summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java45
-rw-r--r--same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java105
2 files changed, 132 insertions, 18 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 5eb7ebb..8469cf4 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -1,18 +1,24 @@
package com.orbekk.paxos;
-import com.orbekk.same.ConnectionManager;
import static com.orbekk.same.StackTraceUtil.throwableToString;
+
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MasterProposer {
+import com.orbekk.same.ConnectionManager;
+
+public class MasterProposer extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
private String myUrl;
private List<String> paxosUrls = new ArrayList<String>();
private ConnectionManager connections;
-
+
public MasterProposer(String clientUrl, List<String> paxosUrls,
ConnectionManager connections) {
this.myUrl = clientUrl;
@@ -68,7 +74,7 @@ public class MasterProposer {
}
}
- public boolean propose(int proposalNumber) {
+ boolean propose(int proposalNumber) {
int result = internalPropose(proposalNumber);
if (result == proposalNumber) {
result = internalAcceptRequest(proposalNumber);
@@ -80,11 +86,16 @@ public class MasterProposer {
}
}
- public boolean proposeRetry(int proposalNumber) {
+ boolean proposeRetry(int proposalNumber) {
+ return proposeRetry(proposalNumber, null) != null;
+ }
+
+ Integer proposeRetry(int proposalNumber, Runnable retryAction) {
+ assert proposalNumber >= 0;
int nextProposal = proposalNumber;
- int result = 0;
+ int result = -1;
- while (result != nextProposal) {
+ while (!Thread.interrupted() && result != nextProposal) {
result = internalPropose(nextProposal);
if (result == nextProposal) {
result = internalAcceptRequest(nextProposal);
@@ -92,9 +103,27 @@ public class MasterProposer {
logger.info("Proposed value {}, result {}", nextProposal, result);
if (result < 0) {
nextProposal = -result + 1;
+ if (retryAction != null) {
+ retryAction.run();
+ }
}
}
+ if (Thread.interrupted()) {
+ return null;
+ }
- return true;
+ return result;
+ }
+
+ public Future<Integer> startProposalTask(final int proposalNumber,
+ final Runnable retryAction) {
+ Callable<Integer> proposalCallable = new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return proposeRetry(proposalNumber, retryAction);
+ }
+ };
+ FutureTask<Integer> task = new FutureTask<Integer>(proposalCallable);
+ new Thread(task).start();
+ return task;
}
}
diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
index 32c7dff..b8d146c 100644
--- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
+++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
@@ -1,22 +1,24 @@
package com.orbekk.paxos;
-import static org.junit.Assert.*;
-
-import com.googlecode.jsonrpc4j.JsonRpcServer;
-import com.orbekk.same.ConnectionManagerImpl;
-import com.orbekk.same.http.RpcServlet;
-import com.orbekk.same.http.JettyServerBuilder;
-import com.orbekk.same.http.JettyServerContainer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Server;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.ConnectionManagerImpl;
+import com.orbekk.same.http.JettyServerBuilder;
+import com.orbekk.same.http.JettyServerContainer;
+import com.orbekk.same.http.RpcServlet;
+
public class PaxosServiceFunctionalTest {
ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500);
List<String> paxosUrls = new ArrayList<String>();
@@ -24,6 +26,17 @@ public class PaxosServiceFunctionalTest {
String myUrl;
int successfulProposals = 0;
+ Runnable sleepForever = new Runnable() {
+ @Override public synchronized void run() {
+ while (!Thread.interrupted()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+
@Before
public void setUp() throws Exception {
JettyServerBuilder builder = new JettyServerBuilder(0);
@@ -64,8 +77,80 @@ public class PaxosServiceFunctionalTest {
connections);
assertTrue(m1.propose(1));
}
+
+ @Test
+ public void testMasterElectionTask() throws InterruptedException, ExecutionException {
+ MasterProposer m1 = new MasterProposer("http://client1", paxosUrls,
+ connections);
+ Future<Integer> result = m1.startProposalTask(1, null);
+ assertEquals(new Integer(1), result.get());
+ }
+
+ @Test
+ public void cancelledElection() {
+ MasterProposer m1 = new MasterProposer("http://client1", paxosUrls,
+ connections);
+ assertTrue(m1.propose(1));
+
+ Future<Integer> result = m1.startProposalTask(1, sleepForever);
+ result.cancel(true);
+ assertTrue(result.isCancelled());
+ }
@Test
+ public void testOnlyOneCompletes() throws InterruptedException, ExecutionException {
+ MasterProposer m1 = new MasterProposer("http://OnlyOneCompletes1", paxosUrls,
+ connections);
+ MasterProposer m2 = new MasterProposer("http://OnlyOneCompletes2", paxosUrls,
+ connections);
+ final Future<Integer> result1 = m1.startProposalTask(1, sleepForever);
+ final Future<Integer> result2 = m2.startProposalTask(1, sleepForever);
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ result1.get();
+ result2.cancel(true);
+ } catch (CancellationException e) {
+ } catch (InterruptedException e) {
+ } catch (ExecutionException e) {
+ }
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ result2.get();
+ result1.cancel(true);
+ } catch (CancellationException e) {
+ } catch (InterruptedException e) {
+ } catch (ExecutionException e) {
+ }
+ }
+ });
+
+ t1.start();
+ t2.start();
+ try {
+ t1.join();
+ } catch (InterruptedException e) {
+ }
+ try {
+ t2.join();
+ } catch (InterruptedException e) {
+ }
+
+ assertTrue(result1.isCancelled() || result2.isCancelled());
+ if (!result1.isCancelled()) {
+ assertEquals(new Integer(1), result1.get());
+ }
+ if (!result2.isCancelled()) {
+ assertEquals(new Integer(1), result2.get());
+ }
+ }
+
+ @Test
public void testWithCompetition() {
int proposers = 5;
List<Thread> masterProposers = new ArrayList<Thread>();