diff options
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6386928..51d62e7 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -29,12 +29,10 @@ public class Client { volatile State state; private volatile String myUrl; private volatile String myLocation; - volatile String masterUrl; - volatile String masterLocation; - private volatile int masterId = 0; private volatile MasterController masterController = null; private final Broadcaster broadcaster; private volatile Future<Integer> currentMasterProposal = null; + private volatile MasterState masterInfo; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -60,7 +58,8 @@ public class Client { return op; } - Services.Master master = connections.getMaster0(masterLocation); + Services.Master master = connections.getMaster0( + masterInfo.getMasterLocation()); if (master == null) { op.complete(DelayedOperation.Status.createError( "Not connected to master.")); @@ -136,26 +135,25 @@ public class Client { @Override public void masterTakeover(RpcController controller, MasterState request, RpcCallback<Empty> done) { logger.info("MasterTakeover({})", request); - if (request.getMasterId() <= Client.this.masterId) { + if (masterInfo != null && + request.getMasterId() <= masterInfo.getMasterId()) { logger.warn("{} tried to take over, but current master is " + "{}:{}. Ignoring", new Object[]{request, state.getDataOf(".masterUrl"), - Client.this.masterId}); + masterInfo.getMasterId()}); return; } abortMasterElection(); - Client.this.masterUrl = request.getMasterUrl(); - Client.this.masterLocation = request.getMasterLocation(); - Client.this.masterId = request.getMasterId(); + masterInfo = request; connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); } @Override public void masterDown(RpcController controller, MasterState request, RpcCallback<Empty> done) { - if (request.getMasterId() < Client.this.masterId) { + if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", - request.getMasterId(), Client.this.masterId); + request.getMasterId(), masterInfo.getMasterId()); return; } logger.warn("Master down."); @@ -197,9 +195,7 @@ public class Client { @Override public void masterDown(int masterId) throws Exception { - Services.MasterState request = Services.MasterState.newBuilder() - .setMasterUrl(masterUrl) - .setNetworkName(state.getDataOf(".networkName")) + Services.MasterState request = masterInfo.toBuilder() .setMasterId(masterId) .build(); newServiceImpl.masterDown(null, request, noOp); @@ -236,6 +232,10 @@ public class Client { .build(); } + public MasterState getMaster() { + return masterInfo; + } + public ConnectionState getConnectionState() { return connectionState; } @@ -246,7 +246,7 @@ public class Client { private synchronized void reset() { state.clear(); - masterId = 0; + masterInfo = null; } public Rpc joinNetwork(Services.MasterState masterInfo) { @@ -288,6 +288,10 @@ public class Client { return serviceImpl; } + public Services.Client getNewService() { + return newServiceImpl; + } + private List<String> getPaxosUrlsNoMaster() { List<String> paxosUrls = new ArrayList<String>(); for (String participant : state.getList(".participants")) { @@ -319,12 +323,12 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterId) { + if (failedMasterId < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } - currentMasterProposal = proposer.startProposalTask(masterId + 1, - sleeperTask); + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); } Integer result = null; try { @@ -345,23 +349,23 @@ public class Client { } } - private int getMasterIdEstimate() { - return masterId; - } - public void startMasterElection() { - List<String> participants = state.getList(".participants"); - final int masterId = getMasterIdEstimate(); - broadcaster.broadcast(participants, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterDown(masterId); - } catch (Exception e) { - logger.info("{}.masterDown() did not respond (ignored): " + - url, e); - } + List<String> participants = state.getList(State.PARTICIPANTS); + final MasterState failedMaster = masterInfo; + + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + // Ignore unresponsive clients - master election will take + // care of them. } - }); + }; + + for (String location : participants) { + Rpc rpc = new Rpc(); + Services.Client client = connections.getClient0(location); + if (client != null) { + client.masterDown(rpc, failedMaster, done); + } + } } } |