diff options
Diffstat (limited to 'same/src/main')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 71 |
1 files changed, 64 insertions, 7 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 511da08..71a95dd 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -1,6 +1,6 @@ package com.orbekk.same; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.UpdateComponentResponse; @@ -138,18 +139,74 @@ public class Master { }; WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { + class UpdateStateCallback + implements RpcCallback<Empty> { + private final String participantLocation; + private final Rpc rpc; + + public UpdateStateCallback(String participantLocation, Rpc rpc) { + this.participantLocation = participantLocation; + this.rpc = rpc; + } + + @Override + public void run(Empty unused) { + if (rpc.isOk()) { + if (rpc.failed()) { + removeParticipant(participantLocation); + } + } + } + } + @Override protected void onChange() { List<String> pending = getAndClear(); logger.info("updateStateRequestThread: Updated state: {}", pending); - for (String componentName : pending) { - Component component = state.getComponent(componentName); - List<String> participants = state.getList(".participants"); - broadcastNewComponents(participants, - Collections.singletonList(component)); +// for (String componentName : pending) { +// Component component = state.getComponent(componentName); +// List<String> participants = state.getList(".participants"); +// broadcastNewComponents(participants, +// Collections.singletonList(component)); +// } + for (String clientLocation : state.getList( + com.orbekk.same.State.PARTICIPANTS)) { + + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); + continue; + } + + for (String componentName : pending) { + Services.Component updatedComponent = + Services.Component.newBuilder() + .setId(componentName) + .setData(state.getDataOf(componentName)) + .setRevision(state.getRevision(componentName)) + .build(); + + Rpc rpc = new Rpc(); + UpdateStateCallback done = new UpdateStateCallback( + clientLocation, rpc); + client.setState(rpc, updatedComponent, done); + } } } }; + + public List<Services.Client> getClients() { + List<Services.Client> clients = new ArrayList<Services.Client>(); + for (String location : state.getList(State.PARTICIPANTS)) { + Services.Client client = connections.getClient0(location); + if (client == null) { + removeParticipant(location); + } else { + clients.add(client); + } + } + return clients; + } WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { @Override protected void onChange() { @@ -230,7 +287,7 @@ public class Master { updateStateRequestThread.add(State.PARTICIPANTS); } } - + private void broadcastNewComponents(List<String> destinations, final List<State.Component> components) { broadcaster.broadcast(destinations, new ServiceOperation() { |