From 14205be283ad5b54a94c07308ab3e336f3069653 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Jan 2012 14:27:48 +0100 Subject: Implement master functionality. - The first participant in the network is considered the master. - Forward join requests to the master. --- .../src/main/java/com/orbekk/same/SameService.java | 2 +- .../main/java/com/orbekk/same/SameServiceImpl.java | 6 +- .../src/main/java/com/orbekk/same/SameState.java | 66 ++++++++++++++++++---- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index 0db9698..9e1071d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -20,6 +20,6 @@ public interface SameService { /** * Notification of participation in network. */ - void notifyParticipation(String networkName, + void notifyParticipation(String networkName, String masterId, Map participants); } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 7e61a2d..0e17f54 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -35,9 +35,9 @@ public class SameServiceImpl implements SameService { } @Override - public void notifyParticipation(String networkName, + public void notifyParticipation(String networkName, String masterId, Map participants) { - logger.info("Joining network {}.", networkName); + logger.info("Joining network {}. Master is {}", networkName, masterId); int i = 1; for (Map.Entry e : participants.entrySet()) { String clientId = e.getKey(); @@ -46,6 +46,6 @@ public class SameServiceImpl implements SameService { new Object[]{networkName, i, clientId, url}); i++; } - logger.warn("Joining not implemented."); + sameState.joinNetwork(networkName, masterId, participants); } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java index cd1c84a..c3d03f9 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -18,6 +18,18 @@ public class SameState extends Thread implements UrlReceiver { private String currentState = ""; private String networkName; + /** + * The master participant id. + */ + private String masterId; + + /** + * TODO: Remove. + */ + public void setMasterId(String masterId) { + this.masterId = masterId; + } + /** * The participants of this network. * @@ -43,6 +55,7 @@ public class SameState extends Thread implements UrlReceiver { this.networkName = networkName; this.clientId = clientId; this.connections = connections; + this.masterId = clientId; participants.put(clientId, null); } @@ -55,6 +68,24 @@ public class SameState extends Thread implements UrlReceiver { return participants; } + /** + * Reset this SameService to an initial state. + * + * TODO: Implement fully. + */ + private synchronized void resetState() { + pendingParticipants.clear(); + } + + public synchronized void joinNetwork(String networkName, String masterId, + Map participants) { + resetState(); + this.networkName = networkName; + this.masterId = masterId; + this.participants = participants; + logger.info("Joined network {}.", networkName); + } + public String getClientId() { return clientId; } @@ -83,18 +114,31 @@ public class SameState extends Thread implements UrlReceiver { notifyAll(); } + private boolean isMaster() { + return masterId.equals(clientId); + } + private synchronized void handleNewParticipants() { - // Adding all pending participants ensures that each of the new - // participants is informed of all participants. - // - // TODO: Does not inform old participants. - participants.putAll(pendingParticipants); - for (Map.Entry e : pendingParticipants.entrySet()) { - String clientId = e.getKey(); - String url = e.getValue(); - logger.info("New participant: {} URL({})", clientId, url); - SameService remoteService = connections.getConnection(url); - remoteService.notifyParticipation(networkName, participants); + if (!isMaster()) { + for (Map.Entry e : pendingParticipants.entrySet()) { + SameService master = connections.getConnection( + participants.get(masterId)); + logger.info("Redirecting participant request to {}", masterId); + String clientId = e.getKey(); + String url = e.getValue(); + master.participateNetwork(networkName, clientId, url); + } + } else { + participants.putAll(pendingParticipants); + for (Map.Entry e : + pendingParticipants.entrySet()) { + String clientId = e.getKey(); + String url = e.getValue(); + logger.info("New participant: {} URL({})", clientId, url); + SameService remoteService = connections.getConnection(url); + remoteService.notifyParticipation(networkName, masterId, + participants); + } } pendingParticipants.clear(); } -- cgit v1.2.3