diff options
Diffstat (limited to 'src')
4 files changed, 38 insertions, 4 deletions
| diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index aa18b37..82e3dd2 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -22,7 +22,7 @@ import com.google.protobuf.RpcCallback;  import com.google.protobuf.RpcController;  public class NewRpcChannel implements com.google.protobuf.RpcChannel { -    public static int NUM_CONCURRENT_REQUESTS = 50; +    public static int NUM_CONCURRENT_REQUESTS = 5;      private static final Logger logger =              Logger.getLogger(RpcChannel.class.getName());      private final String host; @@ -212,7 +212,10 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {      }      private void cancelRequest(RequestMetadata request, String reason) { -        throw new IllegalStateException("Not implemented"); +        request.rpc.setFailed(reason); +        request.rpc.cancel(); +        request.done.run(null); +        request.rpc.complete();      }      private void handleResponse(Data.Response response) { @@ -227,6 +230,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {                      .mergeFrom(response.getResponseProto()).build();              request.rpc.readFrom(response);              request.done.run(responsePb); +            request.rpc.complete();          } catch (InvalidProtocolBufferException e) {              cancelRequest(request, "invalid response from server");          } diff --git a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java index 65239af..3076d7e 100644 --- a/src/main/java/com/orbekk/protobuf/RequestDispatcher.java +++ b/src/main/java/com/orbekk/protobuf/RequestDispatcher.java @@ -11,7 +11,7 @@ import com.google.protobuf.RpcCallback;  import com.google.protobuf.Service;  public class RequestDispatcher extends Thread { -    public static int DEFAULT_QUEUE_SIZE = 50; +    public static int DEFAULT_QUEUE_SIZE = 5;      private volatile boolean isStopped = false;      private final BlockingQueue<Data.Response> output;      private final ServiceHolder services; diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java index c208380..0f6b6cb 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.java +++ b/src/main/java/com/orbekk/protobuf/Rpc.java @@ -24,6 +24,7 @@ import com.google.protobuf.RpcController;  public class Rpc implements RpcController {      private String errorText = ""; +    private CountDownLatch done = new CountDownLatch(1);      private boolean hasFailed;      private boolean canceled;      private List<RpcCallback<Object>> cancelNotificationListeners = null; @@ -64,6 +65,18 @@ public class Rpc implements RpcController {          return errorText;      } +    public boolean isDone() { +        return done.getCount() == 0; +    } +     +    public void await() throws InterruptedException { +        done.await(); +    } +     +    void complete() { +        done.countDown(); +    } +          public boolean isOk() {          return !hasFailed && !canceled;      } diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index a7249a1..e0757d4 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is;  import static org.junit.Assert.assertThat;  import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean;  import org.junit.Before;  import org.junit.Ignore; @@ -78,10 +79,10 @@ public class ProtobufFunctionalTest {          Test.Service service = Test.Service.newStub(channel);          Test.Type1 request = Test.Type1.newBuilder().build();          int count = 10000; -        final Rpc rpc = new Rpc();          final CountDownLatch stop = new CountDownLatch(count);          long startTime = System.currentTimeMillis();          for (int i = 0; i < count; i++) { +            final Rpc rpc = new Rpc();              service.testA(rpc, request, new RpcCallback<Type2>() {                  @Override public void run(Type2 result) {                      stop.countDown(); @@ -94,6 +95,22 @@ public class ProtobufFunctionalTest {                  elapsedTime + ". " + count/elapsedTime*1000 + "r/s");      } +    @org.junit.Test public void testDoneSignal() throws Exception { +        NewRpcChannel channel = NewRpcChannel.create("localhost", serverport); +        Test.Service service = Test.Service.newStub(channel); +        Test.Type1 request = Test.Type1.newBuilder().build(); + +        final AtomicBoolean callbackFinished = new AtomicBoolean(false); +        Rpc rpc = new Rpc(); +        service.testA(rpc, request, new RpcCallback<Type2>() { +            @Override public void run(Type2 result) { +                callbackFinished.set(true); +            } +        }); +        rpc.await(); +        assertThat(callbackFinished.get(), is(true)); +    } +          @org.junit.Test public void respondsNormally() throws Exception {          Test.Type1 request = Test.Type1.newBuilder().build();          int count = 10; | 
