summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 11:23:25 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-11 11:23:25 +0200
commit5120f84996f8ff8d54c5c119034b9251d07bc07b (patch)
treef4bc30a1f23543904bc20865358bfa55776334ad
parente2bcbb117f2ac03196da34217671f1bc48395598 (diff)
Implement done signal in Rpc.
Clients can wait for an Rpc to complete with Rpc.await().
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java8
-rw-r--r--src/main/java/com/orbekk/protobuf/RequestDispatcher.java2
-rw-r--r--src/main/java/com/orbekk/protobuf/Rpc.java13
-rw-r--r--src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java19
4 files changed, 38 insertions, 4 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index aa18b37..82e3dd2 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -22,7 +22,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
public class NewRpcChannel implements com.google.protobuf.RpcChannel {
- public static int NUM_CONCURRENT_REQUESTS = 50;
+ public static int NUM_CONCURRENT_REQUESTS = 5;
private static final Logger logger =
Logger.getLogger(RpcChannel.class.getName());
private final String host;
@@ -212,7 +212,10 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
private void cancelRequest(RequestMetadata request, String reason) {
- throw new IllegalStateException("Not implemented");
+ request.rpc.setFailed(reason);
+ request.rpc.cancel();
+ request.done.run(null);
+ request.rpc.complete();
}
private void handleResponse(Data.Response response) {
@@ -227,6 +230,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
.mergeFrom(response.getResponseProto()).build();
request.rpc.readFrom(response);
request.done.run(responsePb);
+ request.rpc.complete();
} catch (InvalidProtocolBufferException e) {
cancelRequest(request, "invalid response from server");
}
diff --git a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
index 65239af..3076d7e 100644
--- a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
+++ b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
@@ -11,7 +11,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
public class RequestDispatcher extends Thread {
- public static int DEFAULT_QUEUE_SIZE = 50;
+ public static int DEFAULT_QUEUE_SIZE = 5;
private volatile boolean isStopped = false;
private final BlockingQueue<Data.Response> output;
private final ServiceHolder services;
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java
index c208380..0f6b6cb 100644
--- a/src/main/java/com/orbekk/protobuf/Rpc.java
+++ b/src/main/java/com/orbekk/protobuf/Rpc.java
@@ -24,6 +24,7 @@ import com.google.protobuf.RpcController;
public class Rpc implements RpcController {
private String errorText = "";
+ private CountDownLatch done = new CountDownLatch(1);
private boolean hasFailed;
private boolean canceled;
private List<RpcCallback<Object>> cancelNotificationListeners = null;
@@ -64,6 +65,18 @@ public class Rpc implements RpcController {
return errorText;
}
+ public boolean isDone() {
+ return done.getCount() == 0;
+ }
+
+ public void await() throws InterruptedException {
+ done.await();
+ }
+
+ void complete() {
+ done.countDown();
+ }
+
public boolean isOk() {
return !hasFailed && !canceled;
}
diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
index a7249a1..e0757d4 100644
--- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
+++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
@@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Ignore;
@@ -78,10 +79,10 @@ public class ProtobufFunctionalTest {
Test.Service service = Test.Service.newStub(channel);
Test.Type1 request = Test.Type1.newBuilder().build();
int count = 10000;
- final Rpc rpc = new Rpc();
final CountDownLatch stop = new CountDownLatch(count);
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
+ final Rpc rpc = new Rpc();
service.testA(rpc, request, new RpcCallback<Type2>() {
@Override public void run(Type2 result) {
stop.countDown();
@@ -94,6 +95,22 @@ public class ProtobufFunctionalTest {
elapsedTime + ". " + count/elapsedTime*1000 + "r/s");
}
+ @org.junit.Test public void testDoneSignal() throws Exception {
+ NewRpcChannel channel = NewRpcChannel.create("localhost", serverport);
+ Test.Service service = Test.Service.newStub(channel);
+ Test.Type1 request = Test.Type1.newBuilder().build();
+
+ final AtomicBoolean callbackFinished = new AtomicBoolean(false);
+ Rpc rpc = new Rpc();
+ service.testA(rpc, request, new RpcCallback<Type2>() {
+ @Override public void run(Type2 result) {
+ callbackFinished.set(true);
+ }
+ });
+ rpc.await();
+ assertThat(callbackFinished.get(), is(true));
+ }
+
@org.junit.Test public void respondsNormally() throws Exception {
Test.Type1 request = Test.Type1.newBuilder().build();
int count = 10;