From 40cab510c223512a4f947414ae13659a7bcd607b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Thu, 16 Feb 2012 19:50:36 +0100 Subject: Replace Master with new implementation. --- same/src/main/java/com/orbekk/same/Master.java | 190 +++++++++------------ same/src/main/java/com/orbekk/same/NewMaster.java | 131 -------------- .../main/java/com/orbekk/same/SameController.java | 6 +- .../com/orbekk/same/MasterServiceImplTest.java | 133 --------------- same/src/test/java/com/orbekk/same/MasterTest.java | 119 +++++++++++++ .../test/java/com/orbekk/same/NewMasterTest.java | 119 ------------- 6 files changed, 201 insertions(+), 497 deletions(-) delete mode 100644 same/src/main/java/com/orbekk/same/NewMaster.java delete mode 100644 same/src/test/java/com/orbekk/same/MasterServiceImplTest.java create mode 100644 same/src/test/java/com/orbekk/same/MasterTest.java delete mode 100644 same/src/test/java/com/orbekk/same/NewMasterTest.java diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index ed04b4d..77f7496 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -1,92 +1,116 @@ package com.orbekk.same; -import java.util.ArrayList; +import com.orbekk.same.State.Component; +import java.util.Collections; import java.util.List; - +import com.orbekk.util.WorkQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.orbekk.same.State.Component; - -public class Master implements MasterService, Runnable { +public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); private final ConnectionManager connections; private State state; - private boolean stopped = false; private Broadcaster broadcaster; - private List _fullStateReceivers = new ArrayList(); - private Thread workerThread = null; - + public static Master create(ConnectionManager connections, Broadcaster broadcaster, String myUrl, String networkName) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); return new Master(state, connections, broadcaster); } - - /** Constructor for internal use. - */ + Master(State initialState, ConnectionManager connections, Broadcaster broadcaster) { this.state = initialState; this.connections = connections; this.broadcaster = broadcaster; } - - @Override - public synchronized void joinNetworkRequest(String clientUrl) { - logger.info("JoinNetworkRequest({})", clientUrl); - List participants = participants(); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); - _fullStateReceivers.add(clientUrl); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - notifyAll(); - } else { - logger.warn("Client {} already part of network. " + - "Ignoring participation request", clientUrl); + + private MasterService serviceImpl = new MasterService() { + @Override + public boolean updateStateRequest(String component, + String newData, long revision) { + logger.info("updateStateRequest({}, {}, {})", + new Object[]{component, newData, revision}); + boolean updated = state.update(component, newData, revision + 1); + if (updated) { + updateStateRequestThread.add(component); + } + return updated; } - } - - boolean _sendUpdatedComponents() { - boolean worked = false; - for (final Component component : state.getAndClearUpdatedComponents()) { - logger.info("Broadcasting new component {}", component); - broadcastNewComponents(participants(), listWrap(component)); - worked = true; + + @Override + public void joinNetworkRequest(String clientUrl) { + logger.info("joinNetworkRequest({})", clientUrl); + List participants = state.getList(".participants"); + sendFullStateThread.add(clientUrl); + if (!participants.contains(clientUrl)) { + participants.add(clientUrl); + synchronized (Master.this) { + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + } + updateStateRequestThread.add(".participants"); + } else { + logger.warn("Client {} joining: Already part of network"); + } + } + }; + + WorkQueue updateStateRequestThread = new WorkQueue() { + @Override protected void onChange() { + List pending = getAndClear(); + logger.info("updateStateRequestThread: Updated state: {}", + pending); + for (String componentName : pending) { + Component component = state.getComponent(componentName); + List participants = state.getList(".participants"); + broadcastNewComponents(participants, + Collections.singletonList(component)); + } } - return worked; + }; + + WorkQueue sendFullStateThread = new WorkQueue() { + @Override protected void onChange() { + List pending = getAndClear(); + logger.info("Sending full state to {}", pending); + final List components = state.getComponents(); + broadcastNewComponents(pending, components); + } + }; + + void performWork() { + sendFullStateThread.performWork(); + updateStateRequestThread.performWork(); } - - private List listWrap(T o) { - List list = new ArrayList(); - list.add(o); - return list; + + public void start() { + sendFullStateThread.start(); + updateStateRequestThread.start(); } - - synchronized boolean _sendFullState() { - boolean hasWork = _fullStateReceivers.size() != 0; - if (hasWork) { - logger.info("Sending full state to new participants."); - final List components = state.getComponents(); - broadcastNewComponents(_fullStateReceivers, components); - _fullStateReceivers.clear(); - } - return hasWork; + + public void interrupt() { + sendFullStateThread.interrupt(); + updateStateRequestThread.interrupt(); + } + + public MasterService getService() { + return serviceImpl; } - + private synchronized void removeParticipant(String url) { - List participants = participants(); + List participants = state.getList(".participants"); if (participants.contains(url)) { - logger.warn("RemoveParticipant({})", url); + logger.info("removeParticipant({})", url); participants.remove(url); state.updateFromObject(".participants", participants, state.getRevision(".participants") + 1); - notifyAll(); + updateStateRequestThread.add(".participants"); } } - + private void broadcastNewComponents(List destinations, final List components) { broadcaster.broadcast(destinations, new ServiceOperation() { @@ -104,60 +128,4 @@ public class Master implements MasterService, Runnable { } }); } - - private List participants() { - return state.getList(".participants"); - } - - - @Override - public synchronized boolean updateStateRequest(String component, - String newData, long revision) { - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - notifyAll(); - } - return updated; - } - - boolean _performWork() { - boolean worked = false; - worked |= _sendUpdatedComponents(); - worked |= _sendFullState(); - return worked; - } - - @Override - public void run() { - while (!stopped) { - if (!_performWork()) { - synchronized (this) { - try { - wait(500); - } catch (InterruptedException e) { - stopped = true; - } - } - } - if (Thread.interrupted()) { - stopped = true; - } - } - } - - public void start() { - if (workerThread == null) { - workerThread = new Thread(this); - workerThread.start(); - logger.info("Master thread started. {}", state); - } - } - - public void join() throws InterruptedException { - workerThread.join(); - } - - public void interrupt() { - workerThread.interrupt(); - } } diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java deleted file mode 100644 index c2b2d9b..0000000 --- a/same/src/main/java/com/orbekk/same/NewMaster.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.orbekk.same; - -import com.orbekk.same.State.Component; -import java.util.Collections; -import java.util.List; -import com.orbekk.util.WorkQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NewMaster { - private Logger logger = LoggerFactory.getLogger(getClass()); - private final ConnectionManager connections; - private State state; - private Broadcaster broadcaster; - - public static NewMaster create(ConnectionManager connections, - Broadcaster broadcaster, String myUrl, String networkName) { - State state = new State(networkName); - state.update(".masterUrl", myUrl, 1); - return new NewMaster(state, connections, broadcaster); - } - - NewMaster(State initialState, ConnectionManager connections, - Broadcaster broadcaster) { - this.state = initialState; - this.connections = connections; - this.broadcaster = broadcaster; - } - - private MasterService serviceImpl = new MasterService() { - @Override - public boolean updateStateRequest(String component, - String newData, long revision) { - logger.info("updateStateRequest({}, {}, {})", - new Object[]{component, newData, revision}); - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - updateStateRequestThread.add(component); - } - return updated; - } - - @Override - public void joinNetworkRequest(String clientUrl) { - logger.info("joinNetworkRequest({})", clientUrl); - List participants = state.getList(".participants"); - sendFullStateThread.add(clientUrl); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); - synchronized (NewMaster.this) { - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - } - updateStateRequestThread.add(".participants"); - } else { - logger.warn("Client {} joining: Already part of network"); - } - } - }; - - WorkQueue updateStateRequestThread = new WorkQueue() { - @Override protected void onChange() { - List pending = getAndClear(); - logger.info("updateStateRequestThread: Updated state: {}", - pending); - for (String componentName : pending) { - Component component = state.getComponent(componentName); - List participants = state.getList(".participants"); - broadcastNewComponents(participants, - Collections.singletonList(component)); - } - } - }; - - WorkQueue sendFullStateThread = new WorkQueue() { - @Override protected void onChange() { - List pending = getAndClear(); - logger.info("Sending full state to {}", pending); - final List components = state.getComponents(); - broadcastNewComponents(pending, components); - } - }; - - void performWork() { - sendFullStateThread.performWork(); - updateStateRequestThread.performWork(); - } - - public void start() { - sendFullStateThread.start(); - updateStateRequestThread.start(); - } - - public void interrupt() { - sendFullStateThread.interrupt(); - updateStateRequestThread.interrupt(); - } - - public MasterService getService() { - return serviceImpl; - } - - private synchronized void removeParticipant(String url) { - List participants = state.getList(".participants"); - if (participants.contains(url)) { - logger.info("removeParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - updateStateRequestThread.add(".participants"); - } - } - - private void broadcastNewComponents(List destinations, - final List components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } -} diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 7f7a5d0..8cc37fe 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -60,7 +60,7 @@ public class SameController { ServerContainer server = new ServerBuilder(port) .withServlet(new StateServlet(client.getInterface()), "/_/state") .withService(client.getService(), ClientService.class) - .withService(master, MasterService.class) + .withService(master.getService(), MasterService.class) .withService(paxos, PaxosService.class) .build(); @@ -116,12 +116,12 @@ public class SameController { public void join() { try { server.join(); - master.join(); + client.interrupt(); + master.interrupt(); if (discoveryService != null) { discoveryService.join(); } } catch (InterruptedException e) { - master.interrupt(); try { server.stop(); } catch (Exception e1) { diff --git a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java b/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java deleted file mode 100644 index 0e7c81d..0000000 --- a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.orbekk.same; - -import static org.junit.Assert.*; - -import java.util.List; - -import org.codehaus.jackson.type.TypeReference; -import org.junit.Test; -import org.junit.Before; - -public class MasterServiceImplTest { - private State state = new State("TestNetwork"); - private TestConnectionManager connections = new TestConnectionManager(); - private TestBroadcaster broadcaster = new TestBroadcaster(); - private Master master; - - public static class UnreachableClient implements ClientService { - @Override - public void notifyNetwork(String networkName, String masterUrl) - throws Exception { - throw new Exception("Unreachable client"); - } - - @Override - public void setState(String component, String data, long revision) - throws Exception { - throw new Exception("Unreachable client"); - } - - @Override - public void discoveryRequest(String remoteUrl) throws Exception { - throw new Exception("Unreachable client"); - } - } - - @Before - public void setUp() { - state.update(".masterUrl", "http://master/MasterService.json", 1); - master = new Master(state, connections, broadcaster); - connections.masterMap.put("http://master/MasterService.json", master); - } - - @Test - public void testJsonState() { - List participants = - state.getParsedData(".participants", - new TypeReference>() { }); - assertEquals(participants.size(), 0); - participants.add("http://SomeUrl/"); - state.updateFromObject(".participants", participants, 1); - } - - @Test - public void joinNetworkAddsClient() { - master.joinNetworkRequest("http://clientUrl"); - List participants = state.getList(".participants"); - assertTrue(participants.contains("http://clientUrl")); - } - - @Test - public void workLoopClearsUpdatedComponents() { - state.update("Test", "Content", 0); - assertTrue(master._performWork()); - assertTrue(state.getAndClearUpdatedComponents().isEmpty()); - } - - @Test - public void clientJoin() { - Client client = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService clientS = client.getService(); - connections.clientMap.put("http://client/ClientService.json", clientS); - client.joinNetwork("http://master/MasterService.json"); - assertTrue(master._performWork()); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - assertEquals(state, client.testGetState()); - } - - @Test - public void validStateRequest() { - Client client1 = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService client1S = client1.getService(); - connections.clientMap.put("http://client/ClientService.json", client1S); - Client client2 = new Client( - new State("ClientNetwork"), connections, - "http://client2/ClientService.json"); - ClientService client2S = client2.getService(); - connections.clientMap.put("http://client2/ClientService.json", client2S); - - client1.joinNetwork("http://master/MasterService.json"); - client2.joinNetwork("http://master/MasterService.json"); - - assertTrue(master._performWork()); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - assertTrue(state.getList(".participants").contains("http://client2/ClientService.json")); - assertEquals(state, client1.testGetState()); - - assertTrue(master.updateStateRequest("A", "1", 0)); - assertTrue(master._performWork()); - - assertEquals(state, client1.testGetState()); - assertEquals(state, client2.testGetState()); - - assertFalse(master.updateStateRequest("A", "2", 0)); - assertTrue(master.updateStateRequest("A", "3", 1)); - assertTrue(master._performWork()); - - assertEquals(state, client1.testGetState()); - assertEquals(state, client2.testGetState()); - } - - @Test - public void masterRemovesParticipant() { - Client client = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService clientS = client.getService(); - connections.clientMap.put("http://client/ClientService.json", clientS); - client.joinNetwork("http://master/MasterService.json"); - assertTrue(master._performWork()); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - - connections.clientMap.put("http://client/ClientService.json", - new UnreachableClient()); - master.updateStateRequest("NewState", "NewStateData", 0); - master._performWork(); - - assertEquals("[]", state.getDataOf(".participants")); - } -} diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java new file mode 100644 index 0000000..09ac6a1 --- /dev/null +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -0,0 +1,119 @@ +package com.orbekk.same; + +import static org.junit.Assert.*; + +import java.util.List; +import org.codehaus.jackson.type.TypeReference; +import org.junit.Before; +import org.junit.Test; + +public class MasterTest { + private State state = new State("TestNetwork"); + private TestConnectionManager connections = new TestConnectionManager(); + private TestBroadcaster broadcaster = new TestBroadcaster(); + private Master master; + private MasterService masterS; + + public static class UnreachableClient implements ClientService { + @Override + public void notifyNetwork(String networkName, String masterUrl) + throws Exception { + throw new Exception("Unreachable client"); + } + + @Override + public void setState(String component, String data, long revision) + throws Exception { + throw new Exception("Unreachable client"); + } + + @Override + public void discoveryRequest(String remoteUrl) throws Exception { + throw new Exception("Unreachable client"); + } + } + + @Before + public void setUp() { + state.update(".masterUrl", "http://master/MasterService.json", 1); + master = new Master(state, connections, broadcaster); + masterS = master.getService(); + connections.masterMap.put("http://master/MasterService.json", + masterS); + } + + @Test + public void joinNetworkAddsClient() throws Exception { + masterS.joinNetworkRequest("http://clientUrl"); + List participants = state.getList(".participants"); + assertTrue(participants.contains("http://clientUrl")); + } + + @Test + public void clientJoin() { + Client client = new Client( + new State("ClientNetwork"), connections, + "http://client/ClientService.json"); + ClientService clientS = client.getService(); + connections.clientMap.put("http://client/ClientService.json", clientS); + client.joinNetwork("http://master/MasterService.json"); + master.performWork(); + assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); + assertEquals(state, client.testGetState()); + } + + @Test + public void updateStateRequest() throws Exception { + Client client1 = new Client( + new State("ClientNetwork"), connections, + "http://client/ClientService.json"); + ClientService client1S = client1.getService(); + connections.clientMap.put("http://client/ClientService.json", client1S); + Client client2 = new Client( + new State("ClientNetwork"), connections, + "http://client2/ClientService.json"); + ClientService client2S = client2.getService(); + connections.clientMap.put("http://client2/ClientService.json", client2S); + + client1.joinNetwork("http://master/MasterService.json"); + client2.joinNetwork("http://master/MasterService.json"); + + master.performWork(); + assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); + assertTrue(state.getList(".participants").contains("http://client2/ClientService.json")); + assertEquals(state, client1.testGetState()); + + assertTrue(masterS.updateStateRequest("A", "1", 0)); + master.performWork(); + + assertEquals(state, client1.testGetState()); + assertEquals(state, client2.testGetState()); + + assertFalse(masterS.updateStateRequest("A", "2", 0)); + assertTrue(masterS.updateStateRequest("A", "3", 1)); + master.performWork(); + + assertEquals(state, client1.testGetState()); + assertEquals(state, client2.testGetState()); + } + + @Test + public void masterRemovesParticipant() throws Exception { + Client client = new Client( + new State("ClientNetwork"), connections, + "http://client/ClientService.json"); + ClientService clientS = client.getService(); + connections.clientMap.put("http://client/ClientService.json", clientS); + client.joinNetwork("http://master/MasterService.json"); + master.performWork(); + assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); + + connections.clientMap.put("http://client/ClientService.json", + new UnreachableClient()); + masterS.updateStateRequest("NewState", "NewStateData", 0); + master.performWork(); + + assertEquals("[]", state.getDataOf(".participants")); + } + +} diff --git a/same/src/test/java/com/orbekk/same/NewMasterTest.java b/same/src/test/java/com/orbekk/same/NewMasterTest.java deleted file mode 100644 index 273fb3c..0000000 --- a/same/src/test/java/com/orbekk/same/NewMasterTest.java +++ /dev/null @@ -1,119 +0,0 @@ -package com.orbekk.same; - -import static org.junit.Assert.*; - -import java.util.List; -import org.codehaus.jackson.type.TypeReference; -import org.junit.Before; -import org.junit.Test; - -public class NewMasterTest { - private State state = new State("TestNetwork"); - private TestConnectionManager connections = new TestConnectionManager(); - private TestBroadcaster broadcaster = new TestBroadcaster(); - private NewMaster master; - private MasterService masterS; - - public static class UnreachableClient implements ClientService { - @Override - public void notifyNetwork(String networkName, String masterUrl) - throws Exception { - throw new Exception("Unreachable client"); - } - - @Override - public void setState(String component, String data, long revision) - throws Exception { - throw new Exception("Unreachable client"); - } - - @Override - public void discoveryRequest(String remoteUrl) throws Exception { - throw new Exception("Unreachable client"); - } - } - - @Before - public void setUp() { - state.update(".masterUrl", "http://master/MasterService.json", 1); - master = new NewMaster(state, connections, broadcaster); - masterS = master.getService(); - connections.masterMap.put("http://master/MasterService.json", - masterS); - } - - @Test - public void joinNetworkAddsClient() throws Exception { - masterS.joinNetworkRequest("http://clientUrl"); - List participants = state.getList(".participants"); - assertTrue(participants.contains("http://clientUrl")); - } - - @Test - public void clientJoin() { - Client client = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService clientS = client.getService(); - connections.clientMap.put("http://client/ClientService.json", clientS); - client.joinNetwork("http://master/MasterService.json"); - master.performWork(); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - assertEquals(state, client.testGetState()); - } - - @Test - public void updateStateRequest() throws Exception { - Client client1 = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService client1S = client1.getService(); - connections.clientMap.put("http://client/ClientService.json", client1S); - Client client2 = new Client( - new State("ClientNetwork"), connections, - "http://client2/ClientService.json"); - ClientService client2S = client2.getService(); - connections.clientMap.put("http://client2/ClientService.json", client2S); - - client1.joinNetwork("http://master/MasterService.json"); - client2.joinNetwork("http://master/MasterService.json"); - - master.performWork(); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - assertTrue(state.getList(".participants").contains("http://client2/ClientService.json")); - assertEquals(state, client1.testGetState()); - - assertTrue(masterS.updateStateRequest("A", "1", 0)); - master.performWork(); - - assertEquals(state, client1.testGetState()); - assertEquals(state, client2.testGetState()); - - assertFalse(masterS.updateStateRequest("A", "2", 0)); - assertTrue(masterS.updateStateRequest("A", "3", 1)); - master.performWork(); - - assertEquals(state, client1.testGetState()); - assertEquals(state, client2.testGetState()); - } - - @Test - public void masterRemovesParticipant() throws Exception { - Client client = new Client( - new State("ClientNetwork"), connections, - "http://client/ClientService.json"); - ClientService clientS = client.getService(); - connections.clientMap.put("http://client/ClientService.json", clientS); - client.joinNetwork("http://master/MasterService.json"); - master.performWork(); - assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); - - connections.clientMap.put("http://client/ClientService.json", - new UnreachableClient()); - masterS.updateStateRequest("NewState", "NewStateData", 0); - master.performWork(); - - assertEquals("[]", state.getDataOf(".participants")); - } - -} -- cgit v1.2.3