summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Client.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Client.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java72
1 files changed, 46 insertions, 26 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 8b05821..7d1ce56 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -24,15 +24,16 @@ public class Client {
public static long MASTER_TAKEOVER_TIMEOUT = 4000l;
private Logger logger = LoggerFactory.getLogger(getClass());
/** TODO: Not really useful yet. Remove? */
- private ConnectionState connectionState = ConnectionState.DISCONNECTED;
- private ConnectionManager connections;
- State state;
- private String myUrl;
- String masterUrl;
- private int masterId = 0;
- private MasterController masterController = null;
- private Broadcaster broadcaster;
- private Future<Integer> currentMasterProposal = null;
+ private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ private final ConnectionManager connections;
+ volatile State state;
+ private volatile String myUrl;
+ volatile String masterUrl;
+ volatile String masterLocation;
+ private volatile int masterId = 0;
+ private volatile MasterController masterController = null;
+ private final Broadcaster broadcaster;
+ private volatile Future<Integer> currentMasterProposal = null;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -51,30 +52,47 @@ public class Client {
@Override
public DelayedOperation set(Component component) {
- DelayedOperation op = new DelayedOperation();
+ final DelayedOperation op = new DelayedOperation();
if (connectionState != ConnectionState.STABLE) {
op.complete(DelayedOperation.Status.createError(
"Not connected to master: " + connectionState));
return op;
}
- MasterService master = connections.getMaster(masterUrl);
- try {
- boolean success = master.updateStateRequest(
- component.getName(), component.getData(),
- component.getRevision());
- if (success) {
- op.complete(DelayedOperation.Status.createOk());
- } else {
- op.complete(DelayedOperation.Status
- .createConflict("Conflict from master"));
- }
- } catch (Exception e) {
- logger.error("Unable to contact master. Update fails.", e);
- String e_ = throwableToString(e);
+
+ Services.Master master = connections.getMaster0(masterLocation);
+ if (master == null) {
op.complete(DelayedOperation.Status.createError(
- "Error contacting master. Update fails: " + e_));
+ "Not connected to master."));
startMasterElection();
+ return op;
}
+ final Rpc rpc = new Rpc();
+ RpcCallback<Services.UpdateComponentResponse> done =
+ new RpcCallback<Services.UpdateComponentResponse>() {
+ @Override
+ public void run(Services.UpdateComponentResponse response) {
+ if (!rpc.isOk()) {
+ logger.warn("Master failed to respond to update " +
+ "request: {}", rpc);
+ op.complete(DelayedOperation.Status.createError(
+ "Error contacting master. Try again later."));
+ startMasterElection();
+ } else {
+ if (response.getSuccess()) {
+ op.complete(DelayedOperation.Status.createOk());
+ } else {
+ op.complete(DelayedOperation.Status.createConflict(
+ "Conflicting update."));
+ }
+ }
+ }
+ };
+ Services.Component request = Services.Component.newBuilder()
+ .setId(component.getName())
+ .setData(component.getData())
+ .setRevision(component.getRevision())
+ .build();
+ master.updateStateRequest(rpc, request, done);
return op;
}
@@ -126,6 +144,7 @@ public class Client {
}
abortMasterElection();
Client.this.masterUrl = request.getMasterUrl();
+ Client.this.masterLocation = request.getMasterLocation();
Client.this.masterId = request.getMasterId();
connectionState = ConnectionState.STABLE;
done.run(Empty.getDefaultInstance());
@@ -165,11 +184,12 @@ public class Client {
@Override
public synchronized void masterTakeover(String masterUrl, String networkName,
- int masterId) throws Exception {
+ int masterId, String masterLocation) throws Exception {
Services.MasterState request = Services.MasterState.newBuilder()
.setMasterUrl(masterUrl)
.setNetworkName(networkName)
.setMasterId(masterId)
+ .setMasterLocation(masterLocation)
.build();
newServiceImpl.masterTakeover(null, request, noOp);
}