summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-15 13:18:25 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-15 13:18:25 +0100
commit1a2497d1a6ab3c577faf81ca692c218bbdc33fa6 (patch)
tree8779e86db89eefe5a1f83671981241563cb126ff /same/src/main/java/com
parent8b82634fc43b4ffeb899eb03bd52949b7ae9acfe (diff)
Add tools for concurrent master proposal.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java45
1 files changed, 37 insertions, 8 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 5eb7ebb..8469cf4 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -1,18 +1,24 @@
package com.orbekk.paxos;
-import com.orbekk.same.ConnectionManager;
import static com.orbekk.same.StackTraceUtil.throwableToString;
+
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MasterProposer {
+import com.orbekk.same.ConnectionManager;
+
+public class MasterProposer extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
private String myUrl;
private List<String> paxosUrls = new ArrayList<String>();
private ConnectionManager connections;
-
+
public MasterProposer(String clientUrl, List<String> paxosUrls,
ConnectionManager connections) {
this.myUrl = clientUrl;
@@ -68,7 +74,7 @@ public class MasterProposer {
}
}
- public boolean propose(int proposalNumber) {
+ boolean propose(int proposalNumber) {
int result = internalPropose(proposalNumber);
if (result == proposalNumber) {
result = internalAcceptRequest(proposalNumber);
@@ -80,11 +86,16 @@ public class MasterProposer {
}
}
- public boolean proposeRetry(int proposalNumber) {
+ boolean proposeRetry(int proposalNumber) {
+ return proposeRetry(proposalNumber, null) != null;
+ }
+
+ Integer proposeRetry(int proposalNumber, Runnable retryAction) {
+ assert proposalNumber >= 0;
int nextProposal = proposalNumber;
- int result = 0;
+ int result = -1;
- while (result != nextProposal) {
+ while (!Thread.interrupted() && result != nextProposal) {
result = internalPropose(nextProposal);
if (result == nextProposal) {
result = internalAcceptRequest(nextProposal);
@@ -92,9 +103,27 @@ public class MasterProposer {
logger.info("Proposed value {}, result {}", nextProposal, result);
if (result < 0) {
nextProposal = -result + 1;
+ if (retryAction != null) {
+ retryAction.run();
+ }
}
}
+ if (Thread.interrupted()) {
+ return null;
+ }
- return true;
+ return result;
+ }
+
+ public Future<Integer> startProposalTask(final int proposalNumber,
+ final Runnable retryAction) {
+ Callable<Integer> proposalCallable = new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return proposeRetry(proposalNumber, retryAction);
+ }
+ };
+ FutureTask<Integer> task = new FutureTask<Integer>(proposalCallable);
+ new Thread(task).start();
+ return task;
}
}