From a076bf70f82e4100feddac63224041465f75c64e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 18 Jan 2012 10:23:30 +0100 Subject: Refactor Broadcaster. Take a client url instead of a client parameter, to handle error conditions. --- same/src/main/java/com/orbekk/same/BroadcasterImpl.java | 16 +++++----------- same/src/main/java/com/orbekk/same/ClientApp.java | 2 +- same/src/main/java/com/orbekk/same/MasterApp.java | 2 +- .../src/main/java/com/orbekk/same/MasterServiceImpl.java | 9 ++++++--- same/src/main/java/com/orbekk/same/ServiceOperation.java | 2 +- same/src/main/java/com/orbekk/same/TestBroadcaster.java | 8 +------- 6 files changed, 15 insertions(+), 24 deletions(-) (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java index 9ad7296..5efdcf3 100644 --- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java +++ b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java @@ -5,31 +5,25 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class BroadcasterImpl implements Broadcaster { - private ConnectionManager connections; private Executor executor; /** * Get a BroadcastRunner for ClientService using a thread pool of size 20. */ - public static BroadcasterImpl getDefaultBroadcastRunner( - ConnectionManager connections) { - return new BroadcasterImpl(Executors.newFixedThreadPool(20), - connections); + public static BroadcasterImpl getDefaultBroadcastRunner() { + return new BroadcasterImpl(Executors.newFixedThreadPool(20)); } - public BroadcasterImpl(Executor executor, - ConnectionManager connections) { - this.connections = connections; + public BroadcasterImpl(Executor executor) { this.executor = executor; } public synchronized void broadcast(final List targets, final ServiceOperation operation) { - for (String t : targets) { - final ClientService client = connections.getClient(t); + for (final String t : targets) { executor.execute(new Runnable() { @Override public void run() { - operation.run(client); + operation.run(t); } }); } diff --git a/same/src/main/java/com/orbekk/same/ClientApp.java b/same/src/main/java/com/orbekk/same/ClientApp.java index 0effab4..34380a0 100644 --- a/same/src/main/java/com/orbekk/same/ClientApp.java +++ b/same/src/main/java/com/orbekk/same/ClientApp.java @@ -19,7 +19,7 @@ public class ClientApp { timeout); State state = new State(networkName); Broadcaster broadcaster = - BroadcasterImpl.getDefaultBroadcastRunner(connections); + BroadcasterImpl.getDefaultBroadcastRunner(); MasterServiceImpl master = new MasterServiceImpl(state, connections, broadcaster); ClientServiceImpl client = new ClientServiceImpl(state, connections); diff --git a/same/src/main/java/com/orbekk/same/MasterApp.java b/same/src/main/java/com/orbekk/same/MasterApp.java index 1aff6fc..ee5f50b 100644 --- a/same/src/main/java/com/orbekk/same/MasterApp.java +++ b/same/src/main/java/com/orbekk/same/MasterApp.java @@ -16,7 +16,7 @@ public class MasterApp { timeout); State state = new State("MasterNetwork"); Broadcaster broadcaster = - BroadcasterImpl.getDefaultBroadcastRunner(connections); + BroadcasterImpl.getDefaultBroadcastRunner(); MasterServiceImpl master = new MasterServiceImpl(state, connections, broadcaster); JsonRpcServer jsonServer = new JsonRpcServer(master, MasterService.class); diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index 5511dbf..a080b1e 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -10,7 +10,7 @@ import com.orbekk.same.State.Component; public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { private Logger logger = LoggerFactory.getLogger(getClass()); - private ConnectionManager connections; + private final ConnectionManager connections; private State state; private boolean stopped = false; private Broadcaster broadcaster; @@ -19,6 +19,7 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { public MasterServiceImpl(State initialState, ConnectionManager connections, Broadcaster broadcaster) { state = initialState; + this.connections = connections; this.broadcaster = broadcaster; } @@ -45,7 +46,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { for (final String component : state.getAndClearUpdatedComponents()) { logger.info("Broadcasting new component {}", state.show(component)); broadcaster.broadcast(participants(), new ServiceOperation() { - @Override public void run(ClientService client) { + @Override public void run(String url) { + ClientService client = connections.getClient(url); try { client.setState(component, state.getDataOf(component), state.getRevision(component)); @@ -64,7 +66,8 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { if (hasWork) { final List components = state.getComponents(); broadcaster.broadcast(_fullStateReceivers, new ServiceOperation() { - @Override public void run(ClientService client) { + @Override public void run(String url) { + ClientService client = connections.getClient(url); for (Component c : components) { try { client.setState(c.getName(), c.getData(), c.getRevision()); diff --git a/same/src/main/java/com/orbekk/same/ServiceOperation.java b/same/src/main/java/com/orbekk/same/ServiceOperation.java index 5b99c0b..11c9ec7 100644 --- a/same/src/main/java/com/orbekk/same/ServiceOperation.java +++ b/same/src/main/java/com/orbekk/same/ServiceOperation.java @@ -1,5 +1,5 @@ package com.orbekk.same; public interface ServiceOperation { - void run(ClientService service); + void run(String url); } diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java index 1af605c..b2f9d8c 100644 --- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java +++ b/same/src/main/java/com/orbekk/same/TestBroadcaster.java @@ -6,20 +6,14 @@ import java.util.List; * This class is used in tests. */ public class TestBroadcaster implements Broadcaster { - public ConnectionManager connections; public TestBroadcaster() { } - public TestBroadcaster(ConnectionManager connections) { - this.connections = connections; - } - public void broadcast(final List targets, final ServiceOperation operation) { for (String t : targets) { - ClientService client = connections.getClient(t); - operation.run(client); + operation.run(t); } } } -- cgit v1.2.3