From 9c982cf3c2c83edd3fdeecf57579a3076109e73c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 2 Apr 2012 00:43:04 +0200 Subject: Use thread pool in SimpleProtobufServer. --- .../com/orbekk/protobuf/SimpleProtobufServer.java | 34 +++++++++++++++------- .../orbekk/protobuf/ProtobufFunctionalTest.java | 2 +- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java index 294a2e1..4fc9808 100644 --- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java +++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java @@ -27,6 +27,8 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.HashSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; @@ -34,30 +36,34 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.Service; public class SimpleProtobufServer extends Thread { - private static Logger logger = Logger.getLogger( + private volatile boolean isStopped = false; + private final static Logger logger = Logger.getLogger( SimpleProtobufServer.class.getName()); - private ServerSocket serverSocket; - private Set activeClientSockets = + private final ServerSocket serverSocket; + private final Set activeClientSockets = Collections.synchronizedSet(new HashSet()); - private Map registeredServices = - Collections.synchronizedMap( - new HashMap()); + private final Map registeredServices = + Collections.synchronizedMap(new HashMap()); + private final ExecutorService pool; - public static SimpleProtobufServer create(int port) { + public static SimpleProtobufServer create(int port, int maxNumHandlers) { try { InetSocketAddress address = new InetSocketAddress(port); ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); serverSocket.bind(address); - return new SimpleProtobufServer(serverSocket); + ExecutorService pool = + Executors.newFixedThreadPool(maxNumHandlers); + return new SimpleProtobufServer(serverSocket, pool); } catch (IOException e) { logger.log(Level.WARNING, "Could not create server. ", e); return null; } } - public SimpleProtobufServer(ServerSocket serverSocket) { + public SimpleProtobufServer(ServerSocket serverSocket, ExecutorService pool) { this.serverSocket = serverSocket; + this.pool = pool; } public int getPort() { @@ -111,8 +117,12 @@ public class SimpleProtobufServer extends Thread { } private void handleConnection(final Socket connection) { - new Thread(new Runnable() { + if (isStopped) { + return; + } + Runnable handler = new Runnable() { @Override public void run() { + logger.info("Handling client connection " + connection); activeClientSockets.add(connection); try { while (true) { @@ -137,11 +147,13 @@ public class SimpleProtobufServer extends Thread { activeClientSockets.remove(connection); } } - }).start(); + }; + pool.execute(handler); } @Override public void interrupt() { super.interrupt(); + isStopped = true; for (Socket socket : activeClientSockets) { try { socket.close(); diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java index 22b5e31..cb1cf0d 100644 --- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java +++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java @@ -28,7 +28,7 @@ import com.orbekk.protobuf.Test.Type2; public class ProtobufFunctionalTest { CountDownLatch returnC = new CountDownLatch(1); - SimpleProtobufServer server = SimpleProtobufServer.create(0); + SimpleProtobufServer server = SimpleProtobufServer.create(0, 50); int serverport = server.getPort(); RpcChannel channel = RpcChannel.create("localhost", serverport); TestService directService = new TestService(); -- cgit v1.2.3