From 26bc8a5b5b9d1ddc5b2b3992dc818da10f582b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Apr 2012 14:54:20 +0200 Subject: Rename NewRpcChannel => RpcChannel. --- .../java/com/orbekk/example/ExampleClient.java | 6 +- .../java/com/orbekk/protobuf/NewRpcChannel.java | 255 --------------------- src/main/java/com/orbekk/protobuf/RpcChannel.java | 255 +++++++++++++++++++++ .../orbekk/protobuf/ProtobufFunctionalTest.java | 8 +- 4 files changed, 262 insertions(+), 262 deletions(-) delete mode 100644 src/main/java/com/orbekk/protobuf/NewRpcChannel.java create mode 100644 src/main/java/com/orbekk/protobuf/RpcChannel.java diff --git a/src/main/java/com/orbekk/example/ExampleClient.java b/src/main/java/com/orbekk/example/ExampleClient.java index e047ad9..4d229fe 100644 --- a/src/main/java/com/orbekk/example/ExampleClient.java +++ b/src/main/java/com/orbekk/example/ExampleClient.java @@ -19,14 +19,14 @@ import java.util.concurrent.CountDownLatch; import com.google.protobuf.RpcCallback; import com.orbekk.example.Example.FortuneReply; -import com.orbekk.protobuf.NewRpcChannel; +import com.orbekk.protobuf.RpcChannel; import com.orbekk.protobuf.Rpc; public class ExampleClient { public void runClient(int port) { - NewRpcChannel channel = null; + RpcChannel channel = null; try { - channel = NewRpcChannel.createOrNull("localhost", port); + channel = RpcChannel.createOrNull("localhost", port); Example.FortuneService service = Example.FortuneService.newStub(channel); printFortune(service); diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java deleted file mode 100644 index 12f1433..0000000 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ /dev/null @@ -1,255 +0,0 @@ -package com.orbekk.protobuf; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.net.Socket; -import java.net.UnknownHostException; -import java.util.LinkedList; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -public class NewRpcChannel implements com.google.protobuf.RpcChannel { - public static int NUM_CONCURRENT_REQUESTS = 5; - private static final Logger logger = - Logger.getLogger(NewRpcChannel.class.getName()); - private final String host; - private final int port; - private final AtomicLong nextId = new AtomicLong(0); - private final ExecutorService responseHandlerPool = - Executors.newSingleThreadExecutor(); - private final BlockingQueue requestQueue = - new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); - private volatile Socket socket = null; - private final ConcurrentHashMap ongoingRequests = - new ConcurrentHashMap(); - private volatile OutgoingHandler outgoingHandler = null; - private volatile IncomingHandler incomingHandler = null; - - class RequestMetadata { - public final long id; - public final Rpc rpc; - public final RpcCallback done; - public final Message responsePrototype; - - public RequestMetadata(long id, Rpc rpc, RpcCallback done, - Message responsePrototype) { - this.id = id; - this.rpc = rpc; - this.done = done; - this.responsePrototype = responsePrototype; - } - } - - class ResponseHandler implements Runnable { - private final Data.Response response; - - public ResponseHandler(Data.Response response) { - this.response = response; - } - - @Override public void run() { - handleResponse(response); - } - } - - class OutgoingHandler extends Thread { - private final Socket socket; - private final BlockingQueue requests; - - public OutgoingHandler(Socket socket, - BlockingQueue requests) { - super("OutgoingHandler"); - this.socket = socket; - this.requests = requests; - } - - @Override public void run() { - try { - BufferedOutputStream outputStream = new BufferedOutputStream( - socket.getOutputStream()); - LinkedList buffer = - new LinkedList(); - for (;;) { - buffer.clear(); - buffer.add(requests.take()); - requests.drainTo(buffer); - for (Data.Request request : buffer) { - request.writeDelimitedTo(outputStream); - } - outputStream.flush(); - } - } catch (InterruptedException e) { - tryCloseSocket(socket); - return; - } catch (IOException e) { - tryCloseSocket(socket); - return; - } - } - - @Override public void interrupt() { - super.interrupt(); - tryCloseSocket(socket); - } - } - - class IncomingHandler extends Thread { - private Socket socket; - private ExecutorService responseHandlerPool; - - public IncomingHandler(Socket socket, - ExecutorService responseHandlerPool) { - super("IncomingHandler"); - this.socket = socket; - this.responseHandlerPool = responseHandlerPool; - } - - @Override public void run() { - try { - BufferedInputStream inputStream = new BufferedInputStream( - socket.getInputStream()); - for (;;) { - Data.Response response = Data.Response - .parseDelimitedFrom(inputStream); - if (response == null) { - throw new IOException("Could not read response."); - } - responseHandlerPool.execute(new ResponseHandler(response)); - } - } catch (IOException e) { - responseHandlerPool.shutdown(); - tryCloseSocket(socket); - return; - } - } - - @Override public void interrupt() { - super.interrupt(); - tryCloseSocket(socket); - } - } - - public static NewRpcChannel createOrNull(String host, int port) { - try { - return create(host, port); - } catch (UnknownHostException e) { - logger.log(Level.WARNING, "Unable to create RPC channel.", e); - return null; - } catch (IOException e) { - logger.log(Level.WARNING, "Unable to create RPC channel.", e); - return null; - } - } - - public static NewRpcChannel create(String host, int port) - throws UnknownHostException, IOException { - NewRpcChannel channel = new NewRpcChannel(host, port); - channel.start(); - return channel; - } - - NewRpcChannel(String host, int port) { - this.host = host; - this.port = port; - } - - public void start() throws UnknownHostException, IOException { - if (outgoingHandler != null) { - throw new IllegalStateException("start() called twice."); - } - socket = new Socket(host, port); - outgoingHandler = new OutgoingHandler(socket, requestQueue); - incomingHandler = new IncomingHandler(socket, responseHandlerPool); - - outgoingHandler.start(); - incomingHandler.start(); - } - - public void close() { - tryCloseSocket(socket); - outgoingHandler.interrupt(); - incomingHandler.interrupt(); - cancelAllRequests("channel closed."); - } - - private void tryCloseSocket(Socket socket) { - try { - socket.close(); - cancelAllRequests("channel closed"); - } catch (IOException e1) { - logger.log(Level.WARNING, - "Unable to close socket " + socket, - e1); - } - } - - @Override - public void callMethod(MethodDescriptor method, - RpcController rpc, Message requestMessage, - Message responsePrototype, - RpcCallback done) { - long id = nextId.incrementAndGet(); - Rpc rpc_ = (Rpc) rpc; - RequestMetadata request_ = new RequestMetadata(id, rpc_, done, - responsePrototype); - ongoingRequests.put(id, request_); - - Data.Request requestData = Data.Request.newBuilder() - .setRequestId(id) - .setFullServiceName(method.getService().getFullName()) - .setMethodName(method.getName()) - .setRequestProto(requestMessage.toByteString()) - .build(); - - try { - requestQueue.put(requestData); - } catch (InterruptedException e) { - cancelRequest(request_, "channel closed"); - } - } - - private void cancelAllRequests(String reason) { - for (RequestMetadata request : ongoingRequests.values()) { - cancelRequest(request, reason); - } - } - - private void cancelRequest(RequestMetadata request, String reason) { - request.rpc.setFailed(reason); - request.rpc.cancel(); - request.done.run(null); - request.rpc.complete(); - } - - private void handleResponse(Data.Response response) { - RequestMetadata request = - ongoingRequests.remove(response.getRequestId()); - if (request == null) { - logger.info("Unknown request. Possible timeout?" + response); - return; - } - try { - Message responsePb = request.responsePrototype.toBuilder() - .mergeFrom(response.getResponseProto()).build(); - request.rpc.readFrom(response); - request.done.run(responsePb); - request.rpc.complete(); - } catch (InvalidProtocolBufferException e) { - cancelRequest(request, "invalid response from server"); - } - } -} - diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java new file mode 100644 index 0000000..402ff17 --- /dev/null +++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java @@ -0,0 +1,255 @@ +package com.orbekk.protobuf; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +public class RpcChannel implements com.google.protobuf.RpcChannel { + public static int NUM_CONCURRENT_REQUESTS = 5; + private static final Logger logger = + Logger.getLogger(RpcChannel.class.getName()); + private final String host; + private final int port; + private final AtomicLong nextId = new AtomicLong(0); + private final ExecutorService responseHandlerPool = + Executors.newSingleThreadExecutor(); + private final BlockingQueue requestQueue = + new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); + private volatile Socket socket = null; + private final ConcurrentHashMap ongoingRequests = + new ConcurrentHashMap(); + private volatile OutgoingHandler outgoingHandler = null; + private volatile IncomingHandler incomingHandler = null; + + class RequestMetadata { + public final long id; + public final Rpc rpc; + public final RpcCallback done; + public final Message responsePrototype; + + public RequestMetadata(long id, Rpc rpc, RpcCallback done, + Message responsePrototype) { + this.id = id; + this.rpc = rpc; + this.done = done; + this.responsePrototype = responsePrototype; + } + } + + class ResponseHandler implements Runnable { + private final Data.Response response; + + public ResponseHandler(Data.Response response) { + this.response = response; + } + + @Override public void run() { + handleResponse(response); + } + } + + class OutgoingHandler extends Thread { + private final Socket socket; + private final BlockingQueue requests; + + public OutgoingHandler(Socket socket, + BlockingQueue requests) { + super("OutgoingHandler"); + this.socket = socket; + this.requests = requests; + } + + @Override public void run() { + try { + BufferedOutputStream outputStream = new BufferedOutputStream( + socket.getOutputStream()); + LinkedList buffer = + new LinkedList(); + for (;;) { + buffer.clear(); + buffer.add(requests.take()); + requests.drainTo(buffer); + for (Data.Request request : buffer) { + request.writeDelimitedTo(outputStream); + } + outputStream.flush(); + } + } catch (InterruptedException e) { + tryCloseSocket(socket); + return; + } catch (IOException e) { + tryCloseSocket(socket); + return; + } + } + + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } + } + + class IncomingHandler extends Thread { + private Socket socket; + private ExecutorService responseHandlerPool; + + public IncomingHandler(Socket socket, + ExecutorService responseHandlerPool) { + super("IncomingHandler"); + this.socket = socket; + this.responseHandlerPool = responseHandlerPool; + } + + @Override public void run() { + try { + BufferedInputStream inputStream = new BufferedInputStream( + socket.getInputStream()); + for (;;) { + Data.Response response = Data.Response + .parseDelimitedFrom(inputStream); + if (response == null) { + throw new IOException("Could not read response."); + } + responseHandlerPool.execute(new ResponseHandler(response)); + } + } catch (IOException e) { + responseHandlerPool.shutdown(); + tryCloseSocket(socket); + return; + } + } + + @Override public void interrupt() { + super.interrupt(); + tryCloseSocket(socket); + } + } + + public static RpcChannel createOrNull(String host, int port) { + try { + return create(host, port); + } catch (UnknownHostException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } catch (IOException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } + } + + public static RpcChannel create(String host, int port) + throws UnknownHostException, IOException { + RpcChannel channel = new RpcChannel(host, port); + channel.start(); + return channel; + } + + RpcChannel(String host, int port) { + this.host = host; + this.port = port; + } + + public void start() throws UnknownHostException, IOException { + if (outgoingHandler != null) { + throw new IllegalStateException("start() called twice."); + } + socket = new Socket(host, port); + outgoingHandler = new OutgoingHandler(socket, requestQueue); + incomingHandler = new IncomingHandler(socket, responseHandlerPool); + + outgoingHandler.start(); + incomingHandler.start(); + } + + public void close() { + tryCloseSocket(socket); + outgoingHandler.interrupt(); + incomingHandler.interrupt(); + cancelAllRequests("channel closed."); + } + + private void tryCloseSocket(Socket socket) { + try { + socket.close(); + cancelAllRequests("channel closed"); + } catch (IOException e1) { + logger.log(Level.WARNING, + "Unable to close socket " + socket, + e1); + } + } + + @Override + public void callMethod(MethodDescriptor method, + RpcController rpc, Message requestMessage, + Message responsePrototype, + RpcCallback done) { + long id = nextId.incrementAndGet(); + Rpc rpc_ = (Rpc) rpc; + RequestMetadata request_ = new RequestMetadata(id, rpc_, done, + responsePrototype); + ongoingRequests.put(id, request_); + + Data.Request requestData = Data.Request.newBuilder() + .setRequestId(id) + .setFullServiceName(method.getService().getFullName()) + .setMethodName(method.getName()) + .setRequestProto(requestMessage.toByteString()) + .build(); + + try { + requestQueue.put(requestData); + } catch (InterruptedException e) { + cancelRequest(request_, "channel closed"); + } + } + + private void cancelAllRequests(String reason) { + for (RequestMetadata request : ongoingRequests.values()) { + cancelRequest(request, reason); + } + } + + private void cancelRequest(RequestMetadata request, String reason) { + request.rpc.setFailed(reason); + request.rpc.cancel(); + request.done.run(null); + request.rpc.complete(); + } + + private void handleResponse(Data.Response response) { + RequestMetadata request = + ongoingRequests.remove(response.getRequestId()); + if (request == null) { + logger.info("Unknown request. Possible timeout?" + response); + return; + } + try { + Message responsePb = request.responsePrototype.toBuilder() + .mergeFrom(response.getResponseProto()).build(); + request.rpc.readFrom(response); + request.done.run(responsePb); + request.rpc.complete(); + } catch (InvalidProtocolBufferException e) { + cancelRequest(request, "invalid response from server"); + } + } +} + diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index 1f1bc46..bf30f59 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -35,12 +35,12 @@ public class ProtobufFunctionalTest { CountDownLatch returnC = new CountDownLatch(1); SimpleProtobufServer server = SimpleProtobufServer.create(0, 50, 1); int serverport = server.getPort(); - NewRpcChannel channel; + RpcChannel channel; TestService directService = new TestService(); Test.Service service; @Before public void setUp() throws Exception { - channel = NewRpcChannel.create("localhost", serverport); + channel = RpcChannel.create("localhost", serverport); service = Test.Service.newStub(channel); server.start(); server.registerService(directService); @@ -79,7 +79,7 @@ public class ProtobufFunctionalTest { } @org.junit.Test public void testNewRpcChannel() throws Exception { - NewRpcChannel channel = NewRpcChannel.create("localhost", serverport); + RpcChannel channel = RpcChannel.create("localhost", serverport); Test.Service service = Test.Service.newStub(channel); Test.Type1 request = Test.Type1.newBuilder().build(); int count = 10000; @@ -100,7 +100,7 @@ public class ProtobufFunctionalTest { } @org.junit.Test public void testDoneSignal() throws Exception { - NewRpcChannel channel = NewRpcChannel.create("localhost", serverport); + RpcChannel channel = RpcChannel.create("localhost", serverport); Test.Service service = Test.Service.newStub(channel); Test.Type1 request = Test.Type1.newBuilder().build(); -- cgit v1.2.3