diff options
Diffstat (limited to 'same/src')
-rw-r--r-- | same/src/main/java/com/orbekk/same/NewMaster.java | 71 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/NewMasterTest.java | 40 |
2 files changed, 109 insertions, 2 deletions
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java index 161586f..c2b2d9b 100644 --- a/same/src/main/java/com/orbekk/same/NewMaster.java +++ b/same/src/main/java/com/orbekk/same/NewMaster.java @@ -1,5 +1,7 @@ package com.orbekk.same; +import com.orbekk.same.State.Component; +import java.util.Collections; import java.util.List; import com.orbekk.util.WorkQueue; import org.slf4j.Logger; @@ -29,36 +31,101 @@ public class NewMaster { @Override public boolean updateStateRequest(String component, String newData, long revision) { - return false; + logger.info("updateStateRequest({}, {}, {})", + new Object[]{component, newData, revision}); + boolean updated = state.update(component, newData, revision + 1); + if (updated) { + updateStateRequestThread.add(component); + } + return updated; } @Override public void joinNetworkRequest(String clientUrl) { + logger.info("joinNetworkRequest({})", clientUrl); + List<String> participants = state.getList(".participants"); + sendFullStateThread.add(clientUrl); + if (!participants.contains(clientUrl)) { + participants.add(clientUrl); + synchronized (NewMaster.this) { + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + } + updateStateRequestThread.add(".participants"); + } else { + logger.warn("Client {} joining: Already part of network"); + } } }; WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { @Override protected void onChange() { List<String> pending = getAndClear(); + logger.info("updateStateRequestThread: Updated state: {}", + pending); for (String componentName : pending) { - logger.info("Component updated: {}", componentName); + Component component = state.getComponent(componentName); + List<String> participants = state.getList(".participants"); + broadcastNewComponents(participants, + Collections.singletonList(component)); } } }; + WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { + @Override protected void onChange() { + List<String> pending = getAndClear(); + logger.info("Sending full state to {}", pending); + final List<Component> components = state.getComponents(); + broadcastNewComponents(pending, components); + } + }; + void performWork() { + sendFullStateThread.performWork(); updateStateRequestThread.performWork(); } public void start() { + sendFullStateThread.start(); updateStateRequestThread.start(); } public void interrupt() { + sendFullStateThread.interrupt(); updateStateRequestThread.interrupt(); } public MasterService getService() { return serviceImpl; } + + private synchronized void removeParticipant(String url) { + List<String> participants = state.getList(".participants"); + if (participants.contains(url)) { + logger.info("removeParticipant({})", url); + participants.remove(url); + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + updateStateRequestThread.add(".participants"); + } + } + + private void broadcastNewComponents(List<String> destinations, + final List<State.Component> components) { + broadcaster.broadcast(destinations, new ServiceOperation() { + @Override public void run(String url) { + ClientService client = connections.getClient(url); + try { + for (Component c : components) { + client.setState(c.getName(), c.getData(), + c.getRevision()); + } + } catch (Exception e) { + logger.info("Client {} failed to receive state update.", url); + removeParticipant(url); + } + } + }); + } } diff --git a/same/src/test/java/com/orbekk/same/NewMasterTest.java b/same/src/test/java/com/orbekk/same/NewMasterTest.java index 098b141..273fb3c 100644 --- a/same/src/test/java/com/orbekk/same/NewMasterTest.java +++ b/same/src/test/java/com/orbekk/same/NewMasterTest.java @@ -43,6 +43,26 @@ public class NewMasterTest { } @Test + public void joinNetworkAddsClient() throws Exception { + masterS.joinNetworkRequest("http://clientUrl"); + List<String> participants = state.getList(".participants"); + assertTrue(participants.contains("http://clientUrl")); + } + + @Test + public void clientJoin() { + Client client = new Client( + new State("ClientNetwork"), connections, + "http://client/ClientService.json"); + ClientService clientS = client.getService(); + connections.clientMap.put("http://client/ClientService.json", clientS); + client.joinNetwork("http://master/MasterService.json"); + master.performWork(); + assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); + assertEquals(state, client.testGetState()); + } + + @Test public void updateStateRequest() throws Exception { Client client1 = new Client( new State("ClientNetwork"), connections, @@ -76,4 +96,24 @@ public class NewMasterTest { assertEquals(state, client1.testGetState()); assertEquals(state, client2.testGetState()); } + + @Test + public void masterRemovesParticipant() throws Exception { + Client client = new Client( + new State("ClientNetwork"), connections, + "http://client/ClientService.json"); + ClientService clientS = client.getService(); + connections.clientMap.put("http://client/ClientService.json", clientS); + client.joinNetwork("http://master/MasterService.json"); + master.performWork(); + assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); + + connections.clientMap.put("http://client/ClientService.json", + new UnreachableClient()); + masterS.updateStateRequest("NewState", "NewStateData", 0); + master.performWork(); + + assertEquals("[]", state.getDataOf(".participants")); + } + } |