diff options
Diffstat (limited to 'same/src/main/java/com')
6 files changed, 71 insertions, 41 deletions
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index 4637617..8a6d3a0 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -1,7 +1,6 @@ package com.orbekk.same; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; /** * An interface that returns a connection for a participant. @@ -12,5 +11,5 @@ public interface ConnectionManager { ClientService getClient(String url); MasterService getMaster(String url); PaxosService getPaxos(String url); - DirectoryService getDirectory(String url); + Services.Directory getDirectory(String location); } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index ade469e..a57bbdf 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -1,24 +1,35 @@ package com.orbekk.same; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcChannel; import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.net.MyJsonRpcHttpClient; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; +import com.orbekk.protobuf.NewRpcChannel; public class ConnectionManagerImpl implements ConnectionManager { private int connectionTimeout; private int readTimeout; private Map<String, MyJsonRpcHttpClient> connectionCache = new HashMap<String, MyJsonRpcHttpClient>(); - + private ConcurrentMap<String, Future<NewRpcChannel>> channels = + new ConcurrentHashMap<String, Future<NewRpcChannel>>(); + private Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -52,6 +63,44 @@ public class ConnectionManagerImpl implements ConnectionManager { } return service; } + + private NewRpcChannel getChannel(String location) { + Future<NewRpcChannel> channel = channels.get(location); + if (channel == null) { + final String hostname = location.split(":")[0]; + final int port = Integer.valueOf(location.split(":")[1]); + Callable<NewRpcChannel> channelFactory = + new Callable<NewRpcChannel>() { + @Override public NewRpcChannel call() { + try { + return NewRpcChannel.create(hostname, port); + } catch (UnknownHostException e) { + logger.error("Could not connect: ", e); + return null; + } catch (IOException e) { + logger.error("Could not connect: ", e); + return null; + } + } + }; + FutureTask<NewRpcChannel> task = + new FutureTask<NewRpcChannel>(channelFactory); + channel = channels.putIfAbsent(location, task); + if (channel == null) { + task.run(); + channel = task; + } + } + try { + return channel.get(); + } catch (ExecutionException e) { + logger.error("Could not connect: ", e); + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } @Override public ClientService getClient(String url) { @@ -69,7 +118,12 @@ public class ConnectionManagerImpl implements ConnectionManager { } @Override - public DirectoryService getDirectory(String url) { - return getClassProxy(url, DirectoryService.class); + public Services.Directory getDirectory(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Directory.newStub(channel); + } else { + return null; + } } } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 8c17807..855daa7 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory; import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; import com.orbekk.same.config.Configuration; -import com.orbekk.same.discovery.DirectoryService; import com.orbekk.same.http.ServerContainer; import com.orbekk.same.http.StateServlet; import com.orbekk.same.http.JettyServerBuilder; @@ -143,13 +142,13 @@ public class SameController { return master; } - public DirectoryService getDirectory() { - String directoryUrl = configuration.get("directoryUrl"); - DirectoryService directory = null; - if (directoryUrl != null) { - directory = connections.getDirectory(directoryUrl); + public Services.Directory getDirectory() { + String directoryLocation = configuration.get("directoryLocation"); + if (directoryLocation != null) { + return connections.getDirectory(directoryLocation); + } else { + return null; } - return directory; } public VariableFactory createVariableFactory() { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 6f5e5d8..93b2e6a 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -4,7 +4,7 @@ import java.util.Map; import java.util.HashMap; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; +import com.orbekk.same.Services.Directory; /** * This class is used in test. @@ -16,8 +16,8 @@ public class TestConnectionManager implements ConnectionManager { new HashMap<String, MasterService>(); public Map<String, PaxosService> paxosMap = new HashMap<String, PaxosService>(); - public Map<String, DirectoryService> directoryMap = - new HashMap<String, DirectoryService>(); + public Map<String, Services.Directory> directoryMap = + new HashMap<String, Services.Directory>(); public TestConnectionManager() { } @@ -38,7 +38,7 @@ public class TestConnectionManager implements ConnectionManager { } @Override - public DirectoryService getDirectory(String url) { - return directoryMap.get(url); + public Directory getDirectory(String location) { + return directoryMap.get(location); } } diff --git a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java index 1fad871..6108630 100644 --- a/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java +++ b/same/src/main/java/com/orbekk/same/benchmark/ClientBenchmark.java @@ -81,7 +81,7 @@ public class ClientBenchmark { String host = args[0]; int port = Integer.valueOf(args[1]); try { - benchmark(host, port, 1000, 5000); + benchmark(host, port, 2000, 10000); } catch (InterruptedException e) { System.out.println("Benchmark failed."); } diff --git a/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java b/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java deleted file mode 100644 index fae2bf6..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DirectoryService.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.orbekk.same.discovery; - -import java.util.List; - -/** - * Maintains a registry of available networks. - * - * The discovery service is only meant to be used for debugging. - */ -public interface DirectoryService { - /** - * Returns a list of network names and master urls interleaved, i.e., - * - * [NetworkName1, MasterUrl1, ...] - */ - List<String> getNetworks() throws Exception; - - /** - * Register a network. - */ - void registerNetwork(String networkName, String masterUrl) throws Exception; -} |