summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 17:39:50 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-24 17:39:50 +0200
commit5229954adf0d51afd2d376ed4581ae6d1bd059fa (patch)
tree35cb679e4aa11f43175a3f1a9556285d312e3f1f
parent6794f9be9a55fdf01179a344b68626d1126caa8a (diff)
Add timout support in Rpc.
Use with Rpc.setTimeout().
-rw-r--r--src/main/java/com/orbekk/protobuf/Rpc.java13
-rw-r--r--src/main/java/com/orbekk/protobuf/RpcChannel.java33
-rw-r--r--src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java18
3 files changed, 62 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java
index 34cdb31..f22bbc3 100644
--- a/src/main/java/com/orbekk/protobuf/Rpc.java
+++ b/src/main/java/com/orbekk/protobuf/Rpc.java
@@ -28,6 +28,7 @@ public class Rpc implements RpcController {
private volatile String errorText = "";
private volatile boolean hasFailed;
private volatile boolean canceled;
+ private volatile long timeoutMillis = 0;
private volatile List<RpcCallback<Object>> cancelNotificationListeners = null;
public Rpc() {
@@ -122,6 +123,18 @@ public class Rpc implements RpcController {
}
}
}
+
+ public long getTimout() {
+ return timeoutMillis;
+ }
+
+ /** Set the timeout in number of milliseconds.
+ *
+ * The default timeout is 0, i.e. never time out.
+ */
+ public void setTimout(long milliseconds) {
+ timeoutMillis = milliseconds;
+ }
@Override
public void startCancel() {
diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java
index d0534a4..a558228 100644
--- a/src/main/java/com/orbekk/protobuf/RpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java
@@ -6,6 +6,8 @@ import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -15,6 +17,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.runner.Request;
+
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
@@ -28,6 +32,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
private final String host;
private final int port;
private final AtomicLong nextId = new AtomicLong(0);
+ private final Timer timer;
private final ExecutorService responseHandlerPool =
Executors.newSingleThreadExecutor();
private final BlockingQueue<Data.Request> requestQueue =
@@ -65,6 +70,20 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ class CancelRequestTask extends TimerTask {
+ public final long id;
+
+ public CancelRequestTask(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ RequestMetadata request = ongoingRequests.remove(id);
+ cancelRequest(request, "timeout");
+ }
+ }
+
class OutgoingHandler extends Thread {
private final Socket socket;
private final BlockingQueue<Data.Request> requests;
@@ -156,12 +175,14 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
public static RpcChannel create(String host, int port)
throws UnknownHostException, IOException {
- RpcChannel channel = new RpcChannel(host, port);
+ Timer timer = new Timer();
+ RpcChannel channel = new RpcChannel(timer, host, port);
channel.start();
return channel;
}
- RpcChannel(String host, int port) {
+ RpcChannel(Timer timer, String host, int port) {
+ this.timer = timer;
this.host = host;
this.port = port;
}
@@ -196,6 +217,13 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ private void addTimeoutHandler(RequestMetadata request) {
+ long timeout = request.rpc.getTimout();
+ if (timeout > 0) {
+ timer.schedule(new CancelRequestTask(request.id), timeout);
+ }
+ }
+
@Override
public void callMethod(MethodDescriptor method,
RpcController rpc, Message requestMessage,
@@ -205,6 +233,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
Rpc rpc_ = (Rpc) rpc;
RequestMetadata request_ = new RequestMetadata(id, rpc_, done,
responsePrototype);
+ addTimeoutHandler(request_);
ongoingRequests.put(id, request_);
Data.Request requestData = Data.Request.newBuilder()
diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
index bf30f59..a1661d2 100644
--- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
+++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
@@ -161,4 +161,22 @@ public class ProtobufFunctionalTest {
server.interrupt();
stop.await();
}
+
+ @org.junit.Test public void testTimout() throws Exception {
+ Test.Type1 request = Test.Type1.newBuilder().build();
+ final CountDownLatch stop = new CountDownLatch(1);
+
+ final Rpc rpc = new Rpc();
+ rpc.setTimout(1);
+ service.testC(rpc, request, new RpcCallback<Type2>() {
+ @Override public void run(Type2 result) {
+ stop.countDown();
+ }
+ });
+ rpc.await();
+ assertThat(rpc.failed(), is(true));
+ assertThat(rpc.errorText(), is("timeout"));
+ returnC.countDown();
+ stop.await();
+ }
}