From 043dc32807afc0e82b3b89e99bc4eea254fa9062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Sun, 15 Jan 2012 18:29:43 +0100 Subject: Add broadcasting from master. --- .../main/java/com/orbekk/same/BroadcastRunner.java | 33 ++++++++++++++++++++++ .../java/com/orbekk/same/MasterServiceImpl.java | 30 ++++++++++++++------ .../java/com/orbekk/same/ServiceOperation.java | 5 ++++ 3 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 same/src/main/java/com/orbekk/same/BroadcastRunner.java create mode 100644 same/src/main/java/com/orbekk/same/ServiceOperation.java (limited to 'same/src/main/java/com/orbekk') diff --git a/same/src/main/java/com/orbekk/same/BroadcastRunner.java b/same/src/main/java/com/orbekk/same/BroadcastRunner.java new file mode 100644 index 0000000..45935b6 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/BroadcastRunner.java @@ -0,0 +1,33 @@ +package com.orbekk.same; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class BroadcastRunner { + private ConnectionManager connections; + private Executor executor; + + /** + * Get a BroadcastRunner for ClientService using a thread pool of size 20. + */ + public static BroadcastRunner getDefaultBroadcastRunner() { + return new BroadcastRunner(Executors.newFixedThreadPool(20)); + } + + public BroadcastRunner(Executor executor, ConnectionManager connections) { + this.executor = executor; + } + + public synchronized void broadcast(final List targets, + final ServiceOperation operation) { + for (String t : targets) { + final ClientService client = connections.getConnection(t); + executor.execute(new Runnable() { + @Override public void run() { + operation.run(client); + } + }); + } + } +} diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java index 272e405..dcebb80 100644 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java @@ -7,17 +7,21 @@ import org.slf4j.LoggerFactory; public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { private Logger logger = LoggerFactory.getLogger(getClass()); + private ConnectionManager connections; private State state; private boolean stopped = false; - - public MasterServiceImpl(State initialState) { + private BroadcastRunner broadcaster; + + public MasterServiceImpl(State initialState, ConnectionManager connections, + BroadcastRunner broadcaster) { state = initialState; - } + this.broadcaster = broadcaster; +} @Override public void joinNetworkRequest(String networkName, String clientUrl) { if (networkName.equals(state.getDataOf(".networkName"))) { - List participants = state.getList(".participants"); + List participants = participants(); if (!participants.contains(clientUrl)) { participants.add(clientUrl); synchronized(this) { @@ -35,17 +39,27 @@ public class MasterServiceImpl implements MasterService, UrlReceiver, Runnable { state.getDataOf(".networkName") }); } } - public boolean _handleJoinNetworkRequests() { boolean worked = false; - for (String component : state.getAndClearUpdatedComponents()) { - logger.error("TODO: Send state update for component {}", - state.show(component)); + for (final String component : state.getAndClearUpdatedComponents()) { + logger.info("Broadcasting new component {}", state.show(component)); + + broadcaster.broadcast(participants(), new ServiceOperation() { + @Override void run(ClientService client) { + client.setState(component, state.getDataOf(component), + state.getRevision(component)); + } + }); worked = true; } return worked; } + private List participants() { + return state.getList(".participants"); + } + + @Override public boolean updateStateRequest(String component, String newData, long revision) { // TODO Auto-generated method stub diff --git a/same/src/main/java/com/orbekk/same/ServiceOperation.java b/same/src/main/java/com/orbekk/same/ServiceOperation.java new file mode 100644 index 0000000..5b99c0b --- /dev/null +++ b/same/src/main/java/com/orbekk/same/ServiceOperation.java @@ -0,0 +1,5 @@ +package com.orbekk.same; + +public interface ServiceOperation { + void run(ClientService service); +} -- cgit v1.2.3