From 3f007673deb7b4c4a59f9d5ba501aa379db1dfc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Apr 2012 16:50:51 +0200 Subject: =?UTF-8?q?Client=20=E2=86=92=20Master=20communication=20now=20onl?= =?UTF-8?q?y=20protobuf-rpc.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- same/src/main/java/com/orbekk/same/Client.java | 72 ++++++++++++++-------- .../main/java/com/orbekk/same/ClientService.java | 2 +- same/src/main/java/com/orbekk/same/Master.java | 10 ++- 3 files changed, 54 insertions(+), 30 deletions(-) (limited to 'same/src/main/java/com/orbekk') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 8b05821..7d1ce56 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -24,15 +24,16 @@ public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ - private ConnectionState connectionState = ConnectionState.DISCONNECTED; - private ConnectionManager connections; - State state; - private String myUrl; - String masterUrl; - private int masterId = 0; - private MasterController masterController = null; - private Broadcaster broadcaster; - private Future currentMasterProposal = null; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; + private final ConnectionManager connections; + volatile State state; + private volatile String myUrl; + volatile String masterUrl; + volatile String masterLocation; + private volatile int masterId = 0; + private volatile MasterController masterController = null; + private final Broadcaster broadcaster; + private volatile Future currentMasterProposal = null; private List stateListeners = new ArrayList(); @@ -51,30 +52,47 @@ public class Client { @Override public DelayedOperation set(Component component) { - DelayedOperation op = new DelayedOperation(); + final DelayedOperation op = new DelayedOperation(); if (connectionState != ConnectionState.STABLE) { op.complete(DelayedOperation.Status.createError( "Not connected to master: " + connectionState)); return op; } - MasterService master = connections.getMaster(masterUrl); - try { - boolean success = master.updateStateRequest( - component.getName(), component.getData(), - component.getRevision()); - if (success) { - op.complete(DelayedOperation.Status.createOk()); - } else { - op.complete(DelayedOperation.Status - .createConflict("Conflict from master")); - } - } catch (Exception e) { - logger.error("Unable to contact master. Update fails.", e); - String e_ = throwableToString(e); + + Services.Master master = connections.getMaster0(masterLocation); + if (master == null) { op.complete(DelayedOperation.Status.createError( - "Error contacting master. Update fails: " + e_)); + "Not connected to master.")); startMasterElection(); + return op; } + final Rpc rpc = new Rpc(); + RpcCallback done = + new RpcCallback() { + @Override + public void run(Services.UpdateComponentResponse response) { + if (!rpc.isOk()) { + logger.warn("Master failed to respond to update " + + "request: {}", rpc); + op.complete(DelayedOperation.Status.createError( + "Error contacting master. Try again later.")); + startMasterElection(); + } else { + if (response.getSuccess()) { + op.complete(DelayedOperation.Status.createOk()); + } else { + op.complete(DelayedOperation.Status.createConflict( + "Conflicting update.")); + } + } + } + }; + Services.Component request = Services.Component.newBuilder() + .setId(component.getName()) + .setData(component.getData()) + .setRevision(component.getRevision()) + .build(); + master.updateStateRequest(rpc, request, done); return op; } @@ -126,6 +144,7 @@ public class Client { } abortMasterElection(); Client.this.masterUrl = request.getMasterUrl(); + Client.this.masterLocation = request.getMasterLocation(); Client.this.masterId = request.getMasterId(); connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); @@ -165,11 +184,12 @@ public class Client { @Override public synchronized void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception { + int masterId, String masterLocation) throws Exception { Services.MasterState request = Services.MasterState.newBuilder() .setMasterUrl(masterUrl) .setNetworkName(networkName) .setMasterId(masterId) + .setMasterLocation(masterLocation) .build(); newServiceImpl.masterTakeover(null, request, noOp); } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index 97215e0..d7122c7 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -10,7 +10,7 @@ public interface ClientService { * than the current master. */ void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception; + int masterId, String masterLocation) throws Exception; /** The master is down, so start a new master election. */ void masterDown(int masterId) throws Exception; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 55d7daf..d1f27e9 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -29,6 +29,7 @@ public class Master { String myLocation) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); + state.update(".masterLocation", myLocation, 1); return new Master(state, connections, broadcaster, myUrl, myLocation); } @@ -155,9 +156,11 @@ public class Master { client.masterTakeover( state.getDataOf(".masterUrl"), state.getDataOf(".networkName"), - masterId); + masterId, + state.getDataOf(".masterLocation")); } catch (Exception e) { - logger.info("Client {} failed to acknowledge master. Remove."); + logger.info("Client failed to acknowledge master. Remove.", + e); removeParticipant(url); } } @@ -232,7 +235,8 @@ public class Master { ClientService client = connections.getClient(url); try { client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId); + state.getDataOf(".networkName"), masterId, + state.getDataOf(".masterLocation")); } catch (Exception e) { logger.info("Client {} failed to acknowledge new master. " + "Removing {}", url); -- cgit v1.2.3