summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 13:38:20 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 13:38:20 +0200
commit8c0633f4d9055c6710b170e40bb006ed8fc3a0c0 (patch)
tree20a3e6673df64a4c42408079dd3c682e2bf4f211 /same/src/main/java/com
parentae8791a096626b0196c63e4d9cb68f7a18ad86b0 (diff)
Fix master takeover code.
– Use the new services to send a MasterTakeover(). – Remove some old broadcast code. – Remove MasterProposerTest because this functionality is well enough covered by the functional test. – Remove HTTP services from PaxosServiceFunctionalTest. – Fix master takeover test.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java4
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java47
2 files changed, 15 insertions, 36 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index dbfd3c1..bc4f18d 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -1,8 +1,5 @@
package com.orbekk.paxos;
-import static com.orbekk.same.StackTraceUtil.throwableToString;
-
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -10,7 +7,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.runner.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 94a3e5a..c83d4a6 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -225,24 +225,6 @@ public class Master {
}
}
- private void broadcastNewComponents(List<String> destinations,
- final List<State.Component> components) {
- broadcaster.broadcast(destinations, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
- }
- });
- }
-
/** This master should take over from an earlier master. */
public void resumeFrom(State lastKnownState, final int masterId) {
state = lastKnownState;
@@ -250,21 +232,22 @@ public class Master {
state.update(".masterLocation", myLocation,
state.getRevision(".masterLocation") + 100);
this.masterId = masterId;
- broadcaster.broadcast(state.getList(".participants"),
- new ServiceOperation() {
- @Override
- public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterTakeover(myUrl,
- state.getDataOf(".networkName"), masterId,
- state.getDataOf(".masterLocation"));
- } catch (Exception e) {
- logger.info("Client {} failed to acknowledge new master. " +
- "Removing {}", url);
- removeParticipant(url);
+
+ for (final String location : state.getList(State.PARTICIPANTS)) {
+ Services.Client client = connections.getClient0(location);
+ final Rpc rpc = new Rpc();
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ if (!rpc.isOk()) {
+ removeParticipant(location);
+ }
}
+ };
+ if (client == null) {
+ removeParticipant(location);
+ continue;
}
- });
+ client.masterTakeover(rpc, getMasterInfo(), done);
+ }
}
}