diff options
Diffstat (limited to 'same/src')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 21 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/FunctionalTest.java | 2 |
2 files changed, 19 insertions, 4 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 69d0e33..4f42c4e 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -18,7 +18,7 @@ import com.orbekk.util.DelayedOperation; import com.orbekk.util.WorkQueue; public class Client implements DiscoveryListener { - public static final long MASTER_TAKEOVER_TIMEOUT = 4000l; + public static long MASTER_TAKEOVER_TIMEOUT = 4000l; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ private ConnectionState connectionState = ConnectionState.DISCONNECTED; @@ -29,6 +29,7 @@ public class Client implements DiscoveryListener { private int masterId = -1; private MasterController masterController = null; private Broadcaster broadcaster; + private Future<Integer> currentMasterProposal = null; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -124,6 +125,9 @@ public class Client implements DiscoveryListener { @Override public void masterTakeover(String masterUrl, String networkName, int masterId) throws Exception { + logger.info("MasterTakeover({}, {}, {})", + new Object[]{masterUrl, networkName, masterId}); + abortMasterElection(); Client.this.masterUrl = masterUrl; connectionState = ConnectionState.STABLE; } @@ -267,19 +271,28 @@ public class Client implements DiscoveryListener { } } }; - Future<Integer> proposal = proposer.startProposalTask(1, sleeperTask); + synchronized (this) { + currentMasterProposal = proposer.startProposalTask(1, sleeperTask); + } Integer result = null; try { - result = proposal.get(); + result = currentMasterProposal.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } catch (CancellationException e) { } - if (!proposal.isCancelled() && result != null) { + if (!currentMasterProposal.isCancelled() && result != null) { masterController.enableMaster(state); } } + private synchronized void abortMasterElection() { + if (currentMasterProposal != null && !currentMasterProposal.isDone()) { + boolean status = currentMasterProposal.cancel(true); + logger.info("Abort status: {}", status); + } + } + public void startMasterElection() { List<String> participants = state.getList(".participants"); broadcaster.broadcast(participants, new ServiceOperation() { diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 9380409..fb8bd10 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -120,6 +120,8 @@ public class FunctionalTest { } }; client1.setMasterController(controller); + client2.setMasterController(controller); + client3.setMasterController(controller); client1.startMasterElection(); newMaster.performWork(); assertThat(client1.masterUrl, is(newMasterUrl)); |