diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-15 13:18:25 +0100 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-15 13:18:25 +0100 |
commit | 1a2497d1a6ab3c577faf81ca692c218bbdc33fa6 (patch) | |
tree | 8779e86db89eefe5a1f83671981241563cb126ff /same/src/main/java/com | |
parent | 8b82634fc43b4ffeb899eb03bd52949b7ae9acfe (diff) |
Add tools for concurrent master proposal.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 5eb7ebb..8469cf4 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,18 +1,24 @@ package com.orbekk.paxos; -import com.orbekk.same.ConnectionManager; import static com.orbekk.same.StackTraceUtil.throwableToString; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MasterProposer { +import com.orbekk.same.ConnectionManager; + +public class MasterProposer extends Thread { private Logger logger = LoggerFactory.getLogger(getClass()); private String myUrl; private List<String> paxosUrls = new ArrayList<String>(); private ConnectionManager connections; - + public MasterProposer(String clientUrl, List<String> paxosUrls, ConnectionManager connections) { this.myUrl = clientUrl; @@ -68,7 +74,7 @@ public class MasterProposer { } } - public boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -80,11 +86,16 @@ public class MasterProposer { } } - public boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) { + return proposeRetry(proposalNumber, null) != null; + } + + Integer proposeRetry(int proposalNumber, Runnable retryAction) { + assert proposalNumber >= 0; int nextProposal = proposalNumber; - int result = 0; + int result = -1; - while (result != nextProposal) { + while (!Thread.interrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -92,9 +103,27 @@ public class MasterProposer { logger.info("Proposed value {}, result {}", nextProposal, result); if (result < 0) { nextProposal = -result + 1; + if (retryAction != null) { + retryAction.run(); + } } } + if (Thread.interrupted()) { + return null; + } - return true; + return result; + } + + public Future<Integer> startProposalTask(final int proposalNumber, + final Runnable retryAction) { + Callable<Integer> proposalCallable = new Callable<Integer>() { + @Override public Integer call() throws Exception { + return proposeRetry(proposalNumber, retryAction); + } + }; + FutureTask<Integer> task = new FutureTask<Integer>(proposalCallable); + new Thread(task).start(); + return task; } } |