From a1060ee59fe7c9107641543babe950250835fa4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 17 Apr 2012 15:37:31 +0200 Subject: Client.startMasterElection() uses protobuf service. --- same/src/main/java/com/orbekk/same/Client.java | 72 ++++++++++++++------------ 1 file changed, 38 insertions(+), 34 deletions(-) (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6386928..51d62e7 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -29,12 +29,10 @@ public class Client { volatile State state; private volatile String myUrl; private volatile String myLocation; - volatile String masterUrl; - volatile String masterLocation; - private volatile int masterId = 0; private volatile MasterController masterController = null; private final Broadcaster broadcaster; private volatile Future currentMasterProposal = null; + private volatile MasterState masterInfo; private List stateListeners = new ArrayList(); @@ -60,7 +58,8 @@ public class Client { return op; } - Services.Master master = connections.getMaster0(masterLocation); + Services.Master master = connections.getMaster0( + masterInfo.getMasterLocation()); if (master == null) { op.complete(DelayedOperation.Status.createError( "Not connected to master.")); @@ -136,26 +135,25 @@ public class Client { @Override public void masterTakeover(RpcController controller, MasterState request, RpcCallback done) { logger.info("MasterTakeover({})", request); - if (request.getMasterId() <= Client.this.masterId) { + if (masterInfo != null && + request.getMasterId() <= masterInfo.getMasterId()) { logger.warn("{} tried to take over, but current master is " + "{}:{}. Ignoring", new Object[]{request, state.getDataOf(".masterUrl"), - Client.this.masterId}); + masterInfo.getMasterId()}); return; } abortMasterElection(); - Client.this.masterUrl = request.getMasterUrl(); - Client.this.masterLocation = request.getMasterLocation(); - Client.this.masterId = request.getMasterId(); + masterInfo = request; connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); } @Override public void masterDown(RpcController controller, MasterState request, RpcCallback done) { - if (request.getMasterId() < Client.this.masterId) { + if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", - request.getMasterId(), Client.this.masterId); + request.getMasterId(), masterInfo.getMasterId()); return; } logger.warn("Master down."); @@ -197,9 +195,7 @@ public class Client { @Override public void masterDown(int masterId) throws Exception { - Services.MasterState request = Services.MasterState.newBuilder() - .setMasterUrl(masterUrl) - .setNetworkName(state.getDataOf(".networkName")) + Services.MasterState request = masterInfo.toBuilder() .setMasterId(masterId) .build(); newServiceImpl.masterDown(null, request, noOp); @@ -236,6 +232,10 @@ public class Client { .build(); } + public MasterState getMaster() { + return masterInfo; + } + public ConnectionState getConnectionState() { return connectionState; } @@ -246,7 +246,7 @@ public class Client { private synchronized void reset() { state.clear(); - masterId = 0; + masterInfo = null; } public Rpc joinNetwork(Services.MasterState masterInfo) { @@ -288,6 +288,10 @@ public class Client { return serviceImpl; } + public Services.Client getNewService() { + return newServiceImpl; + } + private List getPaxosUrlsNoMaster() { List paxosUrls = new ArrayList(); for (String participant : state.getList(".participants")) { @@ -319,12 +323,12 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterId) { + if (failedMasterId < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } - currentMasterProposal = proposer.startProposalTask(masterId + 1, - sleeperTask); + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); } Integer result = null; try { @@ -345,23 +349,23 @@ public class Client { } } - private int getMasterIdEstimate() { - return masterId; - } - public void startMasterElection() { - List participants = state.getList(".participants"); - final int masterId = getMasterIdEstimate(); - broadcaster.broadcast(participants, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterDown(masterId); - } catch (Exception e) { - logger.info("{}.masterDown() did not respond (ignored): " + - url, e); - } + List participants = state.getList(State.PARTICIPANTS); + final MasterState failedMaster = masterInfo; + + RpcCallback done = new RpcCallback() { + @Override public void run(Empty unused) { + // Ignore unresponsive clients - master election will take + // care of them. } - }); + }; + + for (String location : participants) { + Rpc rpc = new Rpc(); + Services.Client client = connections.getClient0(location); + if (client != null) { + client.masterDown(rpc, failedMaster, done); + } + } } } -- cgit v1.2.3