diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-10 17:51:59 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-10 17:51:59 +0200 |
commit | bb4dc8012467d47ed21a8fa1cf11fca27a1fafd7 (patch) | |
tree | f0e2c8f5b78154fb294f13f559a7987b13145e5f /src/main/java/com/orbekk/protobuf/NewRpcChannel.java | |
parent | afb415dcaf614c177176a00220f3b0418daada1f (diff) |
Fix some bugs in RpcChannel and NewRpcChannel.
Use protobuf v.2.4.1.
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index 1e009fa..afb94af 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -33,6 +33,8 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { private volatile Socket socket = null; private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests = new ConcurrentHashMap<Long, RequestMetadata>(); + private volatile OutgoingHandler outgoingHandler = null; + private volatile IncomingHandler incomingHandler = null; class RequestMetadata { public final long id; @@ -61,12 +63,13 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } - class OutgoingHandler implements Runnable { + class OutgoingHandler extends Thread { private final Socket socket; private final BlockingQueue<Data.Request> requests; public OutgoingHandler(Socket socket, BlockingQueue<Data.Request> requests) { + super("OutgoingHandler"); this.socket = socket; this.requests = requests; } @@ -86,15 +89,19 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } - + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } } - class IncomingHandler implements Runnable { + class IncomingHandler extends Thread { private Socket socket; private ExecutorService responseHandlerPool; public IncomingHandler(Socket socket, ExecutorService responseHandlerPool) { + super("IncomingHandler"); this.socket = socket; this.responseHandlerPool = responseHandlerPool; } @@ -112,6 +119,11 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } } + + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } } public static NewRpcChannel create(String host, int port) @@ -127,14 +139,22 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } public void start() throws UnknownHostException, IOException { + if (outgoingHandler != null) { + throw new IllegalStateException("start() called twice."); + } socket = new Socket(host, port); - OutgoingHandler outgoing = new OutgoingHandler(socket, - requestQueue); - IncomingHandler incoming = new IncomingHandler(socket, - responseHandlerPool); - - new Thread(outgoing, "RequestSender: " + host + ":" + port).start(); - new Thread(incoming, "RequestReceiver: " + host + ":" + port).start(); + outgoingHandler = new OutgoingHandler(socket, requestQueue); + incomingHandler = new IncomingHandler(socket, responseHandlerPool); + + outgoingHandler.start(); + incomingHandler.start(); + } + + public void close() { + tryCloseSocket(socket); + outgoingHandler.interrupt(); + incomingHandler.interrupt(); + cancelAllRequests("channel closed."); } private void tryCloseSocket(Socket socket) { @@ -172,6 +192,12 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } + private void cancelAllRequests(String reason) { + for (RequestMetadata request : ongoingRequests.values()) { + cancelRequest(request, reason); + } + } + private void cancelRequest(RequestMetadata request, String reason) { throw new IllegalStateException("Not implemented"); } |