diff options
| author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-28 16:48:44 +0200 | 
|---|---|---|
| committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-28 16:48:44 +0200 | 
| commit | d317a85a7b76ee94ebc31c218dc877bfadd54902 (patch) | |
| tree | 59e58757df7bea1b153dcdb79c863759733eb843 /src/main/java/com/orbekk/protobuf | |
| parent | 4ac1dadfcd211640a4b52db240f6f72138bd8e45 (diff) | |
Client support for asynchronous requests.
Diffstat (limited to 'src/main/java/com/orbekk/protobuf')
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/Rpc.java | 167 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/Rpc.proto | 6 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/RpcChannel.java | 182 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java | 43 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java | 9 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/Test.java | 28 | 
6 files changed, 380 insertions, 55 deletions
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java index c74e48a..38ff1ea 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.java +++ b/src/main/java/com/orbekk/protobuf/Rpc.java @@ -22,6 +22,10 @@ public final class Rpc {      // optional bytes request_proto = 3;      boolean hasRequestProto();      com.google.protobuf.ByteString getRequestProto(); +     +    // optional int64 request_id = 4; +    boolean hasRequestId(); +    long getRequestId();    }    public static final class Request extends        com.google.protobuf.GeneratedMessage @@ -54,12 +58,12 @@ public final class Rpc {      private int bitField0_;      // optional string full_service_name = 1;      public static final int FULL_SERVICE_NAME_FIELD_NUMBER = 1; -    private Object fullServiceName_; +    private java.lang.Object fullServiceName_;      public boolean hasFullServiceName() {        return ((bitField0_ & 0x00000001) == 0x00000001);      }      public String getFullServiceName() { -      Object ref = fullServiceName_; +      java.lang.Object ref = fullServiceName_;        if (ref instanceof String) {          return (String) ref;        } else { @@ -73,7 +77,7 @@ public final class Rpc {        }      }      private com.google.protobuf.ByteString getFullServiceNameBytes() { -      Object ref = fullServiceName_; +      java.lang.Object ref = fullServiceName_;        if (ref instanceof String) {          com.google.protobuf.ByteString b =               com.google.protobuf.ByteString.copyFromUtf8((String) ref); @@ -86,12 +90,12 @@ public final class Rpc {      // optional string method_name = 2;      public static final int METHOD_NAME_FIELD_NUMBER = 2; -    private Object methodName_; +    private java.lang.Object methodName_;      public boolean hasMethodName() {        return ((bitField0_ & 0x00000002) == 0x00000002);      }      public String getMethodName() { -      Object ref = methodName_; +      java.lang.Object ref = methodName_;        if (ref instanceof String) {          return (String) ref;        } else { @@ -105,7 +109,7 @@ public final class Rpc {        }      }      private com.google.protobuf.ByteString getMethodNameBytes() { -      Object ref = methodName_; +      java.lang.Object ref = methodName_;        if (ref instanceof String) {          com.google.protobuf.ByteString b =               com.google.protobuf.ByteString.copyFromUtf8((String) ref); @@ -126,10 +130,21 @@ public final class Rpc {        return requestProto_;      } +    // optional int64 request_id = 4; +    public static final int REQUEST_ID_FIELD_NUMBER = 4; +    private long requestId_; +    public boolean hasRequestId() { +      return ((bitField0_ & 0x00000008) == 0x00000008); +    } +    public long getRequestId() { +      return requestId_; +    } +          private void initFields() {        fullServiceName_ = "";        methodName_ = "";        requestProto_ = com.google.protobuf.ByteString.EMPTY; +      requestId_ = 0L;      }      private byte memoizedIsInitialized = -1;      public final boolean isInitialized() { @@ -152,6 +167,9 @@ public final class Rpc {        if (((bitField0_ & 0x00000004) == 0x00000004)) {          output.writeBytes(3, requestProto_);        } +      if (((bitField0_ & 0x00000008) == 0x00000008)) { +        output.writeInt64(4, requestId_); +      }        getUnknownFields().writeTo(output);      } @@ -173,13 +191,19 @@ public final class Rpc {          size += com.google.protobuf.CodedOutputStream            .computeBytesSize(3, requestProto_);        } +      if (((bitField0_ & 0x00000008) == 0x00000008)) { +        size += com.google.protobuf.CodedOutputStream +          .computeInt64Size(4, requestId_); +      }        size += getUnknownFields().getSerializedSize();        memoizedSerializedSize = size;        return size;      } +    private static final long serialVersionUID = 0L;      @java.lang.Override -    protected Object writeReplace() throws java.io.ObjectStreamException { +    protected java.lang.Object writeReplace() +        throws java.io.ObjectStreamException {        return super.writeReplace();      } @@ -301,6 +325,8 @@ public final class Rpc {          bitField0_ = (bitField0_ & ~0x00000002);          requestProto_ = com.google.protobuf.ByteString.EMPTY;          bitField0_ = (bitField0_ & ~0x00000004); +        requestId_ = 0L; +        bitField0_ = (bitField0_ & ~0x00000008);          return this;        } @@ -351,6 +377,10 @@ public final class Rpc {            to_bitField0_ |= 0x00000004;          }          result.requestProto_ = requestProto_; +        if (((from_bitField0_ & 0x00000008) == 0x00000008)) { +          to_bitField0_ |= 0x00000008; +        } +        result.requestId_ = requestId_;          result.bitField0_ = to_bitField0_;          onBuilt();          return result; @@ -376,6 +406,9 @@ public final class Rpc {          if (other.hasRequestProto()) {            setRequestProto(other.getRequestProto());          } +        if (other.hasRequestId()) { +          setRequestId(other.getRequestId()); +        }          this.mergeUnknownFields(other.getUnknownFields());          return this;        } @@ -422,6 +455,11 @@ public final class Rpc {                requestProto_ = input.readBytes();                break;              } +            case 32: { +              bitField0_ |= 0x00000008; +              requestId_ = input.readInt64(); +              break; +            }            }          }        } @@ -429,12 +467,12 @@ public final class Rpc {        private int bitField0_;        // optional string full_service_name = 1; -      private Object fullServiceName_ = ""; +      private java.lang.Object fullServiceName_ = "";        public boolean hasFullServiceName() {          return ((bitField0_ & 0x00000001) == 0x00000001);        }        public String getFullServiceName() { -        Object ref = fullServiceName_; +        java.lang.Object ref = fullServiceName_;          if (!(ref instanceof String)) {            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();            fullServiceName_ = s; @@ -465,12 +503,12 @@ public final class Rpc {        }        // optional string method_name = 2; -      private Object methodName_ = ""; +      private java.lang.Object methodName_ = "";        public boolean hasMethodName() {          return ((bitField0_ & 0x00000002) == 0x00000002);        }        public String getMethodName() { -        Object ref = methodName_; +        java.lang.Object ref = methodName_;          if (!(ref instanceof String)) {            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();            methodName_ = s; @@ -524,6 +562,27 @@ public final class Rpc {          return this;        } +      // optional int64 request_id = 4; +      private long requestId_ ; +      public boolean hasRequestId() { +        return ((bitField0_ & 0x00000008) == 0x00000008); +      } +      public long getRequestId() { +        return requestId_; +      } +      public Builder setRequestId(long value) { +        bitField0_ |= 0x00000008; +        requestId_ = value; +        onChanged(); +        return this; +      } +      public Builder clearRequestId() { +        bitField0_ = (bitField0_ & ~0x00000008); +        requestId_ = 0L; +        onChanged(); +        return this; +      } +              // @@protoc_insertion_point(builder_scope:com.orbekk.protobuf.Request)      } @@ -553,6 +612,10 @@ public final class Rpc {      // optional string error_message = 3;      boolean hasErrorMessage();      String getErrorMessage(); +     +    // optional int64 request_id = 5; +    boolean hasRequestId(); +    long getRequestId();    }    public static final class Response extends        com.google.protobuf.GeneratedMessage @@ -690,12 +753,12 @@ public final class Rpc {      // optional string error_message = 3;      public static final int ERROR_MESSAGE_FIELD_NUMBER = 3; -    private Object errorMessage_; +    private java.lang.Object errorMessage_;      public boolean hasErrorMessage() {        return ((bitField0_ & 0x00000008) == 0x00000008);      }      public String getErrorMessage() { -      Object ref = errorMessage_; +      java.lang.Object ref = errorMessage_;        if (ref instanceof String) {          return (String) ref;        } else { @@ -709,7 +772,7 @@ public final class Rpc {        }      }      private com.google.protobuf.ByteString getErrorMessageBytes() { -      Object ref = errorMessage_; +      java.lang.Object ref = errorMessage_;        if (ref instanceof String) {          com.google.protobuf.ByteString b =               com.google.protobuf.ByteString.copyFromUtf8((String) ref); @@ -720,11 +783,22 @@ public final class Rpc {        }      } +    // optional int64 request_id = 5; +    public static final int REQUEST_ID_FIELD_NUMBER = 5; +    private long requestId_; +    public boolean hasRequestId() { +      return ((bitField0_ & 0x00000010) == 0x00000010); +    } +    public long getRequestId() { +      return requestId_; +    } +          private void initFields() {        responseProto_ = com.google.protobuf.ByteString.EMPTY;        error_ = com.orbekk.protobuf.Rpc.Response.Error.UNKNOWN_SERVICE;        appError_ = 0;        errorMessage_ = ""; +      requestId_ = 0L;      }      private byte memoizedIsInitialized = -1;      public final boolean isInitialized() { @@ -750,6 +824,9 @@ public final class Rpc {        if (((bitField0_ & 0x00000004) == 0x00000004)) {          output.writeInt32(4, appError_);        } +      if (((bitField0_ & 0x00000010) == 0x00000010)) { +        output.writeInt64(5, requestId_); +      }        getUnknownFields().writeTo(output);      } @@ -775,13 +852,19 @@ public final class Rpc {          size += com.google.protobuf.CodedOutputStream            .computeInt32Size(4, appError_);        } +      if (((bitField0_ & 0x00000010) == 0x00000010)) { +        size += com.google.protobuf.CodedOutputStream +          .computeInt64Size(5, requestId_); +      }        size += getUnknownFields().getSerializedSize();        memoizedSerializedSize = size;        return size;      } +    private static final long serialVersionUID = 0L;      @java.lang.Override -    protected Object writeReplace() throws java.io.ObjectStreamException { +    protected java.lang.Object writeReplace() +        throws java.io.ObjectStreamException {        return super.writeReplace();      } @@ -905,6 +988,8 @@ public final class Rpc {          bitField0_ = (bitField0_ & ~0x00000004);          errorMessage_ = "";          bitField0_ = (bitField0_ & ~0x00000008); +        requestId_ = 0L; +        bitField0_ = (bitField0_ & ~0x00000010);          return this;        } @@ -959,6 +1044,10 @@ public final class Rpc {            to_bitField0_ |= 0x00000008;          }          result.errorMessage_ = errorMessage_; +        if (((from_bitField0_ & 0x00000010) == 0x00000010)) { +          to_bitField0_ |= 0x00000010; +        } +        result.requestId_ = requestId_;          result.bitField0_ = to_bitField0_;          onBuilt();          return result; @@ -987,6 +1076,9 @@ public final class Rpc {          if (other.hasErrorMessage()) {            setErrorMessage(other.getErrorMessage());          } +        if (other.hasRequestId()) { +          setRequestId(other.getRequestId()); +        }          this.mergeUnknownFields(other.getUnknownFields());          return this;        } @@ -1044,6 +1136,11 @@ public final class Rpc {                appError_ = input.readInt32();                break;              } +            case 40: { +              bitField0_ |= 0x00000010; +              requestId_ = input.readInt64(); +              break; +            }            }          }        } @@ -1120,12 +1217,12 @@ public final class Rpc {        }        // optional string error_message = 3; -      private Object errorMessage_ = ""; +      private java.lang.Object errorMessage_ = "";        public boolean hasErrorMessage() {          return ((bitField0_ & 0x00000008) == 0x00000008);        }        public String getErrorMessage() { -        Object ref = errorMessage_; +        java.lang.Object ref = errorMessage_;          if (!(ref instanceof String)) {            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();            errorMessage_ = s; @@ -1155,6 +1252,27 @@ public final class Rpc {          onChanged();        } +      // optional int64 request_id = 5; +      private long requestId_ ; +      public boolean hasRequestId() { +        return ((bitField0_ & 0x00000010) == 0x00000010); +      } +      public long getRequestId() { +        return requestId_; +      } +      public Builder setRequestId(long value) { +        bitField0_ |= 0x00000010; +        requestId_ = value; +        onChanged(); +        return this; +      } +      public Builder clearRequestId() { +        bitField0_ = (bitField0_ & ~0x00000010); +        requestId_ = 0L; +        onChanged(); +        return this; +      } +              // @@protoc_insertion_point(builder_scope:com.orbekk.protobuf.Response)      } @@ -1186,12 +1304,13 @@ public final class Rpc {    static {      java.lang.String[] descriptorData = {        "\n+src/main/java/com/orbekk/protobuf/Rpc." + -      "proto\022\023com.orbekk.protobuf\"P\n\007Request\022\031\n" + +      "proto\022\023com.orbekk.protobuf\"d\n\007Request\022\031\n" +        "\021full_service_name\030\001 \001(\t\022\023\n\013method_name\030" + -      "\002 \001(\t\022\025\n\rrequest_proto\030\003 \001(\014\"\317\001\n\010Respons" + -      "e\022\026\n\016response_proto\030\001 \001(\014\0222\n\005error\030\002 \001(\016" + -      "2#.com.orbekk.protobuf.Response.Error\022\021\n" + -      "\tapp_error\030\004 \001(\005\022\025\n\rerror_message\030\003 \001(\t\"" + +      "\002 \001(\t\022\025\n\rrequest_proto\030\003 \001(\014\022\022\n\nrequest_" + +      "id\030\004 \001(\003\"\343\001\n\010Response\022\026\n\016response_proto\030" + +      "\001 \001(\014\0222\n\005error\030\002 \001(\0162#.com.orbekk.protob" + +      "uf.Response.Error\022\021\n\tapp_error\030\004 \001(\005\022\025\n\r" + +      "error_message\030\003 \001(\t\022\022\n\nrequest_id\030\005 \001(\003\"" +        "M\n\005Error\022\023\n\017UNKNOWN_SERVICE\020\000\022\022\n\016UNKNOWN" +        "_METHOD\020\001\022\014\n\010CANCELED\020\002\022\r\n\tAPP_ERROR\020\003"      }; @@ -1205,7 +1324,7 @@ public final class Rpc {            internal_static_com_orbekk_protobuf_Request_fieldAccessorTable = new              com.google.protobuf.GeneratedMessage.FieldAccessorTable(                internal_static_com_orbekk_protobuf_Request_descriptor, -              new java.lang.String[] { "FullServiceName", "MethodName", "RequestProto", }, +              new java.lang.String[] { "FullServiceName", "MethodName", "RequestProto", "RequestId", },                com.orbekk.protobuf.Rpc.Request.class,                com.orbekk.protobuf.Rpc.Request.Builder.class);            internal_static_com_orbekk_protobuf_Response_descriptor = @@ -1213,7 +1332,7 @@ public final class Rpc {            internal_static_com_orbekk_protobuf_Response_fieldAccessorTable = new              com.google.protobuf.GeneratedMessage.FieldAccessorTable(                internal_static_com_orbekk_protobuf_Response_descriptor, -              new java.lang.String[] { "ResponseProto", "Error", "AppError", "ErrorMessage", }, +              new java.lang.String[] { "ResponseProto", "Error", "AppError", "ErrorMessage", "RequestId", },                com.orbekk.protobuf.Rpc.Response.class,                com.orbekk.protobuf.Rpc.Response.Builder.class);            return null; diff --git a/src/main/java/com/orbekk/protobuf/Rpc.proto b/src/main/java/com/orbekk/protobuf/Rpc.proto index f733140..e4865fa 100644 --- a/src/main/java/com/orbekk/protobuf/Rpc.proto +++ b/src/main/java/com/orbekk/protobuf/Rpc.proto @@ -1,13 +1,14 @@  package com.orbekk.protobuf; -// Next tag: 4 +// Next tag: 6  message Request {      optional string full_service_name = 1;      optional string method_name = 2;      optional bytes request_proto = 3; +    optional int64 request_id = 4;  } -// Next tag: 4 +// Next tag: 6  message Response {      optional bytes response_proto = 1;      enum Error { @@ -19,4 +20,5 @@ message Response {      optional Error error = 2;      optional int32 app_error = 4;      optional string error_message = 3; +    optional int64 request_id = 5;  } diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java new file mode 100644 index 0000000..56b54c2 --- /dev/null +++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java @@ -0,0 +1,182 @@ +package com.orbekk.protobuf; + +import java.io.Closeable; +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +public class RpcChannel extends Thread implements +        com.google.protobuf.RpcChannel { +    static final Logger logger = +            Logger.getLogger(RpcChannel.class.getName()); +    private String host; +    private int port; +    private volatile Socket socket = null; +    private AtomicLong nextId = new AtomicLong(0); +    private Map<Long, RpcChannel.OngoingRequest> rpcs = +            Collections.synchronizedMap( +                    new HashMap<Long, RpcChannel.OngoingRequest>()); + +    private static class OngoingRequest implements Closeable { +        long id; +        RpcController controller; +        RpcCallback<Message> done; +        Message responsePrototype; +        Map<Long, RpcChannel.OngoingRequest> rpcs; +         +        public OngoingRequest(long id, RpcController controller, +                RpcCallback<Message> done, Message responsePrototype, +                Map<Long, RpcChannel.OngoingRequest> rpcs) { +            this.id = id; +            this.controller = controller; +            this.done = done; +            this.responsePrototype = responsePrototype; +            this.rpcs = rpcs; +        } + +        @Override +        public void close() throws IOException { +            throw new AssertionError("Not implemented"); +        } +    } +     +    public static RpcChannel create(String host, int port) { +        RpcChannel channel = new RpcChannel(host, port); +        channel.start(); +        return channel; +    } +     +    private RpcChannel(String host, int port) { +        this.host = host; +        this.port = port; +    } + +    private Socket getSocket() { +        if (socket == null || socket.isClosed()) { +            try { +                logger.info("Creating new socket to " + host + ":" + port); +                synchronized (this) { +                    socket = new Socket(host, port); +                    notify(); +                } +            } catch (UnknownHostException e) { +                return null; +            } catch (IOException e) { +                logger.log(Level.WARNING, +                        "Could not establish connection.", e); +                return null; +            } +        } +        return socket; +    } +     +    private Rpc.Request createRequest(Descriptors.MethodDescriptor method, +            RpcController controller, +            Message requestMessage, +            Message responsePrototype, +            RpcCallback<Message> done) { +        long id = nextId.incrementAndGet(); +        OngoingRequest ongoingRequest = new OngoingRequest(id, controller, +                done, responsePrototype, rpcs); +        rpcs.put(id, ongoingRequest); +         +        Rpc.Request request = Rpc.Request.newBuilder() +                .setRequestId(id) +                .setFullServiceName(method.getService().getFullName()) +                .setMethodName(method.getName()) +                .setRequestProto(requestMessage.toByteString()) +                .build(); +         +        return request; +    } +     +    private void finishRequest(Rpc.Response response) { +        OngoingRequest ongoingRequest = rpcs.remove(response.getRequestId()); +        if (ongoingRequest != null) { +            try { +                Message responsePb = ongoingRequest.responsePrototype.toBuilder() +                        .mergeFrom(response.getResponseProto()).build(); +                ongoingRequest.done.run(responsePb); +            } catch (InvalidProtocolBufferException e) { +                throw new AssertionError("Should fail here."); +            } +        } +    } + +    @Override public void callMethod( +            Descriptors.MethodDescriptor method, +            RpcController controller, +            Message requestMessage, +            Message responsePrototype, +            RpcCallback<Message> done) { +        try { +            Rpc.Request request = createRequest(method, controller, +                    requestMessage, responsePrototype, done); +            Socket socket = getSocket(); +            request.writeDelimitedTo(socket.getOutputStream()); +        } catch (IOException e) { +            throw new AssertionError("Should return error."); +        } +    } +     +    private void handleResponses(Socket socket) { +        try { +            logger.info("Handling responses to socket " + socket); +            while (!socket.isClosed()) { +                Rpc.Response response; +                response = Rpc.Response.parseDelimitedFrom( +                        socket.getInputStream()); +                finishRequest(response); +            } +        } catch (IOException e) { +            // Breaks the loop. +        } finally { +            if (socket != null && !socket.isClosed()) { +                try { +                    socket.close(); +                } catch (IOException e) { +                    // Socket is closed. +                } +            } +        } +    } +     +    public void run() { +        while (!Thread.interrupted()) { +            try { +                synchronized (this) { +                    if (socket == null) { +                        wait(); +                    } +                } +                if (socket != null) { +                    handleResponses(socket); +                } +            } catch (InterruptedException e) { +                // Interrupts handled by outer loop +            } +        } +    } + +    public void close() { +        if (socket != null) { +            try { +                socket.close(); +            } catch (IOException e) { +                logger.info("Error closing socket."); +            } +        } +    } +}
\ No newline at end of file diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java index ffaedb0..b17c0bc 100644 --- a/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java +++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java @@ -1,26 +1,35 @@  package com.orbekk.protobuf; -import java.net.Socket; -import java.net.UnknownHostException; -import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Logger; + +import com.google.protobuf.RpcCallback;  public class SimpleProtobufClient { +    static final Logger logger = +            Logger.getLogger(SimpleProtobufClient.class.getName()); +      public void run() { -        try { -            Socket socket = new Socket("localhost", 10000); -            Rpc.Request r1 = Rpc.Request.newBuilder() -                .setFullServiceName("com.orbekk.protobuf.TestService") -                .setMethodName("Run") +        RpcChannel channel = RpcChannel.create("localhost", 10000); +        Test.TestService test = Test.TestService.newStub(channel); +        Test.TestRequest request = Test.TestRequest.newBuilder() +                .setId("Hello!")                  .build(); -            Rpc.Request r2 = Rpc.Request.newBuilder() -                .setFullServiceName("Service2") -                .build(); -            r1.writeDelimitedTo(socket.getOutputStream()); -            r2.writeDelimitedTo(socket.getOutputStream()); -        } catch (UnknownHostException e) { -            e.printStackTrace(); -        } catch (IOException e) { -            e.printStackTrace(); +        int count = 10; +        final CountDownLatch stop = new CountDownLatch(count); +        for (int i = 0; i < count; i++) { +            logger.info("Sending request."); +            test.run(null, request, new RpcCallback<Test.TestResponse>() { +                @Override public void run(Test.TestResponse response) { +                    System.out.println("Response from server: " + response); +                    stop.countDown(); +                } +            }); +        } +        try { +            stop.await(); +        } catch (InterruptedException e) { +            // Stop waiting.          }      } diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java index 0a20883..a6dd7c4 100644 --- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java +++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java @@ -53,6 +53,7 @@ public class SimpleProtobufServer extends Thread {              throws IOException {          Service service = registeredServices.get(request.getFullServiceName());          final Rpc.Response.Builder response = Rpc.Response.newBuilder(); +        response.setRequestId(request.getRequestId());          if (service == null) {              response.setError(Rpc.Response.Error.UNKNOWN_SERVICE);              response.build().writeDelimitedTo(out); @@ -75,6 +76,7 @@ public class SimpleProtobufServer extends Thread {                  .mergeFrom(request.getRequestProto())                  .build();          service.callMethod(method, null,  requestMessage, doneCallback); +        response.build().writeDelimitedTo(out);      }      private void handleConnection(final Socket connection) { @@ -84,6 +86,13 @@ public class SimpleProtobufServer extends Thread {                      while (true) {                          Rpc.Request r1 = Rpc.Request.parseDelimitedFrom(                              connection.getInputStream()); +                        if (r1 == null) { +                            try { +                                connection.close(); +                            } catch (IOException e) { +                                // Connection is closed. +                            } +                        }                          handleRequest(r1, connection.getOutputStream());                      }                  } catch (IOException e) { diff --git a/src/main/java/com/orbekk/protobuf/Test.java b/src/main/java/com/orbekk/protobuf/Test.java index 8b17a74..f1be1a9 100644 --- a/src/main/java/com/orbekk/protobuf/Test.java +++ b/src/main/java/com/orbekk/protobuf/Test.java @@ -46,12 +46,12 @@ public final class Test {      private int bitField0_;      // optional string id = 1;      public static final int ID_FIELD_NUMBER = 1; -    private Object id_; +    private java.lang.Object id_;      public boolean hasId() {        return ((bitField0_ & 0x00000001) == 0x00000001);      }      public String getId() { -      Object ref = id_; +      java.lang.Object ref = id_;        if (ref instanceof String) {          return (String) ref;        } else { @@ -65,7 +65,7 @@ public final class Test {        }      }      private com.google.protobuf.ByteString getIdBytes() { -      Object ref = id_; +      java.lang.Object ref = id_;        if (ref instanceof String) {          com.google.protobuf.ByteString b =               com.google.protobuf.ByteString.copyFromUtf8((String) ref); @@ -112,8 +112,10 @@ public final class Test {        return size;      } +    private static final long serialVersionUID = 0L;      @java.lang.Override -    protected Object writeReplace() throws java.io.ObjectStreamException { +    protected java.lang.Object writeReplace() +        throws java.io.ObjectStreamException {        return super.writeReplace();      } @@ -335,12 +337,12 @@ public final class Test {        private int bitField0_;        // optional string id = 1; -      private Object id_ = ""; +      private java.lang.Object id_ = "";        public boolean hasId() {          return ((bitField0_ & 0x00000001) == 0x00000001);        }        public String getId() { -        Object ref = id_; +        java.lang.Object ref = id_;          if (!(ref instanceof String)) {            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();            id_ = s; @@ -419,12 +421,12 @@ public final class Test {      private int bitField0_;      // optional string id = 1;      public static final int ID_FIELD_NUMBER = 1; -    private Object id_; +    private java.lang.Object id_;      public boolean hasId() {        return ((bitField0_ & 0x00000001) == 0x00000001);      }      public String getId() { -      Object ref = id_; +      java.lang.Object ref = id_;        if (ref instanceof String) {          return (String) ref;        } else { @@ -438,7 +440,7 @@ public final class Test {        }      }      private com.google.protobuf.ByteString getIdBytes() { -      Object ref = id_; +      java.lang.Object ref = id_;        if (ref instanceof String) {          com.google.protobuf.ByteString b =               com.google.protobuf.ByteString.copyFromUtf8((String) ref); @@ -485,8 +487,10 @@ public final class Test {        return size;      } +    private static final long serialVersionUID = 0L;      @java.lang.Override -    protected Object writeReplace() throws java.io.ObjectStreamException { +    protected java.lang.Object writeReplace() +        throws java.io.ObjectStreamException {        return super.writeReplace();      } @@ -708,12 +712,12 @@ public final class Test {        private int bitField0_;        // optional string id = 1; -      private Object id_ = ""; +      private java.lang.Object id_ = "";        public boolean hasId() {          return ((bitField0_ & 0x00000001) == 0x00000001);        }        public String getId() { -        Object ref = id_; +        java.lang.Object ref = id_;          if (!(ref instanceof String)) {            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();            id_ = s;  | 
