summaryrefslogtreecommitdiff
path: root/same/src/main
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 16:40:59 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 16:40:59 +0200
commit9d2ecae975b90ef46fb93fa04fc692108bd2ed60 (patch)
tree23a77c16bb235f5c2af458a34fe5e2b89aaa95e2 /same/src/main
parentb9469eb43b5647605ad7b9662f584c2efbef5153 (diff)
Remove Broadcaster.
– Broadcaster is not needed anymore with asynchronous RPC.
Diffstat (limited to 'same/src/main')
-rw-r--r--same/src/main/java/com/orbekk/same/Broadcaster.java11
-rw-r--r--same/src/main/java/com/orbekk/same/BroadcasterImpl.java32
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java8
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java13
-rw-r--r--same/src/main/java/com/orbekk/same/TestBroadcaster.java20
5 files changed, 6 insertions, 78 deletions
diff --git a/same/src/main/java/com/orbekk/same/Broadcaster.java b/same/src/main/java/com/orbekk/same/Broadcaster.java
deleted file mode 100644
index ce1ad4f..0000000
--- a/same/src/main/java/com/orbekk/same/Broadcaster.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-
-/**
- * An interface for broadcasting a message to all clients.
- */
-public interface Broadcaster {
- public void broadcast(final List<String> targets,
- final ServiceOperation operation);
-}
diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
deleted file mode 100644
index 27b8539..0000000
--- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-public class BroadcasterImpl implements Broadcaster {
- private Executor executor;
-
- /**
- * Get a BroadcastRunner for ClientService using a thread pool of size 20.
- */
- public static BroadcasterImpl getDefaultBroadcastRunner() {
- return new BroadcasterImpl(Executors.newFixedThreadPool(20));
- }
-
- public BroadcasterImpl(Executor executor) {
- this.executor = executor;
- }
-
- @Override
- public synchronized void broadcast(final List<String> targets,
- final ServiceOperation operation) {
- for (final String t : targets) {
- executor.execute(new Runnable() {
- @Override public void run() {
- operation.run(t);
- }
- });
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index fb3306e..b97b4ab 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -22,7 +22,6 @@ public class Master {
private String myUrl;
private String myLocation; // Protobuf server location, i.e., myIp:port
State state;
- private Broadcaster broadcaster;
private volatile int masterId = 1;
class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> {
@@ -46,19 +45,18 @@ public class Master {
}
public static Master create(ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String networkName,
+ String myUrl, String networkName,
String myLocation) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
state.update(".masterLocation", myLocation, 1);
- return new Master(state, connections, broadcaster, myUrl, myLocation);
+ return new Master(state, connections, myUrl, myLocation);
}
Master(State initialState, ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String myLocation) {
+ String myUrl, String myLocation) {
this.state = initialState;
this.connections = connections;
- this.broadcaster = broadcaster;
this.myUrl = myUrl;
this.myLocation = myLocation;
}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index e28db59..c793948 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -4,7 +4,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
-import com.orbekk.paxos.PaxosService;
import com.orbekk.paxos.PaxosServiceImpl;
import com.orbekk.protobuf.Rpc;
import com.orbekk.protobuf.SimpleProtobufServer;
@@ -18,7 +17,6 @@ public class SameController {
private PaxosServiceImpl paxos;
private Configuration configuration;
private ConnectionManager connections;
- private Broadcaster serviceBroadcaster;
/**
* Timeout for remote operations in milliseconds.
@@ -32,7 +30,7 @@ public class SameController {
configuration.get("pport");
String masterUrl = configuration.get("baseUrl") +
"MasterService.json";
- master = Master.create(connections, serviceBroadcaster,
+ master = Master.create(connections,
masterUrl, configuration.get("networkName"), myLocation);
master.resumeFrom(lastKnownState, masterId);
pServer.registerService(master.getNewService());
@@ -48,14 +46,12 @@ public class SameController {
};
public static SameController create(Configuration configuration) {
- int port = configuration.getInt("port");
int pport = configuration.getInt("pport");
String myLocation = configuration.get("localIp") + ":" + pport;
ConnectionManagerImpl connections = new ConnectionManagerImpl(
timeout, timeout);
State clientState = new State(".InvalidClientNetwork");
- Broadcaster broadcaster = BroadcasterImpl.getDefaultBroadcastRunner();
String baseUrl = String.format("http://%s:%s/",
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
@@ -66,10 +62,11 @@ public class SameController {
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
pServer.registerService(client.getNewService());
+ pServer.registerService(paxos.getService());
SameController controller = new SameController(
configuration, connections, client,
- paxos, broadcaster, pServer);
+ paxos, pServer);
return controller;
}
@@ -78,13 +75,11 @@ public class SameController {
ConnectionManager connections,
Client client,
PaxosServiceImpl paxos,
- Broadcaster serviceBroadcaster,
SimpleProtobufServer pServer) {
this.configuration = configuration;
this.connections = connections;
this.client = client;
this.paxos = paxos;
- this.serviceBroadcaster = serviceBroadcaster;
this.pServer = pServer;
}
@@ -116,8 +111,6 @@ public class SameController {
public void createNetwork(String networkName) {
masterController.disableMaster();
masterController.enableMaster(new State(networkName), 1);
- String masterUrl = configuration.get("baseUrl") +
- "MasterService.json";
joinNetwork(master.getMasterInfo());
}
diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
deleted file mode 100644
index bac4742..0000000
--- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-
-/**
- * This class is used in tests.
- */
-public class TestBroadcaster implements Broadcaster {
-
- public TestBroadcaster() {
- }
-
- @Override
- public void broadcast(final List<String> targets,
- final ServiceOperation operation) {
- for (String t : targets) {
- operation.run(t);
- }
- }
-}