diff options
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 5 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 26 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/FunctionalTest.java | 11 |
3 files changed, 18 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 9b74532..dbfd3c1 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -10,6 +10,7 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,10 @@ public class MasterProposer extends Thread { for (String location : paxosLocations) { Rpc rpc = new Rpc(); Services.Paxos paxos = connections.getPaxos0(location); + if (paxos == null) { + handler.run(null); + continue; + } PaxosRequest request = PaxosRequest.newBuilder() .setClient(client) .setProposalNumber(proposalNumber) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 1833410..f9bd2a9 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -151,15 +151,15 @@ public class Client { @Override public void masterDown(RpcController controller, MasterState request, RpcCallback<Empty> done) { + logger.warn("Master down({})", request); if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", request.getMasterId(), masterInfo.getMasterId()); return; } - logger.warn("Master down."); connectionState = ConnectionState.UNSTABLE; - tryBecomeMaster(request.getMasterId()); done.run(Empty.getDefaultInstance()); + tryBecomeMaster(request); } }; @@ -292,22 +292,9 @@ public class Client { return newServiceImpl; } - private List<String> getPaxosUrlsNoMaster() { - List<String> paxosUrls = new ArrayList<String>(); - for (String participant : state.getList(".participants")) { - String masterPaxos = state.getDataOf(".masterUrl") - .replace("MasterService", "PaxosService"); - String paxos = participant.replace("ClientService", "PaxosService"); - if (!paxos.equals(masterPaxos)) { - paxosUrls.add(participant.replace("ClientService", "PaxosService")); - } - } - logger.info("Paxos urls: {}", paxosUrls); - return paxosUrls; - } - - private void tryBecomeMaster(int failedMasterId) { - List<String> paxosUrls = getPaxosUrlsNoMaster(); + private void tryBecomeMaster(MasterState failedMaster) { + List<String> paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, connections); if (masterController == null) { @@ -323,7 +310,7 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterInfo.getMasterId()) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } @@ -335,6 +322,7 @@ public class Client { result = currentMasterProposal.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { + logger.error("Error electing master: ", e); } catch (CancellationException e) { } if (!currentMasterProposal.isCancelled() && result != null) { diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 79bd9b8..c0b710e 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -53,8 +53,9 @@ public class FunctionalTest { connections.clientMap0.put(location, client.getNewService()); clients.add(client); String paxosUrl = clientUrl.replace("ClientService", "PaxosService"); - PaxosService paxos = new PaxosServiceImpl(paxosUrl); + PaxosServiceImpl paxos = new PaxosServiceImpl(paxosUrl); connections.paxosMap.put(paxosUrl, paxos); + connections.paxosMap0.put(location, paxos.getService()); return client; } @@ -86,10 +87,10 @@ public class FunctionalTest { @Test public void testJoin() { joinClients(); for (State s : getStates()) { - List<String> participants = s.getList(".participants"); - assertThat(participants, hasItem("http://client1/ClientService.json")); - assertThat(participants, hasItem("http://client2/ClientService.json")); - assertThat(participants, hasItem("http://client3/ClientService.json")); + List<String> participants = s.getList(State.PARTICIPANTS); + assertThat(participants, hasItem("client1")); + assertThat(participants, hasItem("client2")); + assertThat(participants, hasItem("client3")); } for (Client c : clients) { assertThat(c.getConnectionState(), is(ConnectionState.STABLE)); |