summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java71
1 files changed, 64 insertions, 7 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 511da08..71a95dd 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,6 +1,6 @@
package com.orbekk.same;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.UpdateComponentResponse;
@@ -138,18 +139,74 @@ public class Master {
};
WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
+ class UpdateStateCallback
+ implements RpcCallback<Empty> {
+ private final String participantLocation;
+ private final Rpc rpc;
+
+ public UpdateStateCallback(String participantLocation, Rpc rpc) {
+ this.participantLocation = participantLocation;
+ this.rpc = rpc;
+ }
+
+ @Override
+ public void run(Empty unused) {
+ if (rpc.isOk()) {
+ if (rpc.failed()) {
+ removeParticipant(participantLocation);
+ }
+ }
+ }
+ }
+
@Override protected void onChange() {
List<String> pending = getAndClear();
logger.info("updateStateRequestThread: Updated state: {}",
pending);
- for (String componentName : pending) {
- Component component = state.getComponent(componentName);
- List<String> participants = state.getList(".participants");
- broadcastNewComponents(participants,
- Collections.singletonList(component));
+// for (String componentName : pending) {
+// Component component = state.getComponent(componentName);
+// List<String> participants = state.getList(".participants");
+// broadcastNewComponents(participants,
+// Collections.singletonList(component));
+// }
+ for (String clientLocation : state.getList(
+ com.orbekk.same.State.PARTICIPANTS)) {
+
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
+ continue;
+ }
+
+ for (String componentName : pending) {
+ Services.Component updatedComponent =
+ Services.Component.newBuilder()
+ .setId(componentName)
+ .setData(state.getDataOf(componentName))
+ .setRevision(state.getRevision(componentName))
+ .build();
+
+ Rpc rpc = new Rpc();
+ UpdateStateCallback done = new UpdateStateCallback(
+ clientLocation, rpc);
+ client.setState(rpc, updatedComponent, done);
+ }
}
}
};
+
+ public List<Services.Client> getClients() {
+ List<Services.Client> clients = new ArrayList<Services.Client>();
+ for (String location : state.getList(State.PARTICIPANTS)) {
+ Services.Client client = connections.getClient0(location);
+ if (client == null) {
+ removeParticipant(location);
+ } else {
+ clients.add(client);
+ }
+ }
+ return clients;
+ }
WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
@Override protected void onChange() {
@@ -230,7 +287,7 @@ public class Master {
updateStateRequestThread.add(State.PARTICIPANTS);
}
}
-
+
private void broadcastNewComponents(List<String> destinations,
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {