diff options
Diffstat (limited to 'jsonrpc/src/main')
3 files changed, 89 insertions, 20 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index 9e1071d..8f239da 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -14,12 +14,27 @@ public interface SameService {      /**       * A request from the callee to participate in 'networkName'.       */ -    void participateNetwork(String networkName, String clientId, -            String url); +    void participateNetwork(String networkName, String clientId, String url);      /**       * Notification of participation in network.       */ -    void notifyParticipation(String networkName, String masterId, -            Map<String, String> participants); +    void notifyParticipation(String networkName, String masterId); + +    /** +     * New state. +     *  +     * When sent to a non-master from the master, use 'newState' as the +     * current state. +     * +     * When sent to a master, broadcast the new state to all clients. +     */ +    void setState(String newState); + +    /** +     * Notify all nodes of network participants. +     * +     * Only sent from master to non-master. +     */ +    void setParticipants(Map<String, String> participants);  } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 0e17f54..a946c61 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -35,17 +35,26 @@ public class SameServiceImpl implements SameService {      }      @Override -    public void notifyParticipation(String networkName, String masterId, -            Map<String, String> participants) { +    public void notifyParticipation(String networkName, String masterId) {          logger.info("Joining network {}. Master is {}", networkName, masterId); -        int i = 1; -        for (Map.Entry<String, String> e : participants.entrySet()) { -            String clientId = e.getKey(); -            String url = e.getValue(); -            logger.info("  {} participant {}: {}, {}", -                    new Object[]{networkName, i, clientId, url}); -            i++; -        } -        sameState.joinNetwork(networkName, masterId, participants); +        // int i = 1; +        // for (Map.Entry<String, String> e : participants.entrySet()) { +        //     String clientId = e.getKey(); +        //     String url = e.getValue(); +        //     logger.info("  {} participant {}: {}, {}", +        //             new Object[]{networkName, i, clientId, url}); +        //     i++; +        // } +        sameState.joinNetwork(networkName, masterId); +    } + +    @Override +    public void setParticipants(Map<String, String> participants) { +        sameState.setParticipants(participants); +    } + +    @Override +    public void setState(String newState) { +        logger.error("setState not implemented.");      }  } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java index 9638252..d44cc1d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -38,6 +38,13 @@ public class SameState extends Thread implements UrlReceiver {      private Map<String, String> participants = new HashMap<String, String>();      /** +     * New participants. +     * +     * New participants map to replace participants. +     */ +    private Map<String, String> _setParticipants = null; + +    /**       * The client id of this participant.       */      private String clientId; @@ -74,15 +81,16 @@ public class SameState extends Thread implements UrlReceiver {       * TODO: Implement fully.       */      private synchronized void resetState() { +        networkName = ""; +        masterId = "";          pendingParticipants.clear(); +        participants.clear();      } -    public synchronized void joinNetwork(String networkName, String masterId, -            Map<String, String> participants) { +    public synchronized void joinNetwork(String networkName, String masterId) {          resetState();          this.networkName = networkName;          this.masterId = masterId; -        this.participants = participants;          logger.info("Joined network {}.", networkName);      } @@ -114,6 +122,24 @@ public class SameState extends Thread implements UrlReceiver {          notifyAll();      } +    public synchronized void setParticipants(Map<String, String> participants) { +        logger.info("Pending operation: _setParticipants"); +        _setParticipants = participants; +        notifyAll(); +    } + +    private synchronized void handleSetParticipants() { +        if (_setParticipants != null) { +            if (isMaster()) { +                logger.error("{}: Master received setParticipants.", clientId); +            } else { +                logger.info("{}: New participants committed.", clientId); +                participants = _setParticipants; +            } +        } +        _setParticipants = null; +    } +      private boolean isMaster() {          return masterId.equals(clientId);      } @@ -136,8 +162,12 @@ public class SameState extends Thread implements UrlReceiver {                  String url = e.getValue();                  logger.info("New participant: {} URL({})", clientId, url);                  SameService remoteService = connections.getConnection(url); -                remoteService.notifyParticipation(networkName, masterId, -                        participants); +                remoteService.notifyParticipation(networkName, masterId); +                broadcast(new ServiceOperation(){ +                    @Override void run(SameService service) { +                        service.setParticipants(participants); +                    } +                });              }          }          pendingParticipants.clear(); @@ -151,6 +181,7 @@ public class SameState extends Thread implements UrlReceiver {       */      synchronized void internalRun() {          handleNewParticipants(); +        handleSetParticipants();      }      public synchronized void run() { @@ -174,4 +205,18 @@ public class SameState extends Thread implements UrlReceiver {                      "to finish. Ignoring.");          }      } + +    public abstract static class ServiceOperation { +        abstract void run(SameService service); +    } +     +    public synchronized void broadcast(ServiceOperation operation) { +        for (Map.Entry<String, String> e : participants.entrySet()) { +            String clientId = e.getKey(); +            String url = e.getValue(); +            if (!clientId.equals(this.clientId)) { +                operation.run(connections.getConnection(url)); +            } +        } +    }  }  | 
