summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/paxos/MasterProposer.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/paxos/MasterProposer.java')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java146
1 files changed, 95 insertions, 51 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 094a749..9b74532 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -5,76 +5,119 @@ import static com.orbekk.same.StackTraceUtil.throwableToString;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.RpcCallback;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.same.ConnectionManager;
+import com.orbekk.same.Services;
+import com.orbekk.same.Services.ClientState;
+import com.orbekk.same.Services.PaxosRequest;
+import com.orbekk.same.Services.PaxosResponse;
public class MasterProposer extends Thread {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private String myUrl;
- private List<String> paxosUrls = new ArrayList<String>();
- private ConnectionManager connections;
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+ private final ClientState client;
+ private final List<String> paxosLocations;
+ private final ConnectionManager connections;
- public MasterProposer(String clientUrl, List<String> paxosUrls,
+ public MasterProposer(ClientState client, List<String> paxosLocations,
ConnectionManager connections) {
- this.myUrl = clientUrl;
- this.paxosUrls = paxosUrls;
+ this.client = client;
+ this.paxosLocations = paxosLocations;
this.connections = connections;
}
-
- private int internalPropose(int proposalNumber) {
- int bestPromise = -proposalNumber;
- int promises = 0;
- for (String url : paxosUrls) {
- PaxosService paxos = connections.getPaxos(url);
- int result = 0;
- try {
- result = paxos.propose(myUrl, proposalNumber);
- } catch (Exception e) {
- logger.warn("Exception from {}: {}", url,
- throwableToString(e));
+
+ private class ResponseHandler implements RpcCallback<PaxosResponse> {
+ final int proposalNumber;
+ final int numRequests;
+ AtomicInteger bestPromise = new AtomicInteger();
+ AtomicInteger numPromises = new AtomicInteger(0);
+ AtomicInteger numResponses = new AtomicInteger(0);
+ AtomicInteger result = new AtomicInteger();
+ CountDownLatch done = new CountDownLatch(1);
+
+ public ResponseHandler(int proposalNumber, int numRequests) {
+ this.proposalNumber = proposalNumber;
+ this.numRequests = numRequests;
+ bestPromise.set(-proposalNumber);
+ }
+
+ @Override public void run(PaxosResponse response) {
+ numResponses.incrementAndGet();
+ if (response != null) {
+ int result = response.getResult();
+ if (result == proposalNumber) {
+ numPromises.incrementAndGet();
+ }
+ while (true) {
+ int oldVal = bestPromise.get();
+ int update = Math.min(oldVal, result);
+ if (bestPromise.compareAndSet(oldVal, update)) {
+ break;
+ }
+ }
}
- if (result == proposalNumber) {
- promises += 1;
+ checkDone();
+ }
+
+ private void checkDone() {
+ if (done.getCount() > 0) {
+ if (numPromises.get() >= numRequests / 2) {
+ result.set(proposalNumber);
+ done.countDown();
+ } else if (numResponses.get() >= numRequests) {
+ result.set(bestPromise.get());
+ done.countDown();
+ }
}
- bestPromise = Math.min(bestPromise, result);
}
- if (promises > paxosUrls.size() / 2) {
- return proposalNumber;
- } else {
- return bestPromise;
+
+ public int getResult() throws InterruptedException {
+ done.await();
+ return result.get();
}
}
- private int internalAcceptRequest(int proposalNumber) {
- int bestAccepted = -proposalNumber;
- int accepts = 0;
- for (String url : paxosUrls) {
- PaxosService paxos = connections.getPaxos(url);
- int result = 0;
- try {
- result = paxos.acceptRequest(myUrl, proposalNumber);
- } catch (Exception e) {
- logger.warn("Exception from {}: {}", url,
- throwableToString(e));
- }
- if (result == proposalNumber) {
- accepts += 1;
- }
- bestAccepted = Math.min(bestAccepted, result);
+ private int internalPropose(int proposalNumber)
+ throws InterruptedException {
+ ResponseHandler handler = new ResponseHandler(proposalNumber,
+ paxosLocations.size());
+ for (String location : paxosLocations) {
+ Rpc rpc = new Rpc();
+ Services.Paxos paxos = connections.getPaxos0(location);
+ PaxosRequest request = PaxosRequest.newBuilder()
+ .setClient(client)
+ .setProposalNumber(proposalNumber)
+ .build();
+ paxos.propose(rpc, request, handler);
}
- if (accepts > paxosUrls.size() / 2) {
- return proposalNumber;
- } else {
- return bestAccepted;
+ return handler.getResult();
+ }
+
+ private int internalAcceptRequest(int proposalNumber)
+ throws InterruptedException {
+ ResponseHandler handler = new ResponseHandler(proposalNumber,
+ paxosLocations.size());
+ for (String location : paxosLocations) {
+ Rpc rpc = new Rpc();
+ Services.Paxos paxos = connections.getPaxos0(location);
+ PaxosRequest request = PaxosRequest.newBuilder()
+ .setClient(client)
+ .setProposalNumber(proposalNumber)
+ .build();
+ paxos.acceptRequest(rpc, request, handler);
}
+ return handler.getResult();
}
- boolean propose(int proposalNumber) {
+ boolean propose(int proposalNumber) throws InterruptedException {
int result = internalPropose(proposalNumber);
if (result == proposalNumber) {
result = internalAcceptRequest(proposalNumber);
@@ -86,16 +129,17 @@ public class MasterProposer extends Thread {
}
}
- boolean proposeRetry(int proposalNumber) {
+ boolean proposeRetry(int proposalNumber) throws InterruptedException {
return proposeRetry(proposalNumber, null) != null;
}
- Integer proposeRetry(int proposalNumber, Runnable retryAction) {
+ Integer proposeRetry(int proposalNumber, Runnable retryAction)
+ throws InterruptedException {
assert proposalNumber > 0;
int nextProposal = proposalNumber;
int result = nextProposal - 1;
- while (!Thread.interrupted() && result != nextProposal) {
+ while (!Thread.currentThread().isInterrupted() && result != nextProposal) {
result = internalPropose(nextProposal);
if (result == nextProposal) {
result = internalAcceptRequest(nextProposal);
@@ -109,7 +153,7 @@ public class MasterProposer extends Thread {
}
}
if (Thread.interrupted()) {
- return null;
+ throw new InterruptedException();
}
return result;