summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 16:50:51 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 16:50:51 +0200
commit3f007673deb7b4c4a59f9d5ba501aa379db1dfc1 (patch)
treeb73a1791e2a574bc086441812358e8acb24d5fc1 /same/src/main/java/com/orbekk
parent8cef018aab54ac3f31643a26a75f974c454893ba (diff)
Client → Master communication now only protobuf-rpc.
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java72
-rw-r--r--same/src/main/java/com/orbekk/same/ClientService.java2
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java10
3 files changed, 54 insertions, 30 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 8b05821..7d1ce56 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -24,15 +24,16 @@ public class Client {
public static long MASTER_TAKEOVER_TIMEOUT = 4000l;
private Logger logger = LoggerFactory.getLogger(getClass());
/** TODO: Not really useful yet. Remove? */
- private ConnectionState connectionState = ConnectionState.DISCONNECTED;
- private ConnectionManager connections;
- State state;
- private String myUrl;
- String masterUrl;
- private int masterId = 0;
- private MasterController masterController = null;
- private Broadcaster broadcaster;
- private Future<Integer> currentMasterProposal = null;
+ private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ private final ConnectionManager connections;
+ volatile State state;
+ private volatile String myUrl;
+ volatile String masterUrl;
+ volatile String masterLocation;
+ private volatile int masterId = 0;
+ private volatile MasterController masterController = null;
+ private final Broadcaster broadcaster;
+ private volatile Future<Integer> currentMasterProposal = null;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -51,30 +52,47 @@ public class Client {
@Override
public DelayedOperation set(Component component) {
- DelayedOperation op = new DelayedOperation();
+ final DelayedOperation op = new DelayedOperation();
if (connectionState != ConnectionState.STABLE) {
op.complete(DelayedOperation.Status.createError(
"Not connected to master: " + connectionState));
return op;
}
- MasterService master = connections.getMaster(masterUrl);
- try {
- boolean success = master.updateStateRequest(
- component.getName(), component.getData(),
- component.getRevision());
- if (success) {
- op.complete(DelayedOperation.Status.createOk());
- } else {
- op.complete(DelayedOperation.Status
- .createConflict("Conflict from master"));
- }
- } catch (Exception e) {
- logger.error("Unable to contact master. Update fails.", e);
- String e_ = throwableToString(e);
+
+ Services.Master master = connections.getMaster0(masterLocation);
+ if (master == null) {
op.complete(DelayedOperation.Status.createError(
- "Error contacting master. Update fails: " + e_));
+ "Not connected to master."));
startMasterElection();
+ return op;
}
+ final Rpc rpc = new Rpc();
+ RpcCallback<Services.UpdateComponentResponse> done =
+ new RpcCallback<Services.UpdateComponentResponse>() {
+ @Override
+ public void run(Services.UpdateComponentResponse response) {
+ if (!rpc.isOk()) {
+ logger.warn("Master failed to respond to update " +
+ "request: {}", rpc);
+ op.complete(DelayedOperation.Status.createError(
+ "Error contacting master. Try again later."));
+ startMasterElection();
+ } else {
+ if (response.getSuccess()) {
+ op.complete(DelayedOperation.Status.createOk());
+ } else {
+ op.complete(DelayedOperation.Status.createConflict(
+ "Conflicting update."));
+ }
+ }
+ }
+ };
+ Services.Component request = Services.Component.newBuilder()
+ .setId(component.getName())
+ .setData(component.getData())
+ .setRevision(component.getRevision())
+ .build();
+ master.updateStateRequest(rpc, request, done);
return op;
}
@@ -126,6 +144,7 @@ public class Client {
}
abortMasterElection();
Client.this.masterUrl = request.getMasterUrl();
+ Client.this.masterLocation = request.getMasterLocation();
Client.this.masterId = request.getMasterId();
connectionState = ConnectionState.STABLE;
done.run(Empty.getDefaultInstance());
@@ -165,11 +184,12 @@ public class Client {
@Override
public synchronized void masterTakeover(String masterUrl, String networkName,
- int masterId) throws Exception {
+ int masterId, String masterLocation) throws Exception {
Services.MasterState request = Services.MasterState.newBuilder()
.setMasterUrl(masterUrl)
.setNetworkName(networkName)
.setMasterId(masterId)
+ .setMasterLocation(masterLocation)
.build();
newServiceImpl.masterTakeover(null, request, noOp);
}
diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java
index 97215e0..d7122c7 100644
--- a/same/src/main/java/com/orbekk/same/ClientService.java
+++ b/same/src/main/java/com/orbekk/same/ClientService.java
@@ -10,7 +10,7 @@ public interface ClientService {
* than the current master.
*/
void masterTakeover(String masterUrl, String networkName,
- int masterId) throws Exception;
+ int masterId, String masterLocation) throws Exception;
/** The master is down, so start a new master election. */
void masterDown(int masterId) throws Exception;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 55d7daf..d1f27e9 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -29,6 +29,7 @@ public class Master {
String myLocation) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
+ state.update(".masterLocation", myLocation, 1);
return new Master(state, connections, broadcaster, myUrl, myLocation);
}
@@ -155,9 +156,11 @@ public class Master {
client.masterTakeover(
state.getDataOf(".masterUrl"),
state.getDataOf(".networkName"),
- masterId);
+ masterId,
+ state.getDataOf(".masterLocation"));
} catch (Exception e) {
- logger.info("Client {} failed to acknowledge master. Remove.");
+ logger.info("Client failed to acknowledge master. Remove.",
+ e);
removeParticipant(url);
}
}
@@ -232,7 +235,8 @@ public class Master {
ClientService client = connections.getClient(url);
try {
client.masterTakeover(myUrl,
- state.getDataOf(".networkName"), masterId);
+ state.getDataOf(".networkName"), masterId,
+ state.getDataOf(".masterLocation"));
} catch (Exception e) {
logger.info("Client {} failed to acknowledge new master. " +
"Removing {}", url);