diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 16:50:51 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 16:50:51 +0200 |
commit | 3f007673deb7b4c4a59f9d5ba501aa379db1dfc1 (patch) | |
tree | b73a1791e2a574bc086441812358e8acb24d5fc1 /same/src/main/java | |
parent | 8cef018aab54ac3f31643a26a75f974c454893ba (diff) |
Client → Master communication now only protobuf-rpc.
Diffstat (limited to 'same/src/main/java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 72 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/ClientService.java | 2 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 10 |
3 files changed, 54 insertions, 30 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 8b05821..7d1ce56 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -24,15 +24,16 @@ public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ - private ConnectionState connectionState = ConnectionState.DISCONNECTED; - private ConnectionManager connections; - State state; - private String myUrl; - String masterUrl; - private int masterId = 0; - private MasterController masterController = null; - private Broadcaster broadcaster; - private Future<Integer> currentMasterProposal = null; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; + private final ConnectionManager connections; + volatile State state; + private volatile String myUrl; + volatile String masterUrl; + volatile String masterLocation; + private volatile int masterId = 0; + private volatile MasterController masterController = null; + private final Broadcaster broadcaster; + private volatile Future<Integer> currentMasterProposal = null; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -51,30 +52,47 @@ public class Client { @Override public DelayedOperation set(Component component) { - DelayedOperation op = new DelayedOperation(); + final DelayedOperation op = new DelayedOperation(); if (connectionState != ConnectionState.STABLE) { op.complete(DelayedOperation.Status.createError( "Not connected to master: " + connectionState)); return op; } - MasterService master = connections.getMaster(masterUrl); - try { - boolean success = master.updateStateRequest( - component.getName(), component.getData(), - component.getRevision()); - if (success) { - op.complete(DelayedOperation.Status.createOk()); - } else { - op.complete(DelayedOperation.Status - .createConflict("Conflict from master")); - } - } catch (Exception e) { - logger.error("Unable to contact master. Update fails.", e); - String e_ = throwableToString(e); + + Services.Master master = connections.getMaster0(masterLocation); + if (master == null) { op.complete(DelayedOperation.Status.createError( - "Error contacting master. Update fails: " + e_)); + "Not connected to master.")); startMasterElection(); + return op; } + final Rpc rpc = new Rpc(); + RpcCallback<Services.UpdateComponentResponse> done = + new RpcCallback<Services.UpdateComponentResponse>() { + @Override + public void run(Services.UpdateComponentResponse response) { + if (!rpc.isOk()) { + logger.warn("Master failed to respond to update " + + "request: {}", rpc); + op.complete(DelayedOperation.Status.createError( + "Error contacting master. Try again later.")); + startMasterElection(); + } else { + if (response.getSuccess()) { + op.complete(DelayedOperation.Status.createOk()); + } else { + op.complete(DelayedOperation.Status.createConflict( + "Conflicting update.")); + } + } + } + }; + Services.Component request = Services.Component.newBuilder() + .setId(component.getName()) + .setData(component.getData()) + .setRevision(component.getRevision()) + .build(); + master.updateStateRequest(rpc, request, done); return op; } @@ -126,6 +144,7 @@ public class Client { } abortMasterElection(); Client.this.masterUrl = request.getMasterUrl(); + Client.this.masterLocation = request.getMasterLocation(); Client.this.masterId = request.getMasterId(); connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); @@ -165,11 +184,12 @@ public class Client { @Override public synchronized void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception { + int masterId, String masterLocation) throws Exception { Services.MasterState request = Services.MasterState.newBuilder() .setMasterUrl(masterUrl) .setNetworkName(networkName) .setMasterId(masterId) + .setMasterLocation(masterLocation) .build(); newServiceImpl.masterTakeover(null, request, noOp); } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index 97215e0..d7122c7 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -10,7 +10,7 @@ public interface ClientService { * than the current master. */ void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception; + int masterId, String masterLocation) throws Exception; /** The master is down, so start a new master election. */ void masterDown(int masterId) throws Exception; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 55d7daf..d1f27e9 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -29,6 +29,7 @@ public class Master { String myLocation) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); + state.update(".masterLocation", myLocation, 1); return new Master(state, connections, broadcaster, myUrl, myLocation); } @@ -155,9 +156,11 @@ public class Master { client.masterTakeover( state.getDataOf(".masterUrl"), state.getDataOf(".networkName"), - masterId); + masterId, + state.getDataOf(".masterLocation")); } catch (Exception e) { - logger.info("Client {} failed to acknowledge master. Remove."); + logger.info("Client failed to acknowledge master. Remove.", + e); removeParticipant(url); } } @@ -232,7 +235,8 @@ public class Master { ClientService client = connections.getClient(url); try { client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId); + state.getDataOf(".networkName"), masterId, + state.getDataOf(".masterLocation")); } catch (Exception e) { logger.info("Client {} failed to acknowledge new master. " + "Removing {}", url); |