diff options
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index 1e009fa..afb94af 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -33,6 +33,8 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { private volatile Socket socket = null; private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests = new ConcurrentHashMap<Long, RequestMetadata>(); + private volatile OutgoingHandler outgoingHandler = null; + private volatile IncomingHandler incomingHandler = null; class RequestMetadata { public final long id; @@ -61,12 +63,13 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } - class OutgoingHandler implements Runnable { + class OutgoingHandler extends Thread { private final Socket socket; private final BlockingQueue<Data.Request> requests; public OutgoingHandler(Socket socket, BlockingQueue<Data.Request> requests) { + super("OutgoingHandler"); this.socket = socket; this.requests = requests; } @@ -86,15 +89,19 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } - + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } } - class IncomingHandler implements Runnable { + class IncomingHandler extends Thread { private Socket socket; private ExecutorService responseHandlerPool; public IncomingHandler(Socket socket, ExecutorService responseHandlerPool) { + super("IncomingHandler"); this.socket = socket; this.responseHandlerPool = responseHandlerPool; } @@ -112,6 +119,11 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } } + + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } } public static NewRpcChannel create(String host, int port) @@ -127,14 +139,22 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } public void start() throws UnknownHostException, IOException { + if (outgoingHandler != null) { + throw new IllegalStateException("start() called twice."); + } socket = new Socket(host, port); - OutgoingHandler outgoing = new OutgoingHandler(socket, - requestQueue); - IncomingHandler incoming = new IncomingHandler(socket, - responseHandlerPool); - - new Thread(outgoing, "RequestSender: " + host + ":" + port).start(); - new Thread(incoming, "RequestReceiver: " + host + ":" + port).start(); + outgoingHandler = new OutgoingHandler(socket, requestQueue); + incomingHandler = new IncomingHandler(socket, responseHandlerPool); + + outgoingHandler.start(); + incomingHandler.start(); + } + + public void close() { + tryCloseSocket(socket); + outgoingHandler.interrupt(); + incomingHandler.interrupt(); + cancelAllRequests("channel closed."); } private void tryCloseSocket(Socket socket) { @@ -172,6 +192,12 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } + private void cancelAllRequests(String reason) { + for (RequestMetadata request : ongoingRequests.values()) { + cancelRequest(request, reason); + } + } + private void cancelRequest(RequestMetadata request, String reason) { throw new IllegalStateException("Not implemented"); } |