diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Client.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 72 |
1 files changed, 46 insertions, 26 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 8b05821..7d1ce56 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -24,15 +24,16 @@ public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ - private ConnectionState connectionState = ConnectionState.DISCONNECTED; - private ConnectionManager connections; - State state; - private String myUrl; - String masterUrl; - private int masterId = 0; - private MasterController masterController = null; - private Broadcaster broadcaster; - private Future<Integer> currentMasterProposal = null; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; + private final ConnectionManager connections; + volatile State state; + private volatile String myUrl; + volatile String masterUrl; + volatile String masterLocation; + private volatile int masterId = 0; + private volatile MasterController masterController = null; + private final Broadcaster broadcaster; + private volatile Future<Integer> currentMasterProposal = null; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -51,30 +52,47 @@ public class Client { @Override public DelayedOperation set(Component component) { - DelayedOperation op = new DelayedOperation(); + final DelayedOperation op = new DelayedOperation(); if (connectionState != ConnectionState.STABLE) { op.complete(DelayedOperation.Status.createError( "Not connected to master: " + connectionState)); return op; } - MasterService master = connections.getMaster(masterUrl); - try { - boolean success = master.updateStateRequest( - component.getName(), component.getData(), - component.getRevision()); - if (success) { - op.complete(DelayedOperation.Status.createOk()); - } else { - op.complete(DelayedOperation.Status - .createConflict("Conflict from master")); - } - } catch (Exception e) { - logger.error("Unable to contact master. Update fails.", e); - String e_ = throwableToString(e); + + Services.Master master = connections.getMaster0(masterLocation); + if (master == null) { op.complete(DelayedOperation.Status.createError( - "Error contacting master. Update fails: " + e_)); + "Not connected to master.")); startMasterElection(); + return op; } + final Rpc rpc = new Rpc(); + RpcCallback<Services.UpdateComponentResponse> done = + new RpcCallback<Services.UpdateComponentResponse>() { + @Override + public void run(Services.UpdateComponentResponse response) { + if (!rpc.isOk()) { + logger.warn("Master failed to respond to update " + + "request: {}", rpc); + op.complete(DelayedOperation.Status.createError( + "Error contacting master. Try again later.")); + startMasterElection(); + } else { + if (response.getSuccess()) { + op.complete(DelayedOperation.Status.createOk()); + } else { + op.complete(DelayedOperation.Status.createConflict( + "Conflicting update.")); + } + } + } + }; + Services.Component request = Services.Component.newBuilder() + .setId(component.getName()) + .setData(component.getData()) + .setRevision(component.getRevision()) + .build(); + master.updateStateRequest(rpc, request, done); return op; } @@ -126,6 +144,7 @@ public class Client { } abortMasterElection(); Client.this.masterUrl = request.getMasterUrl(); + Client.this.masterLocation = request.getMasterLocation(); Client.this.masterId = request.getMasterId(); connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); @@ -165,11 +184,12 @@ public class Client { @Override public synchronized void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception { + int masterId, String masterLocation) throws Exception { Services.MasterState request = Services.MasterState.newBuilder() .setMasterUrl(masterUrl) .setNetworkName(networkName) .setMasterId(masterId) + .setMasterLocation(masterLocation) .build(); newServiceImpl.masterTakeover(null, request, noOp); } |