summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 10:59:52 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 10:59:52 +0200
commita4a0c66483fd763901c43513f8fbca65b0e7c5a9 (patch)
tree086172868d6e353b81cfbae028e664fa9a47cfbc /same/src/main/java/com
parent478a8b1abb65c5abe1bb8d752d2cae01654e0834 (diff)
Set timeouts for all RPCs.
Implemented with an RpcFactory.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java9
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java12
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java1
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java14
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java16
-rw-r--r--same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java12
6 files changed, 40 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 048995e..e2723a3 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.ConnectionManager;
+import com.orbekk.same.RpcFactory;
import com.orbekk.same.Services;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.PaxosRequest;
@@ -23,12 +24,14 @@ public class MasterProposer extends Thread {
private final ClientState client;
private final List<String> paxosLocations;
private final ConnectionManager connections;
+ private final RpcFactory rpcf;
public MasterProposer(ClientState client, List<String> paxosLocations,
- ConnectionManager connections) {
+ ConnectionManager connections, RpcFactory rpcf) {
this.client = client;
this.paxosLocations = paxosLocations;
this.connections = connections;
+ this.rpcf = rpcf;
}
private class ResponseHandler implements RpcCallback<PaxosResponse> {
@@ -87,7 +90,7 @@ public class MasterProposer extends Thread {
ResponseHandler handler = new ResponseHandler(proposalNumber,
paxosLocations.size());
for (String location : paxosLocations) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Paxos paxos = connections.getPaxos0(location);
if (paxos == null) {
handler.run(null);
@@ -107,7 +110,7 @@ public class MasterProposer extends Thread {
ResponseHandler handler = new ResponseHandler(proposalNumber,
paxosLocations.size());
for (String location : paxosLocations) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Paxos paxos = connections.getPaxos0(location);
PaxosRequest request = PaxosRequest.newBuilder()
.setClient(client)
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 6582e4a..27ce787 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -30,6 +30,7 @@ public class Client {
private volatile MasterController masterController = null;
private volatile Future<Integer> currentMasterProposal = null;
private volatile MasterState masterInfo;
+ private final RpcFactory rpcf;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -62,7 +63,7 @@ public class Client {
startMasterElection();
return op;
}
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Services.UpdateComponentResponse> done =
new RpcCallback<Services.UpdateComponentResponse>() {
@Override
@@ -160,11 +161,12 @@ public class Client {
};
public Client(State state, ConnectionManager connections,
- String myUrl, String myLocation) {
+ String myUrl, String myLocation, RpcFactory rpcf) {
this.state = state;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
+ this.rpcf = rpcf;
}
public void start() {
@@ -212,7 +214,7 @@ public class Client {
Services.Master master =
connections.getMaster0(masterInfo.getMasterLocation());
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Empty> done = new RpcCallback<Empty>() {
@Override public void run(Empty unused) {
if (!rpc.isOk()) {
@@ -244,7 +246,7 @@ public class Client {
List<String> paxosUrls = state.getList(State.PARTICIPANTS);
paxosUrls.remove(failedMaster.getMasterLocation());
MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
- connections);
+ connections, rpcf);
if (masterController == null) {
logger.warn("Could not become master: No master controller.");
return;
@@ -297,7 +299,7 @@ public class Client {
};
for (String location : participants) {
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
Services.Client client = connections.getClient0(location);
if (client != null) {
client.masterDown(rpc, failedMaster, done);
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index ac0abe0..0e199d6 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.googlecode.jsonrpc4j.ProxyUtil;
import com.orbekk.paxos.PaxosService;
import com.orbekk.protobuf.RpcChannel;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index b97b4ab..397aa12 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -23,6 +23,7 @@ public class Master {
private String myLocation; // Protobuf server location, i.e., myIp:port
State state;
private volatile int masterId = 1;
+ private final RpcFactory rpcf;
class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> {
private final String participantLocation;
@@ -46,19 +47,20 @@ public class Master {
public static Master create(ConnectionManager connections,
String myUrl, String networkName,
- String myLocation) {
+ String myLocation, RpcFactory rpcf) {
State state = new State(networkName);
state.update(".masterUrl", myUrl, 1);
state.update(".masterLocation", myLocation, 1);
- return new Master(state, connections, myUrl, myLocation);
+ return new Master(state, connections, myUrl, myLocation, rpcf);
}
Master(State initialState, ConnectionManager connections,
- String myUrl, String myLocation) {
+ String myUrl, String myLocation, RpcFactory rpcf) {
this.state = initialState;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
+ this.rpcf = rpcf;
}
public String getNetworkName() {
@@ -131,7 +133,7 @@ public class Master {
for (Component component : components) {
Services.Component componentProto = componentToProto(component);
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
rpc);
@@ -152,7 +154,7 @@ public class Master {
}
{ // Send masterTakeover().
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(
clientLocation, rpc);
@@ -232,7 +234,7 @@ public class Master {
for (final String location : state.getList(State.PARTICIPANTS)) {
Services.Client client = connections.getClient0(location);
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Empty> done = new RpcCallback<Empty>() {
@Override public void run(Empty unused) {
if (!rpc.isOk()) {
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index c793948..9438c7b 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -17,6 +17,7 @@ public class SameController {
private PaxosServiceImpl paxos;
private Configuration configuration;
private ConnectionManager connections;
+ private final RpcFactory rpcf;
/**
* Timeout for remote operations in milliseconds.
@@ -31,7 +32,8 @@ public class SameController {
String masterUrl = configuration.get("baseUrl") +
"MasterService.json";
master = Master.create(connections,
- masterUrl, configuration.get("networkName"), myLocation);
+ masterUrl, configuration.get("networkName"), myLocation,
+ rpcf);
master.resumeFrom(lastKnownState, masterId);
pServer.registerService(master.getNewService());
master.start();
@@ -51,13 +53,15 @@ public class SameController {
ConnectionManagerImpl connections = new ConnectionManagerImpl(
timeout, timeout);
+ RpcFactory rpcf = new RpcFactory(timeout);
+
State clientState = new State(".InvalidClientNetwork");
String baseUrl = String.format("http://%s:%s/",
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
Client client = new Client(clientState, connections,
- clientUrl, myLocation);
+ clientUrl, myLocation, rpcf);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
@@ -66,7 +70,7 @@ public class SameController {
SameController controller = new SameController(
configuration, connections, client,
- paxos, pServer);
+ paxos, pServer, rpcf);
return controller;
}
@@ -75,12 +79,14 @@ public class SameController {
ConnectionManager connections,
Client client,
PaxosServiceImpl paxos,
- SimpleProtobufServer pServer) {
+ SimpleProtobufServer pServer,
+ RpcFactory rpcf) {
this.configuration = configuration;
this.connections = connections;
this.client = client;
this.paxos = paxos;
this.pServer = pServer;
+ this.rpcf = rpcf;
}
public void start() throws Exception {
@@ -139,7 +145,7 @@ public class SameController {
.setNetworkName(master.getNetworkName())
.setMasterLocation(master.getLocation())
.build();
- final Rpc rpc = new Rpc();
+ final Rpc rpc = rpcf.create();
RpcCallback<Services.Empty> done = new RpcCallback<Services.Empty>() {
@Override public void run(Services.Empty unused) {
if (!rpc.isOk()) {
diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
index 4e18416..861454c 100644
--- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
+++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java
@@ -5,23 +5,26 @@ import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import com.google.protobuf.RpcCallback;
-import com.orbekk.protobuf.RpcChannel;
import com.orbekk.protobuf.Rpc;
+import com.orbekk.protobuf.RpcChannel;
+import com.orbekk.same.RpcFactory;
import com.orbekk.same.benchmark.Example.Data;
public class ClientBenchmark {
private final Example.Service service;
private final int warmupIterations;
private final int iterations;
+ private final RpcFactory rpcf;
public static void benchmark(String host, int port, int warmupIterations,
int iterations) throws InterruptedException {
RpcChannel channel = null;
try {
channel = RpcChannel.create(host, port);
+ RpcFactory rpcf = new RpcFactory(5000);
Example.Service service = Example.Service.newStub(channel);
ClientBenchmark benchmark = new ClientBenchmark(
- service, warmupIterations, iterations);
+ service, warmupIterations, iterations, rpcf);
benchmark.benchmark();
} catch (UnknownHostException e) {
e.printStackTrace();
@@ -35,10 +38,11 @@ public class ClientBenchmark {
}
public ClientBenchmark(Example.Service service,
- int warmupIterations, int iterations) {
+ int warmupIterations, int iterations, RpcFactory rpcf) {
this.service = service;
this.warmupIterations = warmupIterations;
this.iterations = iterations;
+ this.rpcf = rpcf;
}
private void runBenchmark(int iterations) throws InterruptedException {
@@ -48,7 +52,7 @@ public class ClientBenchmark {
for (int i = 0; i < iterations; i++) {
Example.Data request = Example.Data.newBuilder()
.setArg1(i).build();
- Rpc rpc = new Rpc();
+ Rpc rpc = rpcf.create();
service.methodA(rpc, request, new RpcCallback<Example.Data>() {
@Override
public void run(Data ignored) {