diff options
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r-- | same/src/main/java/com/orbekk/same/ClientServiceImpl.java | 6 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/MasterServiceImpl.java | 31 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/State.java | 62 |
3 files changed, 87 insertions, 12 deletions
diff --git a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java index 84ee015..915840e 100644 --- a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java @@ -21,7 +21,7 @@ public class ClientServiceImpl implements ClientService, UrlReceiver { @Override public void setState(String component, String data, long revision) { - logger.error("SetState not yet implemented."); + state.forceUpdate(component, data, revision); } @Override @@ -29,4 +29,8 @@ public class ClientServiceImpl implements ClientService, UrlReceiver { logger.info("My URL is {}.", myUrl); this.myUrl = myUrl; } + + State testGetState() { + return state; + } } diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index 3731542..2101de8 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -1,22 +1,26 @@ package com.orbekk.same; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orbekk.same.State.Component; + public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { private Logger logger = LoggerFactory.getLogger(getClass()); private ConnectionManager connections; private State state; private boolean stopped = false; private Broadcaster broadcaster; - + private List<String> _fullStateReceivers = new ArrayList<String>(); + public MasterServiceImpl(State initialState, ConnectionManager connections, Broadcaster broadcaster) { state = initialState; this.broadcaster = broadcaster; -} + } @Override public void joinNetworkRequest(String networkName, String clientUrl) { @@ -26,20 +30,21 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { participants.add(clientUrl); synchronized(this) { notifyAll(); + state.updateFromObject(".participants", participants, + state.getRevision(".participants")); } } else { logger.warn("Client {} already part of network. " + "Ignoring participation request", clientUrl); } - state.updateFromObject(".participants", participants, - state.getRevision(".participants")); } else { logger.warn("Client {} tried to join {}, but network name is {}", new Object[]{ clientUrl, networkName, state.getDataOf(".networkName") }); } } - public boolean _handleJoinNetworkRequests() { + + public boolean _sendUpdatedComponents() { boolean worked = false; for (final String component : state.getAndClearUpdatedComponents()) { logger.info("Broadcasting new component {}", state.show(component)); @@ -54,6 +59,19 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { return worked; } + public boolean _sendFullState() { + boolean worked = _fullStateReceivers.size() != 0; + final List<State.Component> components = state.getComponents(); + broadcaster.broadcast(participants(), new ServiceOperation() { + @Override public void run(ClientService client) { + for (Component c : components) { + client.setState(c.getName(), c.getData(), c.getRevision()); + } + } + }); + return worked; + } + private List<String> participants() { return state.getList(".participants"); } @@ -74,7 +92,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { boolean _performWork() { boolean worked = false; - worked |= _handleJoinNetworkRequests(); + worked |= _sendUpdatedComponents(); + worked |= _sendFullState(); return worked; } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index afd93f5..3371958 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory; /** * This class is thread-safe. */ -public class State { +public class State implements Comparable<State> { private Logger logger = LoggerFactory.getLogger(getClass()); private Map<String, Component> state = new HashMap<String, Component>(); private ObjectMapper mapper = new ObjectMapper(); @@ -30,15 +30,26 @@ public class State { updateFromObject(".participants", new ArrayList<String>(), 0); } - public boolean update(String componentName, String data, long revision) { + public synchronized void forceUpdate(String componentName, + String data, long revision) { + Component oldComponent = state.get(componentName); + Component newComponent = new Component(componentName, revision, data); + logger.info("Force update: {} => {}", oldComponent, newComponent); + state.put(componentName, newComponent); + updatedComponents.add(componentName); + } + + public synchronized boolean update(String componentName, String data, + long revision) { Component component = null; if (!state.containsKey(componentName)) { - component = new Component(0, ""); + component = new Component("", 0, ""); } else { - component = state.get(componentName); + component = state.get(componentName); } if (revision == component.getRevision()) { + component.setName(componentName); component.setRevision(revision + 1); component.setData(data); state.put(componentName, component); @@ -124,6 +135,19 @@ public class State { return componentName + ": " + state.get(componentName); } + /** + * Returns a list of all the components in this State. + * + * This method is thread-safe, and returns a deep copy. + */ + public synchronized List<Component> getComponents() { + ArrayList<Component> list = new ArrayList<Component>(); + for (Component component : state.values()) { + list.add(new Component(component)); + } + return list; + } + public synchronized Set<String> getAndClearUpdatedComponents() { Set<String> copy = new TreeSet<String>(updatedComponents); updatedComponents.clear(); @@ -131,10 +155,21 @@ public class State { } public static class Component { + private String name; private long revision; private String data; - public Component(long revision, String data) { + /** + * Copy constructor. + */ + public Component(Component other) { + this.name = other.name; + this.revision = other.revision; + this.data = other.data; + } + + public Component(String name, long revision, String data) { + this.name = name; this.revision = revision; this.data = data; } @@ -142,6 +177,7 @@ public class State { public long getRevision() { return revision; } + public void setRevision(long revision) { this.revision = revision; } @@ -149,14 +185,28 @@ public class State { public String getData() { return data; } + public void setData(String data) { this.data = data; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + @Override public String toString() { return this.data + " @" + revision; } } + @Override + public int compareTo(State other) { + return -1; + } + // // @Override // public String toString() { @@ -187,4 +237,6 @@ public class State { // ")", stateIteration, networkName, masterId, data, // participantsString); // } + + } |