diff options
Diffstat (limited to 'same')
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 4 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 17 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/FunctionalTest.java | 28 |
3 files changed, 42 insertions, 7 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 8469cf4..094a749 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -91,9 +91,9 @@ public class MasterProposer extends Thread { } Integer proposeRetry(int proposalNumber, Runnable retryAction) { - assert proposalNumber >= 0; + assert proposalNumber > 0; int nextProposal = proposalNumber; - int result = -1; + int result = nextProposal - 1; while (!Thread.interrupted() && result != nextProposal) { result = internalPropose(nextProposal); diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 18fecd2..ceacd12 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -123,7 +123,7 @@ public class Client implements DiscoveryListener { } @Override - public void masterTakeover(String masterUrl, String networkName, + public synchronized void masterTakeover(String masterUrl, String networkName, int masterId) throws Exception { if (masterId <= Client.this.masterId) { logger.warn("{}:{} tried to take over, but current master is " + @@ -135,6 +135,7 @@ public class Client implements DiscoveryListener { new Object[]{masterUrl, networkName, masterId}); abortMasterElection(); Client.this.masterUrl = masterUrl; + Client.this.masterId = masterId; connectionState = ConnectionState.STABLE; } @@ -147,7 +148,7 @@ public class Client implements DiscoveryListener { } logger.warn("Master down."); connectionState = ConnectionState.UNSTABLE; - tryBecomeMaster(); + tryBecomeMaster(masterId); } }; @@ -266,7 +267,7 @@ public class Client implements DiscoveryListener { return paxosUrls; } - private void tryBecomeMaster() { + private void tryBecomeMaster(int failedMasterId) { List<String> paxosUrls = getPaxosUrls(); MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls, connections); @@ -283,7 +284,12 @@ public class Client implements DiscoveryListener { } }; synchronized (this) { - currentMasterProposal = proposer.startProposalTask(1, sleeperTask); + if (failedMasterId < masterId) { + logger.info("Master election aborted. Master already chosen."); + return; + } + currentMasterProposal = proposer.startProposalTask(masterId + 1, + sleeperTask); } Integer result = null; try { @@ -310,11 +316,12 @@ public class Client implements DiscoveryListener { public void startMasterElection() { List<String> participants = state.getList(".participants"); + final int masterId = getMasterIdEstimate(); broadcaster.broadcast(participants, new ServiceOperation() { @Override public void run(String url) { ClientService client = connections.getClient(url); try { - client.masterDown(getMasterIdEstimate()); + client.masterDown(masterId); } catch (Exception e) { logger.info("{}.masterDown() did not respond (ignored): " + url, e); diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 0da38a6..ea920c4 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -127,4 +127,32 @@ public class FunctionalTest { assertThat(client1.masterUrl, is(newMasterUrl)); assertThat(client2.masterUrl, is(newMasterUrl)); } + + @Test public void onlyOneNewMaster() { + String newMasterUrl = "http://newMaster/MasterService.json"; + final Master newMaster = Master.create(connections, + broadcaster, newMasterUrl, "TestMaster"); + connections.masterMap.put(newMasterUrl, newMaster.getService()); + joinClients(); + MasterController controller = new MasterController() { + boolean firstMaster = true; + @Override + public synchronized void enableMaster(State lastKnownState, + int masterId) { + assertThat(firstMaster, is(true)); + newMaster.resumeFrom(lastKnownState, masterId); + firstMaster = false; + } + @Override + public void disableMaster() { + } + }; + client1.setMasterController(controller); + client2.setMasterController(controller); + client3.setMasterController(controller); + client1.startMasterElection(); + newMaster.performWork(); + assertThat(client1.masterUrl, is(newMasterUrl)); + assertThat(client2.masterUrl, is(newMasterUrl)); + } } |