From ee0b57e81fce368e931a0d7282d8d84a9ecffd71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Jan 2012 15:33:19 +0100 Subject: Add participant synchronization. - No error handling in the protocol at the moment. --- .../src/main/java/com/orbekk/same/SameService.java | 23 +++++++-- .../main/java/com/orbekk/same/SameServiceImpl.java | 31 +++++++----- .../src/main/java/com/orbekk/same/SameState.java | 55 ++++++++++++++++++++-- .../test/java/com/orbekk/same/SameStateTest.java | 13 +++++ 4 files changed, 102 insertions(+), 20 deletions(-) (limited to 'jsonrpc/src') diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index 9e1071d..8f239da 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -14,12 +14,27 @@ public interface SameService { /** * A request from the callee to participate in 'networkName'. */ - void participateNetwork(String networkName, String clientId, - String url); + void participateNetwork(String networkName, String clientId, String url); /** * Notification of participation in network. */ - void notifyParticipation(String networkName, String masterId, - Map participants); + void notifyParticipation(String networkName, String masterId); + + /** + * New state. + * + * When sent to a non-master from the master, use 'newState' as the + * current state. + * + * When sent to a master, broadcast the new state to all clients. + */ + void setState(String newState); + + /** + * Notify all nodes of network participants. + * + * Only sent from master to non-master. + */ + void setParticipants(Map participants); } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 0e17f54..a946c61 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -35,17 +35,26 @@ public class SameServiceImpl implements SameService { } @Override - public void notifyParticipation(String networkName, String masterId, - Map participants) { + public void notifyParticipation(String networkName, String masterId) { logger.info("Joining network {}. Master is {}", networkName, masterId); - int i = 1; - for (Map.Entry e : participants.entrySet()) { - String clientId = e.getKey(); - String url = e.getValue(); - logger.info(" {} participant {}: {}, {}", - new Object[]{networkName, i, clientId, url}); - i++; - } - sameState.joinNetwork(networkName, masterId, participants); + // int i = 1; + // for (Map.Entry e : participants.entrySet()) { + // String clientId = e.getKey(); + // String url = e.getValue(); + // logger.info(" {} participant {}: {}, {}", + // new Object[]{networkName, i, clientId, url}); + // i++; + // } + sameState.joinNetwork(networkName, masterId); + } + + @Override + public void setParticipants(Map participants) { + sameState.setParticipants(participants); + } + + @Override + public void setState(String newState) { + logger.error("setState not implemented."); } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java index 9638252..d44cc1d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -37,6 +37,13 @@ public class SameState extends Thread implements UrlReceiver { */ private Map participants = new HashMap(); + /** + * New participants. + * + * New participants map to replace participants. + */ + private Map _setParticipants = null; + /** * The client id of this participant. */ @@ -74,15 +81,16 @@ public class SameState extends Thread implements UrlReceiver { * TODO: Implement fully. */ private synchronized void resetState() { + networkName = ""; + masterId = ""; pendingParticipants.clear(); + participants.clear(); } - public synchronized void joinNetwork(String networkName, String masterId, - Map participants) { + public synchronized void joinNetwork(String networkName, String masterId) { resetState(); this.networkName = networkName; this.masterId = masterId; - this.participants = participants; logger.info("Joined network {}.", networkName); } @@ -114,6 +122,24 @@ public class SameState extends Thread implements UrlReceiver { notifyAll(); } + public synchronized void setParticipants(Map participants) { + logger.info("Pending operation: _setParticipants"); + _setParticipants = participants; + notifyAll(); + } + + private synchronized void handleSetParticipants() { + if (_setParticipants != null) { + if (isMaster()) { + logger.error("{}: Master received setParticipants.", clientId); + } else { + logger.info("{}: New participants committed.", clientId); + participants = _setParticipants; + } + } + _setParticipants = null; + } + private boolean isMaster() { return masterId.equals(clientId); } @@ -136,8 +162,12 @@ public class SameState extends Thread implements UrlReceiver { String url = e.getValue(); logger.info("New participant: {} URL({})", clientId, url); SameService remoteService = connections.getConnection(url); - remoteService.notifyParticipation(networkName, masterId, - participants); + remoteService.notifyParticipation(networkName, masterId); + broadcast(new ServiceOperation(){ + @Override void run(SameService service) { + service.setParticipants(participants); + } + }); } } pendingParticipants.clear(); @@ -151,6 +181,7 @@ public class SameState extends Thread implements UrlReceiver { */ synchronized void internalRun() { handleNewParticipants(); + handleSetParticipants(); } public synchronized void run() { @@ -174,4 +205,18 @@ public class SameState extends Thread implements UrlReceiver { "to finish. Ignoring."); } } + + public abstract static class ServiceOperation { + abstract void run(SameService service); + } + + public synchronized void broadcast(ServiceOperation operation) { + for (Map.Entry e : participants.entrySet()) { + String clientId = e.getKey(); + String url = e.getValue(); + if (!clientId.equals(this.clientId)) { + operation.run(connections.getConnection(url)); + } + } + } } diff --git a/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java b/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java index 77bbbee..7e21e7a 100644 --- a/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java +++ b/jsonrpc/src/test/java/com/orbekk/same/SameStateTest.java @@ -56,5 +56,18 @@ public class SameStateTest { assertTrue(state1.getParticipants().size() == 2); assertTrue(state2.getParticipants().size() == 2); assertEquals(state1.getNetworkName(), state2.getNetworkName()); + + connections.getConnection(state2.getUrl()). + participateNetwork("Network1", state3.getClientId(), + state3.getUrl()); + state2.internalRun(); + state1.internalRun(); + state3.internalRun(); + + assertTrue(state1.getParticipants().size() == 3); + assertTrue(state2.getParticipants().size() == 3); + assertTrue(state3.getParticipants().size() == 3); + assertEquals(state1.getNetworkName(), state2.getNetworkName()); + assertEquals(state2.getNetworkName(), state3.getNetworkName()); } } -- cgit v1.2.3