diff options
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index 82e3dd2..12f1433 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -24,14 +24,14 @@ import com.google.protobuf.RpcController; public class NewRpcChannel implements com.google.protobuf.RpcChannel { public static int NUM_CONCURRENT_REQUESTS = 5; private static final Logger logger = - Logger.getLogger(RpcChannel.class.getName()); + Logger.getLogger(NewRpcChannel.class.getName()); private final String host; private final int port; private final AtomicLong nextId = new AtomicLong(0); private final ExecutorService responseHandlerPool = Executors.newSingleThreadExecutor(); private final BlockingQueue<Data.Request> requestQueue = - new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); + new ArrayBlockingQueue<Data.Request>(NUM_CONCURRENT_REQUESTS); private volatile Socket socket = null; private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests = new ConcurrentHashMap<Long, RequestMetadata>(); @@ -124,6 +124,9 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { for (;;) { Data.Response response = Data.Response .parseDelimitedFrom(inputStream); + if (response == null) { + throw new IOException("Could not read response."); + } responseHandlerPool.execute(new ResponseHandler(response)); } } catch (IOException e) { @@ -139,6 +142,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } + public static NewRpcChannel createOrNull(String host, int port) { + try { + return create(host, port); + } catch (UnknownHostException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } catch (IOException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } + } + public static NewRpcChannel create(String host, int port) throws UnknownHostException, IOException { NewRpcChannel channel = new NewRpcChannel(host, port); @@ -173,6 +188,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { private void tryCloseSocket(Socket socket) { try { socket.close(); + cancelAllRequests("channel closed"); } catch (IOException e1) { logger.log(Level.WARNING, "Unable to close socket " + socket, |