summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java190
1 files changed, 79 insertions, 111 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index ed04b4d..77f7496 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,92 +1,116 @@
package com.orbekk.same;
-import java.util.ArrayList;
+import com.orbekk.same.State.Component;
+import java.util.Collections;
import java.util.List;
-
+import com.orbekk.util.WorkQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.orbekk.same.State.Component;
-
-public class Master implements MasterService, Runnable {
+public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
private final ConnectionManager connections;
private State state;
- private boolean stopped = false;
private Broadcaster broadcaster;
- private List<String> _fullStateReceivers = new ArrayList<String>();
- private Thread workerThread = null;
-
+
public static Master create(ConnectionManager connections,
Broadcaster broadcaster, String myUrl, String networkName) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
return new Master(state, connections, broadcaster);
}
-
- /** Constructor for internal use.
- */
+
Master(State initialState, ConnectionManager connections,
Broadcaster broadcaster) {
this.state = initialState;
this.connections = connections;
this.broadcaster = broadcaster;
}
-
- @Override
- public synchronized void joinNetworkRequest(String clientUrl) {
- logger.info("JoinNetworkRequest({})", clientUrl);
- List<String> participants = participants();
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
- _fullStateReceivers.add(clientUrl);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- notifyAll();
- } else {
- logger.warn("Client {} already part of network. " +
- "Ignoring participation request", clientUrl);
+
+ private MasterService serviceImpl = new MasterService() {
+ @Override
+ public boolean updateStateRequest(String component,
+ String newData, long revision) {
+ logger.info("updateStateRequest({}, {}, {})",
+ new Object[]{component, newData, revision});
+ boolean updated = state.update(component, newData, revision + 1);
+ if (updated) {
+ updateStateRequestThread.add(component);
+ }
+ return updated;
}
- }
-
- boolean _sendUpdatedComponents() {
- boolean worked = false;
- for (final Component component : state.getAndClearUpdatedComponents()) {
- logger.info("Broadcasting new component {}", component);
- broadcastNewComponents(participants(), listWrap(component));
- worked = true;
+
+ @Override
+ public void joinNetworkRequest(String clientUrl) {
+ logger.info("joinNetworkRequest({})", clientUrl);
+ List<String> participants = state.getList(".participants");
+ sendFullStateThread.add(clientUrl);
+ if (!participants.contains(clientUrl)) {
+ participants.add(clientUrl);
+ synchronized (Master.this) {
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ }
+ updateStateRequestThread.add(".participants");
+ } else {
+ logger.warn("Client {} joining: Already part of network");
+ }
+ }
+ };
+
+ WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("updateStateRequestThread: Updated state: {}",
+ pending);
+ for (String componentName : pending) {
+ Component component = state.getComponent(componentName);
+ List<String> participants = state.getList(".participants");
+ broadcastNewComponents(participants,
+ Collections.singletonList(component));
+ }
}
- return worked;
+ };
+
+ WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("Sending full state to {}", pending);
+ final List<Component> components = state.getComponents();
+ broadcastNewComponents(pending, components);
+ }
+ };
+
+ void performWork() {
+ sendFullStateThread.performWork();
+ updateStateRequestThread.performWork();
}
-
- private <T>List<T> listWrap(T o) {
- List<T> list = new ArrayList<T>();
- list.add(o);
- return list;
+
+ public void start() {
+ sendFullStateThread.start();
+ updateStateRequestThread.start();
}
-
- synchronized boolean _sendFullState() {
- boolean hasWork = _fullStateReceivers.size() != 0;
- if (hasWork) {
- logger.info("Sending full state to new participants.");
- final List<State.Component> components = state.getComponents();
- broadcastNewComponents(_fullStateReceivers, components);
- _fullStateReceivers.clear();
- }
- return hasWork;
+
+ public void interrupt() {
+ sendFullStateThread.interrupt();
+ updateStateRequestThread.interrupt();
+ }
+
+ public MasterService getService() {
+ return serviceImpl;
}
-
+
private synchronized void removeParticipant(String url) {
- List<String> participants = participants();
+ List<String> participants = state.getList(".participants");
if (participants.contains(url)) {
- logger.warn("RemoveParticipant({})", url);
+ logger.info("removeParticipant({})", url);
participants.remove(url);
state.updateFromObject(".participants", participants,
state.getRevision(".participants") + 1);
- notifyAll();
+ updateStateRequestThread.add(".participants");
}
}
-
+
private void broadcastNewComponents(List<String> destinations,
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {
@@ -104,60 +128,4 @@ public class Master implements MasterService, Runnable {
}
});
}
-
- private List<String> participants() {
- return state.getList(".participants");
- }
-
-
- @Override
- public synchronized boolean updateStateRequest(String component,
- String newData, long revision) {
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- notifyAll();
- }
- return updated;
- }
-
- boolean _performWork() {
- boolean worked = false;
- worked |= _sendUpdatedComponents();
- worked |= _sendFullState();
- return worked;
- }
-
- @Override
- public void run() {
- while (!stopped) {
- if (!_performWork()) {
- synchronized (this) {
- try {
- wait(500);
- } catch (InterruptedException e) {
- stopped = true;
- }
- }
- }
- if (Thread.interrupted()) {
- stopped = true;
- }
- }
- }
-
- public void start() {
- if (workerThread == null) {
- workerThread = new Thread(this);
- workerThread.start();
- logger.info("Master thread started. {}", state);
- }
- }
-
- public void join() throws InterruptedException {
- workerThread.join();
- }
-
- public void interrupt() {
- workerThread.interrupt();
- }
}