summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 16:13:08 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 16:13:08 +0200
commit8cef018aab54ac3f31643a26a75f974c454893ba (patch)
treef50b80f7ead481912ce68159b6b0916fffd5ce1e
parent04d448aade12127b1d8c9d4f26963833102698f8 (diff)
Use new RPC in Client.joinNetwork().
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java31
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManager.java2
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java10
-rw-r--r--same/src/main/java/com/orbekk/same/TestConnectionManager.java8
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java4
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java3
6 files changed, 45 insertions, 13 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index c8d4ef3..8b05821 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -14,11 +14,11 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.orbekk.paxos.MasterProposer;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.MasterState;
import com.orbekk.same.State.Component;
import com.orbekk.util.DelayedOperation;
-import com.orbekk.util.WorkQueue;
public class Client {
public static long MASTER_TAKEOVER_TIMEOUT = 4000l;
@@ -49,8 +49,6 @@ public class Client {
return new State(state);
}
- // TODO: Do this asynchronously? Currently this is already achieved
- // on Android, which makes the Java and Android versions different.
@Override
public DelayedOperation set(Component component) {
DelayedOperation op = new DelayedOperation();
@@ -209,6 +207,12 @@ public class Client {
return myUrl;
}
+ public Services.ClientState getClientState() {
+ return Services.ClientState.newBuilder()
+ .setUrl(getUrl())
+ .build();
+ }
+
public ConnectionState getConnectionState() {
return connectionState;
}
@@ -222,16 +226,23 @@ public class Client {
masterId = 0;
}
- public void joinNetwork(Services.MasterState masterInfo) {
+ public Rpc joinNetwork(Services.MasterState masterInfo) {
logger.info("joinNetwork({})", masterInfo);
connectionState = ConnectionState.UNSTABLE;
- MasterService master = connections.getMaster(masterInfo.getMasterUrl());
reset();
- try {
- master.joinNetworkRequest(myUrl);
- } catch (Exception e) {
- logger.error("Unable to connect to master.", e);
- }
+
+ Services.Master master =
+ connections.getMaster0(masterInfo.getMasterLocation());
+ final Rpc rpc = new Rpc();
+ RpcCallback<Empty> done = new RpcCallback<Empty>() {
+ @Override public void run(Empty unused) {
+ if (!rpc.isOk()) {
+ logger.warn("Failed to join network.");
+ }
+ }
+ };
+ master.joinNetworkRequest(rpc, getClientState(), done);
+ return rpc;
}
public ClientInterface getInterface() {
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java
index 8a6d3a0..4a24da5 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java
@@ -11,5 +11,7 @@ public interface ConnectionManager {
ClientService getClient(String url);
MasterService getMaster(String url);
PaxosService getPaxos(String url);
+
+ Services.Master getMaster0(String location);
Services.Directory getDirectory(String location);
}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index 9aa4f81..8c3b9df 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -117,6 +117,16 @@ public class ConnectionManagerImpl implements ConnectionManager {
}
@Override
+ public Services.Master getMaster0(String location) {
+ RpcChannel channel = getChannel(location);
+ if (channel != null) {
+ return Services.Master.newStub(channel);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
public Services.Directory getDirectory(String location) {
RpcChannel channel = getChannel(location);
if (channel != null) {
diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
index 93b2e6a..8319467 100644
--- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
@@ -5,6 +5,7 @@ import java.util.HashMap;
import com.orbekk.paxos.PaxosService;
import com.orbekk.same.Services.Directory;
+import com.orbekk.same.Services.Master;
/**
* This class is used in test.
@@ -18,6 +19,8 @@ public class TestConnectionManager implements ConnectionManager {
new HashMap<String, PaxosService>();
public Map<String, Services.Directory> directoryMap =
new HashMap<String, Services.Directory>();
+ public Map<String, Services.Master> masterMap0 =
+ new HashMap<String, Services.Master>();
public TestConnectionManager() {
}
@@ -41,4 +44,9 @@ public class TestConnectionManager implements ConnectionManager {
public Directory getDirectory(String location) {
return directoryMap.get(location);
}
+
+ @Override
+ public Master getMaster0(String location) {
+ return masterMap0.get(location);
+ }
}
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 12898ad..91322fa 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -36,8 +36,8 @@ public class FunctionalTest {
master = Master.create(connections,
broadcaster, masterUrl, "TestMaster", masterLocation);
masterServiceProxy = new MasterServiceProxy(master.getService());
- connections.masterMap.put(masterUrl,
- masterServiceProxy);
+ connections.masterMap.put(masterUrl, masterServiceProxy);
+ connections.masterMap0.put(masterLocation, master.getNewService());
client1 = newClient("TestClient1", "http://client1/ClientService.json");
vf1 = new VariableFactory(client1.getInterface());
client2 = newClient("TestClient2", "http://client2/ClientService.json");
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index b0bd666..914daaa 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -44,6 +44,7 @@ public class MasterTest {
masterS = master.getService();
connections.masterMap.put("http://master/MasterService.json",
masterS);
+ connections.masterMap0.put("master:1000", master.getNewService());
}
@Test
@@ -54,7 +55,7 @@ public class MasterTest {
}
@Test
- public void clientJoin() {
+ public void clientJoin() throws Exception {
Client client = new Client(
new State("ClientNetwork"), connections,
"http://client/ClientService.json", null);