diff options
5 files changed, 17 insertions, 112 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index dbfd3c1..bc4f18d 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -1,8 +1,5 @@ package com.orbekk.paxos; -import static com.orbekk.same.StackTraceUtil.throwableToString; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -10,7 +7,6 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.runner.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 94a3e5a..c83d4a6 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -225,24 +225,6 @@ public class Master { } } - private void broadcastNewComponents(List<String> destinations, - final List<State.Component> components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } - /** This master should take over from an earlier master. */ public void resumeFrom(State lastKnownState, final int masterId) { state = lastKnownState; @@ -250,21 +232,22 @@ public class Master { state.update(".masterLocation", myLocation, state.getRevision(".masterLocation") + 100); this.masterId = masterId; - broadcaster.broadcast(state.getList(".participants"), - new ServiceOperation() { - @Override - public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover(myUrl, - state.getDataOf(".networkName"), masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client {} failed to acknowledge new master. " + - "Removing {}", url); - removeParticipant(url); + + for (final String location : state.getList(State.PARTICIPANTS)) { + Services.Client client = connections.getClient0(location); + final Rpc rpc = new Rpc(); + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + removeParticipant(location); + } } + }; + if (client == null) { + removeParticipant(location); + continue; } - }); + client.masterTakeover(rpc, getMasterInfo(), done); + } } } diff --git a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java b/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java deleted file mode 100644 index e753d6e..0000000 --- a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.orbekk.paxos; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Before; -import org.junit.Test; - -import com.orbekk.same.Services.ClientState; -import com.orbekk.same.TestConnectionManager; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class MasterProposerTest { - TestConnectionManager connections = new TestConnectionManager(); - ClientState client = ClientState.newBuilder() - .setLocation("client1Location") - .build(); - PaxosService p1 = mock(PaxosService.class); - PaxosService p2 = mock(PaxosService.class); - PaxosService p3 = mock(PaxosService.class); - PaxosService p4 = mock(PaxosService.class); - PaxosService p5 = mock(PaxosService.class); - - @Before public void setUp() { - } - - List<String> paxosUrls() { - List<String> urls = new ArrayList<String>(); - urls.addAll(connections.paxosMap.keySet()); - return urls; - } - - @Test public void successfulProposal() throws Exception { - connections.paxosMap.put("p1", p1); - when(p1.propose("client1", 1)).thenReturn(1); - when(p1.acceptRequest("client1", 1)).thenReturn(1); - - MasterProposer c1 = new MasterProposer( - client, - paxosUrls(), - connections); - assertTrue(c1.propose(1)); - } - - @Test public void unsucessfulProposal() throws Exception { - connections.paxosMap.put("p1", p1); - when(p1.propose("client1", 1)).thenReturn(-1); - when(p1.acceptRequest("client1", 1)).thenReturn(-1); - - MasterProposer c1 = new MasterProposer( - client, - paxosUrls(), - connections); - assertFalse(c1.propose(1)); - } -} diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java index 6ceb423..98631b0 100644 --- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java +++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java @@ -13,13 +13,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.googlecode.jsonrpc4j.JsonRpcServer; import com.orbekk.protobuf.SimpleProtobufServer; import com.orbekk.same.ConnectionManagerImpl; import com.orbekk.same.Services.ClientState; -import com.orbekk.same.http.JettyServerBuilder; -import com.orbekk.same.http.JettyServerContainer; -import com.orbekk.same.http.RpcServlet; public class PaxosServiceFunctionalTest { ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500); @@ -65,18 +61,6 @@ public class PaxosServiceFunctionalTest { paxosUrls.add(location); } - public List<String> setupPaxos(JettyServerBuilder builder, int instances) { - List<String> tempUrls = new ArrayList<String>(); - for (int i = 1; i <= instances; i++) { - JsonRpcServer jsonServer = new JsonRpcServer( - new PaxosServiceImpl("P" + i + ": "), PaxosService.class); - String serviceId = "/PaxosService" + i + ".json"; - builder.withServlet(new RpcServlet(jsonServer), serviceId); - tempUrls.add(serviceId); - } - return tempUrls; - } - public void addUrls(List<String> services) { for (String url : services) { paxosUrls.add(myUrl + url); diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index c0b710e..b0df4dc 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -131,8 +131,8 @@ public class FunctionalTest { client3.setMasterController(controller); client1.startMasterElection(); newMaster.performWork(); - assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl)); - assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl)); + assertThat(client1.getMaster().getMasterLocation(), is(newMasterLocation)); + assertThat(client2.getMaster().getMasterLocation(), is(newMasterLocation)); } @Test public void onlyOneNewMaster() { |