From de32964e8cf5701590f78907ba22d5a114d8c52b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 7 May 2012 16:15:55 +0200 Subject: Update service definitions. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Update Client and Master services with new message types. --- same/src/main/java/com/orbekk/same/Client.java | 23 ++++++++++++++++++++-- same/src/main/java/com/orbekk/same/Master.java | 13 ++++-------- .../main/java/com/orbekk/same/SameController.java | 16 +++------------ 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 5f9cfd4..bbf9ca2 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -31,7 +31,10 @@ import com.google.protobuf.RpcController; import com.orbekk.paxos.MasterProposer; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.FullStateResponse; +import com.orbekk.same.Services.FullStateResponse.Builder; import com.orbekk.same.Services.MasterState; +import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; @@ -149,7 +152,7 @@ public class Client { } @Override public void masterTakeover(RpcController controller, - MasterState request, RpcCallback done) { + MasterState request, RpcCallback done) { logger.info("MasterTakeover({})", request); if (masterInfo != null && request.getMasterId() <= masterInfo.getMasterId()) { @@ -158,9 +161,15 @@ public class Client { return; } abortMasterElection(); + long highestRevision = 0; + if (masterInfo != null && request.getNetworkName().equals(masterInfo.getNetworkName())) { + highestRevision = revision.get(); + } masterInfo = request; connectionState = ConnectionState.STABLE; - done.run(Empty.getDefaultInstance()); + done.run(MasterTakeoverResponse.newBuilder() + .setHighestKnownRevision(highestRevision) + .build()); } @Override public void masterDown(RpcController controller, MasterState request, @@ -175,6 +184,16 @@ public class Client { executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); } + + @Override + public void getFullState(RpcController controller, Empty request, + RpcCallback done) { + FullStateResponse.Builder response = FullStateResponse.newBuilder(); + response.setRevision(revision.get()); + response.addAllComponent( + ServicesPbConversion.componentsToPb(state.getComponents())); + done.run(response.build()); + } }; private class MasterStarter implements Runnable { diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 473fd1e..b32fedd 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -27,6 +27,7 @@ import com.google.protobuf.RpcController; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; import com.orbekk.util.WorkQueue; @@ -166,8 +167,8 @@ public class Master { { // Send masterTakeover(). Rpc rpc = rpcf.create(); - RpcCallback done = - new RemoveParticipantIfFailsCallback( + RpcCallback done = + new RemoveParticipantIfFailsCallback( clientLocation, rpc); client.masterTakeover(rpc, getMasterInfo(), done); } @@ -243,13 +244,7 @@ public class Master { for (final String location : state.getList(State.PARTICIPANTS)) { Services.Client client = connections.getClient0(location); final Rpc rpc = rpcf.create(); - RpcCallback done = new RpcCallback() { - @Override public void run(Empty unused) { - if (!rpc.isOk()) { - removeParticipant(location); - } - } - }; + RpcCallback done = new RemoveParticipantIfFailsCallback(location, rpc); if (client == null) { removeParticipant(location); continue; diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 6a0456d..5eebcdd 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -49,17 +49,7 @@ public class SameController { private static final int timeout = 10000; private class SystemServiceImpl extends Services.SystemService { - private List componentsToPb(List components) { - List results = new ArrayList(); - for (State.Component c : components) { - results.add(Services.Component.newBuilder() - .setId(c.getName()) - .setRevision(c.getRevision()) - .setData(c.getData()) - .build()); - } - return results; - } + private void addMasterInfo(SystemStatus.Builder response) { Master currentMaster = master; @@ -67,7 +57,7 @@ public class SameController { response.setMasterStatus(currentMaster.getMasterInfo()); State masterState = new State(currentMaster.state); response.addAllMasterStateComponent( - componentsToPb(masterState.getComponents())); + ServicesPbConversion.componentsToPb(masterState.getComponents())); } } @@ -78,7 +68,7 @@ public class SameController { } State clientState = new State(client.state); response.addAllClientStateComponent( - componentsToPb(clientState.getComponents())); + ServicesPbConversion.componentsToPb(clientState.getComponents())); } @Override -- cgit v1.2.3