diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 12:41:25 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 12:41:25 +0200 |
commit | ae8791a096626b0196c63e4d9cb68f7a18ad86b0 (patch) | |
tree | b661f744548beaf647e3f734864b2bd15acfcbb7 /same/src/main/java/com | |
parent | 2ef7691606aa86b10e16be93efa9cc7c277c8050 (diff) |
Switch to protobuf based Paxos in functional test.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 5 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 26 |
2 files changed, 12 insertions, 19 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 9b74532..dbfd3c1 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -10,6 +10,7 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,10 @@ public class MasterProposer extends Thread { for (String location : paxosLocations) { Rpc rpc = new Rpc(); Services.Paxos paxos = connections.getPaxos0(location); + if (paxos == null) { + handler.run(null); + continue; + } PaxosRequest request = PaxosRequest.newBuilder() .setClient(client) .setProposalNumber(proposalNumber) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 1833410..f9bd2a9 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -151,15 +151,15 @@ public class Client { @Override public void masterDown(RpcController controller, MasterState request, RpcCallback<Empty> done) { + logger.warn("Master down({})", request); if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", request.getMasterId(), masterInfo.getMasterId()); return; } - logger.warn("Master down."); connectionState = ConnectionState.UNSTABLE; - tryBecomeMaster(request.getMasterId()); done.run(Empty.getDefaultInstance()); + tryBecomeMaster(request); } }; @@ -292,22 +292,9 @@ public class Client { return newServiceImpl; } - private List<String> getPaxosUrlsNoMaster() { - List<String> paxosUrls = new ArrayList<String>(); - for (String participant : state.getList(".participants")) { - String masterPaxos = state.getDataOf(".masterUrl") - .replace("MasterService", "PaxosService"); - String paxos = participant.replace("ClientService", "PaxosService"); - if (!paxos.equals(masterPaxos)) { - paxosUrls.add(participant.replace("ClientService", "PaxosService")); - } - } - logger.info("Paxos urls: {}", paxosUrls); - return paxosUrls; - } - - private void tryBecomeMaster(int failedMasterId) { - List<String> paxosUrls = getPaxosUrlsNoMaster(); + private void tryBecomeMaster(MasterState failedMaster) { + List<String> paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, connections); if (masterController == null) { @@ -323,7 +310,7 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterInfo.getMasterId()) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } @@ -335,6 +322,7 @@ public class Client { result = currentMasterProposal.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { + logger.error("Error electing master: ", e); } catch (CancellationException e) { } if (!currentMasterProposal.isCancelled() && result != null) { |