diff options
Diffstat (limited to 'same/src/main/java/com')
6 files changed, 15 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java index 9ad7296..5efdcf3 100644 --- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java +++ b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java @@ -5,31 +5,25 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class BroadcasterImpl implements Broadcaster { - private ConnectionManager connections; private Executor executor; /** * Get a BroadcastRunner for ClientService using a thread pool of size 20. */ - public static BroadcasterImpl getDefaultBroadcastRunner( - ConnectionManager connections) { - return new BroadcasterImpl(Executors.newFixedThreadPool(20), - connections); + public static BroadcasterImpl getDefaultBroadcastRunner() { + return new BroadcasterImpl(Executors.newFixedThreadPool(20)); } - public BroadcasterImpl(Executor executor, - ConnectionManager connections) { - this.connections = connections; + public BroadcasterImpl(Executor executor) { this.executor = executor; } public synchronized void broadcast(final List<String> targets, final ServiceOperation operation) { - for (String t : targets) { - final ClientService client = connections.getClient(t); + for (final String t : targets) { executor.execute(new Runnable() { @Override public void run() { - operation.run(client); + operation.run(t); } }); } diff --git a/same/src/main/java/com/orbekk/same/ClientApp.java b/same/src/main/java/com/orbekk/same/ClientApp.java index 0effab4..34380a0 100644 --- a/same/src/main/java/com/orbekk/same/ClientApp.java +++ b/same/src/main/java/com/orbekk/same/ClientApp.java @@ -19,7 +19,7 @@ public class ClientApp { timeout); State state = new State(networkName); Broadcaster broadcaster = - BroadcasterImpl.getDefaultBroadcastRunner(connections); + BroadcasterImpl.getDefaultBroadcastRunner(); MasterServiceImpl master = new MasterServiceImpl(state, connections, broadcaster); ClientServiceImpl client = new ClientServiceImpl(state, connections); diff --git a/same/src/main/java/com/orbekk/same/MasterApp.java b/same/src/main/java/com/orbekk/same/MasterApp.java index 1aff6fc..ee5f50b 100644 --- a/same/src/main/java/com/orbekk/same/MasterApp.java +++ b/same/src/main/java/com/orbekk/same/MasterApp.java @@ -16,7 +16,7 @@ public class MasterApp { timeout); State state = new State("MasterNetwork"); Broadcaster broadcaster = - BroadcasterImpl.getDefaultBroadcastRunner(connections); + BroadcasterImpl.getDefaultBroadcastRunner(); MasterServiceImpl master = new MasterServiceImpl(state, connections, broadcaster); JsonRpcServer jsonServer = new JsonRpcServer(master, MasterService.class); diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index 5511dbf..a080b1e 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -10,7 +10,7 @@ import com.orbekk.same.State.Component; public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { private Logger logger = LoggerFactory.getLogger(getClass()); - private ConnectionManager connections; + private final ConnectionManager connections; private State state; private boolean stopped = false; private Broadcaster broadcaster; @@ -19,6 +19,7 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { public MasterServiceImpl(State initialState, ConnectionManager connections, Broadcaster broadcaster) { state = initialState; + this.connections = connections; this.broadcaster = broadcaster; } @@ -45,7 +46,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { for (final String component : state.getAndClearUpdatedComponents()) { logger.info("Broadcasting new component {}", state.show(component)); broadcaster.broadcast(participants(), new ServiceOperation() { - @Override public void run(ClientService client) { + @Override public void run(String url) { + ClientService client = connections.getClient(url); try { client.setState(component, state.getDataOf(component), state.getRevision(component)); @@ -64,7 +66,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { if (hasWork) { final List<State.Component> components = state.getComponents(); broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() { - @Override public void run(ClientService client) { + @Override public void run(String url) { + ClientService client = connections.getClient(url); for (Component c : components) { try { client.setState(c.getName(), c.getData(), c.getRevision()); diff --git a/same/src/main/java/com/orbekk/same/ServiceOperation.java b/same/src/main/java/com/orbekk/same/ServiceOperation.java index 5b99c0b..11c9ec7 100644 --- a/same/src/main/java/com/orbekk/same/ServiceOperation.java +++ b/same/src/main/java/com/orbekk/same/ServiceOperation.java @@ -1,5 +1,5 @@ package com.orbekk.same; public interface ServiceOperation { - void run(ClientService service); + void run(String url); } diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java index 1af605c..b2f9d8c 100644 --- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java +++ b/same/src/main/java/com/orbekk/same/TestBroadcaster.java @@ -6,20 +6,14 @@ import java.util.List; * This class is used in tests. */ public class TestBroadcaster implements Broadcaster { - public ConnectionManager connections; public TestBroadcaster() { } - public TestBroadcaster(ConnectionManager connections) { - this.connections = connections; - } - public void broadcast(final List<String> targets, final ServiceOperation operation) { for (String t : targets) { - ClientService client = connections.getClient(t); - operation.run(client); + operation.run(t); } } } |