From a4a0c66483fd763901c43513f8fbca65b0e7c5a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 25 Apr 2012 10:59:52 +0200 Subject: Set timeouts for all RPCs. Implemented with an RpcFactory. --- same/src/main/java/com/orbekk/paxos/MasterProposer.java | 9 ++++++--- same/src/main/java/com/orbekk/same/Client.java | 12 +++++++----- .../main/java/com/orbekk/same/ConnectionManagerImpl.java | 1 - same/src/main/java/com/orbekk/same/Master.java | 14 ++++++++------ same/src/main/java/com/orbekk/same/SameController.java | 16 +++++++++++----- .../java/com/orbekk/same/benchmark/ClientBenchmark.java | 12 ++++++++---- .../com/orbekk/paxos/PaxosServiceFunctionalTest.java | 14 ++++++++------ same/src/test/java/com/orbekk/same/FunctionalTest.java | 11 ++++++----- same/src/test/java/com/orbekk/same/MasterTest.java | 5 +++-- 9 files changed, 57 insertions(+), 37 deletions(-) diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 048995e..e2723a3 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.orbekk.protobuf.Rpc; import com.orbekk.same.ConnectionManager; +import com.orbekk.same.RpcFactory; import com.orbekk.same.Services; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.PaxosRequest; @@ -23,12 +24,14 @@ public class MasterProposer extends Thread { private final ClientState client; private final List paxosLocations; private final ConnectionManager connections; + private final RpcFactory rpcf; public MasterProposer(ClientState client, List paxosLocations, - ConnectionManager connections) { + ConnectionManager connections, RpcFactory rpcf) { this.client = client; this.paxosLocations = paxosLocations; this.connections = connections; + this.rpcf = rpcf; } private class ResponseHandler implements RpcCallback { @@ -87,7 +90,7 @@ public class MasterProposer extends Thread { ResponseHandler handler = new ResponseHandler(proposalNumber, paxosLocations.size()); for (String location : paxosLocations) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Paxos paxos = connections.getPaxos0(location); if (paxos == null) { handler.run(null); @@ -107,7 +110,7 @@ public class MasterProposer extends Thread { ResponseHandler handler = new ResponseHandler(proposalNumber, paxosLocations.size()); for (String location : paxosLocations) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Paxos paxos = connections.getPaxos0(location); PaxosRequest request = PaxosRequest.newBuilder() .setClient(client) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6582e4a..27ce787 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -30,6 +30,7 @@ public class Client { private volatile MasterController masterController = null; private volatile Future currentMasterProposal = null; private volatile MasterState masterInfo; + private final RpcFactory rpcf; private List stateListeners = new ArrayList(); @@ -62,7 +63,7 @@ public class Client { startMasterElection(); return op; } - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback done = new RpcCallback() { @Override @@ -160,11 +161,12 @@ public class Client { }; public Client(State state, ConnectionManager connections, - String myUrl, String myLocation) { + String myUrl, String myLocation, RpcFactory rpcf) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; + this.rpcf = rpcf; } public void start() { @@ -212,7 +214,7 @@ public class Client { Services.Master master = connections.getMaster0(masterInfo.getMasterLocation()); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback done = new RpcCallback() { @Override public void run(Empty unused) { if (!rpc.isOk()) { @@ -244,7 +246,7 @@ public class Client { List paxosUrls = state.getList(State.PARTICIPANTS); paxosUrls.remove(failedMaster.getMasterLocation()); MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, - connections); + connections, rpcf); if (masterController == null) { logger.warn("Could not become master: No master controller."); return; @@ -297,7 +299,7 @@ public class Client { }; for (String location : participants) { - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); Services.Client client = connections.getClient0(location); if (client != null) { client.masterDown(rpc, failedMaster, done); diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index ac0abe0..0e199d6 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.paxos.PaxosService; import com.orbekk.protobuf.RpcChannel; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index b97b4ab..397aa12 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -23,6 +23,7 @@ public class Master { private String myLocation; // Protobuf server location, i.e., myIp:port State state; private volatile int masterId = 1; + private final RpcFactory rpcf; class RemoveParticipantIfFailsCallback implements RpcCallback { private final String participantLocation; @@ -46,19 +47,20 @@ public class Master { public static Master create(ConnectionManager connections, String myUrl, String networkName, - String myLocation) { + String myLocation, RpcFactory rpcf) { State state = new State(networkName); state.update(".masterUrl", myUrl, 1); state.update(".masterLocation", myLocation, 1); - return new Master(state, connections, myUrl, myLocation); + return new Master(state, connections, myUrl, myLocation, rpcf); } Master(State initialState, ConnectionManager connections, - String myUrl, String myLocation) { + String myUrl, String myLocation, RpcFactory rpcf) { this.state = initialState; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; + this.rpcf = rpcf; } public String getNetworkName() { @@ -131,7 +133,7 @@ public class Master { for (Component component : components) { Services.Component componentProto = componentToProto(component); - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); RpcCallback done = new RemoveParticipantIfFailsCallback(clientLocation, rpc); @@ -152,7 +154,7 @@ public class Master { } { // Send masterTakeover(). - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); RpcCallback done = new RemoveParticipantIfFailsCallback( clientLocation, rpc); @@ -232,7 +234,7 @@ public class Master { for (final String location : state.getList(State.PARTICIPANTS)) { Services.Client client = connections.getClient0(location); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback done = new RpcCallback() { @Override public void run(Empty unused) { if (!rpc.isOk()) { diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index c793948..9438c7b 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -17,6 +17,7 @@ public class SameController { private PaxosServiceImpl paxos; private Configuration configuration; private ConnectionManager connections; + private final RpcFactory rpcf; /** * Timeout for remote operations in milliseconds. @@ -31,7 +32,8 @@ public class SameController { String masterUrl = configuration.get("baseUrl") + "MasterService.json"; master = Master.create(connections, - masterUrl, configuration.get("networkName"), myLocation); + masterUrl, configuration.get("networkName"), myLocation, + rpcf); master.resumeFrom(lastKnownState, masterId); pServer.registerService(master.getNewService()); master.start(); @@ -51,13 +53,15 @@ public class SameController { ConnectionManagerImpl connections = new ConnectionManagerImpl( timeout, timeout); + RpcFactory rpcf = new RpcFactory(timeout); + State clientState = new State(".InvalidClientNetwork"); String baseUrl = String.format("http://%s:%s/", configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; Client client = new Client(clientState, connections, - clientUrl, myLocation); + clientUrl, myLocation, rpcf); PaxosServiceImpl paxos = new PaxosServiceImpl(""); SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); @@ -66,7 +70,7 @@ public class SameController { SameController controller = new SameController( configuration, connections, client, - paxos, pServer); + paxos, pServer, rpcf); return controller; } @@ -75,12 +79,14 @@ public class SameController { ConnectionManager connections, Client client, PaxosServiceImpl paxos, - SimpleProtobufServer pServer) { + SimpleProtobufServer pServer, + RpcFactory rpcf) { this.configuration = configuration; this.connections = connections; this.client = client; this.paxos = paxos; this.pServer = pServer; + this.rpcf = rpcf; } public void start() throws Exception { @@ -139,7 +145,7 @@ public class SameController { .setNetworkName(master.getNetworkName()) .setMasterLocation(master.getLocation()) .build(); - final Rpc rpc = new Rpc(); + final Rpc rpc = rpcf.create(); RpcCallback done = new RpcCallback() { @Override public void run(Services.Empty unused) { if (!rpc.isOk()) { diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java index 4e18416..861454c 100644 --- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java +++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java @@ -5,23 +5,26 @@ import java.net.UnknownHostException; import java.util.concurrent.CountDownLatch; import com.google.protobuf.RpcCallback; -import com.orbekk.protobuf.RpcChannel; import com.orbekk.protobuf.Rpc; +import com.orbekk.protobuf.RpcChannel; +import com.orbekk.same.RpcFactory; import com.orbekk.same.benchmark.Example.Data; public class ClientBenchmark { private final Example.Service service; private final int warmupIterations; private final int iterations; + private final RpcFactory rpcf; public static void benchmark(String host, int port, int warmupIterations, int iterations) throws InterruptedException { RpcChannel channel = null; try { channel = RpcChannel.create(host, port); + RpcFactory rpcf = new RpcFactory(5000); Example.Service service = Example.Service.newStub(channel); ClientBenchmark benchmark = new ClientBenchmark( - service, warmupIterations, iterations); + service, warmupIterations, iterations, rpcf); benchmark.benchmark(); } catch (UnknownHostException e) { e.printStackTrace(); @@ -35,10 +38,11 @@ public class ClientBenchmark { } public ClientBenchmark(Example.Service service, - int warmupIterations, int iterations) { + int warmupIterations, int iterations, RpcFactory rpcf) { this.service = service; this.warmupIterations = warmupIterations; this.iterations = iterations; + this.rpcf = rpcf; } private void runBenchmark(int iterations) throws InterruptedException { @@ -48,7 +52,7 @@ public class ClientBenchmark { for (int i = 0; i < iterations; i++) { Example.Data request = Example.Data.newBuilder() .setArg1(i).build(); - Rpc rpc = new Rpc(); + Rpc rpc = rpcf.create(); service.methodA(rpc, request, new RpcCallback() { @Override public void run(Data ignored) { diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java index 98631b0..2918f3e 100644 --- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java +++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java @@ -15,12 +15,14 @@ import org.junit.Test; import com.orbekk.protobuf.SimpleProtobufServer; import com.orbekk.same.ConnectionManagerImpl; +import com.orbekk.same.RpcFactory; import com.orbekk.same.Services.ClientState; public class PaxosServiceFunctionalTest { ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500); List paxosUrls = new ArrayList(); List servers = new ArrayList(); + RpcFactory rpcf = new RpcFactory(5000); String myUrl; int successfulProposals = 0; ClientState client1 = ClientState.newBuilder() @@ -70,14 +72,14 @@ public class PaxosServiceFunctionalTest { @Test public void testMasterElection() throws InterruptedException { MasterProposer m1 = new MasterProposer(client1, paxosUrls, - connections); + connections, rpcf); assertTrue(m1.propose(1)); } @Test public void testMasterElectionTask() throws InterruptedException, ExecutionException { MasterProposer m1 = new MasterProposer(client1, paxosUrls, - connections); + connections, rpcf); Future result = m1.startProposalTask(1, null); assertEquals(new Integer(1), result.get()); } @@ -85,7 +87,7 @@ public class PaxosServiceFunctionalTest { @Test public void cancelledElection() throws InterruptedException { MasterProposer m1 = new MasterProposer(client1, paxosUrls, - connections); + connections, rpcf); assertTrue(m1.propose(1)); Future result = m1.startProposalTask(1, sleepForever); @@ -96,10 +98,10 @@ public class PaxosServiceFunctionalTest { @Test public void testOnlyOneCompletes() throws InterruptedException, ExecutionException { MasterProposer m1 = new MasterProposer(client1, paxosUrls, - connections); + connections, rpcf); ClientState client2 = ClientState.newBuilder().setLocation("client2").build(); MasterProposer m2 = new MasterProposer(client2, paxosUrls, - connections); + connections, rpcf); final Future result1 = m1.startProposalTask(1, sleepForever); final Future result2 = m2.startProposalTask(1, sleepForever); @@ -160,7 +162,7 @@ public class PaxosServiceFunctionalTest { .build(); MasterProposer proposer = new MasterProposer(client, paxosUrls, - connections); + connections, rpcf); try { if (proposer.proposeRetry(1)) { incrementSuccessfulProposals(); diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index c5d9a56..275eaac 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -29,10 +29,11 @@ public class FunctionalTest { VariableFactory vf3; List clients = new ArrayList(); TestConnectionManager connections = new TestConnectionManager(); + RpcFactory rpcf = new RpcFactory(5000); @Before public void setUp() { master = Master.create(connections, - masterUrl, "TestMaster", masterLocation); + masterUrl, "TestMaster", masterLocation, rpcf); connections.masterMap0.put(masterLocation, master.getNewService()); client1 = newClient("TestClient1", "http://client1/ClientService.json", "client1"); @@ -47,7 +48,7 @@ public class FunctionalTest { Client newClient(String clientName, String clientUrl, String location) { Client client = new Client(new State(clientName), connections, - clientUrl, location); + clientUrl, location, rpcf); connections.clientMap0.put(location, client.getNewService()); clients.add(client); String paxosUrl = clientUrl.replace("ClientService", "PaxosService"); @@ -112,7 +113,7 @@ public class FunctionalTest { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:1"; final Master newMaster = Master.create(connections, - newMasterUrl, "TestMaster", newMasterLocation); + newMasterUrl, "TestMaster", newMasterLocation, rpcf); joinClients(); MasterController controller = new MasterController() { @Override @@ -136,7 +137,7 @@ public class FunctionalTest { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:1"; final Master newMaster = Master.create(connections, - newMasterUrl, "TestMaster", newMasterLocation); + newMasterUrl, "TestMaster", newMasterLocation, rpcf); joinClients(); MasterController controller = new MasterController() { boolean firstMaster = true; @@ -164,7 +165,7 @@ public class FunctionalTest { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:2"; final Master newMaster = Master.create(connections, - newMasterUrl, "TestMaster", newMasterLocation); + newMasterUrl, "TestMaster", newMasterLocation, rpcf); joinClients(); MasterController controller = new MasterController() { @Override diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index 50b3302..f22e9e1 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -14,6 +14,7 @@ public class MasterTest { private State state = new State("TestNetwork"); private TestConnectionManager connections = new TestConnectionManager(); private Master master; + private RpcFactory rpcf = new RpcFactory(5000); @Before public void setUp() { @@ -21,7 +22,7 @@ public class MasterTest { state.update(".masterUrl", "http://master/MasterService.json", 1); state.update(".masterLocation", masterLocation, 1); master = new Master(state, connections, - "http://master/MasterService.json", masterLocation); + "http://master/MasterService.json", masterLocation, rpcf); connections.masterMap0.put("master:1000", master.getNewService()); } @@ -29,7 +30,7 @@ public class MasterTest { public void clientJoin() throws Exception { Client client = new Client( new State("ClientNetwork"), connections, - "http://client/ClientService.json", "clientLocation"); + "http://client/ClientService.json", "clientLocation", rpcf); connections.clientMap0.put("clientLocation", client.getNewService()); client.joinNetwork(master.getMasterInfo()); master.performWork(); -- cgit v1.2.3