summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/NewRpcChannel.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java49
1 files changed, 31 insertions, 18 deletions
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index afb94af..aa18b37 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -1,9 +1,11 @@
package com.orbekk.protobuf;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
-import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,17 +77,26 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
@Override public void run() {
- for (;;) {
- try {
- Data.Request request = requests.take();
- request.writeDelimitedTo(socket.getOutputStream());
- } catch (InterruptedException e) {
- tryCloseSocket(socket);
- return;
- } catch (IOException e) {
- tryCloseSocket(socket);
- return;
+ try {
+ BufferedOutputStream outputStream = new BufferedOutputStream(
+ socket.getOutputStream());
+ LinkedList<Data.Request> buffer =
+ new LinkedList<Data.Request>();
+ for (;;) {
+ buffer.clear();
+ buffer.add(requests.take());
+ requests.drainTo(buffer);
+ for (Data.Request request : buffer) {
+ request.writeDelimitedTo(outputStream);
+ }
+ outputStream.flush();
}
+ } catch (InterruptedException e) {
+ tryCloseSocket(socket);
+ return;
+ } catch (IOException e) {
+ tryCloseSocket(socket);
+ return;
}
}
@@ -107,16 +118,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
@Override public void run() {
- for (;;) {
- try {
+ try {
+ BufferedInputStream inputStream = new BufferedInputStream(
+ socket.getInputStream());
+ for (;;) {
Data.Response response = Data.Response
- .parseDelimitedFrom(socket.getInputStream());
+ .parseDelimitedFrom(inputStream);
responseHandlerPool.execute(new ResponseHandler(response));
- } catch (IOException e) {
- responseHandlerPool.shutdown();
- tryCloseSocket(socket);
- return;
}
+ } catch (IOException e) {
+ responseHandlerPool.shutdown();
+ tryCloseSocket(socket);
+ return;
}
}