diff options
9 files changed, 108 insertions, 95 deletions
diff --git a/directory/src/main/java/com/orbekk/same/directory/DirectoryApp.java b/directory/src/main/java/com/orbekk/same/directory/DirectoryApp.java index 667d4f0..45ad75a 100644 --- a/directory/src/main/java/com/orbekk/same/directory/DirectoryApp.java +++ b/directory/src/main/java/com/orbekk/same/directory/DirectoryApp.java @@ -4,7 +4,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.orbekk.protobuf.SimpleProtobufServer; -import com.orbekk.same.discovery.DirectoryService; import com.orbekk.same.http.JettyServerBuilder; import com.orbekk.same.http.ServerContainer; diff --git a/directory/src/main/java/com/orbekk/same/directory/DirectoryServiceImpl.java b/directory/src/main/java/com/orbekk/same/directory/DirectoryServiceImpl.java index 9c38446..ff1120e 100644 --- a/directory/src/main/java/com/orbekk/same/directory/DirectoryServiceImpl.java +++ b/directory/src/main/java/com/orbekk/same/directory/DirectoryServiceImpl.java @@ -9,13 +9,12 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; -import com.orbekk.same.discovery.DirectoryService; import com.orbekk.same.Services; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterState; import com.orbekk.same.Services.NetworkDirectory; -public class DirectoryServiceImpl extends Services.Directory implements DirectoryService { +public class DirectoryServiceImpl extends Services.Directory { private Logger logger = LoggerFactory.getLogger(getClass()); public final static long EXPIRE_TIME = 15 * 60l * 1000; // 15 minutes List<NetworkEntry> networkList = new ArrayList<NetworkEntry>(); @@ -31,20 +30,7 @@ public class DirectoryServiceImpl extends Services.Directory implements Director logger.info("Cleaned network list. Networks: {}", networkList); } - @Override - public List<String> getNetworks() throws Exception { - cleanNetworkList(); - List<String> networks = new ArrayList<String>(); - for (NetworkEntry e : networkList) { - networks.add(e.networkName); - networks.add(e.masterUrl); - } - return networks; - } - - @Override - public void registerNetwork(String networkName, String masterUrl) - throws Exception { + public void registerNetwork(String networkName, String masterUrl) { cleanNetworkList(); NetworkEntry entry = new NetworkEntry(networkName, masterUrl); entry.register(System.currentTimeMillis()); @@ -55,11 +41,7 @@ public class DirectoryServiceImpl extends Services.Directory implements Director @Override public void registerNetwork(RpcController controller, MasterState request, RpcCallback<Empty> done) { - try { - registerNetwork(request.getNetworkName(), request.getMasterUrl()); - } catch (Exception e) { - // No RPC call here. - } + registerNetwork(request.getNetworkName(), request.getMasterUrl()); done.run(Empty.getDefaultInstance()); } diff --git a/same-android/src/main/java/com/orbekk/same/android/SameService.java b/same-android/src/main/java/com/orbekk/same/android/SameService.java index 004f5e9..47b87b9 100644 --- a/same-android/src/main/java/com/orbekk/same/android/SameService.java +++ b/same-android/src/main/java/com/orbekk/same/android/SameService.java @@ -17,14 +17,17 @@ import android.os.Messenger; import android.os.RemoteException; import android.widget.Toast; +import com.google.protobuf.RpcCallback; +import com.orbekk.protobuf.Rpc; import com.orbekk.same.NetworkNotificationListener; import com.orbekk.same.SameController; +import com.orbekk.same.Services; +import com.orbekk.same.Services.NetworkDirectory; import com.orbekk.same.State; import com.orbekk.same.State.Component; import com.orbekk.same.StateChangedListener; import com.orbekk.same.android.net.Networking; import com.orbekk.same.config.Configuration; -import com.orbekk.same.discovery.DirectoryService; import com.orbekk.util.DelayedOperation; public class SameService extends Service { @@ -65,8 +68,7 @@ public class SameService extends Service { final static int SERVICE_PORT = 15068; final static int DISCOVERY_PORT = 15066; - final static String DIRECTORY_URL = - "http://flode.pvv.ntnu.no:15072/DirectoryService.json"; + final static String DIRECTORY_URL = "flode.pvv.ntnu.no:15072"; private Logger logger = LoggerFactory.getLogger(getClass()); private SameController sameController = null; @@ -195,27 +197,27 @@ public class SameService extends Service { } private void findNetworks() { - new Thread(new Runnable() { - @Override - public void run() { - logger.info("Looking up networks."); - DirectoryService directory = sameController.getDirectory(); - if (directory == null) { - logger.warn("No discovery service configured."); + logger.info("Looking up networks."); + Services.Directory directory = sameController.getDirectory(); + if (directory == null) { + logger.warn("No discovery service configured."); + return; + } + final Rpc rpc = new Rpc(); + RpcCallback<Services.NetworkDirectory> done = + new RpcCallback<Services.NetworkDirectory>() { + @Override public void run(Services.NetworkDirectory networks) { + if (!rpc.isOk()) { + logger.warn("Unable to find networks: {}", rpc.errorText()); return; } - try { - List<String> networks = directory.getNetworks(); - for (int i = 0; i < networks.size(); i += 2) { - String name = networks.get(i); - String url = networks.get(i + 1); - networkListener.notifyNetwork(name, url); - } - } catch (Exception e) { - logger.warn("Unable to contact discovery service.", e); + for (Services.MasterState network : networks.getNetworkList()) { + networkListener.notifyNetwork(network.getNetworkName(), + network.getMasterUrl()); } } - }).start(); + }; + directory.getNetworks(rpc, Services.Empty.getDefaultInstance(), done); } private void initializeConfiguration() { @@ -229,24 +231,24 @@ public class SameService extends Service { properties.setProperty("enableDiscovery", "true"); properties.setProperty("discoveryPort", ""+DISCOVERY_PORT); properties.setProperty("networkName", "AndroidNetwork"); - properties.setProperty("directoryUrl", DIRECTORY_URL); + properties.setProperty("directoryLocation", DIRECTORY_URL); configuration = new Configuration(properties); } /** Create a public network. */ private void create() { sameController.createNetwork(configuration.get("networkName")); - try { - // SameController should take care of this. - sameController.getDirectory().registerNetwork( - configuration.get("networkName"), - sameController.getMaster().getUrl()); - } catch (Exception e) { - Toast.makeText(this, "Unable to register network. " + - "Use manual address to join.", - Toast.LENGTH_LONG).show(); - logger.warn("Unable to advertise network.", e); - } +// try { +// // SameController should take care of this. +// sameController.getDirectory().registerNetwork( +// configuration.get("networkName"), +// sameController.getMaster().getUrl()); +// } catch (Exception e) { +// Toast.makeText(this, "Unable to register network. " + +// "Use manual address to join.", +// Toast.LENGTH_LONG).show(); +// logger.warn("Unable to advertise network.", e); +// } } @Override diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index 4637617..8a6d3a0 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -1,7 +1,6 @@ package com.orbekk.same; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; /** * An interface that returns a connection for a participant. @@ -12,5 +11,5 @@ public interface ConnectionManager { ClientService getClient(String url); MasterService getMaster(String url); PaxosService getPaxos(String url); - DirectoryService getDirectory(String url); + Services.Directory getDirectory(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index ade469e..a57bbdf 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -1,24 +1,35 @@ package com.orbekk.same; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcChannel; import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.net.MyJsonRpcHttpClient; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; +import com.orbekk.protobuf.NewRpcChannel; public class ConnectionManagerImpl implements ConnectionManager { private int connectionTimeout; private int readTimeout; private Map<String, MyJsonRpcHttpClient> connectionCache = new HashMap<String, MyJsonRpcHttpClient>(); - + private ConcurrentMap<String, Future<NewRpcChannel>> channels = + new ConcurrentHashMap<String, Future<NewRpcChannel>>(); + private Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -52,6 +63,44 @@ public class ConnectionManagerImpl implements ConnectionManager { } return service; } + + private NewRpcChannel getChannel(String location) { + Future<NewRpcChannel> channel = channels.get(location); + if (channel == null) { + final String hostname = location.split(":")[0]; + final int port = Integer.valueOf(location.split(":")[1]); + Callable<NewRpcChannel> channelFactory = + new Callable<NewRpcChannel>() { + @Override public NewRpcChannel call() { + try { + return NewRpcChannel.create(hostname, port); + } catch (UnknownHostException e) { + logger.error("Could not connect: ", e); + return null; + } catch (IOException e) { + logger.error("Could not connect: ", e); + return null; + } + } + }; + FutureTask<NewRpcChannel> task = + new FutureTask<NewRpcChannel>(channelFactory); + channel = channels.putIfAbsent(location, task); + if (channel == null) { + task.run(); + channel = task; + } + } + try { + return channel.get(); + } catch (ExecutionException e) { + logger.error("Could not connect: ", e); + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } @Override public ClientService getClient(String url) { @@ -69,7 +118,12 @@ public class ConnectionManagerImpl implements ConnectionManager { } @Override - public DirectoryService getDirectory(String url) { - return getClassProxy(url, DirectoryService.class); + public Services.Directory getDirectory(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Directory.newStub(channel); + } else { + return null; + } } } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 8c17807..855daa7 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory; import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; import com.orbekk.same.config.Configuration; -import com.orbekk.same.discovery.DirectoryService; import com.orbekk.same.http.ServerContainer; import com.orbekk.same.http.StateServlet; import com.orbekk.same.http.JettyServerBuilder; @@ -143,13 +142,13 @@ public class SameController { return master; } - public DirectoryService getDirectory() { - String directoryUrl = configuration.get("directoryUrl"); - DirectoryService directory = null; - if (directoryUrl != null) { - directory = connections.getDirectory(directoryUrl); + public Services.Directory getDirectory() { + String directoryLocation = configuration.get("directoryLocation"); + if (directoryLocation != null) { + return connections.getDirectory(directoryLocation); + } else { + return null; } - return directory; } public VariableFactory createVariableFactory() { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 6f5e5d8..93b2e6a 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -4,7 +4,7 @@ import java.util.Map; import java.util.HashMap; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; +import com.orbekk.same.Services.Directory; /** * This class is used in test. @@ -16,8 +16,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap<String, MasterService>(); public Map<String, PaxosService> paxosMap = new HashMap<String, PaxosService>(); - public Map<String, DirectoryService> directoryMap = - new HashMap<String, DirectoryService>(); + public Map<String, Services.Directory> directoryMap = + new HashMap<String, Services.Directory>(); public TestConnectionManager() { } @@ -38,7 +38,7 @@ public class TestConnectionManager implements ConnectionManager { } @Override - public DirectoryService getDirectory(String url) { - return directoryMap.get(url); + public Directory getDirectory(String location) { + return directoryMap.get(location); } } diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java index 1fad871..6108630 100644 --- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java +++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java @@ -81,7 +81,7 @@ public class ClientBenchmark { String host = args[0]; int port = Integer.valueOf(args[1]); try { - benchmark(host, port, 1000, 5000); + benchmark(host, port, 2000, 10000); } catch (InterruptedException e) { System.out.println("Benchmark failed."); } diff --git a/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java b/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java deleted file mode 100644 index fae2bf6..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.orbekk.same.discovery; - -import java.util.List; - -/** - * Maintains a registry of available networks. - * - * The discovery service is only meant to be used for debugging. - */ -public interface DirectoryService { - /** - * Returns a list of network names and master urls interleaved, i.e., - * - * [NetworkName1, MasterUrl1, ...] - */ - List<String> getNetworks() throws Exception; - - /** - * Register a network. - */ - void registerNetwork(String networkName, String masterUrl) throws Exception; -} |