summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 10:59:52 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 10:59:52 +0200
commita4a0c66483fd763901c43513f8fbca65b0e7c5a9 (patch)
tree086172868d6e353b81cfbae028e664fa9a47cfbc
parent478a8b1abb65c5abe1bb8d752d2cae01654e0834 (diff)
Set timeouts for all RPCs.
Implemented with an RpcFactory.
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java9
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java12
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java1
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java14
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java16
-rw-r--r--same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java12
-rw-r--r--same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java14
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java11
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java5
9 files changed, 57 insertions, 37 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 048995e..e2723a3 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.ConnectionManager;
+import com.orbekk.same.RpcFactory;
import com.orbekk.same.Services;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.PaxosRequest;
@@ -23,12 +24,14 @@ public class MasterProposer extends Thread {
private final ClientState client;
private final List<String> paxosLocations;
private final ConnectionManager connections;
+ private final RpcFactory rpcf;
public MasterProposer(ClientState client, List<String> paxosLocations,
- ConnectionManager connections) {
+ ConnectionManager connections, RpcFactory rpcf) {
this.client = client;
this.paxosLocations = paxosLocations;
this.connections = connections;
+ this.rpcf = rpcf;
}
private class ResponseHandler implements RpcCallback<PaxosResponse> {
@@ -87,7 +90,7 @@ public class MasterProposer extends Thread {
ResponseHandler handler = new ResponseHandler(proposalNumber,
paxosLocations.size());
for (String location : paxosLocations) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Paxos paxos = connections.getPaxos0(location);
if (paxos == null) {
handler.run(null);
@@ -107,7 +110,7 @@ public class MasterProposer extends Thread {
ResponseHandler handler = new ResponseHandler(proposalNumber,
paxosLocations.size());
for (String location : paxosLocations) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Paxos paxos = connections.getPaxos0(location);
PaxosRequest request = PaxosRequest.newBuilder()
.setClient(client)
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 6582e4a..27ce787 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -30,6 +30,7 @@ public class Client {
private volatile MasterController masterController = null;
private volatile Future<Integer> currentMasterProposal = null;
private volatile MasterState masterInfo;
+ private final RpcFactory rpcf;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -62,7 +63,7 @@ public class Client {
startMasterElection();
return op;
}
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Services.UpdateComponentResponse> done =
new RpcCallback<Services.UpdateComponentResponse>() {
@Override
@@ -160,11 +161,12 @@ public class Client {
};
public Client(State state, ConnectionManager connections,
- String myUrl, String myLocation) {
+ String myUrl, String myLocation, RpcFactory rpcf) {
this.state = state;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
+ this.rpcf = rpcf;
}
public void start() {
@@ -212,7 +214,7 @@ public class Client {
Services.Master master =
connections.getMaster0(masterInfo.getMasterLocation());
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Empty> done = new RpcCallback<Empty>() {
@Override public void run(Empty unused) {
if (!rpc.isOk()) {
@@ -244,7 +246,7 @@ public class Client {
List<String> paxosUrls = state.getList(State.PARTICIPANTS);
paxosUrls.remove(failedMaster.getMasterLocation());
MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
- connections);
+ connections, rpcf);
if (masterController == null) {
logger.warn("Could not become master: No master controller.");
return;
@@ -297,7 +299,7 @@ public class Client {
};
for (String location : participants) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Client client = connections.getClient0(location);
if (client != null) {
client.masterDown(rpc, failedMaster, done);
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index ac0abe0..0e199d6 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.googlecode.jsonrpc4j.ProxyUtil;
import com.orbekk.paxos.PaxosService;
import com.orbekk.protobuf.RpcChannel;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index b97b4ab..397aa12 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -23,6 +23,7 @@ public class Master {
private String myLocation; // Protobuf server location, i.e., myIp:port
State state;
private volatile int masterId = 1;
+ private final RpcFactory rpcf;
class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> {
private final String participantLocation;
@@ -46,19 +47,20 @@ public class Master {
public static Master create(ConnectionManager connections,
String myUrl, String networkName,
- String myLocation) {
+ String myLocation, RpcFactory rpcf) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
state.update(".masterLocation", myLocation, 1);
- return new Master(state, connections, myUrl, myLocation);
+ return new Master(state, connections, myUrl, myLocation, rpcf);
}
Master(State initialState, ConnectionManager connections,
- String myUrl, String myLocation) {
+ String myUrl, String myLocation, RpcFactory rpcf) {
this.state = initialState;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
+ this.rpcf = rpcf;
}
public String getNetworkName() {
@@ -131,7 +133,7 @@ public class Master {
for (Component component : components) {
Services.Component componentProto = componentToProto(component);
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
rpc);
@@ -152,7 +154,7 @@ public class Master {
}
{ // Send masterTakeover().
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(
clientLocation, rpc);
@@ -232,7 +234,7 @@ public class Master {
for (final String location : state.getList(State.PARTICIPANTS)) {
Services.Client client = connections.getClient0(location);
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Empty> done = new RpcCallback<Empty>() {
@Override public void run(Empty unused) {
if (!rpc.isOk()) {
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index c793948..9438c7b 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -17,6 +17,7 @@ public class SameController {
private PaxosServiceImpl paxos;
private Configuration configuration;
private ConnectionManager connections;
+ private final RpcFactory rpcf;
/**
* Timeout for remote operations in milliseconds.
@@ -31,7 +32,8 @@ public class SameController {
String masterUrl = configuration.get("baseUrl") +
"MasterService.json";
master = Master.create(connections,
- masterUrl, configuration.get("networkName"), myLocation);
+ masterUrl, configuration.get("networkName"), myLocation,
+ rpcf);
master.resumeFrom(lastKnownState, masterId);
pServer.registerService(master.getNewService());
master.start();
@@ -51,13 +53,15 @@ public class SameController {
ConnectionManagerImpl connections = new ConnectionManagerImpl(
timeout, timeout);
+ RpcFactory rpcf = new RpcFactory(timeout);
+
State clientState = new State(".InvalidClientNetwork");
String baseUrl = String.format("http://%s:%s/",
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
Client client = new Client(clientState, connections,
- clientUrl, myLocation);
+ clientUrl, myLocation, rpcf);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
@@ -66,7 +70,7 @@ public class SameController {
SameController controller = new SameController(
configuration, connections, client,
- paxos, pServer);
+ paxos, pServer, rpcf);
return controller;
}
@@ -75,12 +79,14 @@ public class SameController {
ConnectionManager connections,
Client client,
PaxosServiceImpl paxos,
- SimpleProtobufServer pServer) {
+ SimpleProtobufServer pServer,
+ RpcFactory rpcf) {
this.configuration = configuration;
this.connections = connections;
this.client = client;
this.paxos = paxos;
this.pServer = pServer;
+ this.rpcf = rpcf;
}
public void start() throws Exception {
@@ -139,7 +145,7 @@ public class SameController {
.setNetworkName(master.getNetworkName())
.setMasterLocation(master.getLocation())
.build();
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Services.Empty> done = new RpcCallback<Services.Empty>() {
@Override public void run(Services.Empty unused) {
if (!rpc.isOk()) {
diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
index 4e18416..861454c 100644
--- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
+++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
@@ -5,23 +5,26 @@ import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import com.google.protobuf.RpcCallback;
-import com.orbekk.protobuf.RpcChannel;
import com.orbekk.protobuf.Rpc;
+import com.orbekk.protobuf.RpcChannel;
+import com.orbekk.same.RpcFactory;
import com.orbekk.same.benchmark.Example.Data;
public class ClientBenchmark {
private final Example.Service service;
private final int warmupIterations;
private final int iterations;
+ private final RpcFactory rpcf;
public static void benchmark(String host, int port, int warmupIterations,
int iterations) throws InterruptedException {
RpcChannel channel = null;
try {
channel = RpcChannel.create(host, port);
+ RpcFactory rpcf = new RpcFactory(5000);
Example.Service service = Example.Service.newStub(channel);
ClientBenchmark benchmark = new ClientBenchmark(
- service, warmupIterations, iterations);
+ service, warmupIterations, iterations, rpcf);
benchmark.benchmark();
} catch (UnknownHostException e) {
e.printStackTrace();
@@ -35,10 +38,11 @@ public class ClientBenchmark {
}
public ClientBenchmark(Example.Service service,
- int warmupIterations, int iterations) {
+ int warmupIterations, int iterations, RpcFactory rpcf) {
this.service = service;
this.warmupIterations = warmupIterations;
this.iterations = iterations;
+ this.rpcf = rpcf;
}
private void runBenchmark(int iterations) throws InterruptedException {
@@ -48,7 +52,7 @@ public class ClientBenchmark {
for (int i = 0; i < iterations; i++) {
Example.Data request = Example.Data.newBuilder()
.setArg1(i).build();
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
service.methodA(rpc, request, new RpcCallback<Example.Data>() {
@Override
public void run(Data ignored) {
diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
index 98631b0..2918f3e 100644
--- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
+++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
@@ -15,12 +15,14 @@ import org.junit.Test;
import com.orbekk.protobuf.SimpleProtobufServer;
import com.orbekk.same.ConnectionManagerImpl;
+import com.orbekk.same.RpcFactory;
import com.orbekk.same.Services.ClientState;
public class PaxosServiceFunctionalTest {
ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500);
List<String> paxosUrls = new ArrayList<String>();
List<SimpleProtobufServer> servers = new ArrayList<SimpleProtobufServer>();
+ RpcFactory rpcf = new RpcFactory(5000);
String myUrl;
int successfulProposals = 0;
ClientState client1 = ClientState.newBuilder()
@@ -70,14 +72,14 @@ public class PaxosServiceFunctionalTest {
@Test
public void testMasterElection() throws InterruptedException {
MasterProposer m1 = new MasterProposer(client1, paxosUrls,
- connections);
+ connections, rpcf);
assertTrue(m1.propose(1));
}
@Test
public void testMasterElectionTask() throws InterruptedException, ExecutionException {
MasterProposer m1 = new MasterProposer(client1, paxosUrls,
- connections);
+ connections, rpcf);
Future<Integer> result = m1.startProposalTask(1, null);
assertEquals(new Integer(1), result.get());
}
@@ -85,7 +87,7 @@ public class PaxosServiceFunctionalTest {
@Test
public void cancelledElection() throws InterruptedException {
MasterProposer m1 = new MasterProposer(client1, paxosUrls,
- connections);
+ connections, rpcf);
assertTrue(m1.propose(1));
Future<Integer> result = m1.startProposalTask(1, sleepForever);
@@ -96,10 +98,10 @@ public class PaxosServiceFunctionalTest {
@Test
public void testOnlyOneCompletes() throws InterruptedException, ExecutionException {
MasterProposer m1 = new MasterProposer(client1, paxosUrls,
- connections);
+ connections, rpcf);
ClientState client2 = ClientState.newBuilder().setLocation("client2").build();
MasterProposer m2 = new MasterProposer(client2, paxosUrls,
- connections);
+ connections, rpcf);
final Future<Integer> result1 = m1.startProposalTask(1, sleepForever);
final Future<Integer> result2 = m2.startProposalTask(1, sleepForever);
@@ -160,7 +162,7 @@ public class PaxosServiceFunctionalTest {
.build();
MasterProposer proposer =
new MasterProposer(client, paxosUrls,
- connections);
+ connections, rpcf);
try {
if (proposer.proposeRetry(1)) {
incrementSuccessfulProposals();
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index c5d9a56..275eaac 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -29,10 +29,11 @@ public class FunctionalTest {
VariableFactory vf3;
List<Client> clients = new ArrayList<Client>();
TestConnectionManager connections = new TestConnectionManager();
+ RpcFactory rpcf = new RpcFactory(5000);
@Before public void setUp() {
master = Master.create(connections,
- masterUrl, "TestMaster", masterLocation);
+ masterUrl, "TestMaster", masterLocation, rpcf);
connections.masterMap0.put(masterLocation, master.getNewService());
client1 = newClient("TestClient1", "http://client1/ClientService.json",
"client1");
@@ -47,7 +48,7 @@ public class FunctionalTest {
Client newClient(String clientName, String clientUrl, String location) {
Client client = new Client(new State(clientName), connections,
- clientUrl, location);
+ clientUrl, location, rpcf);
connections.clientMap0.put(location, client.getNewService());
clients.add(client);
String paxosUrl = clientUrl.replace("ClientService", "PaxosService");
@@ -112,7 +113,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
- newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation, rpcf);
joinClients();
MasterController controller = new MasterController() {
@Override
@@ -136,7 +137,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
- newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation, rpcf);
joinClients();
MasterController controller = new MasterController() {
boolean firstMaster = true;
@@ -164,7 +165,7 @@ public class FunctionalTest {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:2";
final Master newMaster = Master.create(connections,
- newMasterUrl, "TestMaster", newMasterLocation);
+ newMasterUrl, "TestMaster", newMasterLocation, rpcf);
joinClients();
MasterController controller = new MasterController() {
@Override
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 50b3302..f22e9e1 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -14,6 +14,7 @@ public class MasterTest {
private State state = new State("TestNetwork");
private TestConnectionManager connections = new TestConnectionManager();
private Master master;
+ private RpcFactory rpcf = new RpcFactory(5000);
@Before
public void setUp() {
@@ -21,7 +22,7 @@ public class MasterTest {
state.update(".masterUrl", "http://master/MasterService.json", 1);
state.update(".masterLocation", masterLocation, 1);
master = new Master(state, connections,
- "http://master/MasterService.json", masterLocation);
+ "http://master/MasterService.json", masterLocation, rpcf);
connections.masterMap0.put("master:1000", master.getNewService());
}
@@ -29,7 +30,7 @@ public class MasterTest {
public void clientJoin() throws Exception {
Client client = new Client(
new State("ClientNetwork"), connections,
- "http://client/ClientService.json", "clientLocation");
+ "http://client/ClientService.json", "clientLocation", rpcf);
connections.clientMap0.put("clientLocation", client.getNewService());
client.joinNetwork(master.getMasterInfo());
master.performWork();