summaryrefslogtreecommitdiff
path: root/same
diff options
context:
space:
mode:
Diffstat (limited to 'same')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java34
-rw-r--r--same/src/main/java/com/orbekk/same/ClientService.java9
-rw-r--r--same/src/test/java/com/orbekk/same/ClientTest.java35
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java5
4 files changed, 78 insertions, 5 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 693a1ba..35e3f0e 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -5,10 +5,10 @@ import static com.orbekk.same.StackTraceUtil.throwableToString;
import java.util.ArrayList;
import java.util.List;
-import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.orbekk.paxos.MasterProposer;
import com.orbekk.same.State.Component;
import com.orbekk.same.discovery.DiscoveryListener;
import com.orbekk.util.DelayedOperation;
@@ -21,6 +21,9 @@ public class Client implements DiscoveryListener {
private ConnectionManager connections;
private State state;
private String myUrl;
+ private String masterUrl;
+ private int masterId = -1;
+
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
private NetworkNotificationListener networkListener;
@@ -41,7 +44,11 @@ public class Client implements DiscoveryListener {
@Override
public DelayedOperation set(Component component) {
DelayedOperation op = new DelayedOperation();
- String masterUrl = state.getDataOf(".masterUrl");
+ if (connectionState != ConnectionState.STABLE) {
+ op.complete(DelayedOperation.Status.createError(
+ "Not connected to master: " + connectionState));
+ return op;
+ }
MasterService master = connections.getMaster(masterUrl);
try {
boolean success = master.updateStateRequest(
@@ -83,7 +90,6 @@ public class Client implements DiscoveryListener {
private ClientService serviceImpl = new ClientService() {
@Override
public void setState(String component, String data, long revision) throws Exception {
- connectionState = ConnectionState.STABLE;
boolean status = state.update(component, data, revision);
if (status) {
for (StateChangedListener listener : stateListeners) {
@@ -108,6 +114,13 @@ public class Client implements DiscoveryListener {
public void discoveryRequest(String remoteUrl) {
discoveryThread.add(remoteUrl);
}
+
+ @Override
+ public void masterTakeover(String masterUrl, String networkName,
+ int masterId) throws Exception {
+ Client.this.masterUrl = masterUrl;
+ connectionState = ConnectionState.STABLE;
+ }
};
private WorkQueue<String> discoveryThread = new WorkQueue<String>() {
@@ -207,4 +220,19 @@ public class Client implements DiscoveryListener {
public ClientService getService() {
return serviceImpl;
}
+
+ private List<String> getPaxosUrls() {
+ List<String> paxosUrls = new ArrayList<String>();
+ for (String participant : state.getList(".participants")) {
+ paxosUrls.add(participant.replace("ClientService", "PaxosService"));
+ }
+ return paxosUrls;
+ }
+
+ private void startMasterElection() {
+ List<String> paxosUrls = getPaxosUrls();
+ MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls,
+ connections);
+ // TODO: Run election.
+ }
}
diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java
index 04f423d..453dfef 100644
--- a/same/src/main/java/com/orbekk/same/ClientService.java
+++ b/same/src/main/java/com/orbekk/same/ClientService.java
@@ -7,4 +7,13 @@ public interface ClientService {
// Manual discovery request by client.
void discoveryRequest(String remoteUrl) throws Exception;
+
+ /** A new master takes over.
+ *
+ * @param masterUrl The new master URL.
+ * @param masterId The ID of the new master. Only accept if this is higher
+ * than the current master.
+ */
+ void masterTakeover(String masterUrl, String networkName,
+ int masterId) throws Exception;
}
diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java
index 91f9760..d20b1d3 100644
--- a/same/src/test/java/com/orbekk/same/ClientTest.java
+++ b/same/src/test/java/com/orbekk/same/ClientTest.java
@@ -1,9 +1,17 @@
package com.orbekk.same;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+
+import com.orbekk.util.DelayedOperation;
public class ClientTest {
private State state = new State("ClientNetwork");
@@ -11,6 +19,29 @@ public class ClientTest {
private Client client = new Client(state, connections,
"http://client/ClientService.json");
private ClientService clientS = client.getService();
+ private MasterService mockMaster = mock(MasterService.class);
+
+ @Before public void setUp() {
+ connections.masterMap.put("master", mockMaster);
+ }
+
+ @Test public void disconnectedFailsUpdate() throws Exception {
+ ClientInterface clientI = client.getInterface();
+ DelayedOperation op = clientI.set(null);
+ assertTrue(op.isDone());
+ assertFalse(op.getStatus().isOk());
+ }
+
+ @Test public void connectedUpdateWorks() throws Exception {
+ clientS.masterTakeover("master", null, 0);
+ ClientInterface clientI = client.getInterface();
+ State.Component component = new State.Component(
+ "TestVariable", 1, "meow");
+ when(mockMaster.updateStateRequest("TestVariable", "meow", 1))
+ .thenReturn(true);
+ DelayedOperation op = clientI.set(component);
+ assertTrue(op.getStatus().isOk());
+ }
@Test public void testSetState() throws Exception {
clientS.setState("TestState", "Test data", 100);
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index 09ac6a1..3a8bd5a 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -31,6 +31,11 @@ public class MasterTest {
public void discoveryRequest(String remoteUrl) throws Exception {
throw new Exception("Unreachable client");
}
+
+ @Override
+ public void masterTakeover(String masterUrl, String networkName, int masterId)
+ throws Exception {
+ }
}
@Before