From 808e9f8c51b32cd052254d25cb6f4c27d353de2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 12:18:31 +0200 Subject: Asynchronous paxos client using the protobuf interface. --- .../main/java/com/orbekk/paxos/MasterProposer.java | 146 ++++++++++++++------- .../java/com/orbekk/same/ConnectionManager.java | 1 + .../com/orbekk/same/ConnectionManagerImpl.java | 10 ++ .../com/orbekk/same/TestConnectionManager.java | 8 ++ 4 files changed, 114 insertions(+), 51 deletions(-) (limited to 'same/src/main/java/com') diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index 094a749..9b74532 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -5,76 +5,119 @@ import static com.orbekk.same.StackTraceUtil.throwableToString; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.ConnectionManager; +import com.orbekk.same.Services; +import com.orbekk.same.Services.ClientState; +import com.orbekk.same.Services.PaxosRequest; +import com.orbekk.same.Services.PaxosResponse; public class MasterProposer extends Thread { - private Logger logger = LoggerFactory.getLogger(getClass()); - private String myUrl; - private List paxosUrls = new ArrayList(); - private ConnectionManager connections; + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ClientState client; + private final List paxosLocations; + private final ConnectionManager connections; - public MasterProposer(String clientUrl, List paxosUrls, + public MasterProposer(ClientState client, List paxosLocations, ConnectionManager connections) { - this.myUrl = clientUrl; - this.paxosUrls = paxosUrls; + this.client = client; + this.paxosLocations = paxosLocations; this.connections = connections; } - - private int internalPropose(int proposalNumber) { - int bestPromise = -proposalNumber; - int promises = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.propose(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); + + private class ResponseHandler implements RpcCallback { + final int proposalNumber; + final int numRequests; + AtomicInteger bestPromise = new AtomicInteger(); + AtomicInteger numPromises = new AtomicInteger(0); + AtomicInteger numResponses = new AtomicInteger(0); + AtomicInteger result = new AtomicInteger(); + CountDownLatch done = new CountDownLatch(1); + + public ResponseHandler(int proposalNumber, int numRequests) { + this.proposalNumber = proposalNumber; + this.numRequests = numRequests; + bestPromise.set(-proposalNumber); + } + + @Override public void run(PaxosResponse response) { + numResponses.incrementAndGet(); + if (response != null) { + int result = response.getResult(); + if (result == proposalNumber) { + numPromises.incrementAndGet(); + } + while (true) { + int oldVal = bestPromise.get(); + int update = Math.min(oldVal, result); + if (bestPromise.compareAndSet(oldVal, update)) { + break; + } + } } - if (result == proposalNumber) { - promises += 1; + checkDone(); + } + + private void checkDone() { + if (done.getCount() > 0) { + if (numPromises.get() >= numRequests / 2) { + result.set(proposalNumber); + done.countDown(); + } else if (numResponses.get() >= numRequests) { + result.set(bestPromise.get()); + done.countDown(); + } } - bestPromise = Math.min(bestPromise, result); } - if (promises > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestPromise; + + public int getResult() throws InterruptedException { + done.await(); + return result.get(); } } - private int internalAcceptRequest(int proposalNumber) { - int bestAccepted = -proposalNumber; - int accepts = 0; - for (String url : paxosUrls) { - PaxosService paxos = connections.getPaxos(url); - int result = 0; - try { - result = paxos.acceptRequest(myUrl, proposalNumber); - } catch (Exception e) { - logger.warn("Exception from {}: {}", url, - throwableToString(e)); - } - if (result == proposalNumber) { - accepts += 1; - } - bestAccepted = Math.min(bestAccepted, result); + private int internalPropose(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.propose(rpc, request, handler); } - if (accepts > paxosUrls.size() / 2) { - return proposalNumber; - } else { - return bestAccepted; + return handler.getResult(); + } + + private int internalAcceptRequest(int proposalNumber) + throws InterruptedException { + ResponseHandler handler = new ResponseHandler(proposalNumber, + paxosLocations.size()); + for (String location : paxosLocations) { + Rpc rpc = new Rpc(); + Services.Paxos paxos = connections.getPaxos0(location); + PaxosRequest request = PaxosRequest.newBuilder() + .setClient(client) + .setProposalNumber(proposalNumber) + .build(); + paxos.acceptRequest(rpc, request, handler); } + return handler.getResult(); } - boolean propose(int proposalNumber) { + boolean propose(int proposalNumber) throws InterruptedException { int result = internalPropose(proposalNumber); if (result == proposalNumber) { result = internalAcceptRequest(proposalNumber); @@ -86,16 +129,17 @@ public class MasterProposer extends Thread { } } - boolean proposeRetry(int proposalNumber) { + boolean proposeRetry(int proposalNumber) throws InterruptedException { return proposeRetry(proposalNumber, null) != null; } - Integer proposeRetry(int proposalNumber, Runnable retryAction) { + Integer proposeRetry(int proposalNumber, Runnable retryAction) + throws InterruptedException { assert proposalNumber > 0; int nextProposal = proposalNumber; int result = nextProposal - 1; - while (!Thread.interrupted() && result != nextProposal) { + while (!Thread.currentThread().isInterrupted() && result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { result = internalAcceptRequest(nextProposal); @@ -109,7 +153,7 @@ public class MasterProposer extends Thread { } } if (Thread.interrupted()) { - return null; + throw new InterruptedException(); } return result; diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index f1ca65f..e4c7832 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -15,4 +15,5 @@ public interface ConnectionManager { Services.Master getMaster0(String location); Services.Client getClient0(String location); Services.Directory getDirectory(String location); + Services.Paxos getPaxos0(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 34536a0..1bf697f 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -145,4 +145,14 @@ public class ConnectionManagerImpl implements ConnectionManager { return null; } } + + @Override + public Services.Paxos getPaxos0(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Paxos.newStub(channel); + } else { + return null; + } + } } diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 2305311..b93e49b 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -6,6 +6,7 @@ import java.util.HashMap; import com.orbekk.paxos.PaxosService; import com.orbekk.same.Services.Directory; import com.orbekk.same.Services.Master; +import com.orbekk.same.Services.Paxos; /** * This class is used in test. @@ -23,6 +24,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap(); public Map clientMap0 = new HashMap(); + public Map paxosMap0 = + new HashMap(); public TestConnectionManager() { } @@ -56,4 +59,9 @@ public class TestConnectionManager implements ConnectionManager { public Services.Client getClient0(String location) { return clientMap0.get(location); } + + @Override + public Services.Paxos getPaxos0(String location) { + return paxosMap0.get(location); + } } -- cgit v1.2.3