From e9773489eb1bc3f13d7b1619e03a33039937fb0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 18 Jan 2012 10:40:15 +0100 Subject: Refactor broadcast operations. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Add handling of client errors. --- .../java/com/orbekk/same/MasterServiceImpl.java | 57 ++++++++++++---------- same/src/main/java/com/orbekk/same/State.java | 9 ++-- 2 files changed, 38 insertions(+), 28 deletions(-) (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index a080b1e..f8044e4 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -43,45 +43,52 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { public boolean _sendUpdatedComponents() { boolean worked = false; - for (final String component : state.getAndClearUpdatedComponents()) { - logger.info("Broadcasting new component {}", state.show(component)); - broadcaster.broadcast(participants(), new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.setState(component, state.getDataOf(component), - state.getRevision(component)); - } catch (Exception e) { - logger.warn("Exception when connecting to client.", e); - } - } - }); + for (final Component component : state.getAndClearUpdatedComponents()) { + logger.info("Broadcasting new component {}", component); + broadcastNewComponents(participants(), listWrap(component)); worked = true; } return worked; } + private List listWrap(T o) { + List list = new ArrayList(); + list.add(o); + return list; + } + public synchronized boolean _sendFullState() { boolean hasWork = _fullStateReceivers.size() != 0; if (hasWork) { final List components = state.getComponents(); - broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - for (Component c : components) { - try { - client.setState(c.getName(), c.getData(), c.getRevision()); - } catch (Exception e) { - logger.warn("Exception when connecting to client.", e); - } - } - } - }); + broadcastNewComponents(participants(), components); _fullStateReceivers.clear(); } return hasWork; } + private synchronized void removeParticipant(String url) { + logger.error("Remove participant {}: Operation not supported", url); + } + + private void broadcastNewComponents(List destinations, + final List components) { + broadcaster.broadcast(destinations, new ServiceOperation() { + @Override public void run(String url) { + ClientService client = connections.getClient(url); + try { + for (Component c : components) { + client.setState(c.getName(), c.getData(), + c.getRevision()); + } + } catch (Exception e) { + logger.warn("Client {} failed to receive state update."); + removeParticipant(url); + } + } + }); + } + private List participants() { return state.getList(".participants"); } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index 19b67e5..58cb5b5 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -156,10 +156,13 @@ public class State { return list; } - public synchronized Set getAndClearUpdatedComponents() { - Set copy = new TreeSet(updatedComponents); + public synchronized List getAndClearUpdatedComponents() { + List components = new ArrayList(); + for (String name : updatedComponents) { + components.add(state.get(name)); + } updatedComponents.clear(); - return copy; + return components; } public static class Component { -- cgit v1.2.3