summaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-11 15:33:19 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-11 15:33:19 +0100
commitee0b57e81fce368e931a0d7282d8d84a9ecffd71 (patch)
tree549062eb4601e703c0333199da200fe5abf90eb2 /jsonrpc/src
parent47e43eefebae63afee1a1743602c15e3b1ede8ff (diff)
Add participant synchronization.
- No error handling in the protocol at the moment.
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameService.java23
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java31
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameState.java55
-rw-r--r--jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java13
4 files changed, 102 insertions, 20 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
index 9e1071d..8f239da 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
@@ -14,12 +14,27 @@ public interface SameService {
/**
* A request from the callee to participate in 'networkName'.
*/
- void participateNetwork(String networkName, String clientId,
- String url);
+ void participateNetwork(String networkName, String clientId, String url);
/**
* Notification of participation in network.
*/
- void notifyParticipation(String networkName, String masterId,
- Map<String, String> participants);
+ void notifyParticipation(String networkName, String masterId);
+
+ /**
+ * New state.
+ *
+ * When sent to a non-master from the master, use 'newState' as the
+ * current state.
+ *
+ * When sent to a master, broadcast the new state to all clients.
+ */
+ void setState(String newState);
+
+ /**
+ * Notify all nodes of network participants.
+ *
+ * Only sent from master to non-master.
+ */
+ void setParticipants(Map<String, String> participants);
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
index 0e17f54..a946c61 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
@@ -35,17 +35,26 @@ public class SameServiceImpl implements SameService {
}
@Override
- public void notifyParticipation(String networkName, String masterId,
- Map<String, String> participants) {
+ public void notifyParticipation(String networkName, String masterId) {
logger.info("Joining network {}. Master is {}", networkName, masterId);
- int i = 1;
- for (Map.Entry<String, String> e : participants.entrySet()) {
- String clientId = e.getKey();
- String url = e.getValue();
- logger.info(" {} participant {}: {}, {}",
- new Object[]{networkName, i, clientId, url});
- i++;
- }
- sameState.joinNetwork(networkName, masterId, participants);
+ // int i = 1;
+ // for (Map.Entry<String, String> e : participants.entrySet()) {
+ // String clientId = e.getKey();
+ // String url = e.getValue();
+ // logger.info(" {} participant {}: {}, {}",
+ // new Object[]{networkName, i, clientId, url});
+ // i++;
+ // }
+ sameState.joinNetwork(networkName, masterId);
+ }
+
+ @Override
+ public void setParticipants(Map<String, String> participants) {
+ sameState.setParticipants(participants);
+ }
+
+ @Override
+ public void setState(String newState) {
+ logger.error("setState not implemented.");
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
index 9638252..d44cc1d 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
@@ -38,6 +38,13 @@ public class SameState extends Thread implements UrlReceiver {
private Map<String, String> participants = new HashMap<String, String>();
/**
+ * New participants.
+ *
+ * New participants map to replace participants.
+ */
+ private Map<String, String> _setParticipants = null;
+
+ /**
* The client id of this participant.
*/
private String clientId;
@@ -74,15 +81,16 @@ public class SameState extends Thread implements UrlReceiver {
* TODO: Implement fully.
*/
private synchronized void resetState() {
+ networkName = "";
+ masterId = "";
pendingParticipants.clear();
+ participants.clear();
}
- public synchronized void joinNetwork(String networkName, String masterId,
- Map<String, String> participants) {
+ public synchronized void joinNetwork(String networkName, String masterId) {
resetState();
this.networkName = networkName;
this.masterId = masterId;
- this.participants = participants;
logger.info("Joined network {}.", networkName);
}
@@ -114,6 +122,24 @@ public class SameState extends Thread implements UrlReceiver {
notifyAll();
}
+ public synchronized void setParticipants(Map<String, String> participants) {
+ logger.info("Pending operation: _setParticipants");
+ _setParticipants = participants;
+ notifyAll();
+ }
+
+ private synchronized void handleSetParticipants() {
+ if (_setParticipants != null) {
+ if (isMaster()) {
+ logger.error("{}: Master received setParticipants.", clientId);
+ } else {
+ logger.info("{}: New participants committed.", clientId);
+ participants = _setParticipants;
+ }
+ }
+ _setParticipants = null;
+ }
+
private boolean isMaster() {
return masterId.equals(clientId);
}
@@ -136,8 +162,12 @@ public class SameState extends Thread implements UrlReceiver {
String url = e.getValue();
logger.info("New participant: {} URL({})", clientId, url);
SameService remoteService = connections.getConnection(url);
- remoteService.notifyParticipation(networkName, masterId,
- participants);
+ remoteService.notifyParticipation(networkName, masterId);
+ broadcast(new ServiceOperation(){
+ @Override void run(SameService service) {
+ service.setParticipants(participants);
+ }
+ });
}
}
pendingParticipants.clear();
@@ -151,6 +181,7 @@ public class SameState extends Thread implements UrlReceiver {
*/
synchronized void internalRun() {
handleNewParticipants();
+ handleSetParticipants();
}
public synchronized void run() {
@@ -174,4 +205,18 @@ public class SameState extends Thread implements UrlReceiver {
"to finish. Ignoring.");
}
}
+
+ public abstract static class ServiceOperation {
+ abstract void run(SameService service);
+ }
+
+ public synchronized void broadcast(ServiceOperation operation) {
+ for (Map.Entry<String, String> e : participants.entrySet()) {
+ String clientId = e.getKey();
+ String url = e.getValue();
+ if (!clientId.equals(this.clientId)) {
+ operation.run(connections.getConnection(url));
+ }
+ }
+ }
}
diff --git a/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java b/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java
index 77bbbee..7e21e7a 100644
--- a/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java
+++ b/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java
@@ -56,5 +56,18 @@ public class SameStateTest {
assertTrue(state1.getParticipants().size() == 2);
assertTrue(state2.getParticipants().size() == 2);
assertEquals(state1.getNetworkName(), state2.getNetworkName());
+
+ connections.getConnection(state2.getUrl()).
+ participateNetwork("Network1", state3.getClientId(),
+ state3.getUrl());
+ state2.internalRun();
+ state1.internalRun();
+ state3.internalRun();
+
+ assertTrue(state1.getParticipants().size() == 3);
+ assertTrue(state2.getParticipants().size() == 3);
+ assertTrue(state3.getParticipants().size() == 3);
+ assertEquals(state1.getNetworkName(), state2.getNetworkName());
+ assertEquals(state2.getNetworkName(), state3.getNetworkName());
}
}