diff options
Diffstat (limited to 'same/src/main')
4 files changed, 38 insertions, 22 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 47ee59f..5127884 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -17,11 +17,13 @@ package com.orbekk.same; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; @@ -40,7 +42,7 @@ import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; public class Client { - public static long MASTER_TAKEOVER_TIMEOUT = 500l; + public static int MASTER_TAKEOVER_TIMEOUT = 500; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; @@ -52,7 +54,7 @@ public class Client { private volatile Future<Integer> currentMasterProposal = null; private volatile MasterState masterInfo; private final RpcFactory rpcf; - private final ExecutorService executor; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ClientInterface clientInterface = new ClientInterfaceImpl(); private final AtomicLong revision = new AtomicLong(0); @@ -246,8 +248,10 @@ public class Client { Runnable sleeperTask = new Runnable() { @Override public synchronized void run() { try { - wait(MASTER_TAKEOVER_TIMEOUT); + long timeout = MASTER_TAKEOVER_TIMEOUT - new Random().nextInt(100); + wait(timeout); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } }; @@ -263,9 +267,14 @@ public class Client { try { result = currentMasterProposal.get(); } catch (InterruptedException e) { + result = null; } catch (ExecutionException e) { logger.error("Error electing master: ", e); } catch (CancellationException e) { + result = null; + } + if (Thread.currentThread().isInterrupted()) { + return; } if (!currentMasterProposal.isCancelled() && result != null && masterInfo.getMasterId() <= failedMaster.getMasterId()) { @@ -278,14 +287,12 @@ public class Client { } public Client(State state, ConnectionManager connections, - String myUrl, String myLocation, RpcFactory rpcf, - ExecutorService executor) { + String myUrl, String myLocation, RpcFactory rpcf) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; this.rpcf = rpcf; - this.executor = executor; } public void start() { diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 815f12f..627ea04 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -214,15 +214,28 @@ public class Master { @Override public void run() { try { + logger.info("Starting master takeover."); sendTakeovers(); - getMostRecentState(); - updateParticipants(); - sendFullState(); - finishTakeover(); + if (!aborted.get()) { + getMostRecentState(); + } + if (!aborted.get()) { + updateParticipants(); + } + if (!aborted.get()) { + sendFullState(); + } + if (!aborted.get()) { + finishTakeover(); + } } catch (InterruptedException e) { // Abort master takeover. - logger.warn("Master takeover aborted: ", e); aborted.set(true); + Thread.currentThread().interrupt(); + } + + if (aborted.get()) { + logger.warn("Master takeover aborted."); } } } @@ -392,7 +405,7 @@ public class Master { this.masterId = masterId; MasterTakeover takeover = new MasterTakeover( state.getList(State.PARTICIPANTS), getMasterInfo()); - takeover.run(); + new Thread(takeover).start(); } public void updateRevision(long newRevision) { diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 8e6e977..0782e55 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -82,15 +82,14 @@ public class SameController { } @Override - public void killMaster(final RpcController rpc, Empty unused, - final RpcCallback<Empty> done) { + public void killMaster(RpcController rpc, Empty unused, + RpcCallback<Empty> done) { logger.info("KillMaster()."); String clientLocation = client.getClientState().getLocation(); String masterLocation = client.getMaster().getMasterLocation(); if (clientLocation.equals(masterLocation)) { SameController.this.killMaster(); done.run(Empty.getDefaultInstance()); - return; } else { RpcChannel channel = connections.getChannel(masterLocation); if (channel == null) { @@ -99,16 +98,13 @@ public class SameController { return; } Services.SystemService system = Services.SystemService.newStub(channel); - final Rpc rpc_ = rpcf.create(); + Rpc rpc_ = rpcf.create(); RpcCallback<Empty> done_ = new RpcCallback<Empty>() { @Override public void run(Empty unused) { - if (!rpc_.isOk()) { - rpc.setFailed(rpc_.errorText()); - } - done.run(Empty.getDefaultInstance()); } }; system.killMaster(rpc_, Empty.getDefaultInstance(), done_); + done.run(Empty.getDefaultInstance()); } } } @@ -170,9 +166,8 @@ public class SameController { configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; - ExecutorService clientExecutor = Executors.newCachedThreadPool(); Client client = new Client(clientState, connections, - clientUrl, myLocation, rpcf, clientExecutor); + clientUrl, myLocation, rpcf); PaxosServiceImpl paxos = new PaxosServiceImpl(""); SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java index 25be4f8..4ac9cc0 100644 --- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java +++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java @@ -64,6 +64,7 @@ public class VariableUpdaterTask<T> extends Thread @Override public void valueChanged(Variable<T> variable) { isReady.countDown(); + variable.update(); } @Override |