From 1a2497d1a6ab3c577faf81ca692c218bbdc33fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Thu, 15 Mar 2012 13:18:25 +0100 Subject: Add tools for concurrent master proposal. --- .../main/java/com/orbekk/paxos/MasterProposer.java | 45 ++++++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) (limited to 'same/src/main/java') diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 5eb7ebb..8469cf4 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,18 +1,24 @@ package com.orbekk.paxos; -import com.orbekk.same.ConnectionManager; import static com.orbekk.same.StackTraceUtil.throwableToString; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MasterProposer { +import com.orbekk.same.ConnectionManager; + +public class MasterProposer extends Thread { private Logger logger = LoggerFactory.getLogger(getClass()); private String myUrl; private List paxosUrls = new ArrayList(); private ConnectionManager connections; - + public MasterProposer(String clientUrl, List paxosUrls, ConnectionManager connections) { this.myUrl = clientUrl; @@ -68,7 +74,7 @@ public class MasterProposer { } } - public boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -80,11 +86,16 @@ public class MasterProposer { } } - public boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) { + return proposeRetry(proposalNumber, null) != null; + } + + Integer proposeRetry(int proposalNumber, Runnable retryAction) { + assert proposalNumber >= 0; int nextProposal = proposalNumber; - int result = 0; + int result = -1; - while (result != nextProposal) { + while (!Thread.interrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -92,9 +103,27 @@ public class MasterProposer { logger.info("Proposed value {}, result {}", nextProposal, result); if (result < 0) { nextProposal = -result + 1; + if (retryAction != null) { + retryAction.run(); + } } } + if (Thread.interrupted()) { + return null; + } - return true; + return result; + } + + public Future startProposalTask(final int proposalNumber, + final Runnable retryAction) { + Callable proposalCallable = new Callable() { + @Override public Integer call() throws Exception { + return proposeRetry(proposalNumber, retryAction); + } + }; + FutureTask task = new FutureTask(proposalCallable); + new Thread(task).start(); + return task; } } -- cgit v1.2.3