summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java195
1 files changed, 66 insertions, 129 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 71a95dd..94a3e5a 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -25,6 +25,26 @@ public class Master {
private Broadcaster broadcaster;
private volatile int masterId = 1;
+ class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> {
+ private final String participantLocation;
+ private final Rpc rpc;
+
+ public RemoveParticipantIfFailsCallback(
+ String participantLocation, Rpc rpc) {
+ this.participantLocation = participantLocation;
+ this.rpc = rpc;
+ }
+
+ @Override
+ public void run(T unused) {
+ if (rpc.isOk()) {
+ if (rpc.failed()) {
+ removeParticipant(participantLocation);
+ }
+ }
+ }
+ }
+
public static Master create(ConnectionManager connections,
Broadcaster broadcaster, String myUrl, String networkName,
String myLocation) {
@@ -68,24 +88,8 @@ public class Master {
@Override public void joinNetworkRequest(RpcController controller,
ClientState request, RpcCallback<Empty> done) {
logger.info("joinNetworkRequest({})", request);
-
- /** Old participant code. */
- List<String> participants = state.getList(".participants");
- sendFullStateThread.add(request.getUrl());
- if (!participants.contains(request.getUrl())) {
- participants.add(request.getUrl());
- synchronized (Master.this) {
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- }
- updateStateRequestThread.add(".participants");
- } else {
- logger.warn("Client {} joining: Already part of network");
- }
-
- /** New participant code. */
+ sendFullStateThread.add(request.getLocation());
addParticipant(request.getLocation());
-
done.run(Empty.getDefaultInstance());
}
@@ -104,135 +108,72 @@ public class Master {
}
};
- private MasterService serviceImpl = new MasterService() {
- @Override
- public boolean updateStateRequest(String component,
- String newData, long revision) {
- Services.Component request = Services.Component.newBuilder()
- .setId(component)
- .setData(newData)
- .setRevision(revision)
- .build();
- final AtomicBoolean result = new AtomicBoolean(false);
- RpcCallback<Services.UpdateComponentResponse> done =
- new RpcCallback<Services.UpdateComponentResponse>() {
- @Override public void run(UpdateComponentResponse response) {
- result.set(response.getSuccess());
- }
- };
- newMasterImpl.updateStateRequest(null, request, done);
- return result.get();
- }
-
- @Override
- public void joinNetworkRequest(String clientUrl) {
- Services.ClientState request = Services.ClientState.newBuilder()
- .setUrl(clientUrl)
- .build();
- RpcCallback<Services.Empty> done =
- new RpcCallback<Services.Empty>() {
- @Override public void run(Empty response) {
- }
- };
- newMasterImpl.joinNetworkRequest(null, request, done);
- }
- };
-
WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- class UpdateStateCallback
- implements RpcCallback<Empty> {
- private final String participantLocation;
- private final Rpc rpc;
-
- public UpdateStateCallback(String participantLocation, Rpc rpc) {
- this.participantLocation = participantLocation;
- this.rpc = rpc;
- }
-
- @Override
- public void run(Empty unused) {
- if (rpc.isOk()) {
- if (rpc.failed()) {
- removeParticipant(participantLocation);
- }
- }
- }
- }
-
@Override protected void onChange() {
List<String> pending = getAndClear();
+ List<Component> updatedComponents = new ArrayList<Component>();
+ for (String component : pending) {
+ updatedComponents.add(state.getComponent(component));
+ }
+
logger.info("updateStateRequestThread: Updated state: {}",
pending);
-// for (String componentName : pending) {
-// Component component = state.getComponent(componentName);
-// List<String> participants = state.getList(".participants");
-// broadcastNewComponents(participants,
-// Collections.singletonList(component));
-// }
for (String clientLocation : state.getList(
com.orbekk.same.State.PARTICIPANTS)) {
-
- Services.Client client = connections.getClient0(clientLocation);
- if (client == null) {
- removeParticipant(clientLocation);
- continue;
- }
-
- for (String componentName : pending) {
- Services.Component updatedComponent =
- Services.Component.newBuilder()
- .setId(componentName)
- .setData(state.getDataOf(componentName))
- .setRevision(state.getRevision(componentName))
- .build();
-
- Rpc rpc = new Rpc();
- UpdateStateCallback done = new UpdateStateCallback(
- clientLocation, rpc);
- client.setState(rpc, updatedComponent, done);
- }
+ sendComponents(clientLocation, updatedComponents);
}
}
};
-
- public List<Services.Client> getClients() {
- List<Services.Client> clients = new ArrayList<Services.Client>();
- for (String location : state.getList(State.PARTICIPANTS)) {
- Services.Client client = connections.getClient0(location);
- if (client == null) {
- removeParticipant(location);
- } else {
- clients.add(client);
- }
+
+ public void sendComponents(String clientLocation,
+ List<Component> components) {
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
}
- return clients;
- }
+ for (Component component : components) {
+ Services.Component componentProto = componentToProto(component);
+ Rpc rpc = new Rpc();
+ RpcCallback<Empty> done =
+ new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
+ rpc);
+ client.setState(rpc, componentProto, done);
+ }
+ }
+
WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
@Override protected void onChange() {
List<String> pending = getAndClear();
logger.info("Sending full state to {}", pending);
final List<Component> components = state.getComponents();
- broadcaster.broadcast(pending, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterTakeover(
- state.getDataOf(".masterUrl"),
- state.getDataOf(".networkName"),
- masterId,
- state.getDataOf(".masterLocation"));
- } catch (Exception e) {
- logger.info("Client failed to acknowledge master. Remove.",
- e);
- removeParticipant(url);
- }
+ for (String clientLocation : pending) {
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
+ continue;
+ }
+
+ { // Send masterTakeover().
+ Rpc rpc = new Rpc();
+ RpcCallback<Empty> done =
+ new RemoveParticipantIfFailsCallback<Empty>(
+ clientLocation, rpc);
+ client.masterTakeover(rpc, getMasterInfo(), done);
}
- });
- broadcastNewComponents(pending, components);
+ sendComponents(clientLocation, components);
+ }
}
};
+ private Services.Component componentToProto(State.Component component) {
+ return Services.Component.newBuilder()
+ .setId(component.getName())
+ .setData(component.getData())
+ .setRevision(component.getRevision())
+ .build();
+ }
+
void performWork() {
sendFullStateThread.performWork();
updateStateRequestThread.performWork();
@@ -252,10 +193,6 @@ public class Master {
return newMasterImpl;
}
- public MasterService getService() {
- return serviceImpl;
- }
-
private synchronized void addParticipant(String location) {
List<String> participants = state.getList(State.PARTICIPANTS);
if (!participants.contains(location)) {