summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 13:38:20 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 13:38:20 +0200
commit8c0633f4d9055c6710b170e40bb006ed8fc3a0c0 (patch)
tree20a3e6673df64a4c42408079dd3c682e2bf4f211
parentae8791a096626b0196c63e4d9cb68f7a18ad86b0 (diff)
Fix master takeover code.
– Use the new services to send a MasterTakeover(). – Remove some old broadcast code. – Remove MasterProposerTest because this functionality is well enough covered by the functional test. – Remove HTTP services from PaxosServiceFunctionalTest. – Fix master takeover test.
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java4
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java47
-rw-r--r--same/src/test/java/com/orbekk/paxos/MasterProposerTest.java58
-rw-r--r--same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java16
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java4
5 files changed, 17 insertions, 112 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index dbfd3c1..bc4f18d 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -1,8 +1,5 @@
package com.orbekk.paxos;
-import static com.orbekk.same.StackTraceUtil.throwableToString;
-
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -10,7 +7,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.runner.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 94a3e5a..c83d4a6 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -225,24 +225,6 @@ public class Master {
}
}
- private void broadcastNewComponents(List<String> destinations,
- final List<State.Component> components) {
- broadcaster.broadcast(destinations, new ServiceOperation() {
- @Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
- }
- });
- }
-
/** This master should take over from an earlier master. */
public void resumeFrom(State lastKnownState, final int masterId) {
state = lastKnownState;
@@ -250,21 +232,22 @@ public class Master {
state.update(".masterLocation", myLocation,
state.getRevision(".masterLocation") + 100);
this.masterId = masterId;
- broadcaster.broadcast(state.getList(".participants"),
- new ServiceOperation() {
- @Override
- public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- client.masterTakeover(myUrl,
- state.getDataOf(".networkName"), masterId,
- state.getDataOf(".masterLocation"));
- } catch (Exception e) {
- logger.info("Client {} failed to acknowledge new master. " +
- "Removing {}", url);
- removeParticipant(url);
+
+ for (final String location : state.getList(State.PARTICIPANTS)) {
+ Services.Client client = connections.getClient0(location);
+ final Rpc rpc = new Rpc();
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ if (!rpc.isOk()) {
+ removeParticipant(location);
+ }
}
+ };
+ if (client == null) {
+ removeParticipant(location);
+ continue;
}
- });
+ client.masterTakeover(rpc, getMasterInfo(), done);
+ }
}
}
diff --git a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java b/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java
deleted file mode 100644
index e753d6e..0000000
--- a/same/src/test/java/com/orbekk/paxos/MasterProposerTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.orbekk.paxos;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.orbekk.same.Services.ClientState;
-import com.orbekk.same.TestConnectionManager;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class MasterProposerTest {
- TestConnectionManager connections = new TestConnectionManager();
- ClientState client = ClientState.newBuilder()
- .setLocation("client1Location")
- .build();
- PaxosService p1 = mock(PaxosService.class);
- PaxosService p2 = mock(PaxosService.class);
- PaxosService p3 = mock(PaxosService.class);
- PaxosService p4 = mock(PaxosService.class);
- PaxosService p5 = mock(PaxosService.class);
-
- @Before public void setUp() {
- }
-
- List<String> paxosUrls() {
- List<String> urls = new ArrayList<String>();
- urls.addAll(connections.paxosMap.keySet());
- return urls;
- }
-
- @Test public void successfulProposal() throws Exception {
- connections.paxosMap.put("p1", p1);
- when(p1.propose("client1", 1)).thenReturn(1);
- when(p1.acceptRequest("client1", 1)).thenReturn(1);
-
- MasterProposer c1 = new MasterProposer(
- client,
- paxosUrls(),
- connections);
- assertTrue(c1.propose(1));
- }
-
- @Test public void unsucessfulProposal() throws Exception {
- connections.paxosMap.put("p1", p1);
- when(p1.propose("client1", 1)).thenReturn(-1);
- when(p1.acceptRequest("client1", 1)).thenReturn(-1);
-
- MasterProposer c1 = new MasterProposer(
- client,
- paxosUrls(),
- connections);
- assertFalse(c1.propose(1));
- }
-}
diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
index 6ceb423..98631b0 100644
--- a/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
+++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceFunctionalTest.java
@@ -13,13 +13,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.googlecode.jsonrpc4j.JsonRpcServer;
import com.orbekk.protobuf.SimpleProtobufServer;
import com.orbekk.same.ConnectionManagerImpl;
import com.orbekk.same.Services.ClientState;
-import com.orbekk.same.http.JettyServerBuilder;
-import com.orbekk.same.http.JettyServerContainer;
-import com.orbekk.same.http.RpcServlet;
public class PaxosServiceFunctionalTest {
ConnectionManagerImpl connections = new ConnectionManagerImpl(500, 500);
@@ -65,18 +61,6 @@ public class PaxosServiceFunctionalTest {
paxosUrls.add(location);
}
- public List<String> setupPaxos(JettyServerBuilder builder, int instances) {
- List<String> tempUrls = new ArrayList<String>();
- for (int i = 1; i <= instances; i++) {
- JsonRpcServer jsonServer = new JsonRpcServer(
- new PaxosServiceImpl("P" + i + ": "), PaxosService.class);
- String serviceId = "/PaxosService" + i + ".json";
- builder.withServlet(new RpcServlet(jsonServer), serviceId);
- tempUrls.add(serviceId);
- }
- return tempUrls;
- }
-
public void addUrls(List<String> services) {
for (String url : services) {
paxosUrls.add(myUrl + url);
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index c0b710e..b0df4dc 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -131,8 +131,8 @@ public class FunctionalTest {
client3.setMasterController(controller);
client1.startMasterElection();
newMaster.performWork();
- assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
- assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl));
+ assertThat(client1.getMaster().getMasterLocation(), is(newMasterLocation));
+ assertThat(client2.getMaster().getMasterLocation(), is(newMasterLocation));
}
@Test public void onlyOneNewMaster() {