diff options
4 files changed, 78 insertions, 5 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 693a1ba..35e3f0e 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -5,10 +5,10 @@ import static com.orbekk.same.StackTraceUtil.throwableToString; import java.util.ArrayList; import java.util.List; -import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orbekk.paxos.MasterProposer; import com.orbekk.same.State.Component; import com.orbekk.same.discovery.DiscoveryListener; import com.orbekk.util.DelayedOperation; @@ -21,6 +21,9 @@ public class Client implements DiscoveryListener { private ConnectionManager connections; private State state; private String myUrl; + private String masterUrl; + private int masterId = -1; + private List<StateChangedListener> stateListeners = new ArrayList<StateChangedListener>(); private NetworkNotificationListener networkListener; @@ -41,7 +44,11 @@ public class Client implements DiscoveryListener { @Override public DelayedOperation set(Component component) { DelayedOperation op = new DelayedOperation(); - String masterUrl = state.getDataOf(".masterUrl"); + if (connectionState != ConnectionState.STABLE) { + op.complete(DelayedOperation.Status.createError( + "Not connected to master: " + connectionState)); + return op; + } MasterService master = connections.getMaster(masterUrl); try { boolean success = master.updateStateRequest( @@ -83,7 +90,6 @@ public class Client implements DiscoveryListener { private ClientService serviceImpl = new ClientService() { @Override public void setState(String component, String data, long revision) throws Exception { - connectionState = ConnectionState.STABLE; boolean status = state.update(component, data, revision); if (status) { for (StateChangedListener listener : stateListeners) { @@ -108,6 +114,13 @@ public class Client implements DiscoveryListener { public void discoveryRequest(String remoteUrl) { discoveryThread.add(remoteUrl); } + + @Override + public void masterTakeover(String masterUrl, String networkName, + int masterId) throws Exception { + Client.this.masterUrl = masterUrl; + connectionState = ConnectionState.STABLE; + } }; private WorkQueue<String> discoveryThread = new WorkQueue<String>() { @@ -207,4 +220,19 @@ public class Client implements DiscoveryListener { public ClientService getService() { return serviceImpl; } + + private List<String> getPaxosUrls() { + List<String> paxosUrls = new ArrayList<String>(); + for (String participant : state.getList(".participants")) { + paxosUrls.add(participant.replace("ClientService", "PaxosService")); + } + return paxosUrls; + } + + private void startMasterElection() { + List<String> paxosUrls = getPaxosUrls(); + MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls, + connections); + // TODO: Run election. + } } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index 04f423d..453dfef 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -7,4 +7,13 @@ public interface ClientService { // Manual discovery request by client. void discoveryRequest(String remoteUrl) throws Exception; + + /** A new master takes over. + * + * @param masterUrl The new master URL. + * @param masterId The ID of the new master. Only accept if this is higher + * than the current master. + */ + void masterTakeover(String masterUrl, String networkName, + int masterId) throws Exception; } diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java index 91f9760..d20b1d3 100644 --- a/same/src/test/java/com/orbekk/same/ClientTest.java +++ b/same/src/test/java/com/orbekk/same/ClientTest.java @@ -1,9 +1,17 @@ package com.orbekk.same; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.*; + +import com.orbekk.util.DelayedOperation; public class ClientTest { private State state = new State("ClientNetwork"); @@ -11,6 +19,29 @@ public class ClientTest { private Client client = new Client(state, connections, "http://client/ClientService.json"); private ClientService clientS = client.getService(); + private MasterService mockMaster = mock(MasterService.class); + + @Before public void setUp() { + connections.masterMap.put("master", mockMaster); + } + + @Test public void disconnectedFailsUpdate() throws Exception { + ClientInterface clientI = client.getInterface(); + DelayedOperation op = clientI.set(null); + assertTrue(op.isDone()); + assertFalse(op.getStatus().isOk()); + } + + @Test public void connectedUpdateWorks() throws Exception { + clientS.masterTakeover("master", null, 0); + ClientInterface clientI = client.getInterface(); + State.Component component = new State.Component( + "TestVariable", 1, "meow"); + when(mockMaster.updateStateRequest("TestVariable", "meow", 1)) + .thenReturn(true); + DelayedOperation op = clientI.set(component); + assertTrue(op.getStatus().isOk()); + } @Test public void testSetState() throws Exception { clientS.setState("TestState", "Test data", 100); diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index 09ac6a1..3a8bd5a 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -31,6 +31,11 @@ public class MasterTest { public void discoveryRequest(String remoteUrl) throws Exception { throw new Exception("Unreachable client"); } + + @Override + public void masterTakeover(String masterUrl, String networkName, int masterId) + throws Exception { + } } @Before |