diff options
| author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 17:39:50 +0200 | 
|---|---|---|
| committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-24 17:39:50 +0200 | 
| commit | 5229954adf0d51afd2d376ed4581ae6d1bd059fa (patch) | |
| tree | 35cb679e4aa11f43175a3f1a9556285d312e3f1f /src | |
| parent | 6794f9be9a55fdf01179a344b68626d1126caa8a (diff) | |
Add timout support in Rpc.
Use with Rpc.setTimeout().
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/Rpc.java | 13 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/RpcChannel.java | 33 | ||||
| -rw-r--r-- | src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java | 18 | 
3 files changed, 62 insertions, 2 deletions
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java index 34cdb31..f22bbc3 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.java +++ b/src/main/java/com/orbekk/protobuf/Rpc.java @@ -28,6 +28,7 @@ public class Rpc implements RpcController {      private volatile String errorText = "";      private volatile boolean hasFailed;      private volatile boolean canceled; +    private volatile long timeoutMillis = 0;      private volatile List<RpcCallback<Object>> cancelNotificationListeners = null;      public Rpc() { @@ -122,6 +123,18 @@ public class Rpc implements RpcController {              }          }      } +     +    public long getTimout() { +        return timeoutMillis; +    } +     +    /** Set the timeout in number of milliseconds. +     *  +     * The default timeout is 0, i.e. never time out. +     */ +    public void setTimout(long milliseconds) { +        timeoutMillis = milliseconds; +    }      @Override      public void startCancel() { diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java index d0534a4..a558228 100644 --- a/src/main/java/com/orbekk/protobuf/RpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java @@ -6,6 +6,8 @@ import java.io.IOException;  import java.net.Socket;  import java.net.UnknownHostException;  import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask;  import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.BlockingQueue;  import java.util.concurrent.ConcurrentHashMap; @@ -15,6 +17,8 @@ import java.util.concurrent.atomic.AtomicLong;  import java.util.logging.Level;  import java.util.logging.Logger; +import org.junit.runner.Request; +  import com.google.protobuf.Descriptors.MethodDescriptor;  import com.google.protobuf.InvalidProtocolBufferException;  import com.google.protobuf.Message; @@ -28,6 +32,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {      private final String host;      private final int port;      private final AtomicLong nextId = new AtomicLong(0); +    private final Timer timer;      private final ExecutorService responseHandlerPool =              Executors.newSingleThreadExecutor();      private final BlockingQueue<Data.Request> requestQueue = @@ -65,6 +70,20 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {          }      } +    class CancelRequestTask extends TimerTask { +        public final long id; +         +        public CancelRequestTask(long id) { +            this.id = id; +        } + +        @Override +        public void run() { +            RequestMetadata request = ongoingRequests.remove(id); +            cancelRequest(request, "timeout"); +        } +    } +          class OutgoingHandler extends Thread {          private final Socket socket;          private final BlockingQueue<Data.Request> requests; @@ -156,12 +175,14 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {      public static RpcChannel create(String host, int port)              throws UnknownHostException, IOException { -        RpcChannel channel = new RpcChannel(host, port); +        Timer timer = new Timer(); +        RpcChannel channel = new RpcChannel(timer, host, port);          channel.start();          return channel;      } -    RpcChannel(String host, int port) { +    RpcChannel(Timer timer, String host, int port) { +        this.timer = timer;          this.host = host;          this.port = port;      } @@ -196,6 +217,13 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {          }      } +    private void addTimeoutHandler(RequestMetadata request) { +        long timeout = request.rpc.getTimout(); +        if (timeout > 0) { +            timer.schedule(new CancelRequestTask(request.id), timeout); +        } +    } +          @Override      public void callMethod(MethodDescriptor method,              RpcController rpc, Message requestMessage, @@ -205,6 +233,7 @@ public class RpcChannel implements com.google.protobuf.RpcChannel {          Rpc rpc_ = (Rpc) rpc;          RequestMetadata request_ = new RequestMetadata(id, rpc_, done,                  responsePrototype); +        addTimeoutHandler(request_);          ongoingRequests.put(id, request_);          Data.Request requestData = Data.Request.newBuilder() diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index bf30f59..a1661d2 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -161,4 +161,22 @@ public class ProtobufFunctionalTest {          server.interrupt();          stop.await();      } +     +    @org.junit.Test public void testTimout() throws Exception { +        Test.Type1 request = Test.Type1.newBuilder().build(); +        final CountDownLatch stop = new CountDownLatch(1); + +        final Rpc rpc = new Rpc(); +        rpc.setTimout(1); +        service.testC(rpc, request, new RpcCallback<Type2>() { +            @Override public void run(Type2 result) { +                stop.countDown(); +            } +        }); +        rpc.await(); +        assertThat(rpc.failed(), is(true)); +        assertThat(rpc.errorText(), is("timeout")); +        returnC.countDown(); +        stop.await(); +    }  }  | 
