From 8c0633f4d9055c6710b170e40bb006ed8fc3a0c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 13:38:20 +0200 Subject: Fix master takeover code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Use the new services to send a MasterTakeover(). – Remove some old broadcast code. – Remove MasterProposerTest because this functionality is well enough covered by the functional test. – Remove HTTP services from PaxosServiceFunctionalTest. – Fix master takeover test. --- .../main/java/com/orbekk/paxos/MasterProposer.java | 4 -- same/src/main/java/com/orbekk/same/Master.java | 47 ++++++------------ .../java/com/orbekk/paxos/MasterProposerTest.java | 58 ---------------------- .../orbekk/paxos/PaxosServiceFunctionalTest.java | 16 ------ .../test/java/com/orbekk/same/FunctionalTest.java | 4 +- 5 files changed, 17 insertions(+), 112 deletions(-) delete mode 100644 same/src/test/java/com/orbekk/paxos/MasterProposerTest.java diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index dbfd3c1..bc4f18d 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,8 +1,5 @@ package com.orbekk.paxos; -import static com.orbekk.same.StackTraceUtil.throwableToString; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -10,7 +7,6 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 94a3e5a..c83d4a6 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -225,24 +225,6 @@ public class Master { } } - private void broadcastNewComponents(List destinations, - final List components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } - /** This master should take over from an earlier master. */ public void resumeFrom(State lastKnownState, final int masterId) { state = lastKnownState; @@ -250,21 +232,22 @@ public class Master { state.update(".masterLocation", myLocation, state.getRevision(".masterLocation") + 100); this.masterId = masterId; - broadcaster.broadcast(state.getList(".participants"), - new ServiceOperation() { - @Override - public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client {} failed to acknowledge new master. " + - "Removing {}", url); - removeParticipant(url); + + for (final String location : state.getList(State.PARTICIPANTS)) { + Services.Client client = connections.getClient0(location); + final Rpc rpc = new Rpc(); + RpcCallback done = new RpcCallback() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + removeParticipant(location); + } } + }; + if (client == null) { + removeParticipant(location); + continue; } - }); + client.masterTakeover(rpc, getMasterInfo(), done); + } } } diff --git a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java b/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java deleted file mode 100644 index e753d6e..0000000 --- a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.orbekk.paxos; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Before; -import org.junit.Test; - -import com.orbekk.same.Services.ClientState; -import com.orbekk.same.TestConnectionManager; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class MasterProposerTest { - TestConnectionManager connections = new TestConnectionManager(); - ClientState client = ClientState.newBuilder() - .setLocation("client1Location") - .build(); - PaxosService p1 = mock(PaxosService.class); - PaxosService p2 = mock(PaxosService.class); - PaxosService p3 = mock(PaxosService.class); - PaxosService p4 = mock(PaxosService.class); - PaxosService p5 = mock(PaxosService.class); - - @Before public void setUp() { - } - - List paxosUrls() { - List urls = new ArrayList(); - urls.addAll(connections.paxosMap.keySet()); - return urls; - } - - @Test public void successfulProposal() throws Exception { - connections.paxosMap.put("p1", p1); - when(p1.propose("client1", 1)).thenReturn(1); - when(p1.acceptRequest("client1", 1)).thenReturn(1); - - MasterProposer c1 = new MasterProposer( - client, - paxosUrls(), - connections); - assertTrue(c1.propose(1)); - } - - @Test public void unsucessfulProposal() throws Exception { - connections.paxosMap.put("p1", p1); - when(p1.propose("client1", 1)).thenReturn(-1); - when(p1.acceptRequest("client1", 1)).thenReturn(-1); - - MasterProposer c1 = new MasterProposer( - client, - paxosUrls(), - connections); - assertFalse(c1.propose(1)); - } -} diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java index 6ceb423..98631b0 100644 --- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java +++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java @@ -13,13 +13,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.googlecode.jsonrpc4j.JsonRpcServer; import com.orbekk.protobuf.SimpleProtobufServer; import com.orbekk.same.ConnectionManagerImpl; import com.orbekk.same.Services.ClientState; -import com.orbekk.same.http.JettyServerBuilder; -import com.orbekk.same.http.JettyServerContainer; -import com.orbekk.same.http.RpcServlet; public class PaxosServiceFunctionalTest { ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500); @@ -65,18 +61,6 @@ public class PaxosServiceFunctionalTest { paxosUrls.add(location); } - public List setupPaxos(JettyServerBuilder builder, int instances) { - List tempUrls = new ArrayList(); - for (int i = 1; i <= instances; i++) { - JsonRpcServer jsonServer = new JsonRpcServer( - new PaxosServiceImpl("P" + i + ": "), PaxosService.class); - String serviceId = "/PaxosService" + i + ".json"; - builder.withServlet(new RpcServlet(jsonServer), serviceId); - tempUrls.add(serviceId); - } - return tempUrls; - } - public void addUrls(List services) { for (String url : services) { paxosUrls.add(myUrl + url); diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index c0b710e..b0df4dc 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -131,8 +131,8 @@ public class FunctionalTest { client3.setMasterController(controller); client1.startMasterElection(); newMaster.performWork(); - assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); - assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); + assertThat(client1.getMaster().getMasterLocation(), is(newMasterLocation)); + assertThat(client2.getMaster().getMasterLocation(), is(newMasterLocation)); } @Test public void onlyOneNewMaster() { -- cgit v1.2.3