summaryrefslogtreecommitdiff
path: root/same
diff options
context:
space:
mode:
Diffstat (limited to 'same')
-rw-r--r--same/src/main/java/com/orbekk/same/Broadcaster.java11
-rw-r--r--same/src/main/java/com/orbekk/same/BroadcasterImpl.java32
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java8
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java13
-rw-r--r--same/src/main/java/com/orbekk/same/TestBroadcaster.java20
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java9
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java3
7 files changed, 11 insertions, 85 deletions
diff --git a/same/src/main/java/com/orbekk/same/Broadcaster.java b/same/src/main/java/com/orbekk/same/Broadcaster.java
deleted file mode 100644
index ce1ad4f..0000000
--- a/same/src/main/java/com/orbekk/same/Broadcaster.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-
-/**
- * An interface for broadcasting a message to all clients.
- */
-public interface Broadcaster {
- public void broadcast(final List<String> targets,
- final ServiceOperation operation);
-}
diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
deleted file mode 100644
index 27b8539..0000000
--- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-public class BroadcasterImpl implements Broadcaster {
- private Executor executor;
-
- /**
- * Get a BroadcastRunner for ClientService using a thread pool of size 20.
- */
- public static BroadcasterImpl getDefaultBroadcastRunner() {
- return new BroadcasterImpl(Executors.newFixedThreadPool(20));
- }
-
- public BroadcasterImpl(Executor executor) {
- this.executor = executor;
- }
-
- @Override
- public synchronized void broadcast(final List<String> targets,
- final ServiceOperation operation) {
- for (final String t : targets) {
- executor.execute(new Runnable() {
- @Override public void run() {
- operation.run(t);
- }
- });
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index fb3306e..b97b4ab 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -22,7 +22,6 @@ public class Master {
private String myUrl;
private String myLocation; // Protobuf server location, i.e., myIp:port
State state;
- private Broadcaster broadcaster;
private volatile int masterId = 1;
class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> {
@@ -46,19 +45,18 @@ public class Master {
}
public static Master create(ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String networkName,
+ String myUrl, String networkName,
String myLocation) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
state.update(".masterLocation", myLocation, 1);
- return new Master(state, connections, broadcaster, myUrl, myLocation);
+ return new Master(state, connections, myUrl, myLocation);
}
Master(State initialState, ConnectionManager connections,
- Broadcaster broadcaster, String myUrl, String myLocation) {
+ String myUrl, String myLocation) {
this.state = initialState;
this.connections = connections;
- this.broadcaster = broadcaster;
this.myUrl = myUrl;
this.myLocation = myLocation;
}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index e28db59..c793948 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -4,7 +4,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
-import com.orbekk.paxos.PaxosService;
import com.orbekk.paxos.PaxosServiceImpl;
import com.orbekk.protobuf.Rpc;
import com.orbekk.protobuf.SimpleProtobufServer;
@@ -18,7 +17,6 @@ public class SameController {
private PaxosServiceImpl paxos;
private Configuration configuration;
private ConnectionManager connections;
- private Broadcaster serviceBroadcaster;
/**
* Timeout for remote operations in milliseconds.
@@ -32,7 +30,7 @@ public class SameController {
configuration.get("pport");
String masterUrl = configuration.get("baseUrl") +
"MasterService.json";
- master = Master.create(connections, serviceBroadcaster,
+ master = Master.create(connections,
masterUrl, configuration.get("networkName"), myLocation);
master.resumeFrom(lastKnownState, masterId);
pServer.registerService(master.getNewService());
@@ -48,14 +46,12 @@ public class SameController {
};
public static SameController create(Configuration configuration) {
- int port = configuration.getInt("port");
int pport = configuration.getInt("pport");
String myLocation = configuration.get("localIp") + ":" + pport;
ConnectionManagerImpl connections = new ConnectionManagerImpl(
timeout, timeout);
State clientState = new State(".InvalidClientNetwork");
- Broadcaster broadcaster = BroadcasterImpl.getDefaultBroadcastRunner();
String baseUrl = String.format("http://%s:%s/",
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
@@ -66,10 +62,11 @@ public class SameController {
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
pServer.registerService(client.getNewService());
+ pServer.registerService(paxos.getService());
SameController controller = new SameController(
configuration, connections, client,
- paxos, broadcaster, pServer);
+ paxos, pServer);
return controller;
}
@@ -78,13 +75,11 @@ public class SameController {
ConnectionManager connections,
Client client,
PaxosServiceImpl paxos,
- Broadcaster serviceBroadcaster,
SimpleProtobufServer pServer) {
this.configuration = configuration;
this.connections = connections;
this.client = client;
this.paxos = paxos;
- this.serviceBroadcaster = serviceBroadcaster;
this.pServer = pServer;
}
@@ -116,8 +111,6 @@ public class SameController {
public void createNetwork(String networkName) {
masterController.disableMaster();
masterController.enableMaster(new State(networkName), 1);
- String masterUrl = configuration.get("baseUrl") +
- "MasterService.json";
joinNetwork(master.getMasterInfo());
}
diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
deleted file mode 100644
index bac4742..0000000
--- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.orbekk.same;
-
-import java.util.List;
-
-/**
- * This class is used in tests.
- */
-public class TestBroadcaster implements Broadcaster {
-
- public TestBroadcaster() {
- }
-
- @Override
- public void broadcast(final List<String> targets,
- final ServiceOperation operation) {
- for (String t : targets) {
- operation.run(t);
- }
- }
-}
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 839b107..c5d9a56 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -29,11 +29,10 @@ public class FunctionalTest {
VariableFactory vf3;
List<Client> clients = new ArrayList<Client>();
TestConnectionManager connections = new TestConnectionManager();
- TestBroadcaster broadcaster = new TestBroadcaster();
@Before public void setUp() {
master = Master.create(connections,
- broadcaster, masterUrl, "TestMaster", masterLocation);
+ masterUrl, "TestMaster", masterLocation);
connections.masterMap0.put(masterLocation, master.getNewService());
client1 = newClient("TestClient1", "http://client1/ClientService.json",
"client1");
@@ -113,7 +112,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
- broadcaster, newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation);
joinClients();
MasterController controller = new MasterController() {
@Override
@@ -137,7 +136,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
- broadcaster, newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation);
joinClients();
MasterController controller = new MasterController() {
boolean firstMaster = true;
@@ -165,7 +164,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:2";
final Master newMaster = Master.create(connections,
- broadcaster, newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation);
joinClients();
MasterController controller = new MasterController() {
@Override
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 4fa0a5f..50b3302 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -13,7 +13,6 @@ import org.junit.Test;
public class MasterTest {
private State state = new State("TestNetwork");
private TestConnectionManager connections = new TestConnectionManager();
- private TestBroadcaster broadcaster = new TestBroadcaster();
private Master master;
@Before
@@ -21,7 +20,7 @@ public class MasterTest {
String masterLocation = "master:1000";
state.update(".masterUrl", "http://master/MasterService.json", 1);
state.update(".masterLocation", masterLocation, 1);
- master = new Master(state, connections, broadcaster,
+ master = new Master(state, connections,
"http://master/MasterService.json", masterLocation);
connections.masterMap0.put("master:1000", master.getNewService());
}