summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 19:50:36 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 19:50:36 +0100
commit40cab510c223512a4f947414ae13659a7bcd607b (patch)
treef90f0b07fba6bd8d410e6ff7f2b7669dcbc8eb84
parent25d3d6d1ff6f57ad354f03a0b6cc4f1a897bb342 (diff)
Replace Master with new implementation.
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java190
-rw-r--r--same/src/main/java/com/orbekk/same/NewMaster.java131
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java6
-rw-r--r--same/src/test/java/com/orbekk/same/MasterServiceImplTest.java133
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java (renamed from same/src/test/java/com/orbekk/same/NewMasterTest.java)6
5 files changed, 85 insertions, 381 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index ed04b4d..77f7496 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,92 +1,116 @@
package com.orbekk.same;
-import java.util.ArrayList;
+import com.orbekk.same.State.Component;
+import java.util.Collections;
import java.util.List;
-
+import com.orbekk.util.WorkQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.orbekk.same.State.Component;
-
-public class Master implements MasterService, Runnable {
+public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
private final ConnectionManager connections;
private State state;
- private boolean stopped = false;
private Broadcaster broadcaster;
- private List<String> _fullStateReceivers = new ArrayList<String>();
- private Thread workerThread = null;
-
+
public static Master create(ConnectionManager connections,
Broadcaster broadcaster, String myUrl, String networkName) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
return new Master(state, connections, broadcaster);
}
-
- /** Constructor for internal use.
- */
+
Master(State initialState, ConnectionManager connections,
Broadcaster broadcaster) {
this.state = initialState;
this.connections = connections;
this.broadcaster = broadcaster;
}
-
- @Override
- public synchronized void joinNetworkRequest(String clientUrl) {
- logger.info("JoinNetworkRequest({})", clientUrl);
- List<String> participants = participants();
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
- _fullStateReceivers.add(clientUrl);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- notifyAll();
- } else {
- logger.warn("Client {} already part of network. " +
- "Ignoring participation request", clientUrl);
+
+ private MasterService serviceImpl = new MasterService() {
+ @Override
+ public boolean updateStateRequest(String component,
+ String newData, long revision) {
+ logger.info("updateStateRequest({}, {}, {})",
+ new Object[]{component, newData, revision});
+ boolean updated = state.update(component, newData, revision + 1);
+ if (updated) {
+ updateStateRequestThread.add(component);
+ }
+ return updated;
}
- }
-
- boolean _sendUpdatedComponents() {
- boolean worked = false;
- for (final Component component : state.getAndClearUpdatedComponents()) {
- logger.info("Broadcasting new component {}", component);
- broadcastNewComponents(participants(), listWrap(component));
- worked = true;
+
+ @Override
+ public void joinNetworkRequest(String clientUrl) {
+ logger.info("joinNetworkRequest({})", clientUrl);
+ List<String> participants = state.getList(".participants");
+ sendFullStateThread.add(clientUrl);
+ if (!participants.contains(clientUrl)) {
+ participants.add(clientUrl);
+ synchronized (Master.this) {
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ }
+ updateStateRequestThread.add(".participants");
+ } else {
+ logger.warn("Client {} joining: Already part of network");
+ }
+ }
+ };
+
+ WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("updateStateRequestThread: Updated state: {}",
+ pending);
+ for (String componentName : pending) {
+ Component component = state.getComponent(componentName);
+ List<String> participants = state.getList(".participants");
+ broadcastNewComponents(participants,
+ Collections.singletonList(component));
+ }
}
- return worked;
+ };
+
+ WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("Sending full state to {}", pending);
+ final List<Component> components = state.getComponents();
+ broadcastNewComponents(pending, components);
+ }
+ };
+
+ void performWork() {
+ sendFullStateThread.performWork();
+ updateStateRequestThread.performWork();
}
-
- private <T>List<T> listWrap(T o) {
- List<T> list = new ArrayList<T>();
- list.add(o);
- return list;
+
+ public void start() {
+ sendFullStateThread.start();
+ updateStateRequestThread.start();
}
-
- synchronized boolean _sendFullState() {
- boolean hasWork = _fullStateReceivers.size() != 0;
- if (hasWork) {
- logger.info("Sending full state to new participants.");
- final List<State.Component> components = state.getComponents();
- broadcastNewComponents(_fullStateReceivers, components);
- _fullStateReceivers.clear();
- }
- return hasWork;
+
+ public void interrupt() {
+ sendFullStateThread.interrupt();
+ updateStateRequestThread.interrupt();
+ }
+
+ public MasterService getService() {
+ return serviceImpl;
}
-
+
private synchronized void removeParticipant(String url) {
- List<String> participants = participants();
+ List<String> participants = state.getList(".participants");
if (participants.contains(url)) {
- logger.warn("RemoveParticipant({})", url);
+ logger.info("removeParticipant({})", url);
participants.remove(url);
state.updateFromObject(".participants", participants,
state.getRevision(".participants") + 1);
- notifyAll();
+ updateStateRequestThread.add(".participants");
}
}
-
+
private void broadcastNewComponents(List<String> destinations,
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {
@@ -104,60 +128,4 @@ public class Master implements MasterService, Runnable {
}
});
}
-
- private List<String> participants() {
- return state.getList(".participants");
- }
-
-
- @Override
- public synchronized boolean updateStateRequest(String component,
- String newData, long revision) {
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- notifyAll();
- }
- return updated;
- }
-
- boolean _performWork() {
- boolean worked = false;
- worked |= _sendUpdatedComponents();
- worked |= _sendFullState();
- return worked;
- }
-
- @Override
- public void run() {
- while (!stopped) {
- if (!_performWork()) {
- synchronized (this) {
- try {
- wait(500);
- } catch (InterruptedException e) {
- stopped = true;
- }
- }
- }
- if (Thread.interrupted()) {
- stopped = true;
- }
- }
- }
-
- public void start() {
- if (workerThread == null) {
- workerThread = new Thread(this);
- workerThread.start();
- logger.info("Master thread started. {}", state);
- }
- }
-
- public void join() throws InterruptedException {
- workerThread.join();
- }
-
- public void interrupt() {
- workerThread.interrupt();
- }
}
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java
deleted file mode 100644
index c2b2d9b..0000000
--- a/same/src/main/java/com/orbekk/same/NewMaster.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.orbekk.same;
-
-import com.orbekk.same.State.Component;
-import java.util.Collections;
-import java.util.List;
-import com.orbekk.util.WorkQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NewMaster {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private final ConnectionManager connections;
- private State state;
- private Broadcaster broadcaster;
-
- public static NewMaster create(ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String networkName) {
- State state = new State(networkName);
- state.update(".masterUrl", myUrl, 1);
- return new NewMaster(state, connections, broadcaster);
- }
-
- NewMaster(State initialState, ConnectionManager connections,
- Broadcaster broadcaster) {
- this.state = initialState;
- this.connections = connections;
- this.broadcaster = broadcaster;
- }
-
- private MasterService serviceImpl = new MasterService() {
- @Override
- public boolean updateStateRequest(String component,
- String newData, long revision) {
- logger.info("updateStateRequest({}, {}, {})",
- new Object[]{component, newData, revision});
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- updateStateRequestThread.add(component);
- }
- return updated;
- }
-
- @Override
- public void joinNetworkRequest(String clientUrl) {
- logger.info("joinNetworkRequest({})", clientUrl);
- List<String> participants = state.getList(".participants");
- sendFullStateThread.add(clientUrl);
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
- synchronized (NewMaster.this) {
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- }
- updateStateRequestThread.add(".participants");
- } else {
- logger.warn("Client {} joining: Already part of network");
- }
- }
- };
-
- WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("updateStateRequestThread: Updated state: {}",
- pending);
- for (String componentName : pending) {
- Component component = state.getComponent(componentName);
- List<String> participants = state.getList(".participants");
- broadcastNewComponents(participants,
- Collections.singletonList(component));
- }
- }
- };
-
- WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("Sending full state to {}", pending);
- final List<Component> components = state.getComponents();
- broadcastNewComponents(pending, components);
- }
- };
-
- void performWork() {
- sendFullStateThread.performWork();
- updateStateRequestThread.performWork();
- }
-
- public void start() {
- sendFullStateThread.start();
- updateStateRequestThread.start();
- }
-
- public void interrupt() {
- sendFullStateThread.interrupt();
- updateStateRequestThread.interrupt();
- }
-
- public MasterService getService() {
- return serviceImpl;
- }
-
- private synchronized void removeParticipant(String url) {
- List<String> participants = state.getList(".participants");
- if (participants.contains(url)) {
- logger.info("removeParticipant({})", url);
- participants.remove(url);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- updateStateRequestThread.add(".participants");
- }
- }
-
- private void broadcastNewComponents(List<String> destinations,
- final List<State.Component> components) {
- broadcaster.broadcast(destinations, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
- }
- });
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 7f7a5d0..8cc37fe 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -60,7 +60,7 @@ public class SameController {
ServerContainer server = new ServerBuilder(port)
.withServlet(new StateServlet(client.getInterface()), "/_/state")
.withService(client.getService(), ClientService.class)
- .withService(master, MasterService.class)
+ .withService(master.getService(), MasterService.class)
.withService(paxos, PaxosService.class)
.build();
@@ -116,12 +116,12 @@ public class SameController {
public void join() {
try {
server.join();
- master.join();
+ client.interrupt();
+ master.interrupt();
if (discoveryService != null) {
discoveryService.join();
}
} catch (InterruptedException e) {
- master.interrupt();
try {
server.stop();
} catch (Exception e1) {
diff --git a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java b/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java
deleted file mode 100644
index 0e7c81d..0000000
--- a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.orbekk.same;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-
-import org.codehaus.jackson.type.TypeReference;
-import org.junit.Test;
-import org.junit.Before;
-
-public class MasterServiceImplTest {
- private State state = new State("TestNetwork");
- private TestConnectionManager connections = new TestConnectionManager();
- private TestBroadcaster broadcaster = new TestBroadcaster();
- private Master master;
-
- public static class UnreachableClient implements ClientService {
- @Override
- public void notifyNetwork(String networkName, String masterUrl)
- throws Exception {
- throw new Exception("Unreachable client");
- }
-
- @Override
- public void setState(String component, String data, long revision)
- throws Exception {
- throw new Exception("Unreachable client");
- }
-
- @Override
- public void discoveryRequest(String remoteUrl) throws Exception {
- throw new Exception("Unreachable client");
- }
- }
-
- @Before
- public void setUp() {
- state.update(".masterUrl", "http://master/MasterService.json", 1);
- master = new Master(state, connections, broadcaster);
- connections.masterMap.put("http://master/MasterService.json", master);
- }
-
- @Test
- public void testJsonState() {
- List<String> participants =
- state.getParsedData(".participants",
- new TypeReference<List<String>>() { });
- assertEquals(participants.size(), 0);
- participants.add("http://SomeUrl/");
- state.updateFromObject(".participants", participants, 1);
- }
-
- @Test
- public void joinNetworkAddsClient() {
- master.joinNetworkRequest("http://clientUrl");
- List<String> participants = state.getList(".participants");
- assertTrue(participants.contains("http://clientUrl"));
- }
-
- @Test
- public void workLoopClearsUpdatedComponents() {
- state.update("Test", "Content", 0);
- assertTrue(master._performWork());
- assertTrue(state.getAndClearUpdatedComponents().isEmpty());
- }
-
- @Test
- public void clientJoin() {
- Client client = new Client(
- new State("ClientNetwork"), connections,
- "http://client/ClientService.json");
- ClientService clientS = client.getService();
- connections.clientMap.put("http://client/ClientService.json", clientS);
- client.joinNetwork("http://master/MasterService.json");
- assertTrue(master._performWork());
- assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
- assertEquals(state, client.testGetState());
- }
-
- @Test
- public void validStateRequest() {
- Client client1 = new Client(
- new State("ClientNetwork"), connections,
- "http://client/ClientService.json");
- ClientService client1S = client1.getService();
- connections.clientMap.put("http://client/ClientService.json", client1S);
- Client client2 = new Client(
- new State("ClientNetwork"), connections,
- "http://client2/ClientService.json");
- ClientService client2S = client2.getService();
- connections.clientMap.put("http://client2/ClientService.json", client2S);
-
- client1.joinNetwork("http://master/MasterService.json");
- client2.joinNetwork("http://master/MasterService.json");
-
- assertTrue(master._performWork());
- assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
- assertTrue(state.getList(".participants").contains("http://client2/ClientService.json"));
- assertEquals(state, client1.testGetState());
-
- assertTrue(master.updateStateRequest("A", "1", 0));
- assertTrue(master._performWork());
-
- assertEquals(state, client1.testGetState());
- assertEquals(state, client2.testGetState());
-
- assertFalse(master.updateStateRequest("A", "2", 0));
- assertTrue(master.updateStateRequest("A", "3", 1));
- assertTrue(master._performWork());
-
- assertEquals(state, client1.testGetState());
- assertEquals(state, client2.testGetState());
- }
-
- @Test
- public void masterRemovesParticipant() {
- Client client = new Client(
- new State("ClientNetwork"), connections,
- "http://client/ClientService.json");
- ClientService clientS = client.getService();
- connections.clientMap.put("http://client/ClientService.json", clientS);
- client.joinNetwork("http://master/MasterService.json");
- assertTrue(master._performWork());
- assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
-
- connections.clientMap.put("http://client/ClientService.json",
- new UnreachableClient());
- master.updateStateRequest("NewState", "NewStateData", 0);
- master._performWork();
-
- assertEquals("[]", state.getDataOf(".participants"));
- }
-}
diff --git a/same/src/test/java/com/orbekk/same/NewMasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 273fb3c..09ac6a1 100644
--- a/same/src/test/java/com/orbekk/same/NewMasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -7,11 +7,11 @@ import org.codehaus.jackson.type.TypeReference;
import org.junit.Before;
import org.junit.Test;
-public class NewMasterTest {
+public class MasterTest {
private State state = new State("TestNetwork");
private TestConnectionManager connections = new TestConnectionManager();
private TestBroadcaster broadcaster = new TestBroadcaster();
- private NewMaster master;
+ private Master master;
private MasterService masterS;
public static class UnreachableClient implements ClientService {
@@ -36,7 +36,7 @@ public class NewMasterTest {
@Before
public void setUp() {
state.update(".masterUrl", "http://master/MasterService.json", 1);
- master = new NewMaster(state, connections, broadcaster);
+ master = new Master(state, connections, broadcaster);
masterS = master.getService();
connections.masterMap.put("http://master/MasterService.json",
masterS);