From 8c0633f4d9055c6710b170e40bb006ed8fc3a0c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 13:38:20 +0200 Subject: Fix master takeover code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Use the new services to send a MasterTakeover(). – Remove some old broadcast code. – Remove MasterProposerTest because this functionality is well enough covered by the functional test. – Remove HTTP services from PaxosServiceFunctionalTest. – Fix master takeover test. --- .../main/java/com/orbekk/paxos/MasterProposer.java | 4 -- same/src/main/java/com/orbekk/same/Master.java | 47 +++++++--------------- 2 files changed, 15 insertions(+), 36 deletions(-) (limited to 'same/src/main/java/com/orbekk') diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index dbfd3c1..bc4f18d 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,8 +1,5 @@ package com.orbekk.paxos; -import static com.orbekk.same.StackTraceUtil.throwableToString; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -10,7 +7,6 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 94a3e5a..c83d4a6 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -225,24 +225,6 @@ public class Master { } } - private void broadcastNewComponents(List destinations, - final List components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } - /** This master should take over from an earlier master. */ public void resumeFrom(State lastKnownState, final int masterId) { state = lastKnownState; @@ -250,21 +232,22 @@ public class Master { state.update(".masterLocation", myLocation, state.getRevision(".masterLocation") + 100); this.masterId = masterId; - broadcaster.broadcast(state.getList(".participants"), - new ServiceOperation() { - @Override - public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client {} failed to acknowledge new master. " + - "Removing {}", url); - removeParticipant(url); + + for (final String location : state.getList(State.PARTICIPANTS)) { + Services.Client client = connections.getClient0(location); + final Rpc rpc = new Rpc(); + RpcCallback done = new RpcCallback() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + removeParticipant(location); + } } + }; + if (client == null) { + removeParticipant(location); + continue; } - }); + client.masterTakeover(rpc, getMasterInfo(), done); + } } } -- cgit v1.2.3