diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-25 14:27:33 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-25 14:27:33 +0200 |
commit | 1cde431ec7e0f5f29ec329c1949c7f5c76366ce5 (patch) | |
tree | c1ae7f8804b396c502213cc8d2bb6a19fef8e3a9 /same/src/main | |
parent | dc9e014d453e469442cb3555f6c62d496e04dee7 (diff) |
Fix thread starvation bug in Client.
Diffstat (limited to 'same/src/main')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 98 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameController.java | 6 |
2 files changed, 63 insertions, 41 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 5a7918c..bef096b 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.slf4j.Logger; @@ -31,6 +32,7 @@ public class Client { private volatile Future<Integer> currentMasterProposal = null; private volatile MasterState masterInfo; private final RpcFactory rpcf; + private final ExecutorService executor; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -155,18 +157,71 @@ public class Client { return; } connectionState = ConnectionState.UNSTABLE; + executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); - tryBecomeMaster(request); } }; + private class MasterStarter implements Runnable { + private final MasterState failedMaster; + + public MasterStarter(MasterState failedMaster) { + this.failedMaster = failedMaster; + } + + @Override public void run() { + logger.info("Trying to become master. Failed master: {}.", + failedMaster); + List<String> paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); + MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, + connections, rpcf); + if (masterController == null) { + logger.warn("Could not become master: No master controller."); + return; + } + Runnable sleeperTask = new Runnable() { + @Override public synchronized void run() { + try { + wait(MASTER_TAKEOVER_TIMEOUT); + } catch (InterruptedException e) { + } + } + }; + synchronized (this) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { + logger.info("Master election aborted. Master already chosen."); + return; + } + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); + } + Integer result = null; + try { + result = currentMasterProposal.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + logger.error("Error electing master: ", e); + } catch (CancellationException e) { + } + if (!currentMasterProposal.isCancelled() && result != null && + masterInfo.getMasterId() <= failedMaster.getMasterId()) { + masterController.enableMaster(new State(state), result); + } else { + logger.info("Master election aborted. Master already chosen."); + } + } + } + public Client(State state, ConnectionManager connections, - String myUrl, String myLocation, RpcFactory rpcf) { + String myUrl, String myLocation, RpcFactory rpcf, + ExecutorService executor) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; this.rpcf = rpcf; + this.executor = executor; } public void start() { @@ -174,6 +229,7 @@ public class Client { public void interrupt() { connectionState = ConnectionState.DISCONNECTED; + executor.shutdown(); } void performWork() { @@ -242,44 +298,6 @@ public class Client { return newServiceImpl; } - private void tryBecomeMaster(MasterState failedMaster) { - List<String> paxosUrls = state.getList(State.PARTICIPANTS); - paxosUrls.remove(failedMaster.getMasterLocation()); - MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, - connections, rpcf); - if (masterController == null) { - logger.warn("Could not become master: No master controller."); - return; - } - Runnable sleeperTask = new Runnable() { - @Override public synchronized void run() { - try { - wait(MASTER_TAKEOVER_TIMEOUT); - } catch (InterruptedException e) { - } - } - }; - synchronized (this) { - if (failedMaster.getMasterId() < masterInfo.getMasterId()) { - logger.info("Master election aborted. Master already chosen."); - return; - } - currentMasterProposal = proposer.startProposalTask( - masterInfo.getMasterId() + 1, sleeperTask); - } - Integer result = null; - try { - result = currentMasterProposal.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - logger.error("Error electing master: ", e); - } catch (CancellationException e) { - } - if (!currentMasterProposal.isCancelled() && result != null) { - masterController.enableMaster(new State(state), result); - } - } - private synchronized void abortMasterElection() { if (currentMasterProposal != null && !currentMasterProposal.isDone()) { boolean status = currentMasterProposal.cancel(true); diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 9438c7b..651303c 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -1,5 +1,8 @@ package com.orbekk.same; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +63,9 @@ public class SameController { configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; + ExecutorService clientExecutor = Executors.newCachedThreadPool(); Client client = new Client(clientState, connections, - clientUrl, myLocation, rpcf); + clientUrl, myLocation, rpcf, clientExecutor); PaxosServiceImpl paxos = new PaxosServiceImpl(""); SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); |