summaryrefslogtreecommitdiff
path: root/same/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java19
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java25
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java15
-rw-r--r--same/src/main/java/com/orbekk/same/VariableUpdaterTask.java1
4 files changed, 38 insertions, 22 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 47ee59f..5127884 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -17,11 +17,13 @@ package com.orbekk.same;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,7 +42,7 @@ import com.orbekk.same.State.Component;
import com.orbekk.util.DelayedOperation;
public class Client {
- public static long MASTER_TAKEOVER_TIMEOUT = 500l;
+ public static int MASTER_TAKEOVER_TIMEOUT = 500;
private Logger logger = LoggerFactory.getLogger(getClass());
/** TODO: Not really useful yet. Remove? */
private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
@@ -52,7 +54,7 @@ public class Client {
private volatile Future<Integer> currentMasterProposal = null;
private volatile MasterState masterInfo;
private final RpcFactory rpcf;
- private final ExecutorService executor;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final ClientInterface clientInterface = new ClientInterfaceImpl();
private final AtomicLong revision = new AtomicLong(0);
@@ -246,8 +248,10 @@ public class Client {
Runnable sleeperTask = new Runnable() {
@Override public synchronized void run() {
try {
- wait(MASTER_TAKEOVER_TIMEOUT);
+ long timeout = MASTER_TAKEOVER_TIMEOUT - new Random().nextInt(100);
+ wait(timeout);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
};
@@ -263,9 +267,14 @@ public class Client {
try {
result = currentMasterProposal.get();
} catch (InterruptedException e) {
+ result = null;
} catch (ExecutionException e) {
logger.error("Error electing master: ", e);
} catch (CancellationException e) {
+ result = null;
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ return;
}
if (!currentMasterProposal.isCancelled() && result != null &&
masterInfo.getMasterId() <= failedMaster.getMasterId()) {
@@ -278,14 +287,12 @@ public class Client {
}
public Client(State state, ConnectionManager connections,
- String myUrl, String myLocation, RpcFactory rpcf,
- ExecutorService executor) {
+ String myUrl, String myLocation, RpcFactory rpcf) {
this.state = state;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
this.rpcf = rpcf;
- this.executor = executor;
}
public void start() {
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 815f12f..627ea04 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -214,15 +214,28 @@ public class Master {
@Override public void run() {
try {
+ logger.info("Starting master takeover.");
sendTakeovers();
- getMostRecentState();
- updateParticipants();
- sendFullState();
- finishTakeover();
+ if (!aborted.get()) {
+ getMostRecentState();
+ }
+ if (!aborted.get()) {
+ updateParticipants();
+ }
+ if (!aborted.get()) {
+ sendFullState();
+ }
+ if (!aborted.get()) {
+ finishTakeover();
+ }
} catch (InterruptedException e) {
// Abort master takeover.
- logger.warn("Master takeover aborted: ", e);
aborted.set(true);
+ Thread.currentThread().interrupt();
+ }
+
+ if (aborted.get()) {
+ logger.warn("Master takeover aborted.");
}
}
}
@@ -392,7 +405,7 @@ public class Master {
this.masterId = masterId;
MasterTakeover takeover = new MasterTakeover(
state.getList(State.PARTICIPANTS), getMasterInfo());
- takeover.run();
+ new Thread(takeover).start();
}
public void updateRevision(long newRevision) {
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 8e6e977..0782e55 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -82,15 +82,14 @@ public class SameController {
}
@Override
- public void killMaster(final RpcController rpc, Empty unused,
- final RpcCallback<Empty> done) {
+ public void killMaster(RpcController rpc, Empty unused,
+ RpcCallback<Empty> done) {
logger.info("KillMaster().");
String clientLocation = client.getClientState().getLocation();
String masterLocation = client.getMaster().getMasterLocation();
if (clientLocation.equals(masterLocation)) {
SameController.this.killMaster();
done.run(Empty.getDefaultInstance());
- return;
} else {
RpcChannel channel = connections.getChannel(masterLocation);
if (channel == null) {
@@ -99,16 +98,13 @@ public class SameController {
return;
}
Services.SystemService system = Services.SystemService.newStub(channel);
- final Rpc rpc_ = rpcf.create();
+ Rpc rpc_ = rpcf.create();
RpcCallback<Empty> done_ = new RpcCallback<Empty>() {
@Override public void run(Empty unused) {
- if (!rpc_.isOk()) {
- rpc.setFailed(rpc_.errorText());
- }
- done.run(Empty.getDefaultInstance());
}
};
system.killMaster(rpc_, Empty.getDefaultInstance(), done_);
+ done.run(Empty.getDefaultInstance());
}
}
}
@@ -170,9 +166,8 @@ public class SameController {
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
- ExecutorService clientExecutor = Executors.newCachedThreadPool();
Client client = new Client(clientState, connections,
- clientUrl, myLocation, rpcf, clientExecutor);
+ clientUrl, myLocation, rpcf);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
index 25be4f8..4ac9cc0 100644
--- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
+++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
@@ -64,6 +64,7 @@ public class VariableUpdaterTask<T> extends Thread
@Override
public void valueChanged(Variable<T> variable) {
isReady.countDown();
+ variable.update();
}
@Override