diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java | 62 |
1 files changed, 58 insertions, 4 deletions
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index ade469e..a57bbdf 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -1,24 +1,35 @@ package com.orbekk.same; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcChannel; import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.net.MyJsonRpcHttpClient; import com.orbekk.paxos.PaxosService; -import com.orbekk.same.discovery.DirectoryService; +import com.orbekk.protobuf.NewRpcChannel; public class ConnectionManagerImpl implements ConnectionManager { private int connectionTimeout; private int readTimeout; private Map<String, MyJsonRpcHttpClient> connectionCache = new HashMap<String, MyJsonRpcHttpClient>(); - + private ConcurrentMap<String, Future<NewRpcChannel>> channels = + new ConcurrentHashMap<String, Future<NewRpcChannel>>(); + private Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -52,6 +63,44 @@ public class ConnectionManagerImpl implements ConnectionManager { } return service; } + + private NewRpcChannel getChannel(String location) { + Future<NewRpcChannel> channel = channels.get(location); + if (channel == null) { + final String hostname = location.split(":")[0]; + final int port = Integer.valueOf(location.split(":")[1]); + Callable<NewRpcChannel> channelFactory = + new Callable<NewRpcChannel>() { + @Override public NewRpcChannel call() { + try { + return NewRpcChannel.create(hostname, port); + } catch (UnknownHostException e) { + logger.error("Could not connect: ", e); + return null; + } catch (IOException e) { + logger.error("Could not connect: ", e); + return null; + } + } + }; + FutureTask<NewRpcChannel> task = + new FutureTask<NewRpcChannel>(channelFactory); + channel = channels.putIfAbsent(location, task); + if (channel == null) { + task.run(); + channel = task; + } + } + try { + return channel.get(); + } catch (ExecutionException e) { + logger.error("Could not connect: ", e); + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } @Override public ClientService getClient(String url) { @@ -69,7 +118,12 @@ public class ConnectionManagerImpl implements ConnectionManager { } @Override - public DirectoryService getDirectory(String url) { - return getClassProxy(url, DirectoryService.class); + public Services.Directory getDirectory(String location) { + RpcChannel channel = getChannel(location); + if (channel != null) { + return Services.Directory.newStub(channel); + } else { + return null; + } } } |