summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/src/main/java/com/orbekk/same/NewMaster.java71
-rw-r--r--same/src/test/java/com/orbekk/same/NewMasterTest.java40
2 files changed, 109 insertions, 2 deletions
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java
index 161586f..c2b2d9b 100644
--- a/same/src/main/java/com/orbekk/same/NewMaster.java
+++ b/same/src/main/java/com/orbekk/same/NewMaster.java
@@ -1,5 +1,7 @@
package com.orbekk.same;
+import com.orbekk.same.State.Component;
+import java.util.Collections;
import java.util.List;
import com.orbekk.util.WorkQueue;
import org.slf4j.Logger;
@@ -29,36 +31,101 @@ public class NewMaster {
@Override
public boolean updateStateRequest(String component,
String newData, long revision) {
- return false;
+ logger.info("updateStateRequest({}, {}, {})",
+ new Object[]{component, newData, revision});
+ boolean updated = state.update(component, newData, revision + 1);
+ if (updated) {
+ updateStateRequestThread.add(component);
+ }
+ return updated;
}
@Override
public void joinNetworkRequest(String clientUrl) {
+ logger.info("joinNetworkRequest({})", clientUrl);
+ List<String> participants = state.getList(".participants");
+ sendFullStateThread.add(clientUrl);
+ if (!participants.contains(clientUrl)) {
+ participants.add(clientUrl);
+ synchronized (NewMaster.this) {
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ }
+ updateStateRequestThread.add(".participants");
+ } else {
+ logger.warn("Client {} joining: Already part of network");
+ }
}
};
WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
@Override protected void onChange() {
List<String> pending = getAndClear();
+ logger.info("updateStateRequestThread: Updated state: {}",
+ pending);
for (String componentName : pending) {
- logger.info("Component updated: {}", componentName);
+ Component component = state.getComponent(componentName);
+ List<String> participants = state.getList(".participants");
+ broadcastNewComponents(participants,
+ Collections.singletonList(component));
}
}
};
+ WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("Sending full state to {}", pending);
+ final List<Component> components = state.getComponents();
+ broadcastNewComponents(pending, components);
+ }
+ };
+
void performWork() {
+ sendFullStateThread.performWork();
updateStateRequestThread.performWork();
}
public void start() {
+ sendFullStateThread.start();
updateStateRequestThread.start();
}
public void interrupt() {
+ sendFullStateThread.interrupt();
updateStateRequestThread.interrupt();
}
public MasterService getService() {
return serviceImpl;
}
+
+ private synchronized void removeParticipant(String url) {
+ List<String> participants = state.getList(".participants");
+ if (participants.contains(url)) {
+ logger.info("removeParticipant({})", url);
+ participants.remove(url);
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ updateStateRequestThread.add(".participants");
+ }
+ }
+
+ private void broadcastNewComponents(List<String> destinations,
+ final List<State.Component> components) {
+ broadcaster.broadcast(destinations, new ServiceOperation() {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
+ try {
+ for (Component c : components) {
+ client.setState(c.getName(), c.getData(),
+ c.getRevision());
+ }
+ } catch (Exception e) {
+ logger.info("Client {} failed to receive state update.", url);
+ removeParticipant(url);
+ }
+ }
+ });
+ }
}
diff --git a/same/src/test/java/com/orbekk/same/NewMasterTest.java b/same/src/test/java/com/orbekk/same/NewMasterTest.java
index 098b141..273fb3c 100644
--- a/same/src/test/java/com/orbekk/same/NewMasterTest.java
+++ b/same/src/test/java/com/orbekk/same/NewMasterTest.java
@@ -43,6 +43,26 @@ public class NewMasterTest {
}
@Test
+ public void joinNetworkAddsClient() throws Exception {
+ masterS.joinNetworkRequest("http://clientUrl");
+ List<String> participants = state.getList(".participants");
+ assertTrue(participants.contains("http://clientUrl"));
+ }
+
+ @Test
+ public void clientJoin() {
+ Client client = new Client(
+ new State("ClientNetwork"), connections,
+ "http://client/ClientService.json");
+ ClientService clientS = client.getService();
+ connections.clientMap.put("http://client/ClientService.json", clientS);
+ client.joinNetwork("http://master/MasterService.json");
+ master.performWork();
+ assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
+ assertEquals(state, client.testGetState());
+ }
+
+ @Test
public void updateStateRequest() throws Exception {
Client client1 = new Client(
new State("ClientNetwork"), connections,
@@ -76,4 +96,24 @@ public class NewMasterTest {
assertEquals(state, client1.testGetState());
assertEquals(state, client2.testGetState());
}
+
+ @Test
+ public void masterRemovesParticipant() throws Exception {
+ Client client = new Client(
+ new State("ClientNetwork"), connections,
+ "http://client/ClientService.json");
+ ClientService clientS = client.getService();
+ connections.clientMap.put("http://client/ClientService.json", clientS);
+ client.joinNetwork("http://master/MasterService.json");
+ master.performWork();
+ assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
+
+ connections.clientMap.put("http://client/ClientService.json",
+ new UnreachableClient());
+ masterS.updateStateRequest("NewState", "NewStateData", 0);
+ master.performWork();
+
+ assertEquals("[]", state.getDataOf(".participants"));
+ }
+
}