From 9d2ecae975b90ef46fb93fa04fc692108bd2ed60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 16:40:59 +0200 Subject: Remove Broadcaster. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Broadcaster is not needed anymore with asynchronous RPC. --- .../src/main/java/com/orbekk/same/Broadcaster.java | 11 -------- .../main/java/com/orbekk/same/BroadcasterImpl.java | 32 ---------------------- same/src/main/java/com/orbekk/same/Master.java | 8 ++---- .../main/java/com/orbekk/same/SameController.java | 13 ++------- .../main/java/com/orbekk/same/TestBroadcaster.java | 20 -------------- 5 files changed, 6 insertions(+), 78 deletions(-) delete mode 100644 same/src/main/java/com/orbekk/same/Broadcaster.java delete mode 100644 same/src/main/java/com/orbekk/same/BroadcasterImpl.java delete mode 100644 same/src/main/java/com/orbekk/same/TestBroadcaster.java (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/same/Broadcaster.java b/same/src/main/java/com/orbekk/same/Broadcaster.java deleted file mode 100644 index ce1ad4f..0000000 --- a/same/src/main/java/com/orbekk/same/Broadcaster.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.orbekk.same; - -import java.util.List; - -/** - * An interface for broadcasting a message to all clients. - */ -public interface Broadcaster { - public void broadcast(final List targets, - final ServiceOperation operation); -} diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java deleted file mode 100644 index 27b8539..0000000 --- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.orbekk.same; - -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class BroadcasterImpl implements Broadcaster { - private Executor executor; - - /** - * Get a BroadcastRunner for ClientService using a thread pool of size 20. - */ - public static BroadcasterImpl getDefaultBroadcastRunner() { - return new BroadcasterImpl(Executors.newFixedThreadPool(20)); - } - - public BroadcasterImpl(Executor executor) { - this.executor = executor; - } - - @Override - public synchronized void broadcast(final List targets, - final ServiceOperation operation) { - for (final String t : targets) { - executor.execute(new Runnable() { - @Override public void run() { - operation.run(t); - } - }); - } - } -} diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index fb3306e..b97b4ab 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -22,7 +22,6 @@ public class Master { private String myUrl; private String myLocation; // Protobuf server location, i.e., myIp:port State state; - private Broadcaster broadcaster; private volatile int masterId = 1; class RemoveParticipantIfFailsCallback implements RpcCallback { @@ -46,19 +45,18 @@ public class Master { } public static Master create(ConnectionManager connections, - Broadcaster broadcaster, String myUrl, String networkName, + String myUrl, String networkName, String myLocation) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); state.update(".masterLocation", myLocation, 1); - return new Master(state, connections, broadcaster, myUrl, myLocation); + return new Master(state, connections, myUrl, myLocation); } Master(State initialState, ConnectionManager connections, - Broadcaster broadcaster, String myUrl, String myLocation) { + String myUrl, String myLocation) { this.state = initialState; this.connections = connections; - this.broadcaster = broadcaster; this.myUrl = myUrl; this.myLocation = myLocation; } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index e28db59..c793948 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -4,7 +4,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; -import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; import com.orbekk.protobuf.Rpc; import com.orbekk.protobuf.SimpleProtobufServer; @@ -18,7 +17,6 @@ public class SameController { private PaxosServiceImpl paxos; private Configuration configuration; private ConnectionManager connections; - private Broadcaster serviceBroadcaster; /** * Timeout for remote operations in milliseconds. @@ -32,7 +30,7 @@ public class SameController { configuration.get("pport"); String masterUrl = configuration.get("baseUrl") + "MasterService.json"; - master = Master.create(connections, serviceBroadcaster, + master = Master.create(connections, masterUrl, configuration.get("networkName"), myLocation); master.resumeFrom(lastKnownState, masterId); pServer.registerService(master.getNewService()); @@ -48,14 +46,12 @@ public class SameController { }; public static SameController create(Configuration configuration) { - int port = configuration.getInt("port"); int pport = configuration.getInt("pport"); String myLocation = configuration.get("localIp") + ":" + pport; ConnectionManagerImpl connections = new ConnectionManagerImpl( timeout, timeout); State clientState = new State(".InvalidClientNetwork"); - Broadcaster broadcaster = BroadcasterImpl.getDefaultBroadcastRunner(); String baseUrl = String.format("http://%s:%s/", configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; @@ -66,10 +62,11 @@ public class SameController { SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); pServer.registerService(client.getNewService()); + pServer.registerService(paxos.getService()); SameController controller = new SameController( configuration, connections, client, - paxos, broadcaster, pServer); + paxos, pServer); return controller; } @@ -78,13 +75,11 @@ public class SameController { ConnectionManager connections, Client client, PaxosServiceImpl paxos, - Broadcaster serviceBroadcaster, SimpleProtobufServer pServer) { this.configuration = configuration; this.connections = connections; this.client = client; this.paxos = paxos; - this.serviceBroadcaster = serviceBroadcaster; this.pServer = pServer; } @@ -116,8 +111,6 @@ public class SameController { public void createNetwork(String networkName) { masterController.disableMaster(); masterController.enableMaster(new State(networkName), 1); - String masterUrl = configuration.get("baseUrl") + - "MasterService.json"; joinNetwork(master.getMasterInfo()); } diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java deleted file mode 100644 index bac4742..0000000 --- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.orbekk.same; - -import java.util.List; - -/** - * This class is used in tests. - */ -public class TestBroadcaster implements Broadcaster { - - public TestBroadcaster() { - } - - @Override - public void broadcast(final List targets, - final ServiceOperation operation) { - for (String t : targets) { - operation.run(t); - } - } -} -- cgit v1.2.3