summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/src/main/java/com/orbekk/same/BroadcastRunner.java33
-rw-r--r--same/src/main/java/com/orbekk/same/MasterServiceImpl.java30
-rw-r--r--same/src/main/java/com/orbekk/same/ServiceOperation.java5
3 files changed, 60 insertions, 8 deletions
diff --git a/same/src/main/java/com/orbekk/same/BroadcastRunner.java b/same/src/main/java/com/orbekk/same/BroadcastRunner.java
new file mode 100644
index 0000000..45935b6
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/BroadcastRunner.java
@@ -0,0 +1,33 @@
+package com.orbekk.same;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class BroadcastRunner {
+ private ConnectionManager connections;
+ private Executor executor;
+
+ /**
+ * Get a BroadcastRunner for ClientService using a thread pool of size 20.
+ */
+ public static BroadcastRunner getDefaultBroadcastRunner() {
+ return new BroadcastRunner(Executors.newFixedThreadPool(20));
+ }
+
+ public BroadcastRunner(Executor executor, ConnectionManager connections) {
+ this.executor = executor;
+ }
+
+ public synchronized void broadcast(final List<String> targets,
+ final ServiceOperation operation) {
+ for (String t : targets) {
+ final ClientService client = connections.getConnection(t);
+ executor.execute(new Runnable() {
+ @Override public void run() {
+ operation.run(client);
+ }
+ });
+ }
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
index 272e405..dcebb80 100644
--- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
+++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
@@ -7,17 +7,21 @@ import org.slf4j.LoggerFactory;
public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
private Logger logger = LoggerFactory.getLogger(getClass());
+ private ConnectionManager connections;
private State state;
private boolean stopped = false;
-
- public MasterServiceImpl(State initialState) {
+ private BroadcastRunner broadcaster;
+
+ public MasterServiceImpl(State initialState, ConnectionManager connections,
+ BroadcastRunner broadcaster) {
state = initialState;
- }
+ this.broadcaster = broadcaster;
+}
@Override
public void joinNetworkRequest(String networkName, String clientUrl) {
if (networkName.equals(state.getDataOf(".networkName"))) {
- List<String> participants = state.getList(".participants");
+ List<String> participants = participants();
if (!participants.contains(clientUrl)) {
participants.add(clientUrl);
synchronized(this) {
@@ -35,17 +39,27 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
state.getDataOf(".networkName") });
}
}
-
public boolean _handleJoinNetworkRequests() {
boolean worked = false;
- for (String component : state.getAndClearUpdatedComponents()) {
- logger.error("TODO: Send state update for component {}",
- state.show(component));
+ for (final String component : state.getAndClearUpdatedComponents()) {
+ logger.info("Broadcasting new component {}", state.show(component));
+
+ broadcaster.broadcast(participants(), new ServiceOperation() {
+ @Override void run(ClientService client) {
+ client.setState(component, state.getDataOf(component),
+ state.getRevision(component));
+ }
+ });
worked = true;
}
return worked;
}
+ private List<String> participants() {
+ return state.getList(".participants");
+ }
+
+
@Override
public boolean updateStateRequest(String component, String newData, long revision) {
// TODO Auto-generated method stub
diff --git a/same/src/main/java/com/orbekk/same/ServiceOperation.java b/same/src/main/java/com/orbekk/same/ServiceOperation.java
new file mode 100644
index 0000000..5b99c0b
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/ServiceOperation.java
@@ -0,0 +1,5 @@
+package com.orbekk.same;
+
+public interface ServiceOperation {
+ void run(ClientService service);
+}