summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/same/ClientServiceImpl.java6
-rw-r--r--same/src/main/java/com/orbekk/same/MasterServiceImpl.java31
-rw-r--r--same/src/main/java/com/orbekk/same/State.java62
3 files changed, 87 insertions, 12 deletions
diff --git a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java
index 84ee015..915840e 100644
--- a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java
+++ b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java
@@ -21,7 +21,7 @@ public class ClientServiceImpl implements ClientService, UrlReceiver {
@Override
public void setState(String component, String data, long revision) {
- logger.error("SetState not yet implemented.");
+ state.forceUpdate(component, data, revision);
}
@Override
@@ -29,4 +29,8 @@ public class ClientServiceImpl implements ClientService, UrlReceiver {
logger.info("My URL is {}.", myUrl);
this.myUrl = myUrl;
}
+
+ State testGetState() {
+ return state;
+ }
}
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
index 3731542..2101de8 100644
--- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
+++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
@@ -1,22 +1,26 @@
package com.orbekk.same;
+import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.orbekk.same.State.Component;
+
public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
private Logger logger = LoggerFactory.getLogger(getClass());
private ConnectionManager connections;
private State state;
private boolean stopped = false;
private Broadcaster broadcaster;
-
+ private List<String> _fullStateReceivers = new ArrayList<String>();
+
public MasterServiceImpl(State initialState, ConnectionManager connections,
Broadcaster broadcaster) {
state = initialState;
this.broadcaster = broadcaster;
-}
+ }
@Override
public void joinNetworkRequest(String networkName, String clientUrl) {
@@ -26,20 +30,21 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
participants.add(clientUrl);
synchronized(this) {
notifyAll();
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants"));
}
} else {
logger.warn("Client {} already part of network. " +
"Ignoring participation request", clientUrl);
}
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants"));
} else {
logger.warn("Client {} tried to join {}, but network name is {}",
new Object[]{ clientUrl, networkName,
state.getDataOf(".networkName") });
}
}
- public boolean _handleJoinNetworkRequests() {
+
+ public boolean _sendUpdatedComponents() {
boolean worked = false;
for (final String component : state.getAndClearUpdatedComponents()) {
logger.info("Broadcasting new component {}", state.show(component));
@@ -54,6 +59,19 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
return worked;
}
+ public boolean _sendFullState() {
+ boolean worked = _fullStateReceivers.size() != 0;
+ final List<State.Component> components = state.getComponents();
+ broadcaster.broadcast(participants(), new ServiceOperation() {
+ @Override public void run(ClientService client) {
+ for (Component c : components) {
+ client.setState(c.getName(), c.getData(), c.getRevision());
+ }
+ }
+ });
+ return worked;
+ }
+
private List<String> participants() {
return state.getList(".participants");
}
@@ -74,7 +92,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
boolean _performWork() {
boolean worked = false;
- worked |= _handleJoinNetworkRequests();
+ worked |= _sendUpdatedComponents();
+ worked |= _sendFullState();
return worked;
}
diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java
index afd93f5..3371958 100644
--- a/same/src/main/java/com/orbekk/same/State.java
+++ b/same/src/main/java/com/orbekk/same/State.java
@@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory;
/**
* This class is thread-safe.
*/
-public class State {
+public class State implements Comparable<State> {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String, Component> state = new HashMap<String, Component>();
private ObjectMapper mapper = new ObjectMapper();
@@ -30,15 +30,26 @@ public class State {
updateFromObject(".participants", new ArrayList<String>(), 0);
}
- public boolean update(String componentName, String data, long revision) {
+ public synchronized void forceUpdate(String componentName,
+ String data, long revision) {
+ Component oldComponent = state.get(componentName);
+ Component newComponent = new Component(componentName, revision, data);
+ logger.info("Force update: {} => {}", oldComponent, newComponent);
+ state.put(componentName, newComponent);
+ updatedComponents.add(componentName);
+ }
+
+ public synchronized boolean update(String componentName, String data,
+ long revision) {
Component component = null;
if (!state.containsKey(componentName)) {
- component = new Component(0, "");
+ component = new Component("", 0, "");
} else {
- component = state.get(componentName);
+ component = state.get(componentName);
}
if (revision == component.getRevision()) {
+ component.setName(componentName);
component.setRevision(revision + 1);
component.setData(data);
state.put(componentName, component);
@@ -124,6 +135,19 @@ public class State {
return componentName + ": " + state.get(componentName);
}
+ /**
+ * Returns a list of all the components in this State.
+ *
+ * This method is thread-safe, and returns a deep copy.
+ */
+ public synchronized List<Component> getComponents() {
+ ArrayList<Component> list = new ArrayList<Component>();
+ for (Component component : state.values()) {
+ list.add(new Component(component));
+ }
+ return list;
+ }
+
public synchronized Set<String> getAndClearUpdatedComponents() {
Set<String> copy = new TreeSet<String>(updatedComponents);
updatedComponents.clear();
@@ -131,10 +155,21 @@ public class State {
}
public static class Component {
+ private String name;
private long revision;
private String data;
- public Component(long revision, String data) {
+ /**
+ * Copy constructor.
+ */
+ public Component(Component other) {
+ this.name = other.name;
+ this.revision = other.revision;
+ this.data = other.data;
+ }
+
+ public Component(String name, long revision, String data) {
+ this.name = name;
this.revision = revision;
this.data = data;
}
@@ -142,6 +177,7 @@ public class State {
public long getRevision() {
return revision;
}
+
public void setRevision(long revision) {
this.revision = revision;
}
@@ -149,14 +185,28 @@ public class State {
public String getData() {
return data;
}
+
public void setData(String data) {
this.data = data;
}
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
@Override public String toString() {
return this.data + " @" + revision;
}
}
+ @Override
+ public int compareTo(State other) {
+ return -1;
+ }
+
//
// @Override
// public String toString() {
@@ -187,4 +237,6 @@ public class State {
// ")", stateIteration, networkName, masterId, data,
// participantsString);
// }
+
+
}