diff options
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 49 |
1 files changed, 31 insertions, 18 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index afb94af..aa18b37 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -1,9 +1,11 @@ package com.orbekk.protobuf; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; -import java.util.Map; +import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -75,17 +77,26 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } @Override public void run() { - for (;;) { - try { - Data.Request request = requests.take(); - request.writeDelimitedTo(socket.getOutputStream()); - } catch (InterruptedException e) { - tryCloseSocket(socket); - return; - } catch (IOException e) { - tryCloseSocket(socket); - return; + try { + BufferedOutputStream outputStream = new BufferedOutputStream( + socket.getOutputStream()); + LinkedList<Data.Request> buffer = + new LinkedList<Data.Request>(); + for (;;) { + buffer.clear(); + buffer.add(requests.take()); + requests.drainTo(buffer); + for (Data.Request request : buffer) { + request.writeDelimitedTo(outputStream); + } + outputStream.flush(); } + } catch (InterruptedException e) { + tryCloseSocket(socket); + return; + } catch (IOException e) { + tryCloseSocket(socket); + return; } } @@ -107,16 +118,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } @Override public void run() { - for (;;) { - try { + try { + BufferedInputStream inputStream = new BufferedInputStream( + socket.getInputStream()); + for (;;) { Data.Response response = Data.Response - .parseDelimitedFrom(socket.getInputStream()); + .parseDelimitedFrom(inputStream); responseHandlerPool.execute(new ResponseHandler(response)); - } catch (IOException e) { - responseHandlerPool.shutdown(); - tryCloseSocket(socket); - return; } + } catch (IOException e) { + responseHandlerPool.shutdown(); + tryCloseSocket(socket); + return; } } |