diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-01-18 10:40:15 +0100 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-01-18 10:40:15 +0100 |
commit | e9773489eb1bc3f13d7b1619e03a33039937fb0a (patch) | |
tree | ef7bbc893d9bbc1e12ee698b60569c6178461b8f | |
parent | a076bf70f82e4100feddac63224041465f75c64e (diff) |
Refactor broadcast operations.
– Add handling of client errors.
-rw-r--r-- | same/src/main/java/com/orbekk/same/MasterServiceImpl.java | 57 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/State.java | 9 |
2 files changed, 38 insertions, 28 deletions
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index a080b1e..f8044e4 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -43,45 +43,52 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { public boolean _sendUpdatedComponents() { boolean worked = false; - for (final String component : state.getAndClearUpdatedComponents()) { - logger.info("Broadcasting new component {}", state.show(component)); - broadcaster.broadcast(participants(), new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.setState(component, state.getDataOf(component), - state.getRevision(component)); - } catch (Exception e) { - logger.warn("Exception when connecting to client.", e); - } - } - }); + for (final Component component : state.getAndClearUpdatedComponents()) { + logger.info("Broadcasting new component {}", component); + broadcastNewComponents(participants(), listWrap(component)); worked = true; } return worked; } + private <T>List<T> listWrap(T o) { + List<T> list = new ArrayList<T>(); + list.add(o); + return list; + } + public synchronized boolean _sendFullState() { boolean hasWork = _fullStateReceivers.size() != 0; if (hasWork) { final List<State.Component> components = state.getComponents(); - broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - for (Component c : components) { - try { - client.setState(c.getName(), c.getData(), c.getRevision()); - } catch (Exception e) { - logger.warn("Exception when connecting to client.", e); - } - } - } - }); + broadcastNewComponents(participants(), components); _fullStateReceivers.clear(); } return hasWork; } + private synchronized void removeParticipant(String url) { + logger.error("Remove participant {}: Operation not supported", url); + } + + private void broadcastNewComponents(List<String> destinations, + final List<State.Component> components) { + broadcaster.broadcast(destinations, new ServiceOperation() { + @Override public void run(String url) { + ClientService client = connections.getClient(url); + try { + for (Component c : components) { + client.setState(c.getName(), c.getData(), + c.getRevision()); + } + } catch (Exception e) { + logger.warn("Client {} failed to receive state update."); + removeParticipant(url); + } + } + }); + } + private List<String> participants() { return state.getList(".participants"); } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index 19b67e5..58cb5b5 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -156,10 +156,13 @@ public class State { return list; } - public synchronized Set<String> getAndClearUpdatedComponents() { - Set<String> copy = new TreeSet<String>(updatedComponents); + public synchronized List<Component> getAndClearUpdatedComponents() { + List<Component> components = new ArrayList<Component>(); + for (String name : updatedComponents) { + components.add(state.get(name)); + } updatedComponents.clear(); - return copy; + return components; } public static class Component { |