summaryrefslogtreecommitdiff
path: root/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java')
-rw-r--r--src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java79
1 files changed, 71 insertions, 8 deletions
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
index 57afae9..0a20883 100644
--- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
+++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
@@ -2,16 +2,26 @@ package com.orbekk.protobuf;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.io.OutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Descriptors;
+import java.util.Map;
+import java.util.HashMap;
public class SimpleProtobufServer extends Thread {
private static Logger logger = Logger.getLogger(
SimpleProtobufServer.class.getName());
- ServerSocket serverSocket;
+ private ServerSocket serverSocket;
+ private Map<String, Service> registeredServices =
+ new HashMap<String, Service>();
public static SimpleProtobufServer create(int port) {
try {
@@ -30,16 +40,52 @@ public class SimpleProtobufServer extends Thread {
this.serverSocket = serverSocket;
}
+ public synchronized void registerService(Service service) {
+ String serviceName = service.getDescriptorForType().getFullName();
+ if (registeredServices.containsKey(serviceName)) {
+ logger.warning("Already registered service with this name.");
+ }
+ logger.info("Registering service: " + serviceName);
+ registeredServices.put(serviceName, service);
+ }
+
+ public void handleRequest(Rpc.Request request, OutputStream out)
+ throws IOException {
+ Service service = registeredServices.get(request.getFullServiceName());
+ final Rpc.Response.Builder response = Rpc.Response.newBuilder();
+ if (service == null) {
+ response.setError(Rpc.Response.Error.UNKNOWN_SERVICE);
+ response.build().writeDelimitedTo(out);
+ return;
+ }
+ Descriptors.MethodDescriptor method = service.getDescriptorForType()
+ .findMethodByName(request.getMethodName());
+ if (method == null) {
+ response.setError(Rpc.Response.Error.UNKNOWN_METHOD);
+ response.build().writeDelimitedTo(out);
+ return;
+ }
+ RpcCallback<Message> doneCallback = new RpcCallback<Message>() {
+ @Override public void run(Message responseMessage) {
+ response.setResponseProto(responseMessage.toByteString());
+ }
+ };
+ Message requestMessage = service.getRequestPrototype(method)
+ .toBuilder()
+ .mergeFrom(request.getRequestProto())
+ .build();
+ service.callMethod(method, null, requestMessage, doneCallback);
+ }
+
private void handleConnection(final Socket connection) {
new Thread(new Runnable() {
@Override public void run() {
try {
- Rpc.Request r1 = Rpc.Request.parseDelimitedFrom(
- connection.getInputStream());
- Rpc.Request r2 = Rpc.Request.parseDelimitedFrom(
- connection.getInputStream());
- System.out.println(r1);
- System.out.println(r2);
+ while (true) {
+ Rpc.Request r1 = Rpc.Request.parseDelimitedFrom(
+ connection.getInputStream());
+ handleRequest(r1, connection.getOutputStream());
+ }
} catch (IOException e) {
logger.info("Closed connection: " + connection);
} finally {
@@ -66,6 +112,23 @@ public class SimpleProtobufServer extends Thread {
}
public static void main(String[] args) {
- SimpleProtobufServer.create(10000).start();
+ SimpleProtobufServer server = SimpleProtobufServer.create(10000);
+ Test.TestService testService = new Test.TestService() {
+ @Override public void run(RpcController controller,
+ Test.TestRequest request,
+ RpcCallback<Test.TestResponse> done) {
+ System.out.println("Hello from TestService!");
+ done.run(Test.TestResponse.newBuilder()
+ .setId("Hello from server.")
+ .build());
+ }
+ };
+ server.registerService(testService);
+ server.start();
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ System.out.println("Stopped.");
+ }
}
}