summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-02 00:43:04 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-02 00:43:04 +0200
commit9c982cf3c2c83edd3fdeecf57579a3076109e73c (patch)
tree6ac02d8651c41eef67b12530ba61d7db6b121346
parent79cf815533940217d748c0a34078678598b4a417 (diff)
Use thread pool in SimpleProtobufServer.
-rw-r--r--src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java34
-rw-r--r--src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java2
2 files changed, 24 insertions, 12 deletions
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
index 294a2e1..4fc9808 100644
--- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
+++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
@@ -27,6 +27,8 @@ import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
@@ -34,30 +36,34 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
public class SimpleProtobufServer extends Thread {
- private static Logger logger = Logger.getLogger(
+ private volatile boolean isStopped = false;
+ private final static Logger logger = Logger.getLogger(
SimpleProtobufServer.class.getName());
- private ServerSocket serverSocket;
- private Set<Socket> activeClientSockets =
+ private final ServerSocket serverSocket;
+ private final Set<Socket> activeClientSockets =
Collections.synchronizedSet(new HashSet<Socket>());
- private Map<String, Service> registeredServices =
- Collections.synchronizedMap(
- new HashMap<String, Service>());
+ private final Map<String, Service> registeredServices =
+ Collections.synchronizedMap(new HashMap<String, Service>());
+ private final ExecutorService pool;
- public static SimpleProtobufServer create(int port) {
+ public static SimpleProtobufServer create(int port, int maxNumHandlers) {
try {
InetSocketAddress address = new InetSocketAddress(port);
ServerSocket serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.bind(address);
- return new SimpleProtobufServer(serverSocket);
+ ExecutorService pool =
+ Executors.newFixedThreadPool(maxNumHandlers);
+ return new SimpleProtobufServer(serverSocket, pool);
} catch (IOException e) {
logger.log(Level.WARNING, "Could not create server. ", e);
return null;
}
}
- public SimpleProtobufServer(ServerSocket serverSocket) {
+ public SimpleProtobufServer(ServerSocket serverSocket, ExecutorService pool) {
this.serverSocket = serverSocket;
+ this.pool = pool;
}
public int getPort() {
@@ -111,8 +117,12 @@ public class SimpleProtobufServer extends Thread {
}
private void handleConnection(final Socket connection) {
- new Thread(new Runnable() {
+ if (isStopped) {
+ return;
+ }
+ Runnable handler = new Runnable() {
@Override public void run() {
+ logger.info("Handling client connection " + connection);
activeClientSockets.add(connection);
try {
while (true) {
@@ -137,11 +147,13 @@ public class SimpleProtobufServer extends Thread {
activeClientSockets.remove(connection);
}
}
- }).start();
+ };
+ pool.execute(handler);
}
@Override public void interrupt() {
super.interrupt();
+ isStopped = true;
for (Socket socket : activeClientSockets) {
try {
socket.close();
diff --git a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
index 22b5e31..cb1cf0d 100644
--- a/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
+++ b/src/test/java/com/orbekk/protobuf/ProtobufFunctionalTest.java
@@ -28,7 +28,7 @@ import com.orbekk.protobuf.Test.Type2;
public class ProtobufFunctionalTest {
CountDownLatch returnC = new CountDownLatch(1);
- SimpleProtobufServer server = SimpleProtobufServer.create(0);
+ SimpleProtobufServer server = SimpleProtobufServer.create(0, 50);
int serverport = server.getPort();
RpcChannel channel = RpcChannel.create("localhost", serverport);
TestService directService = new TestService();