diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Client.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 101 |
1 files changed, 67 insertions, 34 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 53d0ac8..704dc8e 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -11,7 +11,11 @@ import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; import com.orbekk.paxos.MasterProposer; +import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.MasterState; import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; import com.orbekk.util.WorkQueue; @@ -94,58 +98,87 @@ public class Client { private ClientInterface clientInterface = new ClientInterfaceImpl(); - private ClientService serviceImpl = new ClientService() { - @Override - public void setState(String component, String data, long revision) throws Exception { - boolean status = state.update(component, data, revision); + private Services.Client newServiceImpl = new Services.Client() { + @Override public void setState(RpcController controller, + Services.Component request, RpcCallback<Empty> done) { + boolean status = state.update(request.getId(), request.getData(), + request.getRevision()); if (status) { for (StateChangedListener listener : stateListeners) { - listener.stateChanged(state.getComponent(component)); + listener.stateChanged(state.getComponent(request.getId())); } } else { logger.warn("Ignoring update: {) => {}", - state.getComponent(component), - new State.Component(component, revision, data)); - } - } - - @Override - public void notifyNetwork(String networkName, String masterUrl) throws Exception { - logger.info("NotifyNetwork(networkName={}, masterUrl={})", - networkName, masterUrl); - if (networkListener != null) { - networkListener.notifyNetwork(networkName, masterUrl); - } + state.getComponent(request.getId()), + new State.Component(request.getId(), request.getRevision(), + request.getData())); + } } - @Override - public synchronized void masterTakeover(String masterUrl, String networkName, - int masterId) throws Exception { - logger.info("MasterTakeover({}, {}, {})", - new Object[]{masterUrl, networkName, masterId}); - if (masterId <= Client.this.masterId) { - logger.warn("{}:{} tried to take over, but current master is " + - "{}:{}. Ignoring", new Object[]{masterUrl, masterId, - state.getDataOf(".masterUrl"), - Client.this.masterId}); + @Override public void masterTakeover(RpcController controller, + MasterState request, RpcCallback<Empty> done) { + logger.info("MasterTakeover({})", request); + if (request.getMasterId() <= Client.this.masterId) { + logger.warn("{} tried to take over, but current master is " + + "{}:{}. Ignoring", new Object[]{request, + state.getDataOf(".masterUrl"), + Client.this.masterId}); return; } abortMasterElection(); - Client.this.masterUrl = masterUrl; - Client.this.masterId = masterId; + Client.this.masterUrl = request.getMasterUrl(); + Client.this.masterId = request.getMasterId(); connectionState = ConnectionState.STABLE; } - @Override - public void masterDown(int masterId) throws Exception { - if (masterId < Client.this.masterId) { + @Override public void masterDown(RpcController controller, MasterState request, + RpcCallback<Empty> done) { + if (request.getMasterId() < Client.this.masterId) { logger.info("Master {} is down, but current master is {}. Ignoring.", - masterId, Client.this.masterId); + request.getMasterId(), Client.this.masterId); return; } logger.warn("Master down."); connectionState = ConnectionState.UNSTABLE; - tryBecomeMaster(masterId); + tryBecomeMaster(request.getMasterId()); + } + }; + + private ClientService serviceImpl = new ClientService() { + RpcCallback<Empty> noOp = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + } + }; + + @Override + public void setState(String component, String data, long revision) throws Exception { + Services.Component request = Services.Component.newBuilder() + .setId(component) + .setData(data) + .setRevision(revision) + .build(); + newServiceImpl.setState(null, request, noOp); + } + + @Override + public synchronized void masterTakeover(String masterUrl, String networkName, + int masterId) throws Exception { + Services.MasterState request = Services.MasterState.newBuilder() + .setMasterUrl(masterUrl) + .setNetworkName(networkName) + .setMasterId(masterId) + .build(); + newServiceImpl.masterTakeover(null, request, noOp); + } + + @Override + public void masterDown(int masterId) throws Exception { + Services.MasterState request = Services.MasterState.newBuilder() + .setMasterUrl(masterUrl) + .setNetworkName(state.getDataOf(".networkName")) + .setMasterId(masterId) + .build(); + newServiceImpl.masterDown(null, request, noOp); } }; |