diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 12:18:31 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 12:18:31 +0200 |
commit | 808e9f8c51b32cd052254d25cb6f4c27d353de2b (patch) | |
tree | d018cdf117f2172ae866dcb85e80b8ed18b6c6ad /same/src/main/java/com/orbekk | |
parent | b2b40834e22430b56f6857f8670a5c5a81900de4 (diff) |
Asynchronous paxos client using the protobuf interface.
Diffstat (limited to 'same/src/main/java/com/orbekk')
4 files changed, 114 insertions, 51 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 094a749..9b74532 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -5,76 +5,119 @@ import static com.orbekk.same.StackTraceUtil.throwableToString; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.ConnectionManager; +import com.orbekk.same.Services; +import com.orbekk.same.Services.ClientState; +import com.orbekk.same.Services.PaxosRequest; +import com.orbekk.same.Services.PaxosResponse; public class MasterProposer extends Thread { - private Logger logger = LoggerFactory.getLogger(getClass()); - private String myUrl; - private List<String> paxosUrls = new ArrayList<String>(); - private ConnectionManager connections; + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ClientState client; + private final List<String> paxosLocations; + private final ConnectionManager connections; - public MasterProposer(String clientUrl, List<String> paxosUrls, + public MasterProposer(ClientState client, List<String> paxosLocations, ConnectionManager connections) { - this.myUrl = clientUrl; - this.paxosUrls = paxosUrls; + this.client = client; + this.paxosLocations = paxosLocations; this.connections = connections; } - - private int internalPropose(int proposalNumber) { - int bestPromise = -proposalNumber; - int promises = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.propose(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); + + private class ResponseHandler implements RpcCallback<PaxosResponse> { + final int proposalNumber; + final int numRequests; + AtomicInteger bestPromise = new AtomicInteger(); + AtomicInteger numPromises = new AtomicInteger(0); + AtomicInteger numResponses = new AtomicInteger(0); + AtomicInteger result = new AtomicInteger(); + CountDownLatch done = new CountDownLatch(1); + + public ResponseHandler(int proposalNumber, int numRequests) { + this.proposalNumber = proposalNumber; + this.numRequests = numRequests; + bestPromise.set(-proposalNumber); + } + + @Override public void run(PaxosResponse response) { + numResponses.incrementAndGet(); + if (response != null) { + int result = response.getResult(); + if (result == proposalNumber) { + numPromises.incrementAndGet(); + } + while (true) { + int oldVal = bestPromise.get(); + int update = Math.min(oldVal, result); + if (bestPromise.compareAndSet(oldVal, update)) { + break; + } + } } - if (result == proposalNumber) { - promises += 1; + checkDone(); + } + + private void checkDone() { + if (done.getCount() > 0) { + if (numPromises.get() >= numRequests / 2) { + result.set(proposalNumber); + done.countDown(); + } else if (numResponses.get() >= numRequests) { + result.set(bestPromise.get()); + done.countDown(); + } } - bestPromise = Math.min(bestPromise, result); } - if (promises > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestPromise; + + public int getResult() throws InterruptedException { + done.await(); + return result.get(); } } - private int internalAcceptRequest(int proposalNumber) { - int bestAccepted = -proposalNumber; - int accepts = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.acceptRequest(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); - } - if (result == proposalNumber) { - accepts += 1; - } - bestAccepted = Math.min(bestAccepted, result); + private int internalPropose(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.propose(rpc, request, handler); } - if (accepts > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestAccepted; + return handler.getResult(); + } + + private int internalAcceptRequest(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.acceptRequest(rpc, request, handler); } + return handler.getResult(); } - boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) throws InterruptedException { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -86,16 +129,17 @@ public class MasterProposer extends Thread { } } - boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) throws InterruptedException { return proposeRetry(proposalNumber, null) != null; } - Integer proposeRetry(int proposalNumber, Runnable retryAction) { + Integer proposeRetry(int proposalNumber, Runnable retryAction) + throws InterruptedException { assert proposalNumber > 0; int nextProposal = proposalNumber; int result = nextProposal - 1; - while (!Thread.interrupted() && result != nextProposal) { + while (!Thread.currentThread().isInterrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -109,7 +153,7 @@ public class MasterProposer extends Thread { } } if (Thread.interrupted()) { - return null; + throw new InterruptedException(); } return result; diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index f1ca65f..e4c7832 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -15,4 +15,5 @@ public interface ConnectionManager { Services.Master getMaster0(String location); Services.Client getClient0(String location); Services.Directory getDirectory(String location); + Services.Paxos getPaxos0(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 34536a0..1bf697f 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -145,4 +145,14 @@ public class ConnectionManagerImpl implements ConnectionManager { return null; } } + + @Override + public Services.Paxos getPaxos0(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Paxos.newStub(channel); + } else { + return null; + } + } } diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 2305311..b93e49b 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -6,6 +6,7 @@ import java.util.HashMap; import com.orbekk.paxos.PaxosService; import com.orbekk.same.Services.Directory; import com.orbekk.same.Services.Master; +import com.orbekk.same.Services.Paxos; /** * This class is used in test. @@ -23,6 +24,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap<String, Services.Master>(); public Map<String, Services.Client> clientMap0 = new HashMap<String, Services.Client>(); + public Map<String, Services.Paxos> paxosMap0 = + new HashMap<String, Services.Paxos>(); public TestConnectionManager() { } @@ -56,4 +59,9 @@ public class TestConnectionManager implements ConnectionManager { public Services.Client getClient0(String location) { return clientMap0.get(location); } + + @Override + public Services.Paxos getPaxos0(String location) { + return paxosMap0.get(location); + } } |