summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 14:53:23 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 14:53:23 +0200
commita4414f27a1076dbf53224c22765c4dc64c5b7bd8 (patch)
tree609d14afce4d17eb57d2996696d94576af70192c
parent6f414b99c94f5e1562785f9f2b95940a50a70a88 (diff)
Remove old RpcChannel.
Fix tests with NewRpcChannel.
-rw-r--r--src/main/java/com/orbekk/example/ExampleClient.java6
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java20
-rw-r--r--src/main/java/com/orbekk/protobuf/RpcChannel.java216
-rw-r--r--src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java12
4 files changed, 29 insertions, 225 deletions
diff --git a/src/main/java/com/orbekk/example/ExampleClient.java b/src/main/java/com/orbekk/example/ExampleClient.java
index 3590182..e047ad9 100644
--- a/src/main/java/com/orbekk/example/ExampleClient.java
+++ b/src/main/java/com/orbekk/example/ExampleClient.java
@@ -19,14 +19,14 @@ import java.util.concurrent.CountDownLatch;
import com.google.protobuf.RpcCallback;
import com.orbekk.example.Example.FortuneReply;
+import com.orbekk.protobuf.NewRpcChannel;
import com.orbekk.protobuf.Rpc;
-import com.orbekk.protobuf.RpcChannel;
public class ExampleClient {
public void runClient(int port) {
- RpcChannel channel = null;
+ NewRpcChannel channel = null;
try {
- channel = RpcChannel.create("localhost", port);
+ channel = NewRpcChannel.createOrNull("localhost", port);
Example.FortuneService service =
Example.FortuneService.newStub(channel);
printFortune(service);
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index 82e3dd2..12f1433 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -24,14 +24,14 @@ import com.google.protobuf.RpcController;
public class NewRpcChannel implements com.google.protobuf.RpcChannel {
public static int NUM_CONCURRENT_REQUESTS = 5;
private static final Logger logger =
- Logger.getLogger(RpcChannel.class.getName());
+ Logger.getLogger(NewRpcChannel.class.getName());
private final String host;
private final int port;
private final AtomicLong nextId = new AtomicLong(0);
private final ExecutorService responseHandlerPool =
Executors.newSingleThreadExecutor();
private final BlockingQueue<Data.Request> requestQueue =
- new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS);
+ new ArrayBlockingQueue<Data.Request>(NUM_CONCURRENT_REQUESTS);
private volatile Socket socket = null;
private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests =
new ConcurrentHashMap<Long, RequestMetadata>();
@@ -124,6 +124,9 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
for (;;) {
Data.Response response = Data.Response
.parseDelimitedFrom(inputStream);
+ if (response == null) {
+ throw new IOException("Could not read response.");
+ }
responseHandlerPool.execute(new ResponseHandler(response));
}
} catch (IOException e) {
@@ -139,6 +142,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ public static NewRpcChannel createOrNull(String host, int port) {
+ try {
+ return create(host, port);
+ } catch (UnknownHostException e) {
+ logger.log(Level.WARNING, "Unable to create RPC channel.", e);
+ return null;
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Unable to create RPC channel.", e);
+ return null;
+ }
+ }
+
public static NewRpcChannel create(String host, int port)
throws UnknownHostException, IOException {
NewRpcChannel channel = new NewRpcChannel(host, port);
@@ -173,6 +188,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
private void tryCloseSocket(Socket socket) {
try {
socket.close();
+ cancelAllRequests("channel closed");
} catch (IOException e1) {
logger.log(Level.WARNING,
"Unable to close socket " + socket,
diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java
deleted file mode 100644
index 8e24d0c..0000000
--- a/src/main/java/com/orbekk/protobuf/RpcChannel.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.orbekk.protobuf;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-public class RpcChannel extends Thread implements
- com.google.protobuf.RpcChannel {
- static final Logger logger =
- Logger.getLogger(RpcChannel.class.getName());
- private final String host;
- private final int port;
- private volatile Socket socket = null;
- private AtomicLong nextId = new AtomicLong(0);
- private Map<Long, RpcChannel.OngoingRequest> rpcs =
- Collections.synchronizedMap(
- new HashMap<Long, RpcChannel.OngoingRequest>());
- private BlockingQueue<Socket> sockets = new LinkedBlockingQueue<Socket>();
-
- private static class OngoingRequest implements Closeable {
- long id;
- Rpc rpc;
- RpcCallback<Message> done;
- Message responsePrototype;
- Map<Long, RpcChannel.OngoingRequest> rpcs;
-
- public OngoingRequest(long id, Rpc rpc,
- RpcCallback<Message> done, Message responsePrototype,
- Map<Long, RpcChannel.OngoingRequest> rpcs) {
- this.id = id;
- this.rpc = rpc;
- this.done = done;
- this.responsePrototype = responsePrototype;
- this.rpcs = rpcs;
- }
-
- @Override
- public void close() throws IOException {
- throw new AssertionError("Not implemented");
- }
- }
-
- public static RpcChannel create(String host, int port) {
- RpcChannel channel = new RpcChannel(host, port);
- channel.start();
- return channel;
- }
-
- private RpcChannel(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- private Socket getSocket() {
- if (socket == null || socket.isClosed()) {
- try {
- logger.info("Creating new socket to " + host + ":" + port);
- socket = new Socket(host, port);
- sockets.add(socket);
- } catch (UnknownHostException e) {
- return null;
- } catch (IOException e) {
- logger.log(Level.WARNING,
- "Could not establish connection.", e);
- return null;
- }
- }
- return socket;
- }
-
- private Data.Request createRequest(Descriptors.MethodDescriptor method,
- RpcController controller,
- Message requestMessage,
- Message responsePrototype,
- RpcCallback<Message> done) {
- long id = nextId.incrementAndGet();
- Rpc rpc = (Rpc)controller;
- OngoingRequest ongoingRequest = new OngoingRequest(id, rpc,
- done, responsePrototype, rpcs);
- rpcs.put(id, ongoingRequest);
-
- Data.Request request = Data.Request.newBuilder()
- .setRequestId(id)
- .setFullServiceName(method.getService().getFullName())
- .setMethodName(method.getName())
- .setRequestProto(requestMessage.toByteString())
- .build();
-
- return request;
- }
-
- private void finishRequest(Data.Response response) {
- OngoingRequest ongoingRequest = rpcs.remove(response.getRequestId());
- if (ongoingRequest != null) {
- try {
- Message responsePb = ongoingRequest.responsePrototype.toBuilder()
- .mergeFrom(response.getResponseProto()).build();
- ongoingRequest.rpc.readFrom(response);
- ongoingRequest.done.run(responsePb);
- } catch (InvalidProtocolBufferException e) {
- throw new AssertionError("Should fail here.");
- }
- }
- }
-
- @Override public void callMethod(
- Descriptors.MethodDescriptor method,
- RpcController controller,
- Message requestMessage,
- Message responsePrototype,
- RpcCallback<Message> done) {
- try {
- Data.Request request = createRequest(method, controller,
- requestMessage, responsePrototype, done);
- Socket socket = getSocket();
- if (socket == null) {
- cancelAllRpcs();
- } else {
- request.writeDelimitedTo(socket.getOutputStream());
- }
- } catch (IOException e) {
- throw new AssertionError("Should return error.");
- }
- }
-
- private void handleResponses(Socket socket) {
- try {
- logger.info("Handling responses to socket " + socket);
- while (!socket.isClosed()) {
- Data.Response response;
- response = Data.Response.parseDelimitedFrom(
- socket.getInputStream());
- if (response == null) {
- throw new IOException("No response.");
- }
- finishRequest(response);
- }
- } catch (IOException e) {
- if (!rpcs.isEmpty()) {
- logger.log(Level.WARNING, "IO Error. Canceling " +
- rpcs.size() + " requests.", e);
- cancelAllRpcs();
- }
- } finally {
- if (socket != null && !socket.isClosed()) {
- try {
- socket.close();
- } catch (IOException e) {
- // Socket is closed.
- }
- }
- }
- }
-
- private void cancelAllRpcs() {
- synchronized (rpcs) {
- for (OngoingRequest request : rpcs.values()) {
- request.rpc.setFailed("connection closed");
- request.done.run(null);
- }
- rpcs.clear();
- }
- }
-
- public void run() {
- while (!Thread.interrupted()) {
- try {
- Socket socket = sockets.take();
- handleResponses(socket);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void close() {
- if (socket != null) {
- try {
- this.interrupt();
- socket.close();
- } catch (IOException e) {
- logger.info("Error closing socket.");
- }
- }
- }
-} \ No newline at end of file
diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
index e0757d4..1f1bc46 100644
--- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
+++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
@@ -18,6 +18,8 @@ package com.orbekk.protobuf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
+import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,11 +35,13 @@ public class ProtobufFunctionalTest {
CountDownLatch returnC = new CountDownLatch(1);
SimpleProtobufServer server = SimpleProtobufServer.create(0, 50, 1);
int serverport = server.getPort();
- RpcChannel channel = RpcChannel.create("localhost", serverport);
+ NewRpcChannel channel;
TestService directService = new TestService();
- Test.Service service = Test.Service.newStub(channel);
+ Test.Service service;
- @Before public void setUp() {
+ @Before public void setUp() throws Exception {
+ channel = NewRpcChannel.create("localhost", serverport);
+ service = Test.Service.newStub(channel);
server.start();
server.registerService(directService);
}
@@ -150,7 +154,7 @@ public class ProtobufFunctionalTest {
service.testC(rpc, request, new RpcCallback<Type2>() {
@Override public void run(Type2 result) {
assertThat(rpc.failed(), is(true));
- assertThat(rpc.errorText(), is("connection closed"));
+ assertThat(rpc.errorText(), is("channel closed"));
stop.countDown();
}
});