diff options
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r-- | same/src/main/java/com/orbekk/paxos/MasterProposer.java | 4 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 47 |
2 files changed, 15 insertions, 36 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index dbfd3c1..bc4f18d 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,8 +1,5 @@ package com.orbekk.paxos; -import static com.orbekk.same.StackTraceUtil.throwableToString; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -10,7 +7,6 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 94a3e5a..c83d4a6 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -225,24 +225,6 @@ public class Master { } } - private void broadcastNewComponents(List<String> destinations, - final List<State.Component> components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } - /** This master should take over from an earlier master. */ public void resumeFrom(State lastKnownState, final int masterId) { state = lastKnownState; @@ -250,21 +232,22 @@ public class Master { state.update(".masterLocation", myLocation, state.getRevision(".masterLocation") + 100); this.masterId = masterId; - broadcaster.broadcast(state.getList(".participants"), - new ServiceOperation() { - @Override - public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client {} failed to acknowledge new master. " + - "Removing {}", url); - removeParticipant(url); + + for (final String location : state.getList(State.PARTICIPANTS)) { + Services.Client client = connections.getClient0(location); + final Rpc rpc = new Rpc(); + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + removeParticipant(location); + } } + }; + if (client == null) { + removeParticipant(location); + continue; } - }); + client.masterTakeover(rpc, getMasterInfo(), done); + } } } |