summaryrefslogtreecommitdiff
path: root/same/src/main
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-18 10:40:15 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-18 10:40:15 +0100
commite9773489eb1bc3f13d7b1619e03a33039937fb0a (patch)
treeef7bbc893d9bbc1e12ee698b60569c6178461b8f /same/src/main
parenta076bf70f82e4100feddac63224041465f75c64e (diff)
Refactor broadcast operations.
– Add handling of client errors.
Diffstat (limited to 'same/src/main')
-rw-r--r--same/src/main/java/com/orbekk/same/MasterServiceImpl.java57
-rw-r--r--same/src/main/java/com/orbekk/same/State.java9
2 files changed, 38 insertions, 28 deletions
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
index a080b1e..f8044e4 100644
--- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
+++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
@@ -43,45 +43,52 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
public boolean _sendUpdatedComponents() {
boolean worked = false;
- for (final String component : state.getAndClearUpdatedComponents()) {
- logger.info("Broadcasting new component {}", state.show(component));
- broadcaster.broadcast(participants(), new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.setState(component, state.getDataOf(component),
- state.getRevision(component));
- } catch (Exception e) {
- logger.warn("Exception when connecting to client.", e);
- }
- }
- });
+ for (final Component component : state.getAndClearUpdatedComponents()) {
+ logger.info("Broadcasting new component {}", component);
+ broadcastNewComponents(participants(), listWrap(component));
worked = true;
}
return worked;
}
+ private <T>List<T> listWrap(T o) {
+ List<T> list = new ArrayList<T>();
+ list.add(o);
+ return list;
+ }
+
public synchronized boolean _sendFullState() {
boolean hasWork = _fullStateReceivers.size() != 0;
if (hasWork) {
final List<State.Component> components = state.getComponents();
- broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- for (Component c : components) {
- try {
- client.setState(c.getName(), c.getData(), c.getRevision());
- } catch (Exception e) {
- logger.warn("Exception when connecting to client.", e);
- }
- }
- }
- });
+ broadcastNewComponents(participants(), components);
_fullStateReceivers.clear();
}
return hasWork;
}
+ private synchronized void removeParticipant(String url) {
+ logger.error("Remove participant {}: Operation not supported", url);
+ }
+
+ private void broadcastNewComponents(List<String> destinations,
+ final List<State.Component> components) {
+ broadcaster.broadcast(destinations, new ServiceOperation() {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
+ try {
+ for (Component c : components) {
+ client.setState(c.getName(), c.getData(),
+ c.getRevision());
+ }
+ } catch (Exception e) {
+ logger.warn("Client {} failed to receive state update.");
+ removeParticipant(url);
+ }
+ }
+ });
+ }
+
private List<String> participants() {
return state.getList(".participants");
}
diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java
index 19b67e5..58cb5b5 100644
--- a/same/src/main/java/com/orbekk/same/State.java
+++ b/same/src/main/java/com/orbekk/same/State.java
@@ -156,10 +156,13 @@ public class State {
return list;
}
- public synchronized Set<String> getAndClearUpdatedComponents() {
- Set<String> copy = new TreeSet<String>(updatedComponents);
+ public synchronized List<Component> getAndClearUpdatedComponents() {
+ List<Component> components = new ArrayList<Component>();
+ for (String name : updatedComponents) {
+ components.add(state.get(name));
+ }
updatedComponents.clear();
- return copy;
+ return components;
}
public static class Component {