From cecb501245bb7d93b2e4c55acb783958b158f637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 28 Mar 2012 09:50:34 +0200 Subject: Support very simple serving. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Not asynchronous at this point. – Not well-tested. --- .../com/orbekk/protobuf/SimpleProtobufServer.java | 79 +++++++++++++++++++--- 1 file changed, 71 insertions(+), 8 deletions(-) (limited to 'src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java') diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java index 57afae9..0a20883 100644 --- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java +++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java @@ -2,16 +2,26 @@ package com.orbekk.protobuf; import java.util.logging.Level; import java.util.logging.Logger; +import java.io.OutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.RpcController; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.Descriptors; +import java.util.Map; +import java.util.HashMap; public class SimpleProtobufServer extends Thread { private static Logger logger = Logger.getLogger( SimpleProtobufServer.class.getName()); - ServerSocket serverSocket; + private ServerSocket serverSocket; + private Map registeredServices = + new HashMap(); public static SimpleProtobufServer create(int port) { try { @@ -30,16 +40,52 @@ public class SimpleProtobufServer extends Thread { this.serverSocket = serverSocket; } + public synchronized void registerService(Service service) { + String serviceName = service.getDescriptorForType().getFullName(); + if (registeredServices.containsKey(serviceName)) { + logger.warning("Already registered service with this name."); + } + logger.info("Registering service: " + serviceName); + registeredServices.put(serviceName, service); + } + + public void handleRequest(Rpc.Request request, OutputStream out) + throws IOException { + Service service = registeredServices.get(request.getFullServiceName()); + final Rpc.Response.Builder response = Rpc.Response.newBuilder(); + if (service == null) { + response.setError(Rpc.Response.Error.UNKNOWN_SERVICE); + response.build().writeDelimitedTo(out); + return; + } + Descriptors.MethodDescriptor method = service.getDescriptorForType() + .findMethodByName(request.getMethodName()); + if (method == null) { + response.setError(Rpc.Response.Error.UNKNOWN_METHOD); + response.build().writeDelimitedTo(out); + return; + } + RpcCallback doneCallback = new RpcCallback() { + @Override public void run(Message responseMessage) { + response.setResponseProto(responseMessage.toByteString()); + } + }; + Message requestMessage = service.getRequestPrototype(method) + .toBuilder() + .mergeFrom(request.getRequestProto()) + .build(); + service.callMethod(method, null, requestMessage, doneCallback); + } + private void handleConnection(final Socket connection) { new Thread(new Runnable() { @Override public void run() { try { - Rpc.Request r1 = Rpc.Request.parseDelimitedFrom( - connection.getInputStream()); - Rpc.Request r2 = Rpc.Request.parseDelimitedFrom( - connection.getInputStream()); - System.out.println(r1); - System.out.println(r2); + while (true) { + Rpc.Request r1 = Rpc.Request.parseDelimitedFrom( + connection.getInputStream()); + handleRequest(r1, connection.getOutputStream()); + } } catch (IOException e) { logger.info("Closed connection: " + connection); } finally { @@ -66,6 +112,23 @@ public class SimpleProtobufServer extends Thread { } public static void main(String[] args) { - SimpleProtobufServer.create(10000).start(); + SimpleProtobufServer server = SimpleProtobufServer.create(10000); + Test.TestService testService = new Test.TestService() { + @Override public void run(RpcController controller, + Test.TestRequest request, + RpcCallback done) { + System.out.println("Hello from TestService!"); + done.run(Test.TestResponse.newBuilder() + .setId("Hello from server.") + .build()); + } + }; + server.registerService(testService); + server.start(); + try { + server.join(); + } catch (InterruptedException e) { + System.out.println("Stopped."); + } } } -- cgit v1.2.3