From 1cde431ec7e0f5f29ec329c1949c7f5c76366ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 25 Apr 2012 14:27:33 +0200 Subject: Fix thread starvation bug in Client. --- same/src/main/java/com/orbekk/same/Client.java | 98 +++++++++++++++----------- 1 file changed, 58 insertions(+), 40 deletions(-) (limited to 'same/src/main/java/com/orbekk/same/Client.java') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 5a7918c..bef096b 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.slf4j.Logger; @@ -31,6 +32,7 @@ public class Client { private volatile Future currentMasterProposal = null; private volatile MasterState masterInfo; private final RpcFactory rpcf; + private final ExecutorService executor; private List stateListeners = new ArrayList(); @@ -155,18 +157,71 @@ public class Client { return; } connectionState = ConnectionState.UNSTABLE; + executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); - tryBecomeMaster(request); } }; + private class MasterStarter implements Runnable { + private final MasterState failedMaster; + + public MasterStarter(MasterState failedMaster) { + this.failedMaster = failedMaster; + } + + @Override public void run() { + logger.info("Trying to become master. Failed master: {}.", + failedMaster); + List paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); + MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, + connections, rpcf); + if (masterController == null) { + logger.warn("Could not become master: No master controller."); + return; + } + Runnable sleeperTask = new Runnable() { + @Override public synchronized void run() { + try { + wait(MASTER_TAKEOVER_TIMEOUT); + } catch (InterruptedException e) { + } + } + }; + synchronized (this) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { + logger.info("Master election aborted. Master already chosen."); + return; + } + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); + } + Integer result = null; + try { + result = currentMasterProposal.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + logger.error("Error electing master: ", e); + } catch (CancellationException e) { + } + if (!currentMasterProposal.isCancelled() && result != null && + masterInfo.getMasterId() <= failedMaster.getMasterId()) { + masterController.enableMaster(new State(state), result); + } else { + logger.info("Master election aborted. Master already chosen."); + } + } + } + public Client(State state, ConnectionManager connections, - String myUrl, String myLocation, RpcFactory rpcf) { + String myUrl, String myLocation, RpcFactory rpcf, + ExecutorService executor) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; this.rpcf = rpcf; + this.executor = executor; } public void start() { @@ -174,6 +229,7 @@ public class Client { public void interrupt() { connectionState = ConnectionState.DISCONNECTED; + executor.shutdown(); } void performWork() { @@ -242,44 +298,6 @@ public class Client { return newServiceImpl; } - private void tryBecomeMaster(MasterState failedMaster) { - List paxosUrls = state.getList(State.PARTICIPANTS); - paxosUrls.remove(failedMaster.getMasterLocation()); - MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, - connections, rpcf); - if (masterController == null) { - logger.warn("Could not become master: No master controller."); - return; - } - Runnable sleeperTask = new Runnable() { - @Override public synchronized void run() { - try { - wait(MASTER_TAKEOVER_TIMEOUT); - } catch (InterruptedException e) { - } - } - }; - synchronized (this) { - if (failedMaster.getMasterId() < masterInfo.getMasterId()) { - logger.info("Master election aborted. Master already chosen."); - return; - } - currentMasterProposal = proposer.startProposalTask( - masterInfo.getMasterId() + 1, sleeperTask); - } - Integer result = null; - try { - result = currentMasterProposal.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - logger.error("Error electing master: ", e); - } catch (CancellationException e) { - } - if (!currentMasterProposal.isCancelled() && result != null) { - masterController.enableMaster(new State(state), result); - } - } - private synchronized void abortMasterElection() { if (currentMasterProposal != null && !currentMasterProposal.isDone()) { boolean status = currentMasterProposal.cancel(true); -- cgit v1.2.3