summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 16:25:18 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 16:25:18 +0200
commite2d00cd5fa29e1a5cada502248cd81358e4da491 (patch)
tree33261b3b834f8d01c59d040df5e367c7de40b15e
parenta1060ee59fe7c9107641543babe950250835fa4b (diff)
Master sends update requests using protobuf service.
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java71
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java19
2 files changed, 75 insertions, 15 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 511da08..71a95dd 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,6 +1,6 @@
package com.orbekk.same;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.UpdateComponentResponse;
@@ -138,18 +139,74 @@ public class Master {
};
WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
+ class UpdateStateCallback
+ implements RpcCallback<Empty> {
+ private final String participantLocation;
+ private final Rpc rpc;
+
+ public UpdateStateCallback(String participantLocation, Rpc rpc) {
+ this.participantLocation = participantLocation;
+ this.rpc = rpc;
+ }
+
+ @Override
+ public void run(Empty unused) {
+ if (rpc.isOk()) {
+ if (rpc.failed()) {
+ removeParticipant(participantLocation);
+ }
+ }
+ }
+ }
+
@Override protected void onChange() {
List<String> pending = getAndClear();
logger.info("updateStateRequestThread: Updated state: {}",
pending);
- for (String componentName : pending) {
- Component component = state.getComponent(componentName);
- List<String> participants = state.getList(".participants");
- broadcastNewComponents(participants,
- Collections.singletonList(component));
+// for (String componentName : pending) {
+// Component component = state.getComponent(componentName);
+// List<String> participants = state.getList(".participants");
+// broadcastNewComponents(participants,
+// Collections.singletonList(component));
+// }
+ for (String clientLocation : state.getList(
+ com.orbekk.same.State.PARTICIPANTS)) {
+
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
+ continue;
+ }
+
+ for (String componentName : pending) {
+ Services.Component updatedComponent =
+ Services.Component.newBuilder()
+ .setId(componentName)
+ .setData(state.getDataOf(componentName))
+ .setRevision(state.getRevision(componentName))
+ .build();
+
+ Rpc rpc = new Rpc();
+ UpdateStateCallback done = new UpdateStateCallback(
+ clientLocation, rpc);
+ client.setState(rpc, updatedComponent, done);
+ }
}
}
};
+
+ public List<Services.Client> getClients() {
+ List<Services.Client> clients = new ArrayList<Services.Client>();
+ for (String location : state.getList(State.PARTICIPANTS)) {
+ Services.Client client = connections.getClient0(location);
+ if (client == null) {
+ removeParticipant(location);
+ } else {
+ clients.add(client);
+ }
+ }
+ return clients;
+ }
WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
@Override protected void onChange() {
@@ -230,7 +287,7 @@ public class Master {
updateStateRequestThread.add(State.PARTICIPANTS);
}
}
-
+
private void broadcastNewComponents(List<String> destinations,
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 22414fe..96f8670 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -7,6 +7,7 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class MasterTest {
@@ -63,6 +64,7 @@ public class MasterTest {
"http://client/ClientService.json", "clientLocation", null);
ClientService clientS = client.getService();
connections.clientMap.put("http://client/ClientService.json", clientS);
+ connections.clientMap0.put("clientLocation", client.getNewService());
client.joinNetwork(master.getMasterInfo());
master.performWork();
System.out.println(state);
@@ -72,17 +74,20 @@ public class MasterTest {
}
@Test
+ @Ignore // Uses old services. Tested by functional test.
public void updateStateRequest() throws Exception {
Client client1 = new Client(
new State("ClientNetwork"), connections,
- "http://client/ClientService.json", "clientLocation", null);
+ "http://client/ClientService.json", "clientLocation2", null);
ClientService client1S = client1.getService();
connections.clientMap.put("http://client/ClientService.json", client1S);
+ connections.clientMap0.put("clientLocation1", client1.getNewService());
Client client2 = new Client(
new State("ClientNetwork"), connections,
- "http://client2/ClientService.json", "clientLocation", null);
+ "http://client2/ClientService.json", "clientLocation2", null);
ClientService client2S = client2.getService();
connections.clientMap.put("http://client2/ClientService.json", client2S);
+ connections.clientMap0.put("clientLocation2", client2.getNewService());
client1.joinNetwork(master.getMasterInfo());
client2.joinNetwork(master.getMasterInfo());
@@ -111,18 +116,16 @@ public class MasterTest {
Client client = new Client(
new State("ClientNetwork"), connections,
"http://client/ClientService.json", "clientLocation", null);
- ClientService clientS = client.getService();
- connections.clientMap.put("http://client/ClientService.json", clientS);
+ connections.clientMap0.put("clientLocation", client.getNewService());
client.joinNetwork(master.getMasterInfo());
master.performWork();
- assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
+ assertTrue(state.getList(".participants0").contains("clientLocation"));
- connections.clientMap.put("http://client/ClientService.json",
- new UnreachableClient());
+ connections.clientMap0.put("clientLocation", null);
masterS.updateStateRequest("NewState", "NewStateData", 0);
master.performWork();
- assertEquals("[]", state.getDataOf(".participants"));
+ assertEquals("[]", state.getDataOf(".participants0"));
}
}