diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/NewMaster.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/NewMaster.java | 131 |
1 files changed, 0 insertions, 131 deletions
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java deleted file mode 100644 index c2b2d9b..0000000 --- a/same/src/main/java/com/orbekk/same/NewMaster.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.orbekk.same; - -import com.orbekk.same.State.Component; -import java.util.Collections; -import java.util.List; -import com.orbekk.util.WorkQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NewMaster { - private Logger logger = LoggerFactory.getLogger(getClass()); - private final ConnectionManager connections; - private State state; - private Broadcaster broadcaster; - - public static NewMaster create(ConnectionManager connections, - Broadcaster broadcaster, String myUrl, String networkName) { - State state = new State(networkName); - state.update(".masterUrl", myUrl, 1); - return new NewMaster(state, connections, broadcaster); - } - - NewMaster(State initialState, ConnectionManager connections, - Broadcaster broadcaster) { - this.state = initialState; - this.connections = connections; - this.broadcaster = broadcaster; - } - - private MasterService serviceImpl = new MasterService() { - @Override - public boolean updateStateRequest(String component, - String newData, long revision) { - logger.info("updateStateRequest({}, {}, {})", - new Object[]{component, newData, revision}); - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - updateStateRequestThread.add(component); - } - return updated; - } - - @Override - public void joinNetworkRequest(String clientUrl) { - logger.info("joinNetworkRequest({})", clientUrl); - List<String> participants = state.getList(".participants"); - sendFullStateThread.add(clientUrl); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); - synchronized (NewMaster.this) { - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - } - updateStateRequestThread.add(".participants"); - } else { - logger.warn("Client {} joining: Already part of network"); - } - } - }; - - WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - logger.info("updateStateRequestThread: Updated state: {}", - pending); - for (String componentName : pending) { - Component component = state.getComponent(componentName); - List<String> participants = state.getList(".participants"); - broadcastNewComponents(participants, - Collections.singletonList(component)); - } - } - }; - - WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - logger.info("Sending full state to {}", pending); - final List<Component> components = state.getComponents(); - broadcastNewComponents(pending, components); - } - }; - - void performWork() { - sendFullStateThread.performWork(); - updateStateRequestThread.performWork(); - } - - public void start() { - sendFullStateThread.start(); - updateStateRequestThread.start(); - } - - public void interrupt() { - sendFullStateThread.interrupt(); - updateStateRequestThread.interrupt(); - } - - public MasterService getService() { - return serviceImpl; - } - - private synchronized void removeParticipant(String url) { - List<String> participants = state.getList(".participants"); - if (participants.contains(url)) { - logger.info("removeParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - updateStateRequestThread.add(".participants"); - } - } - - private void broadcastNewComponents(List<String> destinations, - final List<State.Component> components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } -} |