diff options
Diffstat (limited to 'same/src/main')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 23 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 13 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameController.java | 16 |
3 files changed, 28 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 5f9cfd4..bbf9ca2 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -31,7 +31,10 @@ import com.google.protobuf.RpcController; import com.orbekk.paxos.MasterProposer; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.FullStateResponse; +import com.orbekk.same.Services.FullStateResponse.Builder; import com.orbekk.same.Services.MasterState; +import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; @@ -149,7 +152,7 @@ public class Client { } @Override public void masterTakeover(RpcController controller, - MasterState request, RpcCallback<Empty> done) { + MasterState request, RpcCallback<MasterTakeoverResponse> done) { logger.info("MasterTakeover({})", request); if (masterInfo != null && request.getMasterId() <= masterInfo.getMasterId()) { @@ -158,9 +161,15 @@ public class Client { return; } abortMasterElection(); + long highestRevision = 0; + if (masterInfo != null && request.getNetworkName().equals(masterInfo.getNetworkName())) { + highestRevision = revision.get(); + } masterInfo = request; connectionState = ConnectionState.STABLE; - done.run(Empty.getDefaultInstance()); + done.run(MasterTakeoverResponse.newBuilder() + .setHighestKnownRevision(highestRevision) + .build()); } @Override public void masterDown(RpcController controller, MasterState request, @@ -175,6 +184,16 @@ public class Client { executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); } + + @Override + public void getFullState(RpcController controller, Empty request, + RpcCallback<FullStateResponse> done) { + FullStateResponse.Builder response = FullStateResponse.newBuilder(); + response.setRevision(revision.get()); + response.addAllComponent( + ServicesPbConversion.componentsToPb(state.getComponents())); + done.run(response.build()); + } }; private class MasterStarter implements Runnable { diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 473fd1e..b32fedd 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -27,6 +27,7 @@ import com.google.protobuf.RpcController; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; import com.orbekk.util.WorkQueue; @@ -166,8 +167,8 @@ public class Master { { // Send masterTakeover(). Rpc rpc = rpcf.create(); - RpcCallback<Empty> done = - new RemoveParticipantIfFailsCallback<Empty>( + RpcCallback<MasterTakeoverResponse> done = + new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( clientLocation, rpc); client.masterTakeover(rpc, getMasterInfo(), done); } @@ -243,13 +244,7 @@ public class Master { for (final String location : state.getList(State.PARTICIPANTS)) { Services.Client client = connections.getClient0(location); final Rpc rpc = rpcf.create(); - RpcCallback<Empty> done = new RpcCallback<Empty>() { - @Override public void run(Empty unused) { - if (!rpc.isOk()) { - removeParticipant(location); - } - } - }; + RpcCallback<MasterTakeoverResponse> done = new RemoveParticipantIfFailsCallback<Services.MasterTakeoverResponse>(location, rpc); if (client == null) { removeParticipant(location); continue; diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 6a0456d..5eebcdd 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -49,17 +49,7 @@ public class SameController { private static final int timeout = 10000; private class SystemServiceImpl extends Services.SystemService { - private List<Services.Component> componentsToPb(List<State.Component> components) { - List<Services.Component> results = new ArrayList<Services.Component>(); - for (State.Component c : components) { - results.add(Services.Component.newBuilder() - .setId(c.getName()) - .setRevision(c.getRevision()) - .setData(c.getData()) - .build()); - } - return results; - } + private void addMasterInfo(SystemStatus.Builder response) { Master currentMaster = master; @@ -67,7 +57,7 @@ public class SameController { response.setMasterStatus(currentMaster.getMasterInfo()); State masterState = new State(currentMaster.state); response.addAllMasterStateComponent( - componentsToPb(masterState.getComponents())); + ServicesPbConversion.componentsToPb(masterState.getComponents())); } } @@ -78,7 +68,7 @@ public class SameController { } State clientState = new State(client.state); response.addAllClientStateComponent( - componentsToPb(clientState.getComponents())); + ServicesPbConversion.componentsToPb(clientState.getComponents())); } @Override |