diff options
6 files changed, 45 insertions, 13 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index c8d4ef3..8b05821 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -14,11 +14,11 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.orbekk.paxos.MasterProposer; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterState; import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; -import com.orbekk.util.WorkQueue; public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; @@ -49,8 +49,6 @@ public class Client { return new State(state); } - // TODO: Do this asynchronously? Currently this is already achieved - // on Android, which makes the Java and Android versions different. @Override public DelayedOperation set(Component component) { DelayedOperation op = new DelayedOperation(); @@ -209,6 +207,12 @@ public class Client { return myUrl; } + public Services.ClientState getClientState() { + return Services.ClientState.newBuilder() + .setUrl(getUrl()) + .build(); + } + public ConnectionState getConnectionState() { return connectionState; } @@ -222,16 +226,23 @@ public class Client { masterId = 0; } - public void joinNetwork(Services.MasterState masterInfo) { + public Rpc joinNetwork(Services.MasterState masterInfo) { logger.info("joinNetwork({})", masterInfo); connectionState = ConnectionState.UNSTABLE; - MasterService master = connections.getMaster(masterInfo.getMasterUrl()); reset(); - try { - master.joinNetworkRequest(myUrl); - } catch (Exception e) { - logger.error("Unable to connect to master.", e); - } + + Services.Master master = + connections.getMaster0(masterInfo.getMasterLocation()); + final Rpc rpc = new Rpc(); + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + logger.warn("Failed to join network."); + } + } + }; + master.joinNetworkRequest(rpc, getClientState(), done); + return rpc; } public ClientInterface getInterface() { diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index 8a6d3a0..4a24da5 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -11,5 +11,7 @@ public interface ConnectionManager { ClientService getClient(String url); MasterService getMaster(String url); PaxosService getPaxos(String url); + + Services.Master getMaster0(String location); Services.Directory getDirectory(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 9aa4f81..8c3b9df 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -117,6 +117,16 @@ public class ConnectionManagerImpl implements ConnectionManager { } @Override + public Services.Master getMaster0(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Master.newStub(channel); + } else { + return null; + } + } + + @Override public Services.Directory getDirectory(String location) { RpcChannel channel = getChannel(location); if (channel != null) { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 93b2e6a..8319467 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -5,6 +5,7 @@ import java.util.HashMap; import com.orbekk.paxos.PaxosService; import com.orbekk.same.Services.Directory; +import com.orbekk.same.Services.Master; /** * This class is used in test. @@ -18,6 +19,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap<String, PaxosService>(); public Map<String, Services.Directory> directoryMap = new HashMap<String, Services.Directory>(); + public Map<String, Services.Master> masterMap0 = + new HashMap<String, Services.Master>(); public TestConnectionManager() { } @@ -41,4 +44,9 @@ public class TestConnectionManager implements ConnectionManager { public Directory getDirectory(String location) { return directoryMap.get(location); } + + @Override + public Master getMaster0(String location) { + return masterMap0.get(location); + } } diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 12898ad..91322fa 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -36,8 +36,8 @@ public class FunctionalTest { master = Master.create(connections, broadcaster, masterUrl, "TestMaster", masterLocation); masterServiceProxy = new MasterServiceProxy(master.getService()); - connections.masterMap.put(masterUrl, - masterServiceProxy); + connections.masterMap.put(masterUrl, masterServiceProxy); + connections.masterMap0.put(masterLocation, master.getNewService()); client1 = newClient("TestClient1", "http://client1/ClientService.json"); vf1 = new VariableFactory(client1.getInterface()); client2 = newClient("TestClient2", "http://client2/ClientService.json"); diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index b0bd666..914daaa 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -44,6 +44,7 @@ public class MasterTest { masterS = master.getService(); connections.masterMap.put("http://master/MasterService.json", masterS); + connections.masterMap0.put("master:1000", master.getNewService()); } @Test @@ -54,7 +55,7 @@ public class MasterTest { } @Test - public void clientJoin() { + public void clientJoin() throws Exception { Client client = new Client( new State("ClientNetwork"), connections, "http://client/ClientService.json", null); |