From 5120f84996f8ff8d54c5c119034b9251d07bc07b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Apr 2012 11:23:25 +0200 Subject: Implement done signal in Rpc. Clients can wait for an Rpc to complete with Rpc.await(). --- src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 8 ++++++-- .../java/com/orbekk/protobuf/RequestDispatcher.java | 2 +- src/main/java/com/orbekk/protobuf/Rpc.java | 13 +++++++++++++ .../com/orbekk/protobuf/ProtobufFunctionalTest.java | 19 ++++++++++++++++++- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index aa18b37..82e3dd2 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -22,7 +22,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; public class NewRpcChannel implements com.google.protobuf.RpcChannel { - public static int NUM_CONCURRENT_REQUESTS = 50; + public static int NUM_CONCURRENT_REQUESTS = 5; private static final Logger logger = Logger.getLogger(RpcChannel.class.getName()); private final String host; @@ -212,7 +212,10 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } private void cancelRequest(RequestMetadata request, String reason) { - throw new IllegalStateException("Not implemented"); + request.rpc.setFailed(reason); + request.rpc.cancel(); + request.done.run(null); + request.rpc.complete(); } private void handleResponse(Data.Response response) { @@ -227,6 +230,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { .mergeFrom(response.getResponseProto()).build(); request.rpc.readFrom(response); request.done.run(responsePb); + request.rpc.complete(); } catch (InvalidProtocolBufferException e) { cancelRequest(request, "invalid response from server"); } diff --git a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java index 65239af..3076d7e 100644 --- a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java +++ b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java @@ -11,7 +11,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.Service; public class RequestDispatcher extends Thread { - public static int DEFAULT_QUEUE_SIZE = 50; + public static int DEFAULT_QUEUE_SIZE = 5; private volatile boolean isStopped = false; private final BlockingQueue output; private final ServiceHolder services; diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java index c208380..0f6b6cb 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.java +++ b/src/main/java/com/orbekk/protobuf/Rpc.java @@ -24,6 +24,7 @@ import com.google.protobuf.RpcController; public class Rpc implements RpcController { private String errorText = ""; + private CountDownLatch done = new CountDownLatch(1); private boolean hasFailed; private boolean canceled; private List> cancelNotificationListeners = null; @@ -64,6 +65,18 @@ public class Rpc implements RpcController { return errorText; } + public boolean isDone() { + return done.getCount() == 0; + } + + public void await() throws InterruptedException { + done.await(); + } + + void complete() { + done.countDown(); + } + public boolean isOk() { return !hasFailed && !canceled; } diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index a7249a1..e0757d4 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Ignore; @@ -78,10 +79,10 @@ public class ProtobufFunctionalTest { Test.Service service = Test.Service.newStub(channel); Test.Type1 request = Test.Type1.newBuilder().build(); int count = 10000; - final Rpc rpc = new Rpc(); final CountDownLatch stop = new CountDownLatch(count); long startTime = System.currentTimeMillis(); for (int i = 0; i < count; i++) { + final Rpc rpc = new Rpc(); service.testA(rpc, request, new RpcCallback() { @Override public void run(Type2 result) { stop.countDown(); @@ -94,6 +95,22 @@ public class ProtobufFunctionalTest { elapsedTime + ". " + count/elapsedTime*1000 + "r/s"); } + @org.junit.Test public void testDoneSignal() throws Exception { + NewRpcChannel channel = NewRpcChannel.create("localhost", serverport); + Test.Service service = Test.Service.newStub(channel); + Test.Type1 request = Test.Type1.newBuilder().build(); + + final AtomicBoolean callbackFinished = new AtomicBoolean(false); + Rpc rpc = new Rpc(); + service.testA(rpc, request, new RpcCallback() { + @Override public void run(Type2 result) { + callbackFinished.set(true); + } + }); + rpc.await(); + assertThat(callbackFinished.get(), is(true)); + } + @org.junit.Test public void respondsNormally() throws Exception { Test.Type1 request = Test.Type1.newBuilder().build(); int count = 10; -- cgit v1.2.3