diff options
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java')
-rw-r--r-- | src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java | 150 |
1 files changed, 51 insertions, 99 deletions
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java index 4fc9808..14887d8 100644 --- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java +++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java @@ -16,54 +16,63 @@ package com.orbekk.protobuf; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; import com.google.protobuf.Service; public class SimpleProtobufServer extends Thread { - private volatile boolean isStopped = false; + private static int DEFAULT_NUM_HANDLERS = 20; + private static int DEFAULT_CONCURRENT_REQUESTS = 1; + private final static Logger logger = Logger.getLogger( SimpleProtobufServer.class.getName()); private final ServerSocket serverSocket; - private final Set<Socket> activeClientSockets = - Collections.synchronizedSet(new HashSet<Socket>()); - private final Map<String, Service> registeredServices = - Collections.synchronizedMap(new HashMap<String, Service>()); - private final ExecutorService pool; - - public static SimpleProtobufServer create(int port, int maxNumHandlers) { + private final ExecutorService incomingHandlerPool; + private final ExecutorService outgoingHandlerPool = + Executors.newCachedThreadPool(); + private final ExecutorService requestHandlerPool; + private final ServiceHolder services = new ServiceHolder(); + private final Set<ConnectionHandler> activeConnections = + Collections.synchronizedSet(new HashSet<ConnectionHandler>()); + + public static SimpleProtobufServer create(int port) { + return create(port, DEFAULT_NUM_HANDLERS, DEFAULT_CONCURRENT_REQUESTS); + } + + public static SimpleProtobufServer create(int port, int maxNumHandlers, + int maxConcurrentRequests) { try { InetSocketAddress address = new InetSocketAddress(port); ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); serverSocket.bind(address); - ExecutorService pool = + ExecutorService incomingHandlerPool = Executors.newFixedThreadPool(maxNumHandlers); - return new SimpleProtobufServer(serverSocket, pool); + ExecutorService requestHandlerPool = + Executors.newFixedThreadPool(maxConcurrentRequests); + return new SimpleProtobufServer(serverSocket, incomingHandlerPool, + requestHandlerPool); } catch (IOException e) { logger.log(Level.WARNING, "Could not create server. ", e); return null; } } - public SimpleProtobufServer(ServerSocket serverSocket, ExecutorService pool) { + public SimpleProtobufServer(ServerSocket serverSocket, + ExecutorService incomingHandlerPool, + ExecutorService requestHandlerPool) { this.serverSocket = serverSocket; - this.pool = pool; + this.incomingHandlerPool = incomingHandlerPool; + this.requestHandlerPool = requestHandlerPool; } public int getPort() { @@ -71,97 +80,40 @@ public class SimpleProtobufServer extends Thread { } public void registerService(Service service) { - String serviceName = service.getDescriptorForType().getFullName(); - if (registeredServices.containsKey(serviceName)) { - logger.warning("Already registered service with this name."); - } - logger.info("Registering service: " + serviceName); - registeredServices.put(serviceName, service); + services.registerService(service); } - public void handleRequest(Data.Request request, OutputStream out) - throws IOException { - final Service service = registeredServices.get(request.getFullServiceName()); - Rpc rpc = new Rpc(); - final Data.Response.Builder response = Data.Response.newBuilder(); - response.setRequestId(request.getRequestId()); - if (service == null) { - response.setError(Data.Response.RpcError.UNKNOWN_SERVICE); - response.build().writeDelimitedTo(out); + private synchronized void handleConnection(Socket connection) { + if (serverSocket.isClosed()) { return; } - final Descriptors.MethodDescriptor method = service.getDescriptorForType() - .findMethodByName(request.getMethodName()); - if (method == null) { - response.setError(Data.Response.RpcError.UNKNOWN_METHOD); - response.build().writeDelimitedTo(out); - return; - } - RpcCallback<Message> doneCallback = new RpcCallback<Message>() { - @Override public void run(Message responseMessage) { - if (responseMessage == null) { - responseMessage = service - .getResponsePrototype(method) - .toBuilder().build(); - } - response.setResponseProto(responseMessage.toByteString()); - } - }; - Message requestMessage = service.getRequestPrototype(method) - .toBuilder() - .mergeFrom(request.getRequestProto()) - .build(); - service.callMethod(method, rpc, requestMessage, doneCallback); - rpc.writeTo(response); - response.build().writeDelimitedTo(out); - } - - private void handleConnection(final Socket connection) { - if (isStopped) { - return; - } - Runnable handler = new Runnable() { + + final ConnectionHandler handler = ConnectionHandler.create( + connection, requestHandlerPool, services); + activeConnections.add(handler); + final Runnable realIncomingHandler = handler.createIncomingHandler(); + + class HelperHandler implements Runnable { @Override public void run() { - logger.info("Handling client connection " + connection); - activeClientSockets.add(connection); + activeConnections.add(handler); try { - while (true) { - Data.Request r1 = Data.Request.parseDelimitedFrom( - connection.getInputStream()); - if (r1 == null) { - try { - connection.close(); - } catch (IOException e) { - // Connection is closed. - } - } - handleRequest(r1, connection.getOutputStream()); - } - } catch (IOException e) { - logger.info("Closed connection: " + connection); + realIncomingHandler.run(); } finally { - try { - connection.close(); - } catch (IOException e) { - } - activeClientSockets.remove(connection); + activeConnections.remove(handler); } } - }; - pool.execute(handler); + } + + incomingHandlerPool.execute(new HelperHandler()); + outgoingHandlerPool.execute(handler.createOutgoingHandler()); } - @Override public void interrupt() { + @Override public synchronized void interrupt() { super.interrupt(); - isStopped = true; - for (Socket socket : activeClientSockets) { - try { - socket.close(); - } catch (IOException e) { - logger.log(Level.WARNING, "Error closing socket.", e); - } + for (ConnectionHandler handler : activeConnections) { + handler.closeConnection(); } - + try { serverSocket.close(); } catch (IOException e) { |