summaryrefslogtreecommitdiff
path: root/jsonrpc/src/main/java/com/orbekk/same/SameState.java
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/main/java/com/orbekk/same/SameState.java')
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameState.java55
1 files changed, 50 insertions, 5 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
index 9638252..d44cc1d 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
@@ -38,6 +38,13 @@ public class SameState extends Thread implements UrlReceiver {
private Map<String, String> participants = new HashMap<String, String>();
/**
+ * New participants.
+ *
+ * New participants map to replace participants.
+ */
+ private Map<String, String> _setParticipants = null;
+
+ /**
* The client id of this participant.
*/
private String clientId;
@@ -74,15 +81,16 @@ public class SameState extends Thread implements UrlReceiver {
* TODO: Implement fully.
*/
private synchronized void resetState() {
+ networkName = "";
+ masterId = "";
pendingParticipants.clear();
+ participants.clear();
}
- public synchronized void joinNetwork(String networkName, String masterId,
- Map<String, String> participants) {
+ public synchronized void joinNetwork(String networkName, String masterId) {
resetState();
this.networkName = networkName;
this.masterId = masterId;
- this.participants = participants;
logger.info("Joined network {}.", networkName);
}
@@ -114,6 +122,24 @@ public class SameState extends Thread implements UrlReceiver {
notifyAll();
}
+ public synchronized void setParticipants(Map<String, String> participants) {
+ logger.info("Pending operation: _setParticipants");
+ _setParticipants = participants;
+ notifyAll();
+ }
+
+ private synchronized void handleSetParticipants() {
+ if (_setParticipants != null) {
+ if (isMaster()) {
+ logger.error("{}: Master received setParticipants.", clientId);
+ } else {
+ logger.info("{}: New participants committed.", clientId);
+ participants = _setParticipants;
+ }
+ }
+ _setParticipants = null;
+ }
+
private boolean isMaster() {
return masterId.equals(clientId);
}
@@ -136,8 +162,12 @@ public class SameState extends Thread implements UrlReceiver {
String url = e.getValue();
logger.info("New participant: {} URL({})", clientId, url);
SameService remoteService = connections.getConnection(url);
- remoteService.notifyParticipation(networkName, masterId,
- participants);
+ remoteService.notifyParticipation(networkName, masterId);
+ broadcast(new ServiceOperation(){
+ @Override void run(SameService service) {
+ service.setParticipants(participants);
+ }
+ });
}
}
pendingParticipants.clear();
@@ -151,6 +181,7 @@ public class SameState extends Thread implements UrlReceiver {
*/
synchronized void internalRun() {
handleNewParticipants();
+ handleSetParticipants();
}
public synchronized void run() {
@@ -174,4 +205,18 @@ public class SameState extends Thread implements UrlReceiver {
"to finish. Ignoring.");
}
}
+
+ public abstract static class ServiceOperation {
+ abstract void run(SameService service);
+ }
+
+ public synchronized void broadcast(ServiceOperation operation) {
+ for (Map.Entry<String, String> e : participants.entrySet()) {
+ String clientId = e.getKey();
+ String url = e.getValue();
+ if (!clientId.equals(this.clientId)) {
+ operation.run(connections.getConnection(url));
+ }
+ }
+ }
}