summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java20
1 files changed, 18 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index 82e3dd2..12f1433 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -24,14 +24,14 @@ import com.google.protobuf.RpcController;
public class NewRpcChannel implements com.google.protobuf.RpcChannel {
public static int NUM_CONCURRENT_REQUESTS = 5;
private static final Logger logger =
- Logger.getLogger(RpcChannel.class.getName());
+ Logger.getLogger(NewRpcChannel.class.getName());
private final String host;
private final int port;
private final AtomicLong nextId = new AtomicLong(0);
private final ExecutorService responseHandlerPool =
Executors.newSingleThreadExecutor();
private final BlockingQueue<Data.Request> requestQueue =
- new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS);
+ new ArrayBlockingQueue<Data.Request>(NUM_CONCURRENT_REQUESTS);
private volatile Socket socket = null;
private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests =
new ConcurrentHashMap<Long, RequestMetadata>();
@@ -124,6 +124,9 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
for (;;) {
Data.Response response = Data.Response
.parseDelimitedFrom(inputStream);
+ if (response == null) {
+ throw new IOException("Could not read response.");
+ }
responseHandlerPool.execute(new ResponseHandler(response));
}
} catch (IOException e) {
@@ -139,6 +142,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ public static NewRpcChannel createOrNull(String host, int port) {
+ try {
+ return create(host, port);
+ } catch (UnknownHostException e) {
+ logger.log(Level.WARNING, "Unable to create RPC channel.", e);
+ return null;
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Unable to create RPC channel.", e);
+ return null;
+ }
+ }
+
public static NewRpcChannel create(String host, int port)
throws UnknownHostException, IOException {
NewRpcChannel channel = new NewRpcChannel(host, port);
@@ -173,6 +188,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
private void tryCloseSocket(Socket socket) {
try {
socket.close();
+ cancelAllRequests("channel closed");
} catch (IOException e1) {
logger.log(Level.WARNING,
"Unable to close socket " + socket,