summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java146
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManager.java1
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java10
-rw-r--r--same/src/main/java/com/orbekk/same/TestConnectionManager.java8
4 files changed, 114 insertions, 51 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 094a749..9b74532 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -5,76 +5,119 @@ import static com.orbekk.same.StackTraceUtil.throwableToString;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.RpcCallback;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.same.ConnectionManager;
+import com.orbekk.same.Services;
+import com.orbekk.same.Services.ClientState;
+import com.orbekk.same.Services.PaxosRequest;
+import com.orbekk.same.Services.PaxosResponse;
public class MasterProposer extends Thread {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private String myUrl;
- private List<String> paxosUrls = new ArrayList<String>();
- private ConnectionManager connections;
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+ private final ClientState client;
+ private final List<String> paxosLocations;
+ private final ConnectionManager connections;
- public MasterProposer(String clientUrl, List<String> paxosUrls,
+ public MasterProposer(ClientState client, List<String> paxosLocations,
ConnectionManager connections) {
- this.myUrl = clientUrl;
- this.paxosUrls = paxosUrls;
+ this.client = client;
+ this.paxosLocations = paxosLocations;
this.connections = connections;
}
-
- private int internalPropose(int proposalNumber) {
- int bestPromise = -proposalNumber;
- int promises = 0;
- for (String url : paxosUrls) {
- PaxosService paxos = connections.getPaxos(url);
- int result = 0;
- try {
- result = paxos.propose(myUrl, proposalNumber);
- } catch (Exception e) {
- logger.warn("Exception from {}: {}", url,
- throwableToString(e));
+
+ private class ResponseHandler implements RpcCallback<PaxosResponse> {
+ final int proposalNumber;
+ final int numRequests;
+ AtomicInteger bestPromise = new AtomicInteger();
+ AtomicInteger numPromises = new AtomicInteger(0);
+ AtomicInteger numResponses = new AtomicInteger(0);
+ AtomicInteger result = new AtomicInteger();
+ CountDownLatch done = new CountDownLatch(1);
+
+ public ResponseHandler(int proposalNumber, int numRequests) {
+ this.proposalNumber = proposalNumber;
+ this.numRequests = numRequests;
+ bestPromise.set(-proposalNumber);
+ }
+
+ @Override public void run(PaxosResponse response) {
+ numResponses.incrementAndGet();
+ if (response != null) {
+ int result = response.getResult();
+ if (result == proposalNumber) {
+ numPromises.incrementAndGet();
+ }
+ while (true) {
+ int oldVal = bestPromise.get();
+ int update = Math.min(oldVal, result);
+ if (bestPromise.compareAndSet(oldVal, update)) {
+ break;
+ }
+ }
}
- if (result == proposalNumber) {
- promises += 1;
+ checkDone();
+ }
+
+ private void checkDone() {
+ if (done.getCount() > 0) {
+ if (numPromises.get() >= numRequests / 2) {
+ result.set(proposalNumber);
+ done.countDown();
+ } else if (numResponses.get() >= numRequests) {
+ result.set(bestPromise.get());
+ done.countDown();
+ }
}
- bestPromise = Math.min(bestPromise, result);
}
- if (promises > paxosUrls.size() / 2) {
- return proposalNumber;
- } else {
- return bestPromise;
+
+ public int getResult() throws InterruptedException {
+ done.await();
+ return result.get();
}
}
- private int internalAcceptRequest(int proposalNumber) {
- int bestAccepted = -proposalNumber;
- int accepts = 0;
- for (String url : paxosUrls) {
- PaxosService paxos = connections.getPaxos(url);
- int result = 0;
- try {
- result = paxos.acceptRequest(myUrl, proposalNumber);
- } catch (Exception e) {
- logger.warn("Exception from {}: {}", url,
- throwableToString(e));
- }
- if (result == proposalNumber) {
- accepts += 1;
- }
- bestAccepted = Math.min(bestAccepted, result);
+ private int internalPropose(int proposalNumber)
+ throws InterruptedException {
+ ResponseHandler handler = new ResponseHandler(proposalNumber,
+ paxosLocations.size());
+ for (String location : paxosLocations) {
+ Rpc rpc = new Rpc();
+ Services.Paxos paxos = connections.getPaxos0(location);
+ PaxosRequest request = PaxosRequest.newBuilder()
+ .setClient(client)
+ .setProposalNumber(proposalNumber)
+ .build();
+ paxos.propose(rpc, request, handler);
}
- if (accepts > paxosUrls.size() / 2) {
- return proposalNumber;
- } else {
- return bestAccepted;
+ return handler.getResult();
+ }
+
+ private int internalAcceptRequest(int proposalNumber)
+ throws InterruptedException {
+ ResponseHandler handler = new ResponseHandler(proposalNumber,
+ paxosLocations.size());
+ for (String location : paxosLocations) {
+ Rpc rpc = new Rpc();
+ Services.Paxos paxos = connections.getPaxos0(location);
+ PaxosRequest request = PaxosRequest.newBuilder()
+ .setClient(client)
+ .setProposalNumber(proposalNumber)
+ .build();
+ paxos.acceptRequest(rpc, request, handler);
}
+ return handler.getResult();
}
- boolean propose(int proposalNumber) {
+ boolean propose(int proposalNumber) throws InterruptedException {
int result = internalPropose(proposalNumber);
if (result == proposalNumber) {
result = internalAcceptRequest(proposalNumber);
@@ -86,16 +129,17 @@ public class MasterProposer extends Thread {
}
}
- boolean proposeRetry(int proposalNumber) {
+ boolean proposeRetry(int proposalNumber) throws InterruptedException {
return proposeRetry(proposalNumber, null) != null;
}
- Integer proposeRetry(int proposalNumber, Runnable retryAction) {
+ Integer proposeRetry(int proposalNumber, Runnable retryAction)
+ throws InterruptedException {
assert proposalNumber > 0;
int nextProposal = proposalNumber;
int result = nextProposal - 1;
- while (!Thread.interrupted() && result != nextProposal) {
+ while (!Thread.currentThread().isInterrupted() && result != nextProposal) {
result = internalPropose(nextProposal);
if (result == nextProposal) {
result = internalAcceptRequest(nextProposal);
@@ -109,7 +153,7 @@ public class MasterProposer extends Thread {
}
}
if (Thread.interrupted()) {
- return null;
+ throw new InterruptedException();
}
return result;
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java
index f1ca65f..e4c7832 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java
@@ -15,4 +15,5 @@ public interface ConnectionManager {
Services.Master getMaster0(String location);
Services.Client getClient0(String location);
Services.Directory getDirectory(String location);
+ Services.Paxos getPaxos0(String location);
}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index 34536a0..1bf697f 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -145,4 +145,14 @@ public class ConnectionManagerImpl implements ConnectionManager {
return null;
}
}
+
+ @Override
+ public Services.Paxos getPaxos0(String location) {
+ RpcChannel channel = getChannel(location);
+ if (channel != null) {
+ return Services.Paxos.newStub(channel);
+ } else {
+ return null;
+ }
+ }
}
diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
index 2305311..b93e49b 100644
--- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
@@ -6,6 +6,7 @@ import java.util.HashMap;
import com.orbekk.paxos.PaxosService;
import com.orbekk.same.Services.Directory;
import com.orbekk.same.Services.Master;
+import com.orbekk.same.Services.Paxos;
/**
* This class is used in test.
@@ -23,6 +24,8 @@ public class TestConnectionManager implements ConnectionManager {
new HashMap<String, Services.Master>();
public Map<String, Services.Client> clientMap0 =
new HashMap<String, Services.Client>();
+ public Map<String, Services.Paxos> paxosMap0 =
+ new HashMap<String, Services.Paxos>();
public TestConnectionManager() {
}
@@ -56,4 +59,9 @@ public class TestConnectionManager implements ConnectionManager {
public Services.Client getClient0(String location) {
return clientMap0.get(location);
}
+
+ @Override
+ public Services.Paxos getPaxos0(String location) {
+ return paxosMap0.get(location);
+ }
}