summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java8
-rw-r--r--src/main/java/com/orbekk/protobuf/RequestDispatcher.java2
-rw-r--r--src/main/java/com/orbekk/protobuf/Rpc.java13
3 files changed, 20 insertions, 3 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index aa18b37..82e3dd2 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -22,7 +22,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
public class NewRpcChannel implements com.google.protobuf.RpcChannel {
- public static int NUM_CONCURRENT_REQUESTS = 50;
+ public static int NUM_CONCURRENT_REQUESTS = 5;
private static final Logger logger =
Logger.getLogger(RpcChannel.class.getName());
private final String host;
@@ -212,7 +212,10 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
private void cancelRequest(RequestMetadata request, String reason) {
- throw new IllegalStateException("Not implemented");
+ request.rpc.setFailed(reason);
+ request.rpc.cancel();
+ request.done.run(null);
+ request.rpc.complete();
}
private void handleResponse(Data.Response response) {
@@ -227,6 +230,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
.mergeFrom(response.getResponseProto()).build();
request.rpc.readFrom(response);
request.done.run(responsePb);
+ request.rpc.complete();
} catch (InvalidProtocolBufferException e) {
cancelRequest(request, "invalid response from server");
}
diff --git a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
index 65239af..3076d7e 100644
--- a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
+++ b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java
@@ -11,7 +11,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
public class RequestDispatcher extends Thread {
- public static int DEFAULT_QUEUE_SIZE = 50;
+ public static int DEFAULT_QUEUE_SIZE = 5;
private volatile boolean isStopped = false;
private final BlockingQueue<Data.Response> output;
private final ServiceHolder services;
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java
index c208380..0f6b6cb 100644
--- a/src/main/java/com/orbekk/protobuf/Rpc.java
+++ b/src/main/java/com/orbekk/protobuf/Rpc.java
@@ -24,6 +24,7 @@ import com.google.protobuf.RpcController;
public class Rpc implements RpcController {
private String errorText = "";
+ private CountDownLatch done = new CountDownLatch(1);
private boolean hasFailed;
private boolean canceled;
private List<RpcCallback<Object>> cancelNotificationListeners = null;
@@ -64,6 +65,18 @@ public class Rpc implements RpcController {
return errorText;
}
+ public boolean isDone() {
+ return done.getCount() == 0;
+ }
+
+ public void await() throws InterruptedException {
+ done.await();
+ }
+
+ void complete() {
+ done.countDown();
+ }
+
public boolean isOk() {
return !hasFailed && !canceled;
}