summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/RpcChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/RpcChannel.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/RpcChannel.java33
1 files changed, 31 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java
index d0534a4..a558228 100644
--- a/src/main/java/com/orbekk/protobuf/RpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java
@@ -6,6 +6,8 @@ import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -15,6 +17,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.runner.Request;
+
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
@@ -28,6 +32,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
private final String host;
private final int port;
private final AtomicLong nextId = new AtomicLong(0);
+ private final Timer timer;
private final ExecutorService responseHandlerPool =
Executors.newSingleThreadExecutor();
private final BlockingQueue<Data.Request> requestQueue =
@@ -65,6 +70,20 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ class CancelRequestTask extends TimerTask {
+ public final long id;
+
+ public CancelRequestTask(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ RequestMetadata request = ongoingRequests.remove(id);
+ cancelRequest(request, "timeout");
+ }
+ }
+
class OutgoingHandler extends Thread {
private final Socket socket;
private final BlockingQueue<Data.Request> requests;
@@ -156,12 +175,14 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
public static RpcChannel create(String host, int port)
throws UnknownHostException, IOException {
- RpcChannel channel = new RpcChannel(host, port);
+ Timer timer = new Timer();
+ RpcChannel channel = new RpcChannel(timer, host, port);
channel.start();
return channel;
}
- RpcChannel(String host, int port) {
+ RpcChannel(Timer timer, String host, int port) {
+ this.timer = timer;
this.host = host;
this.port = port;
}
@@ -196,6 +217,13 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
}
}
+ private void addTimeoutHandler(RequestMetadata request) {
+ long timeout = request.rpc.getTimout();
+ if (timeout > 0) {
+ timer.schedule(new CancelRequestTask(request.id), timeout);
+ }
+ }
+
@Override
public void callMethod(MethodDescriptor method,
RpcController rpc, Message requestMessage,
@@ -205,6 +233,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {
Rpc rpc_ = (Rpc) rpc;
RequestMetadata request_ = new RequestMetadata(id, rpc_, done,
responsePrototype);
+ addTimeoutHandler(request_);
ongoingRequests.put(id, request_);
Data.Request requestData = Data.Request.newBuilder()