From ae8791a096626b0196c63e4d9cb68f7a18ad86b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 12:41:25 +0200 Subject: Switch to protobuf based Paxos in functional test. --- .../main/java/com/orbekk/paxos/MasterProposer.java | 5 +++++ same/src/main/java/com/orbekk/same/Client.java | 26 ++++++---------------- .../test/java/com/orbekk/same/FunctionalTest.java | 11 ++++----- 3 files changed, 18 insertions(+), 24 deletions(-) diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 9b74532..dbfd3c1 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -10,6 +10,7 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,10 @@ public class MasterProposer extends Thread { for (String location : paxosLocations) { Rpc rpc = new Rpc(); Services.Paxos paxos = connections.getPaxos0(location); + if (paxos == null) { + handler.run(null); + continue; + } PaxosRequest request = PaxosRequest.newBuilder() .setClient(client) .setProposalNumber(proposalNumber) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 1833410..f9bd2a9 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -151,15 +151,15 @@ public class Client { @Override public void masterDown(RpcController controller, MasterState request, RpcCallback done) { + logger.warn("Master down({})", request); if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", request.getMasterId(), masterInfo.getMasterId()); return; } - logger.warn("Master down."); connectionState = ConnectionState.UNSTABLE; - tryBecomeMaster(request.getMasterId()); done.run(Empty.getDefaultInstance()); + tryBecomeMaster(request); } }; @@ -292,22 +292,9 @@ public class Client { return newServiceImpl; } - private List getPaxosUrlsNoMaster() { - List paxosUrls = new ArrayList(); - for (String participant : state.getList(".participants")) { - String masterPaxos = state.getDataOf(".masterUrl") - .replace("MasterService", "PaxosService"); - String paxos = participant.replace("ClientService", "PaxosService"); - if (!paxos.equals(masterPaxos)) { - paxosUrls.add(participant.replace("ClientService", "PaxosService")); - } - } - logger.info("Paxos urls: {}", paxosUrls); - return paxosUrls; - } - - private void tryBecomeMaster(int failedMasterId) { - List paxosUrls = getPaxosUrlsNoMaster(); + private void tryBecomeMaster(MasterState failedMaster) { + List paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, connections); if (masterController == null) { @@ -323,7 +310,7 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterInfo.getMasterId()) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } @@ -335,6 +322,7 @@ public class Client { result = currentMasterProposal.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { + logger.error("Error electing master: ", e); } catch (CancellationException e) { } if (!currentMasterProposal.isCancelled() && result != null) { diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 79bd9b8..c0b710e 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -53,8 +53,9 @@ public class FunctionalTest { connections.clientMap0.put(location, client.getNewService()); clients.add(client); String paxosUrl = clientUrl.replace("ClientService", "PaxosService"); - PaxosService paxos = new PaxosServiceImpl(paxosUrl); + PaxosServiceImpl paxos = new PaxosServiceImpl(paxosUrl); connections.paxosMap.put(paxosUrl, paxos); + connections.paxosMap0.put(location, paxos.getService()); return client; } @@ -86,10 +87,10 @@ public class FunctionalTest { @Test public void testJoin() { joinClients(); for (State s : getStates()) { - List participants = s.getList(".participants"); - assertThat(participants, hasItem("http://client1/ClientService.json")); - assertThat(participants, hasItem("http://client2/ClientService.json")); - assertThat(participants, hasItem("http://client3/ClientService.json")); + List participants = s.getList(State.PARTICIPANTS); + assertThat(participants, hasItem("client1")); + assertThat(participants, hasItem("client2")); + assertThat(participants, hasItem("client3")); } for (Client c : clients) { assertThat(c.getConnectionState(), is(ConnectionState.STABLE)); -- cgit v1.2.3