From cfdde6b86b243cf412c1bb90e4e39de882d1c2e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Thu, 5 Apr 2012 16:36:03 +0200 Subject: Better server design. Proper handling of asynchronous requests on the server. --- .../com/orbekk/protobuf/ConnectionHandler.java | 94 ++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 src/main/java/com/orbekk/protobuf/ConnectionHandler.java (limited to 'src/main/java/com/orbekk/protobuf/ConnectionHandler.java') diff --git a/src/main/java/com/orbekk/protobuf/ConnectionHandler.java b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java new file mode 100644 index 0000000..1c93a05 --- /dev/null +++ b/src/main/java/com/orbekk/protobuf/ConnectionHandler.java @@ -0,0 +1,94 @@ +package com.orbekk.protobuf; + +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +public class ConnectionHandler { + private final Socket connection; + private final BlockingQueue dispatcherOutput; + private final RequestDispatcher dispatcher; + + private class IncomingHandler implements Runnable { + @Override public void run() { + dispatcher.start(); + while (!connection.isClosed()) { + try { + Data.Request r = Data.Request.parseDelimitedFrom( + connection.getInputStream()); + if (r == null) { + tryCloseConnection(); + } else { + try { + dispatcher.handleRequest(r); + } catch (InterruptedException e) { + tryCloseConnection(); + return; + } + } + } catch (IOException e) { + tryCloseConnection(); + } + } + dispatcher.interrupt(); + } + } + + private class OutgoingHandler implements Runnable { + @Override public void run() { + while (!connection.isClosed()) { + try { + Data.Response response = dispatcherOutput.take(); + try { + response.writeDelimitedTo(connection.getOutputStream()); + } catch (IOException e) { + tryCloseConnection(); + } + } catch (InterruptedException e) { + tryCloseConnection(); + } + } + dispatcher.interrupt(); + } + } + + public static ConnectionHandler create(Socket connection, + ExecutorService requestPool, ServiceHolder services) { + BlockingQueue dispatcherOutput = + new ArrayBlockingQueue(RequestDispatcher.DEFAULT_QUEUE_SIZE); + RequestDispatcher dispatcher = new RequestDispatcher( + requestPool, dispatcherOutput, services); + return new ConnectionHandler(connection, dispatcherOutput, + dispatcher); + } + + ConnectionHandler(Socket connection, + BlockingQueue dispatcherOutput, + RequestDispatcher dispatcher) { + this.connection = connection; + this.dispatcherOutput = dispatcherOutput; + this.dispatcher = dispatcher; + } + + public void closeConnection() { + tryCloseConnection(); + } + + private void tryCloseConnection() { + try { + connection.close(); + } catch (IOException e) { + // Assume connection is closed. + } + } + + public Runnable createIncomingHandler() { + return new IncomingHandler(); + } + + public Runnable createOutgoingHandler() { + return new OutgoingHandler(); + } +} -- cgit v1.2.3