diff options
Diffstat (limited to 'same')
4 files changed, 91 insertions, 48 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 5a7918c..bef096b 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.slf4j.Logger; @@ -31,6 +32,7 @@ public class Client { private volatile Future<Integer> currentMasterProposal = null; private volatile MasterState masterInfo; private final RpcFactory rpcf; + private final ExecutorService executor; private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); @@ -155,18 +157,71 @@ public class Client { return; } connectionState = ConnectionState.UNSTABLE; + executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); - tryBecomeMaster(request); } }; + private class MasterStarter implements Runnable { + private final MasterState failedMaster; + + public MasterStarter(MasterState failedMaster) { + this.failedMaster = failedMaster; + } + + @Override public void run() { + logger.info("Trying to become master. Failed master: {}.", + failedMaster); + List<String> paxosUrls = state.getList(State.PARTICIPANTS); + paxosUrls.remove(failedMaster.getMasterLocation()); + MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, + connections, rpcf); + if (masterController == null) { + logger.warn("Could not become master: No master controller."); + return; + } + Runnable sleeperTask = new Runnable() { + @Override public synchronized void run() { + try { + wait(MASTER_TAKEOVER_TIMEOUT); + } catch (InterruptedException e) { + } + } + }; + synchronized (this) { + if (failedMaster.getMasterId() < masterInfo.getMasterId()) { + logger.info("Master election aborted. Master already chosen."); + return; + } + currentMasterProposal = proposer.startProposalTask( + masterInfo.getMasterId() + 1, sleeperTask); + } + Integer result = null; + try { + result = currentMasterProposal.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + logger.error("Error electing master: ", e); + } catch (CancellationException e) { + } + if (!currentMasterProposal.isCancelled() && result != null && + masterInfo.getMasterId() <= failedMaster.getMasterId()) { + masterController.enableMaster(new State(state), result); + } else { + logger.info("Master election aborted. Master already chosen."); + } + } + } + public Client(State state, ConnectionManager connections, - String myUrl, String myLocation, RpcFactory rpcf) { + String myUrl, String myLocation, RpcFactory rpcf, + ExecutorService executor) { this.state = state; this.connections = connections; this.myUrl = myUrl; this.myLocation = myLocation; this.rpcf = rpcf; + this.executor = executor; } public void start() { @@ -174,6 +229,7 @@ public class Client { public void interrupt() { connectionState = ConnectionState.DISCONNECTED; + executor.shutdown(); } void performWork() { @@ -242,44 +298,6 @@ public class Client { return newServiceImpl; } - private void tryBecomeMaster(MasterState failedMaster) { - List<String> paxosUrls = state.getList(State.PARTICIPANTS); - paxosUrls.remove(failedMaster.getMasterLocation()); - MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls, - connections, rpcf); - if (masterController == null) { - logger.warn("Could not become master: No master controller."); - return; - } - Runnable sleeperTask = new Runnable() { - @Override public synchronized void run() { - try { - wait(MASTER_TAKEOVER_TIMEOUT); - } catch (InterruptedException e) { - } - } - }; - synchronized (this) { - if (failedMaster.getMasterId() < masterInfo.getMasterId()) { - logger.info("Master election aborted. Master already chosen."); - return; - } - currentMasterProposal = proposer.startProposalTask( - masterInfo.getMasterId() + 1, sleeperTask); - } - Integer result = null; - try { - result = currentMasterProposal.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - logger.error("Error electing master: ", e); - } catch (CancellationException e) { - } - if (!currentMasterProposal.isCancelled() && result != null) { - masterController.enableMaster(new State(state), result); - } - } - private synchronized void abortMasterElection() { if (currentMasterProposal != null && !currentMasterProposal.isDone()) { boolean status = currentMasterProposal.cancel(true); diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 9438c7b..651303c 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -1,5 +1,8 @@ package com.orbekk.same; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +63,9 @@ public class SameController { configuration.get("localIp"), configuration.getInt("port")); String clientUrl = baseUrl + "ClientService.json"; + ExecutorService clientExecutor = Executors.newCachedThreadPool(); Client client = new Client(clientState, connections, - clientUrl, myLocation, rpcf); + clientUrl, myLocation, rpcf, clientExecutor); PaxosServiceImpl paxos = new PaxosServiceImpl(""); SimpleProtobufServer pServer = SimpleProtobufServer.create(pport); diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index a69bdca..2a73656 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -6,6 +6,9 @@ import static org.hamcrest.Matchers.is; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Before; import org.junit.Test; @@ -16,6 +19,7 @@ import com.orbekk.util.DelayedOperation; /** A functional test that runs with a master and several clients. */ public class FunctionalTest { + ExecutorService executor = Executors.newSingleThreadExecutor(); Master master; String masterUrl = "http://master/MasterService.json"; String masterLocation = "master:1"; @@ -35,6 +39,18 @@ public class FunctionalTest { }; }; + /** Works with a single thread executor. */ + public void awaitExecution() throws InterruptedException { + final CountDownLatch finished = new CountDownLatch(1); + Runnable sendFinished = new Runnable() { + @Override public void run() { + finished.countDown(); + } + }; + executor.execute(sendFinished); + finished.await(); + } + @Before public void setUp() { master = Master.create(connections, masterUrl, "TestMaster", masterLocation, rpcf); @@ -52,7 +68,7 @@ public class FunctionalTest { Client newClient(String clientName, String clientUrl, String location) { Client client = new Client(new State(clientName), connections, - clientUrl, location, rpcf); + clientUrl, location, rpcf, executor); connections.clientMap0.put(location, client.getNewService()); clients.add(client); String paxosUrl = clientUrl.replace("ClientService", "PaxosService"); @@ -113,7 +129,7 @@ public class FunctionalTest { assertThat(x2.get(), is("TestValue1")); } - @Test public void clientBecomesMaster() { + @Test public void clientBecomesMaster() throws Exception { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:1"; final Master newMaster = Master.create(connections, @@ -132,12 +148,13 @@ public class FunctionalTest { client2.setMasterController(controller); client3.setMasterController(controller); client1.startMasterElection(); + awaitExecution(); newMaster.performWork(); assertThat(client1.getMaster().getMasterLocation(), is(newMasterLocation)); assertThat(client2.getMaster().getMasterLocation(), is(newMasterLocation)); } - @Test public void onlyOneNewMaster() { + @Test public void onlyOneNewMaster() throws Exception { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:1"; final Master newMaster = Master.create(connections, @@ -160,12 +177,13 @@ public class FunctionalTest { client2.setMasterController(controller); client3.setMasterController(controller); client1.startMasterElection(); + awaitExecution(); newMaster.performWork(); assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); } - @Test public void masterFails() { + @Test public void masterFails() throws Exception { String newMasterUrl = "http://newMaster/MasterService.json"; String newMasterLocation = "newMaster:2"; final Master newMaster = Master.create(connections, @@ -188,6 +206,7 @@ public class FunctionalTest { connections.masterMap0.put(masterLocation, null); assertThat(x1.set("Woop, woop").getStatus().getStatusCode(), is(DelayedOperation.Status.ERROR)); + awaitExecution(); performWork(); newMaster.performWork(); assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index f22e9e1..6aa3b6e 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -1,16 +1,17 @@ package com.orbekk.same; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; public class MasterTest { + private ExecutorService executor = Executors.newCachedThreadPool(); private State state = new State("TestNetwork"); private TestConnectionManager connections = new TestConnectionManager(); private Master master; @@ -30,7 +31,8 @@ public class MasterTest { public void clientJoin() throws Exception { Client client = new Client( new State("ClientNetwork"), connections, - "http://client/ClientService.json", "clientLocation", rpcf); + "http://client/ClientService.json", "clientLocation", rpcf, + executor); connections.clientMap0.put("clientLocation", client.getNewService()); client.joinNetwork(master.getMasterInfo()); master.performWork(); |