summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 15:37:31 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-17 15:37:31 +0200
commita1060ee59fe7c9107641543babe950250835fa4b (patch)
treecec1c4d5d46d40e3e6f9ce1e9e7eba3564145b8b /same/src/main/java/com/orbekk
parenta6a1c550731caaa2e46aab2112b81f318da229aa (diff)
Client.startMasterElection() uses protobuf service.
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java72
1 files changed, 38 insertions, 34 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 6386928..51d62e7 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -29,12 +29,10 @@ public class Client {
volatile State state;
private volatile String myUrl;
private volatile String myLocation;
- volatile String masterUrl;
- volatile String masterLocation;
- private volatile int masterId = 0;
private volatile MasterController masterController = null;
private final Broadcaster broadcaster;
private volatile Future<Integer> currentMasterProposal = null;
+ private volatile MasterState masterInfo;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -60,7 +58,8 @@ public class Client {
return op;
}
- Services.Master master = connections.getMaster0(masterLocation);
+ Services.Master master = connections.getMaster0(
+ masterInfo.getMasterLocation());
if (master == null) {
op.complete(DelayedOperation.Status.createError(
"Not connected to master."));
@@ -136,26 +135,25 @@ public class Client {
@Override public void masterTakeover(RpcController controller,
MasterState request, RpcCallback<Empty> done) {
logger.info("MasterTakeover({})", request);
- if (request.getMasterId() <= Client.this.masterId) {
+ if (masterInfo != null &&
+ request.getMasterId() <= masterInfo.getMasterId()) {
logger.warn("{} tried to take over, but current master is " +
"{}:{}. Ignoring", new Object[]{request,
state.getDataOf(".masterUrl"),
- Client.this.masterId});
+ masterInfo.getMasterId()});
return;
}
abortMasterElection();
- Client.this.masterUrl = request.getMasterUrl();
- Client.this.masterLocation = request.getMasterLocation();
- Client.this.masterId = request.getMasterId();
+ masterInfo = request;
connectionState = ConnectionState.STABLE;
done.run(Empty.getDefaultInstance());
}
@Override public void masterDown(RpcController controller, MasterState request,
RpcCallback<Empty> done) {
- if (request.getMasterId() < Client.this.masterId) {
+ if (request.getMasterId() < masterInfo.getMasterId()) {
logger.info("Master {} is down, but current master is {}. Ignoring.",
- request.getMasterId(), Client.this.masterId);
+ request.getMasterId(), masterInfo.getMasterId());
return;
}
logger.warn("Master down.");
@@ -197,9 +195,7 @@ public class Client {
@Override
public void masterDown(int masterId) throws Exception {
- Services.MasterState request = Services.MasterState.newBuilder()
- .setMasterUrl(masterUrl)
- .setNetworkName(state.getDataOf(".networkName"))
+ Services.MasterState request = masterInfo.toBuilder()
.setMasterId(masterId)
.build();
newServiceImpl.masterDown(null, request, noOp);
@@ -236,6 +232,10 @@ public class Client {
.build();
}
+ public MasterState getMaster() {
+ return masterInfo;
+ }
+
public ConnectionState getConnectionState() {
return connectionState;
}
@@ -246,7 +246,7 @@ public class Client {
private synchronized void reset() {
state.clear();
- masterId = 0;
+ masterInfo = null;
}
public Rpc joinNetwork(Services.MasterState masterInfo) {
@@ -288,6 +288,10 @@ public class Client {
return serviceImpl;
}
+ public Services.Client getNewService() {
+ return newServiceImpl;
+ }
+
private List<String> getPaxosUrlsNoMaster() {
List<String> paxosUrls = new ArrayList<String>();
for (String participant : state.getList(".participants")) {
@@ -319,12 +323,12 @@ public class Client {
}
};
synchronized (this) {
- if (failedMasterId < masterId) {
+ if (failedMasterId < masterInfo.getMasterId()) {
logger.info("Master election aborted. Master already chosen.");
return;
}
- currentMasterProposal = proposer.startProposalTask(masterId + 1,
- sleeperTask);
+ currentMasterProposal = proposer.startProposalTask(
+ masterInfo.getMasterId() + 1, sleeperTask);
}
Integer result = null;
try {
@@ -345,23 +349,23 @@ public class Client {
}
}
- private int getMasterIdEstimate() {
- return masterId;
- }
-
public void startMasterElection() {
- List<String> participants = state.getList(".participants");
- final int masterId = getMasterIdEstimate();
- broadcaster.broadcast(participants, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterDown(masterId);
- } catch (Exception e) {
- logger.info("{}.masterDown() did not respond (ignored): " +
- url, e);
- }
+ List<String> participants = state.getList(State.PARTICIPANTS);
+ final MasterState failedMaster = masterInfo;
+
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ // Ignore unresponsive clients - master election will take
+ // care of them.
}
- });
+ };
+
+ for (String location : participants) {
+ Rpc rpc = new Rpc();
+ Services.Client client = connections.getClient0(location);
+ if (client != null) {
+ client.masterDown(rpc, failedMaster, done);
+ }
+ }
}
}