summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java4
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java47
2 files changed, 15 insertions, 36 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index dbfd3c1..bc4f18d 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -1,8 +1,5 @@
package com.orbekk.paxos;
-import static com.orbekk.same.StackTraceUtil.throwableToString;
-
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -10,7 +7,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.runner.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 94a3e5a..c83d4a6 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -225,24 +225,6 @@ public class Master {
}
}
- private void broadcastNewComponents(List<String> destinations,
- final List<State.Component> components) {
- broadcaster.broadcast(destinations, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
- }
- });
- }
-
/** This master should take over from an earlier master. */
public void resumeFrom(State lastKnownState, final int masterId) {
state = lastKnownState;
@@ -250,21 +232,22 @@ public class Master {
state.update(".masterLocation", myLocation,
state.getRevision(".masterLocation") + 100);
this.masterId = masterId;
- broadcaster.broadcast(state.getList(".participants"),
- new ServiceOperation() {
- @Override
- public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterTakeover(myUrl,
- state.getDataOf(".networkName"), masterId,
- state.getDataOf(".masterLocation"));
- } catch (Exception e) {
- logger.info("Client {} failed to acknowledge new master. " +
- "Removing {}", url);
- removeParticipant(url);
+
+ for (final String location : state.getList(State.PARTICIPANTS)) {
+ Services.Client client = connections.getClient0(location);
+ final Rpc rpc = new Rpc();
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ if (!rpc.isOk()) {
+ removeParticipant(location);
+ }
}
+ };
+ if (client == null) {
+ removeParticipant(location);
+ continue;
}
- });
+ client.masterTakeover(rpc, getMasterInfo(), done);
+ }
}
}