From 5229954adf0d51afd2d376ed4581ae6d1bd059fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 24 Apr 2012 17:39:50 +0200 Subject: Add timout support in Rpc. Use with Rpc.setTimeout(). --- src/main/java/com/orbekk/protobuf/Rpc.java | 13 +++++++++ src/main/java/com/orbekk/protobuf/RpcChannel.java | 33 ++++++++++++++++++++-- .../orbekk/protobuf/ProtobufFunctionalTest.java | 18 ++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java index 34cdb31..f22bbc3 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.java +++ b/src/main/java/com/orbekk/protobuf/Rpc.java @@ -28,6 +28,7 @@ public class Rpc implements RpcController { private volatile String errorText = ""; private volatile boolean hasFailed; private volatile boolean canceled; + private volatile long timeoutMillis = 0; private volatile List> cancelNotificationListeners = null; public Rpc() { @@ -122,6 +123,18 @@ public class Rpc implements RpcController { } } } + + public long getTimout() { + return timeoutMillis; + } + + /** Set the timeout in number of milliseconds. + * + * The default timeout is 0, i.e. never time out. + */ + public void setTimout(long milliseconds) { + timeoutMillis = milliseconds; + } @Override public void startCancel() { diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java index d0534a4..a558228 100644 --- a/src/main/java/com/orbekk/protobuf/RpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -15,6 +17,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.runner.Request; + import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -28,6 +32,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { private final String host; private final int port; private final AtomicLong nextId = new AtomicLong(0); + private final Timer timer; private final ExecutorService responseHandlerPool = Executors.newSingleThreadExecutor(); private final BlockingQueue requestQueue = @@ -65,6 +70,20 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { } } + class CancelRequestTask extends TimerTask { + public final long id; + + public CancelRequestTask(long id) { + this.id = id; + } + + @Override + public void run() { + RequestMetadata request = ongoingRequests.remove(id); + cancelRequest(request, "timeout"); + } + } + class OutgoingHandler extends Thread { private final Socket socket; private final BlockingQueue requests; @@ -156,12 +175,14 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { public static RpcChannel create(String host, int port) throws UnknownHostException, IOException { - RpcChannel channel = new RpcChannel(host, port); + Timer timer = new Timer(); + RpcChannel channel = new RpcChannel(timer, host, port); channel.start(); return channel; } - RpcChannel(String host, int port) { + RpcChannel(Timer timer, String host, int port) { + this.timer = timer; this.host = host; this.port = port; } @@ -196,6 +217,13 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { } } + private void addTimeoutHandler(RequestMetadata request) { + long timeout = request.rpc.getTimout(); + if (timeout > 0) { + timer.schedule(new CancelRequestTask(request.id), timeout); + } + } + @Override public void callMethod(MethodDescriptor method, RpcController rpc, Message requestMessage, @@ -205,6 +233,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { Rpc rpc_ = (Rpc) rpc; RequestMetadata request_ = new RequestMetadata(id, rpc_, done, responsePrototype); + addTimeoutHandler(request_); ongoingRequests.put(id, request_); Data.Request requestData = Data.Request.newBuilder() diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index bf30f59..a1661d2 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -161,4 +161,22 @@ public class ProtobufFunctionalTest { server.interrupt(); stop.await(); } + + @org.junit.Test public void testTimout() throws Exception { + Test.Type1 request = Test.Type1.newBuilder().build(); + final CountDownLatch stop = new CountDownLatch(1); + + final Rpc rpc = new Rpc(); + rpc.setTimout(1); + service.testC(rpc, request, new RpcCallback() { + @Override public void run(Type2 result) { + stop.countDown(); + } + }); + rpc.await(); + assertThat(rpc.failed(), is(true)); + assertThat(rpc.errorText(), is("timeout")); + returnC.countDown(); + stop.await(); + } } -- cgit v1.2.3