From e2bcbb117f2ac03196da34217671f1bc48395598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 10 Apr 2012 19:04:05 +0200 Subject: Change in buffering. --- .../com/orbekk/protobuf/ConnectionHandler.java | 14 ++++--- .../java/com/orbekk/protobuf/NewRpcChannel.java | 49 ++++++++++++++-------- .../orbekk/protobuf/ProtobufFunctionalTest.java | 6 +-- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/orbekk/protobuf/ConnectionHandler.java b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java index d9bc6da..db5a582 100644 --- a/src/main/java/com/orbekk/protobuf/ConnectionHandler.java +++ b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java @@ -4,6 +4,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; import java.net.Socket; +import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -45,13 +46,16 @@ public class ConnectionHandler { try { BufferedOutputStream output = new BufferedOutputStream( connection.getOutputStream()); + LinkedList buffer = + new LinkedList(); while (!connection.isClosed()) { - Data.Response response = dispatcherOutput.take(); - try { - response.writeDelimitedTo(connection.getOutputStream()); - } catch (IOException e) { - tryCloseConnection(); + buffer.clear(); + buffer.add(dispatcherOutput.take()); + dispatcherOutput.drainTo(buffer); + for (Data.Response response : buffer) { + response.writeDelimitedTo(output); } + output.flush(); } } catch (InterruptedException e) { tryCloseConnection(); diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index afb94af..aa18b37 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -1,9 +1,11 @@ package com.orbekk.protobuf; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; -import java.util.Map; +import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -75,17 +77,26 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } @Override public void run() { - for (;;) { - try { - Data.Request request = requests.take(); - request.writeDelimitedTo(socket.getOutputStream()); - } catch (InterruptedException e) { - tryCloseSocket(socket); - return; - } catch (IOException e) { - tryCloseSocket(socket); - return; + try { + BufferedOutputStream outputStream = new BufferedOutputStream( + socket.getOutputStream()); + LinkedList buffer = + new LinkedList(); + for (;;) { + buffer.clear(); + buffer.add(requests.take()); + requests.drainTo(buffer); + for (Data.Request request : buffer) { + request.writeDelimitedTo(outputStream); + } + outputStream.flush(); } + } catch (InterruptedException e) { + tryCloseSocket(socket); + return; + } catch (IOException e) { + tryCloseSocket(socket); + return; } } @@ -107,16 +118,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel { } @Override public void run() { - for (;;) { - try { + try { + BufferedInputStream inputStream = new BufferedInputStream( + socket.getInputStream()); + for (;;) { Data.Response response = Data.Response - .parseDelimitedFrom(socket.getInputStream()); + .parseDelimitedFrom(inputStream); responseHandlerPool.execute(new ResponseHandler(response)); - } catch (IOException e) { - responseHandlerPool.shutdown(); - tryCloseSocket(socket); - return; } + } catch (IOException e) { + responseHandlerPool.shutdown(); + tryCloseSocket(socket); + return; } } diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index b3875e3..a7249a1 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -73,15 +73,11 @@ public class ProtobufFunctionalTest { } } - @org.junit.Test public void testConnectionRefused() throws Exception { - } - - @Ignore @org.junit.Test public void testNewRpcChannel() throws Exception { NewRpcChannel channel = NewRpcChannel.create("localhost", serverport); Test.Service service = Test.Service.newStub(channel); Test.Type1 request = Test.Type1.newBuilder().build(); - int count = 5000000; + int count = 10000; final Rpc rpc = new Rpc(); final CountDownLatch stop = new CountDownLatch(count); long startTime = System.currentTimeMillis(); -- cgit v1.2.3