summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-10 19:04:05 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-10 19:04:05 +0200
commite2bcbb117f2ac03196da34217671f1bc48395598 (patch)
tree990f92f4271d3929fa4fcc12305c4a3fd3d58111
parentbb4dc8012467d47ed21a8fa1cf11fca27a1fafd7 (diff)
Change in buffering.
-rw-r--r--src/main/java/com/orbekk/protobuf/ConnectionHandler.java14
-rw-r--r--src/main/java/com/orbekk/protobuf/NewRpcChannel.java49
-rw-r--r--src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java6
3 files changed, 41 insertions, 28 deletions
diff --git a/src/main/java/com/orbekk/protobuf/ConnectionHandler.java b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java
index d9bc6da..db5a582 100644
--- a/src/main/java/com/orbekk/protobuf/ConnectionHandler.java
+++ b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java
@@ -4,6 +4,7 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
+import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -45,13 +46,16 @@ public class ConnectionHandler {
try {
BufferedOutputStream output = new BufferedOutputStream(
connection.getOutputStream());
+ LinkedList<Data.Response> buffer =
+ new LinkedList<Data.Response>();
while (!connection.isClosed()) {
- Data.Response response = dispatcherOutput.take();
- try {
- response.writeDelimitedTo(connection.getOutputStream());
- } catch (IOException e) {
- tryCloseConnection();
+ buffer.clear();
+ buffer.add(dispatcherOutput.take());
+ dispatcherOutput.drainTo(buffer);
+ for (Data.Response response : buffer) {
+ response.writeDelimitedTo(output);
}
+ output.flush();
}
} catch (InterruptedException e) {
tryCloseConnection();
diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
index afb94af..aa18b37 100644
--- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
+++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java
@@ -1,9 +1,11 @@
package com.orbekk.protobuf;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
-import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,17 +77,26 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
@Override public void run() {
- for (;;) {
- try {
- Data.Request request = requests.take();
- request.writeDelimitedTo(socket.getOutputStream());
- } catch (InterruptedException e) {
- tryCloseSocket(socket);
- return;
- } catch (IOException e) {
- tryCloseSocket(socket);
- return;
+ try {
+ BufferedOutputStream outputStream = new BufferedOutputStream(
+ socket.getOutputStream());
+ LinkedList<Data.Request> buffer =
+ new LinkedList<Data.Request>();
+ for (;;) {
+ buffer.clear();
+ buffer.add(requests.take());
+ requests.drainTo(buffer);
+ for (Data.Request request : buffer) {
+ request.writeDelimitedTo(outputStream);
+ }
+ outputStream.flush();
}
+ } catch (InterruptedException e) {
+ tryCloseSocket(socket);
+ return;
+ } catch (IOException e) {
+ tryCloseSocket(socket);
+ return;
}
}
@@ -107,16 +118,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {
}
@Override public void run() {
- for (;;) {
- try {
+ try {
+ BufferedInputStream inputStream = new BufferedInputStream(
+ socket.getInputStream());
+ for (;;) {
Data.Response response = Data.Response
- .parseDelimitedFrom(socket.getInputStream());
+ .parseDelimitedFrom(inputStream);
responseHandlerPool.execute(new ResponseHandler(response));
- } catch (IOException e) {
- responseHandlerPool.shutdown();
- tryCloseSocket(socket);
- return;
}
+ } catch (IOException e) {
+ responseHandlerPool.shutdown();
+ tryCloseSocket(socket);
+ return;
}
}
diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
index b3875e3..a7249a1 100644
--- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
+++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
@@ -73,15 +73,11 @@ public class ProtobufFunctionalTest {
}
}
- @org.junit.Test public void testConnectionRefused() throws Exception {
- }
-
- @Ignore
@org.junit.Test public void testNewRpcChannel() throws Exception {
NewRpcChannel channel = NewRpcChannel.create("localhost", serverport);
Test.Service service = Test.Service.newStub(channel);
Test.Type1 request = Test.Type1.newBuilder().build();
- int count = 5000000;
+ int count = 10000;
final Rpc rpc = new Rpc();
final CountDownLatch stop = new CountDownLatch(count);
long startTime = System.currentTimeMillis();