summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java150
1 files changed, 51 insertions, 99 deletions
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
index 4fc9808..14887d8 100644
--- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
+++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
@@ -16,54 +16,63 @@
package com.orbekk.protobuf;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
public class SimpleProtobufServer extends Thread {
- private volatile boolean isStopped = false;
+ private static int DEFAULT_NUM_HANDLERS = 20;
+ private static int DEFAULT_CONCURRENT_REQUESTS = 1;
+
private final static Logger logger = Logger.getLogger(
SimpleProtobufServer.class.getName());
private final ServerSocket serverSocket;
- private final Set<Socket> activeClientSockets =
- Collections.synchronizedSet(new HashSet<Socket>());
- private final Map<String, Service> registeredServices =
- Collections.synchronizedMap(new HashMap<String, Service>());
- private final ExecutorService pool;
-
- public static SimpleProtobufServer create(int port, int maxNumHandlers) {
+ private final ExecutorService incomingHandlerPool;
+ private final ExecutorService outgoingHandlerPool =
+ Executors.newCachedThreadPool();
+ private final ExecutorService requestHandlerPool;
+ private final ServiceHolder services = new ServiceHolder();
+ private final Set<ConnectionHandler> activeConnections =
+ Collections.synchronizedSet(new HashSet<ConnectionHandler>());
+
+ public static SimpleProtobufServer create(int port) {
+ return create(port, DEFAULT_NUM_HANDLERS, DEFAULT_CONCURRENT_REQUESTS);
+ }
+
+ public static SimpleProtobufServer create(int port, int maxNumHandlers,
+ int maxConcurrentRequests) {
try {
InetSocketAddress address = new InetSocketAddress(port);
ServerSocket serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.bind(address);
- ExecutorService pool =
+ ExecutorService incomingHandlerPool =
Executors.newFixedThreadPool(maxNumHandlers);
- return new SimpleProtobufServer(serverSocket, pool);
+ ExecutorService requestHandlerPool =
+ Executors.newFixedThreadPool(maxConcurrentRequests);
+ return new SimpleProtobufServer(serverSocket, incomingHandlerPool,
+ requestHandlerPool);
} catch (IOException e) {
logger.log(Level.WARNING, "Could not create server. ", e);
return null;
}
}
- public SimpleProtobufServer(ServerSocket serverSocket, ExecutorService pool) {
+ public SimpleProtobufServer(ServerSocket serverSocket,
+ ExecutorService incomingHandlerPool,
+ ExecutorService requestHandlerPool) {
this.serverSocket = serverSocket;
- this.pool = pool;
+ this.incomingHandlerPool = incomingHandlerPool;
+ this.requestHandlerPool = requestHandlerPool;
}
public int getPort() {
@@ -71,97 +80,40 @@ public class SimpleProtobufServer extends Thread {
}
public void registerService(Service service) {
- String serviceName = service.getDescriptorForType().getFullName();
- if (registeredServices.containsKey(serviceName)) {
- logger.warning("Already registered service with this name.");
- }
- logger.info("Registering service: " + serviceName);
- registeredServices.put(serviceName, service);
+ services.registerService(service);
}
- public void handleRequest(Data.Request request, OutputStream out)
- throws IOException {
- final Service service = registeredServices.get(request.getFullServiceName());
- Rpc rpc = new Rpc();
- final Data.Response.Builder response = Data.Response.newBuilder();
- response.setRequestId(request.getRequestId());
- if (service == null) {
- response.setError(Data.Response.RpcError.UNKNOWN_SERVICE);
- response.build().writeDelimitedTo(out);
+ private synchronized void handleConnection(Socket connection) {
+ if (serverSocket.isClosed()) {
return;
}
- final Descriptors.MethodDescriptor method = service.getDescriptorForType()
- .findMethodByName(request.getMethodName());
- if (method == null) {
- response.setError(Data.Response.RpcError.UNKNOWN_METHOD);
- response.build().writeDelimitedTo(out);
- return;
- }
- RpcCallback<Message> doneCallback = new RpcCallback<Message>() {
- @Override public void run(Message responseMessage) {
- if (responseMessage == null) {
- responseMessage = service
- .getResponsePrototype(method)
- .toBuilder().build();
- }
- response.setResponseProto(responseMessage.toByteString());
- }
- };
- Message requestMessage = service.getRequestPrototype(method)
- .toBuilder()
- .mergeFrom(request.getRequestProto())
- .build();
- service.callMethod(method, rpc, requestMessage, doneCallback);
- rpc.writeTo(response);
- response.build().writeDelimitedTo(out);
- }
-
- private void handleConnection(final Socket connection) {
- if (isStopped) {
- return;
- }
- Runnable handler = new Runnable() {
+
+ final ConnectionHandler handler = ConnectionHandler.create(
+ connection, requestHandlerPool, services);
+ activeConnections.add(handler);
+ final Runnable realIncomingHandler = handler.createIncomingHandler();
+
+ class HelperHandler implements Runnable {
@Override public void run() {
- logger.info("Handling client connection " + connection);
- activeClientSockets.add(connection);
+ activeConnections.add(handler);
try {
- while (true) {
- Data.Request r1 = Data.Request.parseDelimitedFrom(
- connection.getInputStream());
- if (r1 == null) {
- try {
- connection.close();
- } catch (IOException e) {
- // Connection is closed.
- }
- }
- handleRequest(r1, connection.getOutputStream());
- }
- } catch (IOException e) {
- logger.info("Closed connection: " + connection);
+ realIncomingHandler.run();
} finally {
- try {
- connection.close();
- } catch (IOException e) {
- }
- activeClientSockets.remove(connection);
+ activeConnections.remove(handler);
}
}
- };
- pool.execute(handler);
+ }
+
+ incomingHandlerPool.execute(new HelperHandler());
+ outgoingHandlerPool.execute(handler.createOutgoingHandler());
}
- @Override public void interrupt() {
+ @Override public synchronized void interrupt() {
super.interrupt();
- isStopped = true;
- for (Socket socket : activeClientSockets) {
- try {
- socket.close();
- } catch (IOException e) {
- logger.log(Level.WARNING, "Error closing socket.", e);
- }
+ for (ConnectionHandler handler : activeConnections) {
+ handler.closeConnection();
}
-
+
try {
serverSocket.close();
} catch (IOException e) {