summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 12:41:25 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 12:41:25 +0200
commitae8791a096626b0196c63e4d9cb68f7a18ad86b0 (patch)
treeb661f744548beaf647e3f734864b2bd15acfcbb7
parent2ef7691606aa86b10e16be93efa9cc7c277c8050 (diff)
Switch to protobuf based Paxos in functional test.
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java5
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java26
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java11
3 files changed, 18 insertions, 24 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index 9b74532..dbfd3c1 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -10,6 +10,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.runner.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,10 @@ public class MasterProposer extends Thread {
for (String location : paxosLocations) {
Rpc rpc = new Rpc();
Services.Paxos paxos = connections.getPaxos0(location);
+ if (paxos == null) {
+ handler.run(null);
+ continue;
+ }
PaxosRequest request = PaxosRequest.newBuilder()
.setClient(client)
.setProposalNumber(proposalNumber)
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 1833410..f9bd2a9 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -151,15 +151,15 @@ public class Client {
@Override public void masterDown(RpcController controller, MasterState request,
RpcCallback<Empty> done) {
+ logger.warn("Master down({})", request);
if (request.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master {} is down, but current master is {}. Ignoring.",
request.getMasterId(), masterInfo.getMasterId());
return;
}
- logger.warn("Master down.");
connectionState = ConnectionState.UNSTABLE;
- tryBecomeMaster(request.getMasterId());
done.run(Empty.getDefaultInstance());
+ tryBecomeMaster(request);
}
};
@@ -292,22 +292,9 @@ public class Client {
return newServiceImpl;
}
- private List<String> getPaxosUrlsNoMaster() {
- List<String> paxosUrls = new ArrayList<String>();
- for (String participant : state.getList(".participants")) {
- String masterPaxos = state.getDataOf(".masterUrl")
- .replace("MasterService", "PaxosService");
- String paxos = participant.replace("ClientService", "PaxosService");
- if (!paxos.equals(masterPaxos)) {
- paxosUrls.add(participant.replace("ClientService", "PaxosService"));
- }
- }
- logger.info("Paxos urls: {}", paxosUrls);
- return paxosUrls;
- }
-
- private void tryBecomeMaster(int failedMasterId) {
- List<String> paxosUrls = getPaxosUrlsNoMaster();
+ private void tryBecomeMaster(MasterState failedMaster) {
+ List<String> paxosUrls = state.getList(State.PARTICIPANTS);
+ paxosUrls.remove(failedMaster.getMasterLocation());
MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
connections);
if (masterController == null) {
@@ -323,7 +310,7 @@ public class Client {
}
};
synchronized (this) {
- if (failedMasterId < masterInfo.getMasterId()) {
+ if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master election aborted. Master already chosen.");
return;
}
@@ -335,6 +322,7 @@ public class Client {
result = currentMasterProposal.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
+ logger.error("Error electing master: ", e);
} catch (CancellationException e) {
}
if (!currentMasterProposal.isCancelled() && result != null) {
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 79bd9b8..c0b710e 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -53,8 +53,9 @@ public class FunctionalTest {
connections.clientMap0.put(location, client.getNewService());
clients.add(client);
String paxosUrl = clientUrl.replace("ClientService", "PaxosService");
- PaxosService paxos = new PaxosServiceImpl(paxosUrl);
+ PaxosServiceImpl paxos = new PaxosServiceImpl(paxosUrl);
connections.paxosMap.put(paxosUrl, paxos);
+ connections.paxosMap0.put(location, paxos.getService());
return client;
}
@@ -86,10 +87,10 @@ public class FunctionalTest {
@Test public void testJoin() {
joinClients();
for (State s : getStates()) {
- List<String> participants = s.getList(".participants");
- assertThat(participants, hasItem("http://client1/ClientService.json"));
- assertThat(participants, hasItem("http://client2/ClientService.json"));
- assertThat(participants, hasItem("http://client3/ClientService.json"));
+ List<String> participants = s.getList(State.PARTICIPANTS);
+ assertThat(participants, hasItem("client1"));
+ assertThat(participants, hasItem("client2"));
+ assertThat(participants, hasItem("client3"));
}
for (Client c : clients) {
assertThat(c.getConnectionState(), is(ConnectionState.STABLE));