From a4414f27a1076dbf53224c22765c4dc64c5b7bd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Apr 2012 14:53:23 +0200 Subject: Remove old RpcChannel. Fix tests with NewRpcChannel. --- .../java/com/orbekk/example/ExampleClient.java | 6 +- .../java/com/orbekk/protobuf/NewRpcChannel.java | 20 +- src/main/java/com/orbekk/protobuf/RpcChannel.java | 216 --------------------- .../orbekk/protobuf/ProtobufFunctionalTest.java | 12 +- 4 files changed, 29 insertions(+), 225 deletions(-) delete mode 100644 src/main/java/com/orbekk/protobuf/RpcChannel.java diff --git a/src/main/java/com/orbekk/example/ExampleClient.java b/src/main/java/com/orbekk/example/ExampleClient.java index 3590182..e047ad9 100644 --- a/src/main/java/com/orbekk/example/ExampleClient.java +++ b/src/main/java/com/orbekk/example/ExampleClient.java @@ -19,14 +19,14 @@ import java.util.concurrent.CountDownLatch; import com.google.protobuf.RpcCallback; import com.orbekk.example.Example.FortuneReply; +import com.orbekk.protobuf.NewRpcChannel; import com.orbekk.protobuf.Rpc; -import com.orbekk.protobuf.RpcChannel; public class ExampleClient { public void runClient(int port) { - RpcChannel channel = null; + NewRpcChannel channel = null; try { - channel = RpcChannel.create("localhost", port); + channel = NewRpcChannel.createOrNull("localhost", port); Example.FortuneService service = Example.FortuneService.newStub(channel); printFortune(service); diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index 82e3dd2..12f1433 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -24,14 +24,14 @@ import com.google.protobuf.RpcController; public class NewRpcChannel implements com.google.protobuf.RpcChannel { public static int NUM_CONCURRENT_REQUESTS = 5; private static final Logger logger = - Logger.getLogger(RpcChannel.class.getName()); + Logger.getLogger(NewRpcChannel.class.getName()); private final String host; private final int port; private final AtomicLong nextId = new AtomicLong(0); private final ExecutorService responseHandlerPool = Executors.newSingleThreadExecutor(); private final BlockingQueue requestQueue = - new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); + new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); private volatile Socket socket = null; private final ConcurrentHashMap ongoingRequests = new ConcurrentHashMap(); @@ -124,6 +124,9 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { for (;;) { Data.Response response = Data.Response .parseDelimitedFrom(inputStream); + if (response == null) { + throw new IOException("Could not read response."); + } responseHandlerPool.execute(new ResponseHandler(response)); } } catch (IOException e) { @@ -139,6 +142,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } } + public static NewRpcChannel createOrNull(String host, int port) { + try { + return create(host, port); + } catch (UnknownHostException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } catch (IOException e) { + logger.log(Level.WARNING, "Unable to create RPC channel.", e); + return null; + } + } + public static NewRpcChannel create(String host, int port) throws UnknownHostException, IOException { NewRpcChannel channel = new NewRpcChannel(host, port); @@ -173,6 +188,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { private void tryCloseSocket(Socket socket) { try { socket.close(); + cancelAllRequests("channel closed"); } catch (IOException e1) { logger.log(Level.WARNING, "Unable to close socket " + socket, diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java deleted file mode 100644 index 8e24d0c..0000000 --- a/src/main/java/com/orbekk/protobuf/RpcChannel.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Copyright 2012 Kjetil Ørbekk - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.orbekk.protobuf; - -import java.io.Closeable; -import java.io.IOException; -import java.net.Socket; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -public class RpcChannel extends Thread implements - com.google.protobuf.RpcChannel { - static final Logger logger = - Logger.getLogger(RpcChannel.class.getName()); - private final String host; - private final int port; - private volatile Socket socket = null; - private AtomicLong nextId = new AtomicLong(0); - private Map rpcs = - Collections.synchronizedMap( - new HashMap()); - private BlockingQueue sockets = new LinkedBlockingQueue(); - - private static class OngoingRequest implements Closeable { - long id; - Rpc rpc; - RpcCallback done; - Message responsePrototype; - Map rpcs; - - public OngoingRequest(long id, Rpc rpc, - RpcCallback done, Message responsePrototype, - Map rpcs) { - this.id = id; - this.rpc = rpc; - this.done = done; - this.responsePrototype = responsePrototype; - this.rpcs = rpcs; - } - - @Override - public void close() throws IOException { - throw new AssertionError("Not implemented"); - } - } - - public static RpcChannel create(String host, int port) { - RpcChannel channel = new RpcChannel(host, port); - channel.start(); - return channel; - } - - private RpcChannel(String host, int port) { - this.host = host; - this.port = port; - } - - private Socket getSocket() { - if (socket == null || socket.isClosed()) { - try { - logger.info("Creating new socket to " + host + ":" + port); - socket = new Socket(host, port); - sockets.add(socket); - } catch (UnknownHostException e) { - return null; - } catch (IOException e) { - logger.log(Level.WARNING, - "Could not establish connection.", e); - return null; - } - } - return socket; - } - - private Data.Request createRequest(Descriptors.MethodDescriptor method, - RpcController controller, - Message requestMessage, - Message responsePrototype, - RpcCallback done) { - long id = nextId.incrementAndGet(); - Rpc rpc = (Rpc)controller; - OngoingRequest ongoingRequest = new OngoingRequest(id, rpc, - done, responsePrototype, rpcs); - rpcs.put(id, ongoingRequest); - - Data.Request request = Data.Request.newBuilder() - .setRequestId(id) - .setFullServiceName(method.getService().getFullName()) - .setMethodName(method.getName()) - .setRequestProto(requestMessage.toByteString()) - .build(); - - return request; - } - - private void finishRequest(Data.Response response) { - OngoingRequest ongoingRequest = rpcs.remove(response.getRequestId()); - if (ongoingRequest != null) { - try { - Message responsePb = ongoingRequest.responsePrototype.toBuilder() - .mergeFrom(response.getResponseProto()).build(); - ongoingRequest.rpc.readFrom(response); - ongoingRequest.done.run(responsePb); - } catch (InvalidProtocolBufferException e) { - throw new AssertionError("Should fail here."); - } - } - } - - @Override public void callMethod( - Descriptors.MethodDescriptor method, - RpcController controller, - Message requestMessage, - Message responsePrototype, - RpcCallback done) { - try { - Data.Request request = createRequest(method, controller, - requestMessage, responsePrototype, done); - Socket socket = getSocket(); - if (socket == null) { - cancelAllRpcs(); - } else { - request.writeDelimitedTo(socket.getOutputStream()); - } - } catch (IOException e) { - throw new AssertionError("Should return error."); - } - } - - private void handleResponses(Socket socket) { - try { - logger.info("Handling responses to socket " + socket); - while (!socket.isClosed()) { - Data.Response response; - response = Data.Response.parseDelimitedFrom( - socket.getInputStream()); - if (response == null) { - throw new IOException("No response."); - } - finishRequest(response); - } - } catch (IOException e) { - if (!rpcs.isEmpty()) { - logger.log(Level.WARNING, "IO Error. Canceling " + - rpcs.size() + " requests.", e); - cancelAllRpcs(); - } - } finally { - if (socket != null && !socket.isClosed()) { - try { - socket.close(); - } catch (IOException e) { - // Socket is closed. - } - } - } - } - - private void cancelAllRpcs() { - synchronized (rpcs) { - for (OngoingRequest request : rpcs.values()) { - request.rpc.setFailed("connection closed"); - request.done.run(null); - } - rpcs.clear(); - } - } - - public void run() { - while (!Thread.interrupted()) { - try { - Socket socket = sockets.take(); - handleResponses(socket); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - public void close() { - if (socket != null) { - try { - this.interrupt(); - socket.close(); - } catch (IOException e) { - logger.info("Error closing socket."); - } - } - } -} \ No newline at end of file diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index e0757d4..1f1bc46 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -18,6 +18,8 @@ package com.orbekk.protobuf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.io.IOException; +import java.net.UnknownHostException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,11 +35,13 @@ public class ProtobufFunctionalTest { CountDownLatch returnC = new CountDownLatch(1); SimpleProtobufServer server = SimpleProtobufServer.create(0, 50, 1); int serverport = server.getPort(); - RpcChannel channel = RpcChannel.create("localhost", serverport); + NewRpcChannel channel; TestService directService = new TestService(); - Test.Service service = Test.Service.newStub(channel); + Test.Service service; - @Before public void setUp() { + @Before public void setUp() throws Exception { + channel = NewRpcChannel.create("localhost", serverport); + service = Test.Service.newStub(channel); server.start(); server.registerService(directService); } @@ -150,7 +154,7 @@ public class ProtobufFunctionalTest { service.testC(rpc, request, new RpcCallback() { @Override public void run(Type2 result) { assertThat(rpc.failed(), is(true)); - assertThat(rpc.errorText(), is("connection closed")); + assertThat(rpc.errorText(), is("channel closed")); stop.countDown(); } }); -- cgit v1.2.3