diff options
Diffstat (limited to 'same')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 72 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/FunctionalTest.java | 30 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/MasterTest.java | 2 |
3 files changed, 57 insertions, 47 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6386928..51d62e7 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -29,12 +29,10 @@ public class Client { volatile State state; private volatile String myUrl; private volatile String myLocation; - volatile String masterUrl; - volatile String masterLocation; - private volatile int masterId = 0; private volatile MasterController masterController = null; private final Broadcaster broadcaster; private volatile Future<Integer> currentMasterProposal = null; + private volatile MasterState masterInfo; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -60,7 +58,8 @@ public class Client { return op; } - Services.Master master = connections.getMaster0(masterLocation); + Services.Master master = connections.getMaster0( + masterInfo.getMasterLocation()); if (master == null) { op.complete(DelayedOperation.Status.createError( "Not connected to master.")); @@ -136,26 +135,25 @@ public class Client { @Override public void masterTakeover(RpcController controller, MasterState request, RpcCallback<Empty> done) { logger.info("MasterTakeover({})", request); - if (request.getMasterId() <= Client.this.masterId) { + if (masterInfo != null && + request.getMasterId() <= masterInfo.getMasterId()) { logger.warn("{} tried to take over, but current master is " + "{}:{}. Ignoring", new Object[]{request, state.getDataOf(".masterUrl"), - Client.this.masterId}); + masterInfo.getMasterId()}); return; } abortMasterElection(); - Client.this.masterUrl = request.getMasterUrl(); - Client.this.masterLocation = request.getMasterLocation(); - Client.this.masterId = request.getMasterId(); + masterInfo = request; connectionState = ConnectionState.STABLE; done.run(Empty.getDefaultInstance()); } @Override public void masterDown(RpcController controller, MasterState request, RpcCallback<Empty> done) { - if (request.getMasterId() < Client.this.masterId) { + if (request.getMasterId() < masterInfo.getMasterId()) { logger.info("Master {} is down, but current master is {}. Ignoring.", - request.getMasterId(), Client.this.masterId); + request.getMasterId(), masterInfo.getMasterId()); return; } logger.warn("Master down."); @@ -197,9 +195,7 @@ public class Client { @Override public void masterDown(int masterId) throws Exception { - Services.MasterState request = Services.MasterState.newBuilder() - .setMasterUrl(masterUrl) - .setNetworkName(state.getDataOf(".networkName")) + Services.MasterState request = masterInfo.toBuilder() .setMasterId(masterId) .build(); newServiceImpl.masterDown(null, request, noOp); @@ -236,6 +232,10 @@ public class Client { .build(); } + public MasterState getMaster() { + return masterInfo; + } + public ConnectionState getConnectionState() { return connectionState; } @@ -246,7 +246,7 @@ public class Client { private synchronized void reset() { state.clear(); - masterId = 0; + masterInfo = null; } public Rpc joinNetwork(Services.MasterState masterInfo) { @@ -288,6 +288,10 @@ public class Client { return serviceImpl; } + public Services.Client getNewService() { + return newServiceImpl; + } + private List<String> getPaxosUrlsNoMaster() { List<String> paxosUrls = new ArrayList<String>(); for (String participant : state.getList(".participants")) { @@ -319,12 +323,12 @@ public class Client { } }; synchronized (this) { - if (failedMasterId < masterId) { + if (failedMasterId < masterInfo.getMasterId()) { logger.info("Master election aborted. Master already chosen."); return; } - currentMasterProposal = proposer.startProposalTask(masterId + 1, - sleeperTask); + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); } Integer result = null; try { @@ -345,23 +349,23 @@ public class Client { } } - private int getMasterIdEstimate() { - return masterId; - } - public void startMasterElection() { - List<String> participants = state.getList(".participants"); - final int masterId = getMasterIdEstimate(); - broadcaster.broadcast(participants, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterDown(masterId); - } catch (Exception e) { - logger.info("{}.masterDown() did not respond (ignored): " + - url, e); - } + List<String> participants = state.getList(State.PARTICIPANTS); + final MasterState failedMaster = masterInfo; + + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + // Ignore unresponsive clients - master election will take + // care of them. } - }); + }; + + for (String location : participants) { + Rpc rpc = new Rpc(); + Services.Client client = connections.getClient0(location); + if (client != null) { + client.masterDown(rpc, failedMaster, done); + } + } } } diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 73f0a37..0a20fc1 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -38,18 +38,22 @@ public class FunctionalTest { masterServiceProxy = new MasterServiceProxy(master.getService()); connections.masterMap.put(masterUrl, masterServiceProxy); connections.masterMap0.put(masterLocation, master.getNewService()); - client1 = newClient("TestClient1", "http://client1/ClientService.json"); + client1 = newClient("TestClient1", "http://client1/ClientService.json", + "client1"); vf1 = new VariableFactory(client1.getInterface()); - client2 = newClient("TestClient2", "http://client2/ClientService.json"); + client2 = newClient("TestClient2", "http://client2/ClientService.json", + "client2"); vf2 = new VariableFactory(client2.getInterface()); - client3 = newClient("TestClient3", "http://client3/ClientService.json"); + client3 = newClient("TestClient3", "http://client3/ClientService.json", + "client3"); vf3 = new VariableFactory(client3.getInterface()); } - Client newClient(String clientName, String clientUrl) { + Client newClient(String clientName, String clientUrl, String location) { Client client = new Client(new State(clientName), connections, - clientUrl, "clientLocation", broadcaster); + clientUrl, location, broadcaster); connections.clientMap.put(clientUrl, client.getService()); + connections.clientMap0.put(location, client.getNewService()); clients.add(client); String paxosUrl = clientUrl.replace("ClientService", "PaxosService"); PaxosService paxos = new PaxosServiceImpl(paxosUrl); @@ -92,8 +96,8 @@ public class FunctionalTest { } for (Client c : clients) { assertThat(c.getConnectionState(), is(ConnectionState.STABLE)); - assertThat(c.masterUrl, is(masterUrl)); - assertThat(c.masterLocation, is(masterLocation)); + assertThat(c.getMaster().getMasterUrl(), is(masterUrl)); + assertThat(c.getMaster().getMasterLocation(), is(masterLocation)); } } @@ -130,8 +134,8 @@ public class FunctionalTest { client3.setMasterController(controller); client1.startMasterElection(); newMaster.performWork(); - assertThat(client1.masterUrl, is(newMasterUrl)); - assertThat(client2.masterUrl, is(newMasterUrl)); + assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); + assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); } @Test public void onlyOneNewMaster() { @@ -159,8 +163,8 @@ public class FunctionalTest { client3.setMasterController(controller); client1.startMasterElection(); newMaster.performWork(); - assertThat(client1.masterUrl, is(newMasterUrl)); - assertThat(client2.masterUrl, is(newMasterUrl)); + assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); + assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); } @Test public void masterFails() { @@ -190,7 +194,7 @@ public class FunctionalTest { is(DelayedOperation.Status.ERROR)); performWork(); newMaster.performWork(); - assertThat(client1.masterUrl, is(newMasterUrl)); - assertThat(client2.masterUrl, is(newMasterUrl)); + assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); + assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); } } diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index 8830c10..22414fe 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -65,6 +65,8 @@ public class MasterTest { connections.clientMap.put("http://client/ClientService.json", clientS); client.joinNetwork(master.getMasterInfo()); master.performWork(); + System.out.println(state); + System.out.println(master.state); assertTrue(state.getList(".participants").contains("http://client/ClientService.json")); assertEquals(state, client.testGetState()); } |