diff options
| -rw-r--r-- | same/src/main/java/com/orbekk/same/MasterServiceImpl.java | 57 | ||||
| -rw-r--r-- | same/src/main/java/com/orbekk/same/State.java | 9 | 
2 files changed, 38 insertions, 28 deletions
| diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index a080b1e..f8044e4 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -43,45 +43,52 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {      public boolean _sendUpdatedComponents() {          boolean worked = false; -        for (final String component : state.getAndClearUpdatedComponents()) { -            logger.info("Broadcasting new component {}", state.show(component)); -            broadcaster.broadcast(participants(), new ServiceOperation() { -                @Override public void run(String url) { -                    ClientService client = connections.getClient(url); -                    try { -                        client.setState(component, state.getDataOf(component), -                                state.getRevision(component)); -                    } catch (Exception e) { -                        logger.warn("Exception when connecting to client.", e); -                    } -               } -            }); +        for (final Component component : state.getAndClearUpdatedComponents()) { +            logger.info("Broadcasting new component {}", component); +            broadcastNewComponents(participants(), listWrap(component));              worked = true;          }          return worked;      } +    private <T>List<T> listWrap(T o) { +        List<T> list = new ArrayList<T>(); +        list.add(o); +        return list; +    } +          public synchronized boolean _sendFullState() {          boolean hasWork = _fullStateReceivers.size() != 0;          if (hasWork) {              final List<State.Component> components = state.getComponents(); -            broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() { -                @Override public void run(String url) { -                    ClientService client = connections.getClient(url); -                    for (Component c : components) { -                        try { -                            client.setState(c.getName(), c.getData(), c.getRevision()); -                        } catch (Exception e) { -                            logger.warn("Exception when connecting to client.", e); -                        } -                    } -                } -            }); +            broadcastNewComponents(participants(), components);              _fullStateReceivers.clear();          }          return hasWork;      } +    private synchronized void removeParticipant(String url) { +        logger.error("Remove participant {}: Operation not supported", url); +    } +     +    private void broadcastNewComponents(List<String> destinations, +            final List<State.Component> components) { +        broadcaster.broadcast(destinations, new ServiceOperation() { +            @Override public void run(String url) { +                 ClientService client = connections.getClient(url); +                 try { +                     for (Component c : components) { +                         client.setState(c.getName(), c.getData(), +                                 c.getRevision()); +                     } +                 } catch (Exception e) { +                     logger.warn("Client {} failed to receive state update."); +                     removeParticipant(url); +                 } +            } +        }); +    } +          private List<String> participants() {          return state.getList(".participants");      } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index 19b67e5..58cb5b5 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -156,10 +156,13 @@ public class State {          return list;      } -    public synchronized Set<String> getAndClearUpdatedComponents() { -        Set<String> copy = new TreeSet<String>(updatedComponents); +    public synchronized List<Component> getAndClearUpdatedComponents() { +        List<Component> components = new ArrayList<Component>(); +        for (String name : updatedComponents) { +            components.add(state.get(name)); +        }          updatedComponents.clear(); -        return copy; +        return components;      }      public static class Component { | 
