summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java23
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java13
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java16
3 files changed, 28 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 5f9cfd4..bbf9ca2 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -31,7 +31,10 @@ import com.google.protobuf.RpcController;
import com.orbekk.paxos.MasterProposer;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.Empty;
+import com.orbekk.same.Services.FullStateResponse;
+import com.orbekk.same.Services.FullStateResponse.Builder;
import com.orbekk.same.Services.MasterState;
+import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
import com.orbekk.util.DelayedOperation;
@@ -149,7 +152,7 @@ public class Client {
}
@Override public void masterTakeover(RpcController controller,
- MasterState request, RpcCallback<Empty> done) {
+ MasterState request, RpcCallback<MasterTakeoverResponse> done) {
logger.info("MasterTakeover({})", request);
if (masterInfo != null &&
request.getMasterId() <= masterInfo.getMasterId()) {
@@ -158,9 +161,15 @@ public class Client {
return;
}
abortMasterElection();
+ long highestRevision = 0;
+ if (masterInfo != null && request.getNetworkName().equals(masterInfo.getNetworkName())) {
+ highestRevision = revision.get();
+ }
masterInfo = request;
connectionState = ConnectionState.STABLE;
- done.run(Empty.getDefaultInstance());
+ done.run(MasterTakeoverResponse.newBuilder()
+ .setHighestKnownRevision(highestRevision)
+ .build());
}
@Override public void masterDown(RpcController controller, MasterState request,
@@ -175,6 +184,16 @@ public class Client {
executor.execute(new MasterStarter(request));
done.run(Empty.getDefaultInstance());
}
+
+ @Override
+ public void getFullState(RpcController controller, Empty request,
+ RpcCallback<FullStateResponse> done) {
+ FullStateResponse.Builder response = FullStateResponse.newBuilder();
+ response.setRevision(revision.get());
+ response.addAllComponent(
+ ServicesPbConversion.componentsToPb(state.getComponents()));
+ done.run(response.build());
+ }
};
private class MasterStarter implements Runnable {
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 473fd1e..b32fedd 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -27,6 +27,7 @@ import com.google.protobuf.RpcController;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
+import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
import com.orbekk.util.WorkQueue;
@@ -166,8 +167,8 @@ public class Master {
{ // Send masterTakeover().
Rpc rpc = rpcf.create();
- RpcCallback<Empty> done =
- new RemoveParticipantIfFailsCallback<Empty>(
+ RpcCallback<MasterTakeoverResponse> done =
+ new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
clientLocation, rpc);
client.masterTakeover(rpc, getMasterInfo(), done);
}
@@ -243,13 +244,7 @@ public class Master {
for (final String location : state.getList(State.PARTICIPANTS)) {
Services.Client client = connections.getClient0(location);
final Rpc rpc = rpcf.create();
- RpcCallback<Empty> done = new RpcCallback<Empty>() {
- @Override public void run(Empty unused) {
- if (!rpc.isOk()) {
- removeParticipant(location);
- }
- }
- };
+ RpcCallback<MasterTakeoverResponse> done = new RemoveParticipantIfFailsCallback<Services.MasterTakeoverResponse>(location, rpc);
if (client == null) {
removeParticipant(location);
continue;
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 6a0456d..5eebcdd 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -49,17 +49,7 @@ public class SameController {
private static final int timeout = 10000;
private class SystemServiceImpl extends Services.SystemService {
- private List<Services.Component> componentsToPb(List<State.Component> components) {
- List<Services.Component> results = new ArrayList<Services.Component>();
- for (State.Component c : components) {
- results.add(Services.Component.newBuilder()
- .setId(c.getName())
- .setRevision(c.getRevision())
- .setData(c.getData())
- .build());
- }
- return results;
- }
+
private void addMasterInfo(SystemStatus.Builder response) {
Master currentMaster = master;
@@ -67,7 +57,7 @@ public class SameController {
response.setMasterStatus(currentMaster.getMasterInfo());
State masterState = new State(currentMaster.state);
response.addAllMasterStateComponent(
- componentsToPb(masterState.getComponents()));
+ ServicesPbConversion.componentsToPb(masterState.getComponents()));
}
}
@@ -78,7 +68,7 @@ public class SameController {
}
State clientState = new State(client.state);
response.addAllClientStateComponent(
- componentsToPb(clientState.getComponents()));
+ ServicesPbConversion.componentsToPb(clientState.getComponents()));
}
@Override