diff options
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 45 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java | 105 |
2 files changed, 132 insertions, 18 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 5eb7ebb..8469cf4 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,18 +1,24 @@ package com.orbekk.paxos; -import com.orbekk.same.ConnectionManager; import static com.orbekk.same.StackTraceUtil.throwableToString; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MasterProposer { +import com.orbekk.same.ConnectionManager; + +public class MasterProposer extends Thread { private Logger logger = LoggerFactory.getLogger(getClass()); private String myUrl; private List<String> paxosUrls = new ArrayList<String>(); private ConnectionManager connections; - + public MasterProposer(String clientUrl, List<String> paxosUrls, ConnectionManager connections) { this.myUrl = clientUrl; @@ -68,7 +74,7 @@ public class MasterProposer { } } - public boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -80,11 +86,16 @@ public class MasterProposer { } } - public boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) { + return proposeRetry(proposalNumber, null) != null; + } + + Integer proposeRetry(int proposalNumber, Runnable retryAction) { + assert proposalNumber >= 0; int nextProposal = proposalNumber; - int result = 0; + int result = -1; - while (result != nextProposal) { + while (!Thread.interrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -92,9 +103,27 @@ public class MasterProposer { logger.info("Proposed value {}, result {}", nextProposal, result); if (result < 0) { nextProposal = -result + 1; + if (retryAction != null) { + retryAction.run(); + } } } + if (Thread.interrupted()) { + return null; + } - return true; + return result; + } + + public Future<Integer> startProposalTask(final int proposalNumber, + final Runnable retryAction) { + Callable<Integer> proposalCallable = new Callable<Integer>() { + @Override public Integer call() throws Exception { + return proposeRetry(proposalNumber, retryAction); + } + }; + FutureTask<Integer> task = new FutureTask<Integer>(proposalCallable); + new Thread(task).start(); + return task; } } diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java index 32c7dff..b8d146c 100644 --- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java +++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java @@ -1,22 +1,24 @@ package com.orbekk.paxos; -import static org.junit.Assert.*; - -import com.googlecode.jsonrpc4j.JsonRpcServer; -import com.orbekk.same.ConnectionManagerImpl; -import com.orbekk.same.http.RpcServlet; -import com.orbekk.same.http.JettyServerBuilder; -import com.orbekk.same.http.JettyServerContainer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; -import java.util.Random; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.ConnectionManagerImpl; +import com.orbekk.same.http.JettyServerBuilder; +import com.orbekk.same.http.JettyServerContainer; +import com.orbekk.same.http.RpcServlet; + public class PaxosServiceFunctionalTest { ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500); List<String> paxosUrls = new ArrayList<String>(); @@ -24,6 +26,17 @@ public class PaxosServiceFunctionalTest { String myUrl; int successfulProposals = 0; + Runnable sleepForever = new Runnable() { + @Override public synchronized void run() { + while (!Thread.interrupted()) { + try { + wait(); + } catch (InterruptedException e) { + } + } + } + }; + @Before public void setUp() throws Exception { JettyServerBuilder builder = new JettyServerBuilder(0); @@ -64,8 +77,80 @@ public class PaxosServiceFunctionalTest { connections); assertTrue(m1.propose(1)); } + + @Test + public void testMasterElectionTask() throws InterruptedException, ExecutionException { + MasterProposer m1 = new MasterProposer("http://client1", paxosUrls, + connections); + Future<Integer> result = m1.startProposalTask(1, null); + assertEquals(new Integer(1), result.get()); + } + + @Test + public void cancelledElection() { + MasterProposer m1 = new MasterProposer("http://client1", paxosUrls, + connections); + assertTrue(m1.propose(1)); + + Future<Integer> result = m1.startProposalTask(1, sleepForever); + result.cancel(true); + assertTrue(result.isCancelled()); + } @Test + public void testOnlyOneCompletes() throws InterruptedException, ExecutionException { + MasterProposer m1 = new MasterProposer("http://OnlyOneCompletes1", paxosUrls, + connections); + MasterProposer m2 = new MasterProposer("http://OnlyOneCompletes2", paxosUrls, + connections); + final Future<Integer> result1 = m1.startProposalTask(1, sleepForever); + final Future<Integer> result2 = m2.startProposalTask(1, sleepForever); + + Thread t1 = new Thread(new Runnable() { + @Override public void run() { + try { + result1.get(); + result2.cancel(true); + } catch (CancellationException e) { + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + } + }); + + Thread t2 = new Thread(new Runnable() { + @Override public void run() { + try { + result2.get(); + result1.cancel(true); + } catch (CancellationException e) { + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + } + }); + + t1.start(); + t2.start(); + try { + t1.join(); + } catch (InterruptedException e) { + } + try { + t2.join(); + } catch (InterruptedException e) { + } + + assertTrue(result1.isCancelled() || result2.isCancelled()); + if (!result1.isCancelled()) { + assertEquals(new Integer(1), result1.get()); + } + if (!result2.isCancelled()) { + assertEquals(new Integer(1), result2.get()); + } + } + + @Test public void testWithCompetition() { int proposers = 5; List<Thread> masterProposers = new ArrayList<Thread>(); |