summaryrefslogtreecommitdiff
path: root/same/src/main/java
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 12:41:25 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 12:41:25 +0200
commitae8791a096626b0196c63e4d9cb68f7a18ad86b0 (patch)
treeb661f744548beaf647e3f734864b2bd15acfcbb7 /same/src/main/java
parent2ef7691606aa86b10e16be93efa9cc7c277c8050 (diff)
Switch to protobuf based Paxos in functional test.
Diffstat (limited to 'same/src/main/java')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java5
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java26
2 files changed, 12 insertions, 19 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 9b74532..dbfd3c1 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -10,6 +10,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.runner.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,10 @@ public class MasterProposer extends Thread {
for (String location : paxosLocations) {
Rpc rpc = new Rpc();
Services.Paxos paxos = connections.getPaxos0(location);
+ if (paxos == null) {
+ handler.run(null);
+ continue;
+ }
PaxosRequest request = PaxosRequest.newBuilder()
.setClient(client)
.setProposalNumber(proposalNumber)
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 1833410..f9bd2a9 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -151,15 +151,15 @@ public class Client {
@Override public void masterDown(RpcController controller, MasterState request,
RpcCallback<Empty> done) {
+ logger.warn("Master down({})", request);
if (request.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master {} is down, but current master is {}. Ignoring.",
request.getMasterId(), masterInfo.getMasterId());
return;
}
- logger.warn("Master down.");
connectionState = ConnectionState.UNSTABLE;
- tryBecomeMaster(request.getMasterId());
done.run(Empty.getDefaultInstance());
+ tryBecomeMaster(request);
}
};
@@ -292,22 +292,9 @@ public class Client {
return newServiceImpl;
}
- private List<String> getPaxosUrlsNoMaster() {
- List<String> paxosUrls = new ArrayList<String>();
- for (String participant : state.getList(".participants")) {
- String masterPaxos = state.getDataOf(".masterUrl")
- .replace("MasterService", "PaxosService");
- String paxos = participant.replace("ClientService", "PaxosService");
- if (!paxos.equals(masterPaxos)) {
- paxosUrls.add(participant.replace("ClientService", "PaxosService"));
- }
- }
- logger.info("Paxos urls: {}", paxosUrls);
- return paxosUrls;
- }
-
- private void tryBecomeMaster(int failedMasterId) {
- List<String> paxosUrls = getPaxosUrlsNoMaster();
+ private void tryBecomeMaster(MasterState failedMaster) {
+ List<String> paxosUrls = state.getList(State.PARTICIPANTS);
+ paxosUrls.remove(failedMaster.getMasterLocation());
MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
connections);
if (masterController == null) {
@@ -323,7 +310,7 @@ public class Client {
}
};
synchronized (this) {
- if (failedMasterId < masterInfo.getMasterId()) {
+ if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master election aborted. Master already chosen.");
return;
}
@@ -335,6 +322,7 @@ public class Client {
result = currentMasterProposal.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
+ logger.error("Error electing master: ", e);
} catch (CancellationException e) {
}
if (!currentMasterProposal.isCancelled() && result != null) {