summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-18 10:23:30 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-18 10:23:30 +0100
commita076bf70f82e4100feddac63224041465f75c64e (patch)
tree4c88483e9a88295177e4d67b4c148d9c21cebe36 /same/src/main/java/com/orbekk
parent5a21215b4ea85bfa4da8d3fbb0f87916c2f4635a (diff)
Refactor Broadcaster.
Take a client url instead of a client parameter, to handle error conditions.
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r--same/src/main/java/com/orbekk/same/BroadcasterImpl.java16
-rw-r--r--same/src/main/java/com/orbekk/same/ClientApp.java2
-rw-r--r--same/src/main/java/com/orbekk/same/MasterApp.java2
-rw-r--r--same/src/main/java/com/orbekk/same/MasterServiceImpl.java9
-rw-r--r--same/src/main/java/com/orbekk/same/ServiceOperation.java2
-rw-r--r--same/src/main/java/com/orbekk/same/TestBroadcaster.java8
6 files changed, 15 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
index 9ad7296..5efdcf3 100644
--- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
+++ b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
@@ -5,31 +5,25 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class BroadcasterImpl implements Broadcaster {
- private ConnectionManager connections;
private Executor executor;
/**
* Get a BroadcastRunner for ClientService using a thread pool of size 20.
*/
- public static BroadcasterImpl getDefaultBroadcastRunner(
- ConnectionManager connections) {
- return new BroadcasterImpl(Executors.newFixedThreadPool(20),
- connections);
+ public static BroadcasterImpl getDefaultBroadcastRunner() {
+ return new BroadcasterImpl(Executors.newFixedThreadPool(20));
}
- public BroadcasterImpl(Executor executor,
- ConnectionManager connections) {
- this.connections = connections;
+ public BroadcasterImpl(Executor executor) {
this.executor = executor;
}
public synchronized void broadcast(final List<String> targets,
final ServiceOperation operation) {
- for (String t : targets) {
- final ClientService client = connections.getClient(t);
+ for (final String t : targets) {
executor.execute(new Runnable() {
@Override public void run() {
- operation.run(client);
+ operation.run(t);
}
});
}
diff --git a/same/src/main/java/com/orbekk/same/ClientApp.java b/same/src/main/java/com/orbekk/same/ClientApp.java
index 0effab4..34380a0 100644
--- a/same/src/main/java/com/orbekk/same/ClientApp.java
+++ b/same/src/main/java/com/orbekk/same/ClientApp.java
@@ -19,7 +19,7 @@ public class ClientApp {
timeout);
State state = new State(networkName);
Broadcaster broadcaster =
- BroadcasterImpl.getDefaultBroadcastRunner(connections);
+ BroadcasterImpl.getDefaultBroadcastRunner();
MasterServiceImpl master = new MasterServiceImpl(state, connections,
broadcaster);
ClientServiceImpl client = new ClientServiceImpl(state, connections);
diff --git a/same/src/main/java/com/orbekk/same/MasterApp.java b/same/src/main/java/com/orbekk/same/MasterApp.java
index 1aff6fc..ee5f50b 100644
--- a/same/src/main/java/com/orbekk/same/MasterApp.java
+++ b/same/src/main/java/com/orbekk/same/MasterApp.java
@@ -16,7 +16,7 @@ public class MasterApp {
timeout);
State state = new State("MasterNetwork");
Broadcaster broadcaster =
- BroadcasterImpl.getDefaultBroadcastRunner(connections);
+ BroadcasterImpl.getDefaultBroadcastRunner();
MasterServiceImpl master = new MasterServiceImpl(state, connections,
broadcaster);
JsonRpcServer jsonServer = new JsonRpcServer(master, MasterService.class);
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
index 5511dbf..a080b1e 100644
--- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
+++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java
@@ -10,7 +10,7 @@ import com.orbekk.same.State.Component;
public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
private Logger logger = LoggerFactory.getLogger(getClass());
- private ConnectionManager connections;
+ private final ConnectionManager connections;
private State state;
private boolean stopped = false;
private Broadcaster broadcaster;
@@ -19,6 +19,7 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
public MasterServiceImpl(State initialState, ConnectionManager connections,
Broadcaster broadcaster) {
state = initialState;
+ this.connections = connections;
this.broadcaster = broadcaster;
}
@@ -45,7 +46,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
for (final String component : state.getAndClearUpdatedComponents()) {
logger.info("Broadcasting new component {}", state.show(component));
broadcaster.broadcast(participants(), new ServiceOperation() {
- @Override public void run(ClientService client) {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
try {
client.setState(component, state.getDataOf(component),
state.getRevision(component));
@@ -64,7 +66,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable {
if (hasWork) {
final List<State.Component> components = state.getComponents();
broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() {
- @Override public void run(ClientService client) {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
for (Component c : components) {
try {
client.setState(c.getName(), c.getData(), c.getRevision());
diff --git a/same/src/main/java/com/orbekk/same/ServiceOperation.java b/same/src/main/java/com/orbekk/same/ServiceOperation.java
index 5b99c0b..11c9ec7 100644
--- a/same/src/main/java/com/orbekk/same/ServiceOperation.java
+++ b/same/src/main/java/com/orbekk/same/ServiceOperation.java
@@ -1,5 +1,5 @@
package com.orbekk.same;
public interface ServiceOperation {
- void run(ClientService service);
+ void run(String url);
}
diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
index 1af605c..b2f9d8c 100644
--- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java
+++ b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
@@ -6,20 +6,14 @@ import java.util.List;
* This class is used in tests.
*/
public class TestBroadcaster implements Broadcaster {
- public ConnectionManager connections;
public TestBroadcaster() {
}
- public TestBroadcaster(ConnectionManager connections) {
- this.connections = connections;
- }
-
public void broadcast(final List<String> targets,
final ServiceOperation operation) {
for (String t : targets) {
- ClientService client = connections.getClient(t);
- operation.run(client);
+ operation.run(t);
}
}
}