diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/paxos')
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 146 |
1 files changed, 95 insertions, 51 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 094a749..9b74532 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -5,76 +5,119 @@ import static com.orbekk.same.StackTraceUtil.throwableToString; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.ConnectionManager; +import com.orbekk.same.Services; +import com.orbekk.same.Services.ClientState; +import com.orbekk.same.Services.PaxosRequest; +import com.orbekk.same.Services.PaxosResponse; public class MasterProposer extends Thread { - private Logger logger = LoggerFactory.getLogger(getClass()); - private String myUrl; - private List<String> paxosUrls = new ArrayList<String>(); - private ConnectionManager connections; + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ClientState client; + private final List<String> paxosLocations; + private final ConnectionManager connections; - public MasterProposer(String clientUrl, List<String> paxosUrls, + public MasterProposer(ClientState client, List<String> paxosLocations, ConnectionManager connections) { - this.myUrl = clientUrl; - this.paxosUrls = paxosUrls; + this.client = client; + this.paxosLocations = paxosLocations; this.connections = connections; } - - private int internalPropose(int proposalNumber) { - int bestPromise = -proposalNumber; - int promises = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.propose(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); + + private class ResponseHandler implements RpcCallback<PaxosResponse> { + final int proposalNumber; + final int numRequests; + AtomicInteger bestPromise = new AtomicInteger(); + AtomicInteger numPromises = new AtomicInteger(0); + AtomicInteger numResponses = new AtomicInteger(0); + AtomicInteger result = new AtomicInteger(); + CountDownLatch done = new CountDownLatch(1); + + public ResponseHandler(int proposalNumber, int numRequests) { + this.proposalNumber = proposalNumber; + this.numRequests = numRequests; + bestPromise.set(-proposalNumber); + } + + @Override public void run(PaxosResponse response) { + numResponses.incrementAndGet(); + if (response != null) { + int result = response.getResult(); + if (result == proposalNumber) { + numPromises.incrementAndGet(); + } + while (true) { + int oldVal = bestPromise.get(); + int update = Math.min(oldVal, result); + if (bestPromise.compareAndSet(oldVal, update)) { + break; + } + } } - if (result == proposalNumber) { - promises += 1; + checkDone(); + } + + private void checkDone() { + if (done.getCount() > 0) { + if (numPromises.get() >= numRequests / 2) { + result.set(proposalNumber); + done.countDown(); + } else if (numResponses.get() >= numRequests) { + result.set(bestPromise.get()); + done.countDown(); + } } - bestPromise = Math.min(bestPromise, result); } - if (promises > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestPromise; + + public int getResult() throws InterruptedException { + done.await(); + return result.get(); } } - private int internalAcceptRequest(int proposalNumber) { - int bestAccepted = -proposalNumber; - int accepts = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.acceptRequest(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); - } - if (result == proposalNumber) { - accepts += 1; - } - bestAccepted = Math.min(bestAccepted, result); + private int internalPropose(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.propose(rpc, request, handler); } - if (accepts > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestAccepted; + return handler.getResult(); + } + + private int internalAcceptRequest(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.acceptRequest(rpc, request, handler); } + return handler.getResult(); } - boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) throws InterruptedException { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -86,16 +129,17 @@ public class MasterProposer extends Thread { } } - boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) throws InterruptedException { return proposeRetry(proposalNumber, null) != null; } - Integer proposeRetry(int proposalNumber, Runnable retryAction) { + Integer proposeRetry(int proposalNumber, Runnable retryAction) + throws InterruptedException { assert proposalNumber > 0; int nextProposal = proposalNumber; int result = nextProposal - 1; - while (!Thread.interrupted() && result != nextProposal) { + while (!Thread.currentThread().isInterrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -109,7 +153,7 @@ public class MasterProposer extends Thread { } } if (Thread.interrupted()) { - return null; + throw new InterruptedException(); } return result; |