summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Client.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Client.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java101
1 files changed, 67 insertions, 34 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 53d0ac8..704dc8e 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -11,7 +11,11 @@ import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
import com.orbekk.paxos.MasterProposer;
+import com.orbekk.same.Services.Empty;
+import com.orbekk.same.Services.MasterState;
import com.orbekk.same.State.Component;
import com.orbekk.util.DelayedOperation;
import com.orbekk.util.WorkQueue;
@@ -94,58 +98,87 @@ public class Client {
private ClientInterface clientInterface = new ClientInterfaceImpl();
- private ClientService serviceImpl = new ClientService() {
- @Override
- public void setState(String component, String data, long revision) throws Exception {
- boolean status = state.update(component, data, revision);
+ private Services.Client newServiceImpl = new Services.Client() {
+ @Override public void setState(RpcController controller,
+ Services.Component request, RpcCallback<Empty> done) {
+ boolean status = state.update(request.getId(), request.getData(),
+ request.getRevision());
if (status) {
for (StateChangedListener listener : stateListeners) {
- listener.stateChanged(state.getComponent(component));
+ listener.stateChanged(state.getComponent(request.getId()));
}
} else {
logger.warn("Ignoring update: {) => {}",
- state.getComponent(component),
- new State.Component(component, revision, data));
- }
- }
-
- @Override
- public void notifyNetwork(String networkName, String masterUrl) throws Exception {
- logger.info("NotifyNetwork(networkName={}, masterUrl={})",
- networkName, masterUrl);
- if (networkListener != null) {
- networkListener.notifyNetwork(networkName, masterUrl);
- }
+ state.getComponent(request.getId()),
+ new State.Component(request.getId(), request.getRevision(),
+ request.getData()));
+ }
}
- @Override
- public synchronized void masterTakeover(String masterUrl, String networkName,
- int masterId) throws Exception {
- logger.info("MasterTakeover({}, {}, {})",
- new Object[]{masterUrl, networkName, masterId});
- if (masterId <= Client.this.masterId) {
- logger.warn("{}:{} tried to take over, but current master is " +
- "{}:{}. Ignoring", new Object[]{masterUrl, masterId,
- state.getDataOf(".masterUrl"),
- Client.this.masterId});
+ @Override public void masterTakeover(RpcController controller,
+ MasterState request, RpcCallback<Empty> done) {
+ logger.info("MasterTakeover({})", request);
+ if (request.getMasterId() <= Client.this.masterId) {
+ logger.warn("{} tried to take over, but current master is " +
+ "{}:{}. Ignoring", new Object[]{request,
+ state.getDataOf(".masterUrl"),
+ Client.this.masterId});
return;
}
abortMasterElection();
- Client.this.masterUrl = masterUrl;
- Client.this.masterId = masterId;
+ Client.this.masterUrl = request.getMasterUrl();
+ Client.this.masterId = request.getMasterId();
connectionState = ConnectionState.STABLE;
}
- @Override
- public void masterDown(int masterId) throws Exception {
- if (masterId < Client.this.masterId) {
+ @Override public void masterDown(RpcController controller, MasterState request,
+ RpcCallback<Empty> done) {
+ if (request.getMasterId() < Client.this.masterId) {
logger.info("Master {} is down, but current master is {}. Ignoring.",
- masterId, Client.this.masterId);
+ request.getMasterId(), Client.this.masterId);
return;
}
logger.warn("Master down.");
connectionState = ConnectionState.UNSTABLE;
- tryBecomeMaster(masterId);
+ tryBecomeMaster(request.getMasterId());
+ }
+ };
+
+ private ClientService serviceImpl = new ClientService() {
+ RpcCallback<Empty> noOp = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ }
+ };
+
+ @Override
+ public void setState(String component, String data, long revision) throws Exception {
+ Services.Component request = Services.Component.newBuilder()
+ .setId(component)
+ .setData(data)
+ .setRevision(revision)
+ .build();
+ newServiceImpl.setState(null, request, noOp);
+ }
+
+ @Override
+ public synchronized void masterTakeover(String masterUrl, String networkName,
+ int masterId) throws Exception {
+ Services.MasterState request = Services.MasterState.newBuilder()
+ .setMasterUrl(masterUrl)
+ .setNetworkName(networkName)
+ .setMasterId(masterId)
+ .build();
+ newServiceImpl.masterTakeover(null, request, noOp);
+ }
+
+ @Override
+ public void masterDown(int masterId) throws Exception {
+ Services.MasterState request = Services.MasterState.newBuilder()
+ .setMasterUrl(masterUrl)
+ .setNetworkName(state.getDataOf(".networkName"))
+ .setMasterId(masterId)
+ .build();
+ newServiceImpl.masterDown(null, request, noOp);
}
};