summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java46
1 files changed, 36 insertions, 10 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index 1e009fa..afb94af 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -33,6 +33,8 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
private volatile Socket socket = null;
private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests =
new ConcurrentHashMap<Long, RequestMetadata>();
+ private volatile OutgoingHandler outgoingHandler = null;
+ private volatile IncomingHandler incomingHandler = null;
class RequestMetadata {
public final long id;
@@ -61,12 +63,13 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
- class OutgoingHandler implements Runnable {
+ class OutgoingHandler extends Thread {
private final Socket socket;
private final BlockingQueue<Data.Request> requests;
public OutgoingHandler(Socket socket,
BlockingQueue<Data.Request> requests) {
+ super("OutgoingHandler");
this.socket = socket;
this.requests = requests;
}
@@ -86,15 +89,19 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
-
+ @Override public void interrupt() {
+ super.interrupt();
+ tryCloseSocket(socket);
+ }
}
- class IncomingHandler implements Runnable {
+ class IncomingHandler extends Thread {
private Socket socket;
private ExecutorService responseHandlerPool;
public IncomingHandler(Socket socket,
ExecutorService responseHandlerPool) {
+ super("IncomingHandler");
this.socket = socket;
this.responseHandlerPool = responseHandlerPool;
}
@@ -112,6 +119,11 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
}
+
+ @Override public void interrupt() {
+ super.interrupt();
+ tryCloseSocket(socket);
+ }
}
public static NewRpcChannel create(String host, int port)
@@ -127,14 +139,22 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
public void start() throws UnknownHostException, IOException {
+ if (outgoingHandler != null) {
+ throw new IllegalStateException("start() called twice.");
+ }
socket = new Socket(host, port);
- OutgoingHandler outgoing = new OutgoingHandler(socket,
- requestQueue);
- IncomingHandler incoming = new IncomingHandler(socket,
- responseHandlerPool);
-
- new Thread(outgoing, "RequestSender: " + host + ":" + port).start();
- new Thread(incoming, "RequestReceiver: " + host + ":" + port).start();
+ outgoingHandler = new OutgoingHandler(socket, requestQueue);
+ incomingHandler = new IncomingHandler(socket, responseHandlerPool);
+
+ outgoingHandler.start();
+ incomingHandler.start();
+ }
+
+ public void close() {
+ tryCloseSocket(socket);
+ outgoingHandler.interrupt();
+ incomingHandler.interrupt();
+ cancelAllRequests("channel closed.");
}
private void tryCloseSocket(Socket socket) {
@@ -172,6 +192,12 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ private void cancelAllRequests(String reason) {
+ for (RequestMetadata request : ongoingRequests.values()) {
+ cancelRequest(request, reason);
+ }
+ }
+
private void cancelRequest(RequestMetadata request, String reason) {
throw new IllegalStateException("Not implemented");
}