summaryrefslogtreecommitdiff
path: root/same/src/main/java
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 19:50:36 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 19:50:36 +0100
commit40cab510c223512a4f947414ae13659a7bcd607b (patch)
treef90f0b07fba6bd8d410e6ff7f2b7669dcbc8eb84 /same/src/main/java
parent25d3d6d1ff6f57ad354f03a0b6cc4f1a897bb342 (diff)
Replace Master with new implementation.
Diffstat (limited to 'same/src/main/java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java190
-rw-r--r--same/src/main/java/com/orbekk/same/NewMaster.java131
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java6
3 files changed, 82 insertions, 245 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index ed04b4d..77f7496 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,92 +1,116 @@
package com.orbekk.same;
-import java.util.ArrayList;
+import com.orbekk.same.State.Component;
+import java.util.Collections;
import java.util.List;
-
+import com.orbekk.util.WorkQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.orbekk.same.State.Component;
-
-public class Master implements MasterService, Runnable {
+public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
private final ConnectionManager connections;
private State state;
- private boolean stopped = false;
private Broadcaster broadcaster;
- private List<String> _fullStateReceivers = new ArrayList<String>();
- private Thread workerThread = null;
-
+
public static Master create(ConnectionManager connections,
Broadcaster broadcaster, String myUrl, String networkName) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
return new Master(state, connections, broadcaster);
}
-
- /** Constructor for internal use.
- */
+
Master(State initialState, ConnectionManager connections,
Broadcaster broadcaster) {
this.state = initialState;
this.connections = connections;
this.broadcaster = broadcaster;
}
-
- @Override
- public synchronized void joinNetworkRequest(String clientUrl) {
- logger.info("JoinNetworkRequest({})", clientUrl);
- List<String> participants = participants();
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
- _fullStateReceivers.add(clientUrl);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- notifyAll();
- } else {
- logger.warn("Client {} already part of network. " +
- "Ignoring participation request", clientUrl);
+
+ private MasterService serviceImpl = new MasterService() {
+ @Override
+ public boolean updateStateRequest(String component,
+ String newData, long revision) {
+ logger.info("updateStateRequest({}, {}, {})",
+ new Object[]{component, newData, revision});
+ boolean updated = state.update(component, newData, revision + 1);
+ if (updated) {
+ updateStateRequestThread.add(component);
+ }
+ return updated;
}
- }
-
- boolean _sendUpdatedComponents() {
- boolean worked = false;
- for (final Component component : state.getAndClearUpdatedComponents()) {
- logger.info("Broadcasting new component {}", component);
- broadcastNewComponents(participants(), listWrap(component));
- worked = true;
+
+ @Override
+ public void joinNetworkRequest(String clientUrl) {
+ logger.info("joinNetworkRequest({})", clientUrl);
+ List<String> participants = state.getList(".participants");
+ sendFullStateThread.add(clientUrl);
+ if (!participants.contains(clientUrl)) {
+ participants.add(clientUrl);
+ synchronized (Master.this) {
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ }
+ updateStateRequestThread.add(".participants");
+ } else {
+ logger.warn("Client {} joining: Already part of network");
+ }
+ }
+ };
+
+ WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("updateStateRequestThread: Updated state: {}",
+ pending);
+ for (String componentName : pending) {
+ Component component = state.getComponent(componentName);
+ List<String> participants = state.getList(".participants");
+ broadcastNewComponents(participants,
+ Collections.singletonList(component));
+ }
}
- return worked;
+ };
+
+ WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("Sending full state to {}", pending);
+ final List<Component> components = state.getComponents();
+ broadcastNewComponents(pending, components);
+ }
+ };
+
+ void performWork() {
+ sendFullStateThread.performWork();
+ updateStateRequestThread.performWork();
}
-
- private <T>List<T> listWrap(T o) {
- List<T> list = new ArrayList<T>();
- list.add(o);
- return list;
+
+ public void start() {
+ sendFullStateThread.start();
+ updateStateRequestThread.start();
}
-
- synchronized boolean _sendFullState() {
- boolean hasWork = _fullStateReceivers.size() != 0;
- if (hasWork) {
- logger.info("Sending full state to new participants.");
- final List<State.Component> components = state.getComponents();
- broadcastNewComponents(_fullStateReceivers, components);
- _fullStateReceivers.clear();
- }
- return hasWork;
+
+ public void interrupt() {
+ sendFullStateThread.interrupt();
+ updateStateRequestThread.interrupt();
+ }
+
+ public MasterService getService() {
+ return serviceImpl;
}
-
+
private synchronized void removeParticipant(String url) {
- List<String> participants = participants();
+ List<String> participants = state.getList(".participants");
if (participants.contains(url)) {
- logger.warn("RemoveParticipant({})", url);
+ logger.info("removeParticipant({})", url);
participants.remove(url);
state.updateFromObject(".participants", participants,
state.getRevision(".participants") + 1);
- notifyAll();
+ updateStateRequestThread.add(".participants");
}
}
-
+
private void broadcastNewComponents(List<String> destinations,
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {
@@ -104,60 +128,4 @@ public class Master implements MasterService, Runnable {
}
});
}
-
- private List<String> participants() {
- return state.getList(".participants");
- }
-
-
- @Override
- public synchronized boolean updateStateRequest(String component,
- String newData, long revision) {
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- notifyAll();
- }
- return updated;
- }
-
- boolean _performWork() {
- boolean worked = false;
- worked |= _sendUpdatedComponents();
- worked |= _sendFullState();
- return worked;
- }
-
- @Override
- public void run() {
- while (!stopped) {
- if (!_performWork()) {
- synchronized (this) {
- try {
- wait(500);
- } catch (InterruptedException e) {
- stopped = true;
- }
- }
- }
- if (Thread.interrupted()) {
- stopped = true;
- }
- }
- }
-
- public void start() {
- if (workerThread == null) {
- workerThread = new Thread(this);
- workerThread.start();
- logger.info("Master thread started. {}", state);
- }
- }
-
- public void join() throws InterruptedException {
- workerThread.join();
- }
-
- public void interrupt() {
- workerThread.interrupt();
- }
}
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java
deleted file mode 100644
index c2b2d9b..0000000
--- a/same/src/main/java/com/orbekk/same/NewMaster.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.orbekk.same;
-
-import com.orbekk.same.State.Component;
-import java.util.Collections;
-import java.util.List;
-import com.orbekk.util.WorkQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NewMaster {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private final ConnectionManager connections;
- private State state;
- private Broadcaster broadcaster;
-
- public static NewMaster create(ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String networkName) {
- State state = new State(networkName);
- state.update(".masterUrl", myUrl, 1);
- return new NewMaster(state, connections, broadcaster);
- }
-
- NewMaster(State initialState, ConnectionManager connections,
- Broadcaster broadcaster) {
- this.state = initialState;
- this.connections = connections;
- this.broadcaster = broadcaster;
- }
-
- private MasterService serviceImpl = new MasterService() {
- @Override
- public boolean updateStateRequest(String component,
- String newData, long revision) {
- logger.info("updateStateRequest({}, {}, {})",
- new Object[]{component, newData, revision});
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- updateStateRequestThread.add(component);
- }
- return updated;
- }
-
- @Override
- public void joinNetworkRequest(String clientUrl) {
- logger.info("joinNetworkRequest({})", clientUrl);
- List<String> participants = state.getList(".participants");
- sendFullStateThread.add(clientUrl);
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
- synchronized (NewMaster.this) {
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- }
- updateStateRequestThread.add(".participants");
- } else {
- logger.warn("Client {} joining: Already part of network");
- }
- }
- };
-
- WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("updateStateRequestThread: Updated state: {}",
- pending);
- for (String componentName : pending) {
- Component component = state.getComponent(componentName);
- List<String> participants = state.getList(".participants");
- broadcastNewComponents(participants,
- Collections.singletonList(component));
- }
- }
- };
-
- WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("Sending full state to {}", pending);
- final List<Component> components = state.getComponents();
- broadcastNewComponents(pending, components);
- }
- };
-
- void performWork() {
- sendFullStateThread.performWork();
- updateStateRequestThread.performWork();
- }
-
- public void start() {
- sendFullStateThread.start();
- updateStateRequestThread.start();
- }
-
- public void interrupt() {
- sendFullStateThread.interrupt();
- updateStateRequestThread.interrupt();
- }
-
- public MasterService getService() {
- return serviceImpl;
- }
-
- private synchronized void removeParticipant(String url) {
- List<String> participants = state.getList(".participants");
- if (participants.contains(url)) {
- logger.info("removeParticipant({})", url);
- participants.remove(url);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- updateStateRequestThread.add(".participants");
- }
- }
-
- private void broadcastNewComponents(List<String> destinations,
- final List<State.Component> components) {
- broadcaster.broadcast(destinations, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
- }
- });
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 7f7a5d0..8cc37fe 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -60,7 +60,7 @@ public class SameController {
ServerContainer server = new ServerBuilder(port)
.withServlet(new StateServlet(client.getInterface()), "/_/state")
.withService(client.getService(), ClientService.class)
- .withService(master, MasterService.class)
+ .withService(master.getService(), MasterService.class)
.withService(paxos, PaxosService.class)
.build();
@@ -116,12 +116,12 @@ public class SameController {
public void join() {
try {
server.join();
- master.join();
+ client.interrupt();
+ master.interrupt();
if (discoveryService != null) {
discoveryService.join();
}
} catch (InterruptedException e) {
- master.interrupt();
try {
server.stop();
} catch (Exception e1) {