diff options
Diffstat (limited to 'same/src/main/java/com')
| -rw-r--r-- | same/src/main/java/com/orbekk/same/ClientServiceImpl.java | 6 | ||||
| -rw-r--r-- | same/src/main/java/com/orbekk/same/MasterServiceImpl.java | 31 | ||||
| -rw-r--r-- | same/src/main/java/com/orbekk/same/State.java | 62 | 
3 files changed, 87 insertions, 12 deletions
diff --git a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java index 84ee015..915840e 100644 --- a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java @@ -21,7 +21,7 @@ public class ClientServiceImpl implements ClientService, UrlReceiver {      @Override      public void setState(String component, String data, long revision) { -        logger.error("SetState not yet implemented."); +        state.forceUpdate(component, data, revision);      }      @Override @@ -29,4 +29,8 @@ public class ClientServiceImpl implements ClientService, UrlReceiver {          logger.info("My URL is {}.", myUrl);          this.myUrl = myUrl;      } +     +    State testGetState() { +        return state; +    }  } diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index 3731542..2101de8 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -1,22 +1,26 @@  package com.orbekk.same; +import java.util.ArrayList;  import java.util.List;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; +import com.orbekk.same.State.Component; +  public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {      private Logger logger = LoggerFactory.getLogger(getClass());      private ConnectionManager connections;      private State state;      private boolean stopped = false;      private Broadcaster broadcaster; - +    private List<String> _fullStateReceivers = new ArrayList<String>(); +          public MasterServiceImpl(State initialState, ConnectionManager connections,              Broadcaster broadcaster) {          state = initialState;          this.broadcaster = broadcaster; -} +    }      @Override      public void joinNetworkRequest(String networkName, String clientUrl) { @@ -26,20 +30,21 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {                  participants.add(clientUrl);                  synchronized(this) {                      notifyAll(); +                    state.updateFromObject(".participants", participants, +                            state.getRevision(".participants"));                  }              } else {                                  logger.warn("Client {} already part of network. " +                          "Ignoring participation request", clientUrl);              } -            state.updateFromObject(".participants", participants, -                    state.getRevision(".participants"));          } else {              logger.warn("Client {} tried to join {}, but network name is {}",                      new Object[]{ clientUrl, networkName,                               state.getDataOf(".networkName") });          }      } -    public boolean _handleJoinNetworkRequests() { +     +    public boolean _sendUpdatedComponents() {          boolean worked = false;          for (final String component : state.getAndClearUpdatedComponents()) {              logger.info("Broadcasting new component {}", state.show(component)); @@ -54,6 +59,19 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {          return worked;      } +    public boolean _sendFullState() { +        boolean worked = _fullStateReceivers.size() != 0; +        final List<State.Component> components = state.getComponents(); +        broadcaster.broadcast(participants(), new ServiceOperation() { +            @Override public void run(ClientService client) { +                for (Component c : components) { +                    client.setState(c.getName(), c.getData(), c.getRevision()); +                } +            } +        }); +        return worked; +    } +          private List<String> participants() {          return state.getList(".participants");      } @@ -74,7 +92,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {      boolean _performWork() {          boolean worked = false; -        worked |= _handleJoinNetworkRequests(); +        worked |= _sendUpdatedComponents(); +        worked |= _sendFullState();          return worked;      } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index afd93f5..3371958 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory;  /**   * This class is thread-safe.   */ -public class State { +public class State implements Comparable<State> {      private Logger logger = LoggerFactory.getLogger(getClass());      private Map<String, Component> state = new HashMap<String, Component>();       private ObjectMapper mapper = new ObjectMapper(); @@ -30,15 +30,26 @@ public class State {          updateFromObject(".participants", new ArrayList<String>(), 0);      } -    public boolean update(String componentName, String data, long revision) { +    public synchronized void forceUpdate(String componentName, +            String data, long revision) { +        Component oldComponent = state.get(componentName); +        Component newComponent = new Component(componentName, revision, data); +        logger.info("Force update: {} => {}", oldComponent, newComponent); +        state.put(componentName, newComponent); +        updatedComponents.add(componentName); +    } +     +    public synchronized boolean update(String componentName, String data, +            long revision) {          Component component = null;          if (!state.containsKey(componentName)) { -            component = new Component(0, ""); +            component = new Component("", 0, "");          } else { -            component = state.get(componentName);            +            component = state.get(componentName);                 }          if (revision == component.getRevision()) { +            component.setName(componentName);              component.setRevision(revision + 1);              component.setData(data);              state.put(componentName, component); @@ -124,6 +135,19 @@ public class State {          return componentName + ": " + state.get(componentName);      } +    /** +     * Returns a list of all the components in this State. +     *  +     * This method is thread-safe, and returns a deep copy. +     */ +    public synchronized List<Component> getComponents() { +        ArrayList<Component> list = new ArrayList<Component>(); +        for (Component component : state.values()) { +            list.add(new Component(component)); +        } +        return list; +    } +          public synchronized Set<String> getAndClearUpdatedComponents() {          Set<String> copy = new TreeSet<String>(updatedComponents);          updatedComponents.clear(); @@ -131,10 +155,21 @@ public class State {      }      public static class Component { +        private String name;          private long revision;          private String data; -        public Component(long revision, String data) { +        /** +         * Copy constructor. +         */ +        public Component(Component other) { +            this.name = other.name; +            this.revision = other.revision; +            this.data = other.data; +        } +         +        public Component(String name, long revision, String data) { +            this.name = name;              this.revision = revision;              this.data = data;          } @@ -142,6 +177,7 @@ public class State {          public long getRevision() {              return revision;          } +                  public void setRevision(long revision) {              this.revision = revision;          } @@ -149,14 +185,28 @@ public class State {          public String getData() {              return data;          } +          public void setData(String data) {              this.data = data;          }        +        public String getName() { +            return name; +        } +         +        public void setName(String name) { +            this.name = name; +        } +                  @Override public String toString() {              return this.data + " @" + revision;          }      } +    @Override +    public int compareTo(State other) { +        return -1; +    } +      //      //    @Override  //    public String toString() { @@ -187,4 +237,6 @@ public class State {  //            ")", stateIteration, networkName, masterId, data,  //            participantsString);  //    } + +  }  | 
