summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 15:37:31 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 15:37:31 +0200
commita1060ee59fe7c9107641543babe950250835fa4b (patch)
treecec1c4d5d46d40e3e6f9ce1e9e7eba3564145b8b
parenta6a1c550731caaa2e46aab2112b81f318da229aa (diff)
Client.startMasterElection() uses protobuf service.
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java72
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java30
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java2
3 files changed, 57 insertions, 47 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 6386928..51d62e7 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -29,12 +29,10 @@ public class Client {
volatile State state;
private volatile String myUrl;
private volatile String myLocation;
- volatile String masterUrl;
- volatile String masterLocation;
- private volatile int masterId = 0;
private volatile MasterController masterController = null;
private final Broadcaster broadcaster;
private volatile Future<Integer> currentMasterProposal = null;
+ private volatile MasterState masterInfo;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -60,7 +58,8 @@ public class Client {
return op;
}
- Services.Master master = connections.getMaster0(masterLocation);
+ Services.Master master = connections.getMaster0(
+ masterInfo.getMasterLocation());
if (master == null) {
op.complete(DelayedOperation.Status.createError(
"Not connected to master."));
@@ -136,26 +135,25 @@ public class Client {
@Override public void masterTakeover(RpcController controller,
MasterState request, RpcCallback<Empty> done) {
logger.info("MasterTakeover({})", request);
- if (request.getMasterId() <= Client.this.masterId) {
+ if (masterInfo != null &&
+ request.getMasterId() <= masterInfo.getMasterId()) {
logger.warn("{} tried to take over, but current master is " +
"{}:{}. Ignoring", new Object[]{request,
state.getDataOf(".masterUrl"),
- Client.this.masterId});
+ masterInfo.getMasterId()});
return;
}
abortMasterElection();
- Client.this.masterUrl = request.getMasterUrl();
- Client.this.masterLocation = request.getMasterLocation();
- Client.this.masterId = request.getMasterId();
+ masterInfo = request;
connectionState = ConnectionState.STABLE;
done.run(Empty.getDefaultInstance());
}
@Override public void masterDown(RpcController controller, MasterState request,
RpcCallback<Empty> done) {
- if (request.getMasterId() < Client.this.masterId) {
+ if (request.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master {} is down, but current master is {}. Ignoring.",
- request.getMasterId(), Client.this.masterId);
+ request.getMasterId(), masterInfo.getMasterId());
return;
}
logger.warn("Master down.");
@@ -197,9 +195,7 @@ public class Client {
@Override
public void masterDown(int masterId) throws Exception {
- Services.MasterState request = Services.MasterState.newBuilder()
- .setMasterUrl(masterUrl)
- .setNetworkName(state.getDataOf(".networkName"))
+ Services.MasterState request = masterInfo.toBuilder()
.setMasterId(masterId)
.build();
newServiceImpl.masterDown(null, request, noOp);
@@ -236,6 +232,10 @@ public class Client {
.build();
}
+ public MasterState getMaster() {
+ return masterInfo;
+ }
+
public ConnectionState getConnectionState() {
return connectionState;
}
@@ -246,7 +246,7 @@ public class Client {
private synchronized void reset() {
state.clear();
- masterId = 0;
+ masterInfo = null;
}
public Rpc joinNetwork(Services.MasterState masterInfo) {
@@ -288,6 +288,10 @@ public class Client {
return serviceImpl;
}
+ public Services.Client getNewService() {
+ return newServiceImpl;
+ }
+
private List<String> getPaxosUrlsNoMaster() {
List<String> paxosUrls = new ArrayList<String>();
for (String participant : state.getList(".participants")) {
@@ -319,12 +323,12 @@ public class Client {
}
};
synchronized (this) {
- if (failedMasterId < masterId) {
+ if (failedMasterId < masterInfo.getMasterId()) {
logger.info("Master election aborted. Master already chosen.");
return;
}
- currentMasterProposal = proposer.startProposalTask(masterId + 1,
- sleeperTask);
+ currentMasterProposal = proposer.startProposalTask(
+ masterInfo.getMasterId() + 1, sleeperTask);
}
Integer result = null;
try {
@@ -345,23 +349,23 @@ public class Client {
}
}
- private int getMasterIdEstimate() {
- return masterId;
- }
-
public void startMasterElection() {
- List<String> participants = state.getList(".participants");
- final int masterId = getMasterIdEstimate();
- broadcaster.broadcast(participants, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterDown(masterId);
- } catch (Exception e) {
- logger.info("{}.masterDown() did not respond (ignored): " +
- url, e);
- }
+ List<String> participants = state.getList(State.PARTICIPANTS);
+ final MasterState failedMaster = masterInfo;
+
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ // Ignore unresponsive clients - master election will take
+ // care of them.
}
- });
+ };
+
+ for (String location : participants) {
+ Rpc rpc = new Rpc();
+ Services.Client client = connections.getClient0(location);
+ if (client != null) {
+ client.masterDown(rpc, failedMaster, done);
+ }
+ }
}
}
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 73f0a37..0a20fc1 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -38,18 +38,22 @@ public class FunctionalTest {
masterServiceProxy = new MasterServiceProxy(master.getService());
connections.masterMap.put(masterUrl, masterServiceProxy);
connections.masterMap0.put(masterLocation, master.getNewService());
- client1 = newClient("TestClient1", "http://client1/ClientService.json");
+ client1 = newClient("TestClient1", "http://client1/ClientService.json",
+ "client1");
vf1 = new VariableFactory(client1.getInterface());
- client2 = newClient("TestClient2", "http://client2/ClientService.json");
+ client2 = newClient("TestClient2", "http://client2/ClientService.json",
+ "client2");
vf2 = new VariableFactory(client2.getInterface());
- client3 = newClient("TestClient3", "http://client3/ClientService.json");
+ client3 = newClient("TestClient3", "http://client3/ClientService.json",
+ "client3");
vf3 = new VariableFactory(client3.getInterface());
}
- Client newClient(String clientName, String clientUrl) {
+ Client newClient(String clientName, String clientUrl, String location) {
Client client = new Client(new State(clientName), connections,
- clientUrl, "clientLocation", broadcaster);
+ clientUrl, location, broadcaster);
connections.clientMap.put(clientUrl, client.getService());
+ connections.clientMap0.put(location, client.getNewService());
clients.add(client);
String paxosUrl = clientUrl.replace("ClientService", "PaxosService");
PaxosService paxos = new PaxosServiceImpl(paxosUrl);
@@ -92,8 +96,8 @@ public class FunctionalTest {
}
for (Client c : clients) {
assertThat(c.getConnectionState(), is(ConnectionState.STABLE));
- assertThat(c.masterUrl, is(masterUrl));
- assertThat(c.masterLocation, is(masterLocation));
+ assertThat(c.getMaster().getMasterUrl(), is(masterUrl));
+ assertThat(c.getMaster().getMasterLocation(), is(masterLocation));
}
}
@@ -130,8 +134,8 @@ public class FunctionalTest {
client3.setMasterController(controller);
client1.startMasterElection();
newMaster.performWork();
- assertThat(client1.masterUrl, is(newMasterUrl));
- assertThat(client2.masterUrl, is(newMasterUrl));
+ assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
+ assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl));
}
@Test public void onlyOneNewMaster() {
@@ -159,8 +163,8 @@ public class FunctionalTest {
client3.setMasterController(controller);
client1.startMasterElection();
newMaster.performWork();
- assertThat(client1.masterUrl, is(newMasterUrl));
- assertThat(client2.masterUrl, is(newMasterUrl));
+ assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
+ assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl));
}
@Test public void masterFails() {
@@ -190,7 +194,7 @@ public class FunctionalTest {
is(DelayedOperation.Status.ERROR));
performWork();
newMaster.performWork();
- assertThat(client1.masterUrl, is(newMasterUrl));
- assertThat(client2.masterUrl, is(newMasterUrl));
+ assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
+ assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl));
}
}
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 8830c10..22414fe 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -65,6 +65,8 @@ public class MasterTest {
connections.clientMap.put("http://client/ClientService.json", clientS);
client.joinNetwork(master.getMasterInfo());
master.performWork();
+ System.out.println(state);
+ System.out.println(master.state);
assertTrue(state.getList(".participants").contains("http://client/ClientService.json"));
assertEquals(state, client.testGetState());
}