diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-25 10:59:52 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-25 10:59:52 +0200 |
commit | a4a0c66483fd763901c43513f8fbca65b0e7c5a9 (patch) | |
tree | 086172868d6e353b81cfbae028e664fa9a47cfbc /same/src/main/java/com | |
parent | 478a8b1abb65c5abe1bb8d752d2cae01654e0834 (diff) |
Set timeouts for all RPCs.
Implemented with an RpcFactory.
Diffstat (limited to 'same/src/main/java/com')
6 files changed, 40 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 048995e..e2723a3 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.orbekk.protobuf.Rpc; import com.orbekk.same.ConnectionManager; +import com.orbekk.same.RpcFactory; import com.orbekk.same.Services; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.PaxosRequest; @@ -23,12 +24,14 @@ public class MasterProposer extends Thread { private final ClientState client; private final List<String> paxosLocations; private final ConnectionManager connections; + private final RpcFactory rpcf; public MasterProposer(ClientState client, List<String> paxosLocations, - ConnectionManager connections) { + ConnectionManager connections, RpcFactory rpcf) { this.client = client; this.paxosLocations = paxosLocations; this.connections = connections; + this.rpcf = rpcf; } private class ResponseHandler implements RpcCallback<PaxosResponse> { @@ -87,7 +90,7 @@ public class MasterProposer extends Thread { ResponseHandler handler = new ResponseHandler(proposalNumber, paxosLocations.size()); for (String location : paxosLocations) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Paxos paxos = connections.getPaxos0(location); if (paxos == null) { handler.run(null); @@ -107,7 +110,7 @@ public class MasterProposer extends Thread { ResponseHandler handler = new ResponseHandler(proposalNumber, paxosLocations.size()); for (String location : paxosLocations) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Paxos paxos = connections.getPaxos0(location); PaxosRequest request = PaxosRequest.newBuilder() .setClient(client) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6582e4a..27ce787 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -30,6 +30,7 @@ public class Client { private volatile MasterController masterController = null; private volatile Future<Integer> currentMasterProposal = null; private volatile MasterState masterInfo; + private final RpcFactory rpcf; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -62,7 +63,7 @@ public class Client { startMasterElection(); return op; } - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback<Services.UpdateComponentResponse> done = new RpcCallback<Services.UpdateComponentResponse>() { @Override @@ -160,11 +161,12 @@ public class Client { }; public Client(State state, ConnectionManager connections, - String myUrl, String myLocation) { + String myUrl, String myLocation, RpcFactory rpcf) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; + this.rpcf = rpcf; } public void start() { @@ -212,7 +214,7 @@ public class Client { Services.Master master = connections.getMaster0(masterInfo.getMasterLocation()); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RpcCallback<Empty>() { @Override public void run(Empty unused) { if (!rpc.isOk()) { @@ -244,7 +246,7 @@ public class Client { List<String> paxosUrls = state.getList(State.PARTICIPANTS); paxosUrls.remove(failedMaster.getMasterLocation()); MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, - connections); + connections, rpcf); if (masterController == null) { logger.warn("Could not become master: No master controller."); return; @@ -297,7 +299,7 @@ public class Client { }; for (String location : participants) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Client client = connections.getClient0(location); if (client != null) { client.masterDown(rpc, failedMaster, done); diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index ac0abe0..0e199d6 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.paxos.PaxosService; import com.orbekk.protobuf.RpcChannel; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index b97b4ab..397aa12 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -23,6 +23,7 @@ public class Master { private String myLocation; // Protobuf server location, i.e., myIp:port State state; private volatile int masterId = 1; + private final RpcFactory rpcf; class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> { private final String participantLocation; @@ -46,19 +47,20 @@ public class Master { public static Master create(ConnectionManager connections, String myUrl, String networkName, - String myLocation) { + String myLocation, RpcFactory rpcf) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); state.update(".masterLocation", myLocation, 1); - return new Master(state, connections, myUrl, myLocation); + return new Master(state, connections, myUrl, myLocation, rpcf); } Master(State initialState, ConnectionManager connections, - String myUrl, String myLocation) { + String myUrl, String myLocation, RpcFactory rpcf) { this.state = initialState; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; + this.rpcf = rpcf; } public String getNetworkName() { @@ -131,7 +133,7 @@ public class Master { for (Component component : components) { Services.Component componentProto = componentToProto(component); - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RemoveParticipantIfFailsCallback<Empty>(clientLocation, rpc); @@ -152,7 +154,7 @@ public class Master { } { // Send masterTakeover(). - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RemoveParticipantIfFailsCallback<Empty>( clientLocation, rpc); @@ -232,7 +234,7 @@ public class Master { for (final String location : state.getList(State.PARTICIPANTS)) { Services.Client client = connections.getClient0(location); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RpcCallback<Empty>() { @Override public void run(Empty unused) { if (!rpc.isOk()) { diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index c793948..9438c7b 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -17,6 +17,7 @@ public class SameController { private PaxosServiceImpl paxos; private Configuration configuration; private ConnectionManager connections; + private final RpcFactory rpcf; /** * Timeout for remote operations in milliseconds. @@ -31,7 +32,8 @@ public class SameController { String masterUrl = configuration.get("baseUrl") + "MasterService.json"; master = Master.create(connections, - masterUrl, configuration.get("networkName"), myLocation); + masterUrl, configuration.get("networkName"), myLocation, + rpcf); master.resumeFrom(lastKnownState, masterId); pServer.registerService(master.getNewService()); master.start(); @@ -51,13 +53,15 @@ public class SameController { ConnectionManagerImpl connections = new ConnectionManagerImpl( timeout, timeout); + RpcFactory rpcf = new RpcFactory(timeout); + State clientState = new State(".InvalidClientNetwork"); String baseUrl = String.format("http://%s:%s/", configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; Client client = new Client(clientState, connections, - clientUrl, myLocation); + clientUrl, myLocation, rpcf); PaxosServiceImpl paxos = new PaxosServiceImpl(""); SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); @@ -66,7 +70,7 @@ public class SameController { SameController controller = new SameController( configuration, connections, client, - paxos, pServer); + paxos, pServer, rpcf); return controller; } @@ -75,12 +79,14 @@ public class SameController { ConnectionManager connections, Client client, PaxosServiceImpl paxos, - SimpleProtobufServer pServer) { + SimpleProtobufServer pServer, + RpcFactory rpcf) { this.configuration = configuration; this.connections = connections; this.client = client; this.paxos = paxos; this.pServer = pServer; + this.rpcf = rpcf; } public void start() throws Exception { @@ -139,7 +145,7 @@ public class SameController { .setNetworkName(master.getNetworkName()) .setMasterLocation(master.getLocation()) .build(); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback<Services.Empty> done = new RpcCallback<Services.Empty>() { @Override public void run(Services.Empty unused) { if (!rpc.isOk()) { diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java index 4e18416..861454c 100644 --- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java +++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java @@ -5,23 +5,26 @@ import java.net.UnknownHostException; import java.util.concurrent.CountDownLatch; import com.google.protobuf.RpcCallback; -import com.orbekk.protobuf.RpcChannel; import com.orbekk.protobuf.Rpc; +import com.orbekk.protobuf.RpcChannel; +import com.orbekk.same.RpcFactory; import com.orbekk.same.benchmark.Example.Data; public class ClientBenchmark { private final Example.Service service; private final int warmupIterations; private final int iterations; + private final RpcFactory rpcf; public static void benchmark(String host, int port, int warmupIterations, int iterations) throws InterruptedException { RpcChannel channel = null; try { channel = RpcChannel.create(host, port); + RpcFactory rpcf = new RpcFactory(5000); Example.Service service = Example.Service.newStub(channel); ClientBenchmark benchmark = new ClientBenchmark( - service, warmupIterations, iterations); + service, warmupIterations, iterations, rpcf); benchmark.benchmark(); } catch (UnknownHostException e) { e.printStackTrace(); @@ -35,10 +38,11 @@ public class ClientBenchmark { } public ClientBenchmark(Example.Service service, - int warmupIterations, int iterations) { + int warmupIterations, int iterations, RpcFactory rpcf) { this.service = service; this.warmupIterations = warmupIterations; this.iterations = iterations; + this.rpcf = rpcf; } private void runBenchmark(int iterations) throws InterruptedException { @@ -48,7 +52,7 @@ public class ClientBenchmark { for (int i = 0; i < iterations; i++) { Example.Data request = Example.Data.newBuilder() .setArg1(i).build(); - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); service.methodA(rpc, request, new RpcCallback<Example.Data>() { @Override public void run(Data ignored) { |