diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 190 |
1 files changed, 79 insertions, 111 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index ed04b4d..77f7496 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -1,92 +1,116 @@ package com.orbekk.same; -import java.util.ArrayList; +import com.orbekk.same.State.Component; +import java.util.Collections; import java.util.List; - +import com.orbekk.util.WorkQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.orbekk.same.State.Component; - -public class Master implements MasterService, Runnable { +public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); private final ConnectionManager connections; private State state; - private boolean stopped = false; private Broadcaster broadcaster; - private List<String> _fullStateReceivers = new ArrayList<String>(); - private Thread workerThread = null; - + public static Master create(ConnectionManager connections, Broadcaster broadcaster, String myUrl, String networkName) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); return new Master(state, connections, broadcaster); } - - /** Constructor for internal use. - */ + Master(State initialState, ConnectionManager connections, Broadcaster broadcaster) { this.state = initialState; this.connections = connections; this.broadcaster = broadcaster; } - - @Override - public synchronized void joinNetworkRequest(String clientUrl) { - logger.info("JoinNetworkRequest({})", clientUrl); - List<String> participants = participants(); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); - _fullStateReceivers.add(clientUrl); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - notifyAll(); - } else { - logger.warn("Client {} already part of network. " + - "Ignoring participation request", clientUrl); + + private MasterService serviceImpl = new MasterService() { + @Override + public boolean updateStateRequest(String component, + String newData, long revision) { + logger.info("updateStateRequest({}, {}, {})", + new Object[]{component, newData, revision}); + boolean updated = state.update(component, newData, revision + 1); + if (updated) { + updateStateRequestThread.add(component); + } + return updated; } - } - - boolean _sendUpdatedComponents() { - boolean worked = false; - for (final Component component : state.getAndClearUpdatedComponents()) { - logger.info("Broadcasting new component {}", component); - broadcastNewComponents(participants(), listWrap(component)); - worked = true; + + @Override + public void joinNetworkRequest(String clientUrl) { + logger.info("joinNetworkRequest({})", clientUrl); + List<String> participants = state.getList(".participants"); + sendFullStateThread.add(clientUrl); + if (!participants.contains(clientUrl)) { + participants.add(clientUrl); + synchronized (Master.this) { + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + } + updateStateRequestThread.add(".participants"); + } else { + logger.warn("Client {} joining: Already part of network"); + } + } + }; + + WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { + @Override protected void onChange() { + List<String> pending = getAndClear(); + logger.info("updateStateRequestThread: Updated state: {}", + pending); + for (String componentName : pending) { + Component component = state.getComponent(componentName); + List<String> participants = state.getList(".participants"); + broadcastNewComponents(participants, + Collections.singletonList(component)); + } } - return worked; + }; + + WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { + @Override protected void onChange() { + List<String> pending = getAndClear(); + logger.info("Sending full state to {}", pending); + final List<Component> components = state.getComponents(); + broadcastNewComponents(pending, components); + } + }; + + void performWork() { + sendFullStateThread.performWork(); + updateStateRequestThread.performWork(); } - - private <T>List<T> listWrap(T o) { - List<T> list = new ArrayList<T>(); - list.add(o); - return list; + + public void start() { + sendFullStateThread.start(); + updateStateRequestThread.start(); } - - synchronized boolean _sendFullState() { - boolean hasWork = _fullStateReceivers.size() != 0; - if (hasWork) { - logger.info("Sending full state to new participants."); - final List<State.Component> components = state.getComponents(); - broadcastNewComponents(_fullStateReceivers, components); - _fullStateReceivers.clear(); - } - return hasWork; + + public void interrupt() { + sendFullStateThread.interrupt(); + updateStateRequestThread.interrupt(); + } + + public MasterService getService() { + return serviceImpl; } - + private synchronized void removeParticipant(String url) { - List<String> participants = participants(); + List<String> participants = state.getList(".participants"); if (participants.contains(url)) { - logger.warn("RemoveParticipant({})", url); + logger.info("removeParticipant({})", url); participants.remove(url); state.updateFromObject(".participants", participants, state.getRevision(".participants") + 1); - notifyAll(); + updateStateRequestThread.add(".participants"); } } - + private void broadcastNewComponents(List<String> destinations, final List<State.Component> components) { broadcaster.broadcast(destinations, new ServiceOperation() { @@ -104,60 +128,4 @@ public class Master implements MasterService, Runnable { } }); } - - private List<String> participants() { - return state.getList(".participants"); - } - - - @Override - public synchronized boolean updateStateRequest(String component, - String newData, long revision) { - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - notifyAll(); - } - return updated; - } - - boolean _performWork() { - boolean worked = false; - worked |= _sendUpdatedComponents(); - worked |= _sendFullState(); - return worked; - } - - @Override - public void run() { - while (!stopped) { - if (!_performWork()) { - synchronized (this) { - try { - wait(500); - } catch (InterruptedException e) { - stopped = true; - } - } - } - if (Thread.interrupted()) { - stopped = true; - } - } - } - - public void start() { - if (workerThread == null) { - workerThread = new Thread(this); - workerThread.start(); - logger.info("Master thread started. {}", state); - } - } - - public void join() throws InterruptedException { - workerThread.join(); - } - - public void interrupt() { - workerThread.interrupt(); - } } |