diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 16:13:08 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 16:13:08 +0200 |
commit | 8cef018aab54ac3f31643a26a75f974c454893ba (patch) | |
tree | f50b80f7ead481912ce68159b6b0916fffd5ce1e /same/src/main/java/com/orbekk | |
parent | 04d448aade12127b1d8c9d4f26963833102698f8 (diff) |
Use new RPC in Client.joinNetwork().
Diffstat (limited to 'same/src/main/java/com/orbekk')
4 files changed, 41 insertions, 10 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index c8d4ef3..8b05821 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -14,11 +14,11 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.orbekk.paxos.MasterProposer; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterState; import com.orbekk.same.State.Component; import com.orbekk.util.DelayedOperation; -import com.orbekk.util.WorkQueue; public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; @@ -49,8 +49,6 @@ public class Client { return new State(state); } - // TODO: Do this asynchronously? Currently this is already achieved - // on Android, which makes the Java and Android versions different. @Override public DelayedOperation set(Component component) { DelayedOperation op = new DelayedOperation(); @@ -209,6 +207,12 @@ public class Client { return myUrl; } + public Services.ClientState getClientState() { + return Services.ClientState.newBuilder() + .setUrl(getUrl()) + .build(); + } + public ConnectionState getConnectionState() { return connectionState; } @@ -222,16 +226,23 @@ public class Client { masterId = 0; } - public void joinNetwork(Services.MasterState masterInfo) { + public Rpc joinNetwork(Services.MasterState masterInfo) { logger.info("joinNetwork({})", masterInfo); connectionState = ConnectionState.UNSTABLE; - MasterService master = connections.getMaster(masterInfo.getMasterUrl()); reset(); - try { - master.joinNetworkRequest(myUrl); - } catch (Exception e) { - logger.error("Unable to connect to master.", e); - } + + Services.Master master = + connections.getMaster0(masterInfo.getMasterLocation()); + final Rpc rpc = new Rpc(); + RpcCallback<Empty> done = new RpcCallback<Empty>() { + @Override public void run(Empty unused) { + if (!rpc.isOk()) { + logger.warn("Failed to join network."); + } + } + }; + master.joinNetworkRequest(rpc, getClientState(), done); + return rpc; } public ClientInterface getInterface() { diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index 8a6d3a0..4a24da5 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -11,5 +11,7 @@ public interface ConnectionManager { ClientService getClient(String url); MasterService getMaster(String url); PaxosService getPaxos(String url); + + Services.Master getMaster0(String location); Services.Directory getDirectory(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 9aa4f81..8c3b9df 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -117,6 +117,16 @@ public class ConnectionManagerImpl implements ConnectionManager { } @Override + public Services.Master getMaster0(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Master.newStub(channel); + } else { + return null; + } + } + + @Override public Services.Directory getDirectory(String location) { RpcChannel channel = getChannel(location); if (channel != null) { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 93b2e6a..8319467 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -5,6 +5,7 @@ import java.util.HashMap; import com.orbekk.paxos.PaxosService; import com.orbekk.same.Services.Directory; +import com.orbekk.same.Services.Master; /** * This class is used in test. @@ -18,6 +19,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap<String, PaxosService>(); public Map<String, Services.Directory> directoryMap = new HashMap<String, Services.Directory>(); + public Map<String, Services.Master> masterMap0 = + new HashMap<String, Services.Master>(); public TestConnectionManager() { } @@ -41,4 +44,9 @@ public class TestConnectionManager implements ConnectionManager { public Directory getDirectory(String location) { return directoryMap.get(location); } + + @Override + public Master getMaster0(String location) { + return masterMap0.get(location); + } } |