diff options
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/RpcChannel.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/RpcChannel.java | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java index d0534a4..a558228 100644 --- a/src/main/java/com/orbekk/protobuf/RpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -15,6 +17,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.runner.Request; + import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -28,6 +32,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { private final String host; private final int port; private final AtomicLong nextId = new AtomicLong(0); + private final Timer timer; private final ExecutorService responseHandlerPool = Executors.newSingleThreadExecutor(); private final BlockingQueue<Data.Request> requestQueue = @@ -65,6 +70,20 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { } } + class CancelRequestTask extends TimerTask { + public final long id; + + public CancelRequestTask(long id) { + this.id = id; + } + + @Override + public void run() { + RequestMetadata request = ongoingRequests.remove(id); + cancelRequest(request, "timeout"); + } + } + class OutgoingHandler extends Thread { private final Socket socket; private final BlockingQueue<Data.Request> requests; @@ -156,12 +175,14 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { public static RpcChannel create(String host, int port) throws UnknownHostException, IOException { - RpcChannel channel = new RpcChannel(host, port); + Timer timer = new Timer(); + RpcChannel channel = new RpcChannel(timer, host, port); channel.start(); return channel; } - RpcChannel(String host, int port) { + RpcChannel(Timer timer, String host, int port) { + this.timer = timer; this.host = host; this.port = port; } @@ -196,6 +217,13 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { } } + private void addTimeoutHandler(RequestMetadata request) { + long timeout = request.rpc.getTimout(); + if (timeout > 0) { + timer.schedule(new CancelRequestTask(request.id), timeout); + } + } + @Override public void callMethod(MethodDescriptor method, RpcController rpc, Message requestMessage, @@ -205,6 +233,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel { Rpc rpc_ = (Rpc) rpc; RequestMetadata request_ = new RequestMetadata(id, rpc_, done, responsePrototype); + addTimeoutHandler(request_); ongoingRequests.put(id, request_); Data.Request requestData = Data.Request.newBuilder() |