summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-10 17:51:59 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-10 17:51:59 +0200
commitbb4dc8012467d47ed21a8fa1cf11fca27a1fafd7 (patch)
treef0e2c8f5b78154fb294f13f559a7987b13145e5f /src/main/java/com/orbekk/protobuf/NewRpcChannel.java
parentafb415dcaf614c177176a00220f3b0418daada1f (diff)
Fix some bugs in RpcChannel and NewRpcChannel.
Use protobuf v.2.4.1.
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java46
1 files changed, 36 insertions, 10 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index 1e009fa..afb94af 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -33,6 +33,8 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
private volatile Socket socket = null;
private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests =
new ConcurrentHashMap<Long, RequestMetadata>();
+ private volatile OutgoingHandler outgoingHandler = null;
+ private volatile IncomingHandler incomingHandler = null;
class RequestMetadata {
public final long id;
@@ -61,12 +63,13 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
- class OutgoingHandler implements Runnable {
+ class OutgoingHandler extends Thread {
private final Socket socket;
private final BlockingQueue<Data.Request> requests;
public OutgoingHandler(Socket socket,
BlockingQueue<Data.Request> requests) {
+ super("OutgoingHandler");
this.socket = socket;
this.requests = requests;
}
@@ -86,15 +89,19 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
-
+ @Override public void interrupt() {
+ super.interrupt();
+ tryCloseSocket(socket);
+ }
}
- class IncomingHandler implements Runnable {
+ class IncomingHandler extends Thread {
private Socket socket;
private ExecutorService responseHandlerPool;
public IncomingHandler(Socket socket,
ExecutorService responseHandlerPool) {
+ super("IncomingHandler");
this.socket = socket;
this.responseHandlerPool = responseHandlerPool;
}
@@ -112,6 +119,11 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
}
+
+ @Override public void interrupt() {
+ super.interrupt();
+ tryCloseSocket(socket);
+ }
}
public static NewRpcChannel create(String host, int port)
@@ -127,14 +139,22 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
public void start() throws UnknownHostException, IOException {
+ if (outgoingHandler != null) {
+ throw new IllegalStateException("start() called twice.");
+ }
socket = new Socket(host, port);
- OutgoingHandler outgoing = new OutgoingHandler(socket,
- requestQueue);
- IncomingHandler incoming = new IncomingHandler(socket,
- responseHandlerPool);
-
- new Thread(outgoing, "RequestSender: " + host + ":" + port).start();
- new Thread(incoming, "RequestReceiver: " + host + ":" + port).start();
+ outgoingHandler = new OutgoingHandler(socket, requestQueue);
+ incomingHandler = new IncomingHandler(socket, responseHandlerPool);
+
+ outgoingHandler.start();
+ incomingHandler.start();
+ }
+
+ public void close() {
+ tryCloseSocket(socket);
+ outgoingHandler.interrupt();
+ incomingHandler.interrupt();
+ cancelAllRequests("channel closed.");
}
private void tryCloseSocket(Socket socket) {
@@ -172,6 +192,12 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ private void cancelAllRequests(String reason) {
+ for (RequestMetadata request : ongoingRequests.values()) {
+ cancelRequest(request, reason);
+ }
+ }
+
private void cancelRequest(RequestMetadata request, String reason) {
throw new IllegalStateException("Not implemented");
}