summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-28 16:48:44 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-28 16:48:44 +0200
commitd317a85a7b76ee94ebc31c218dc877bfadd54902 (patch)
tree59e58757df7bea1b153dcdb79c863759733eb843
parent4ac1dadfcd211640a4b52db240f6f72138bd8e45 (diff)
Client support for asynchronous requests.
-rw-r--r--pom.xml14
-rw-r--r--src/main/java/com/orbekk/protobuf/Rpc.java167
-rw-r--r--src/main/java/com/orbekk/protobuf/Rpc.proto6
-rw-r--r--src/main/java/com/orbekk/protobuf/RpcChannel.java182
-rw-r--r--src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java43
-rw-r--r--src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java9
-rw-r--r--src/main/java/com/orbekk/protobuf/Test.java28
7 files changed, 393 insertions, 56 deletions
diff --git a/pom.xml b/pom.xml
index 137d1a8..3319230 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,19 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.java b/src/main/java/com/orbekk/protobuf/Rpc.java
index c74e48a..38ff1ea 100644
--- a/src/main/java/com/orbekk/protobuf/Rpc.java
+++ b/src/main/java/com/orbekk/protobuf/Rpc.java
@@ -22,6 +22,10 @@ public final class Rpc {
// optional bytes request_proto = 3;
boolean hasRequestProto();
com.google.protobuf.ByteString getRequestProto();
+
+ // optional int64 request_id = 4;
+ boolean hasRequestId();
+ long getRequestId();
}
public static final class Request extends
com.google.protobuf.GeneratedMessage
@@ -54,12 +58,12 @@ public final class Rpc {
private int bitField0_;
// optional string full_service_name = 1;
public static final int FULL_SERVICE_NAME_FIELD_NUMBER = 1;
- private Object fullServiceName_;
+ private java.lang.Object fullServiceName_;
public boolean hasFullServiceName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getFullServiceName() {
- Object ref = fullServiceName_;
+ java.lang.Object ref = fullServiceName_;
if (ref instanceof String) {
return (String) ref;
} else {
@@ -73,7 +77,7 @@ public final class Rpc {
}
}
private com.google.protobuf.ByteString getFullServiceNameBytes() {
- Object ref = fullServiceName_;
+ java.lang.Object ref = fullServiceName_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
@@ -86,12 +90,12 @@ public final class Rpc {
// optional string method_name = 2;
public static final int METHOD_NAME_FIELD_NUMBER = 2;
- private Object methodName_;
+ private java.lang.Object methodName_;
public boolean hasMethodName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public String getMethodName() {
- Object ref = methodName_;
+ java.lang.Object ref = methodName_;
if (ref instanceof String) {
return (String) ref;
} else {
@@ -105,7 +109,7 @@ public final class Rpc {
}
}
private com.google.protobuf.ByteString getMethodNameBytes() {
- Object ref = methodName_;
+ java.lang.Object ref = methodName_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
@@ -126,10 +130,21 @@ public final class Rpc {
return requestProto_;
}
+ // optional int64 request_id = 4;
+ public static final int REQUEST_ID_FIELD_NUMBER = 4;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
private void initFields() {
fullServiceName_ = "";
methodName_ = "";
requestProto_ = com.google.protobuf.ByteString.EMPTY;
+ requestId_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -152,6 +167,9 @@ public final class Rpc {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, requestProto_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt64(4, requestId_);
+ }
getUnknownFields().writeTo(output);
}
@@ -173,13 +191,19 @@ public final class Rpc {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, requestProto_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(4, requestId_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
+ private static final long serialVersionUID = 0L;
@java.lang.Override
- protected Object writeReplace() throws java.io.ObjectStreamException {
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
return super.writeReplace();
}
@@ -301,6 +325,8 @@ public final class Rpc {
bitField0_ = (bitField0_ & ~0x00000002);
requestProto_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -351,6 +377,10 @@ public final class Rpc {
to_bitField0_ |= 0x00000004;
}
result.requestProto_ = requestProto_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.requestId_ = requestId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -376,6 +406,9 @@ public final class Rpc {
if (other.hasRequestProto()) {
setRequestProto(other.getRequestProto());
}
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -422,6 +455,11 @@ public final class Rpc {
requestProto_ = input.readBytes();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ requestId_ = input.readInt64();
+ break;
+ }
}
}
}
@@ -429,12 +467,12 @@ public final class Rpc {
private int bitField0_;
// optional string full_service_name = 1;
- private Object fullServiceName_ = "";
+ private java.lang.Object fullServiceName_ = "";
public boolean hasFullServiceName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getFullServiceName() {
- Object ref = fullServiceName_;
+ java.lang.Object ref = fullServiceName_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
fullServiceName_ = s;
@@ -465,12 +503,12 @@ public final class Rpc {
}
// optional string method_name = 2;
- private Object methodName_ = "";
+ private java.lang.Object methodName_ = "";
public boolean hasMethodName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public String getMethodName() {
- Object ref = methodName_;
+ java.lang.Object ref = methodName_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
methodName_ = s;
@@ -524,6 +562,27 @@ public final class Rpc {
return this;
}
+ // optional int64 request_id = 4;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000008;
+ requestId_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ requestId_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:com.orbekk.protobuf.Request)
}
@@ -553,6 +612,10 @@ public final class Rpc {
// optional string error_message = 3;
boolean hasErrorMessage();
String getErrorMessage();
+
+ // optional int64 request_id = 5;
+ boolean hasRequestId();
+ long getRequestId();
}
public static final class Response extends
com.google.protobuf.GeneratedMessage
@@ -690,12 +753,12 @@ public final class Rpc {
// optional string error_message = 3;
public static final int ERROR_MESSAGE_FIELD_NUMBER = 3;
- private Object errorMessage_;
+ private java.lang.Object errorMessage_;
public boolean hasErrorMessage() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public String getErrorMessage() {
- Object ref = errorMessage_;
+ java.lang.Object ref = errorMessage_;
if (ref instanceof String) {
return (String) ref;
} else {
@@ -709,7 +772,7 @@ public final class Rpc {
}
}
private com.google.protobuf.ByteString getErrorMessageBytes() {
- Object ref = errorMessage_;
+ java.lang.Object ref = errorMessage_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
@@ -720,11 +783,22 @@ public final class Rpc {
}
}
+ // optional int64 request_id = 5;
+ public static final int REQUEST_ID_FIELD_NUMBER = 5;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
private void initFields() {
responseProto_ = com.google.protobuf.ByteString.EMPTY;
error_ = com.orbekk.protobuf.Rpc.Response.Error.UNKNOWN_SERVICE;
appError_ = 0;
errorMessage_ = "";
+ requestId_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -750,6 +824,9 @@ public final class Rpc {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(4, appError_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt64(5, requestId_);
+ }
getUnknownFields().writeTo(output);
}
@@ -775,13 +852,19 @@ public final class Rpc {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(4, appError_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(5, requestId_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
+ private static final long serialVersionUID = 0L;
@java.lang.Override
- protected Object writeReplace() throws java.io.ObjectStreamException {
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
return super.writeReplace();
}
@@ -905,6 +988,8 @@ public final class Rpc {
bitField0_ = (bitField0_ & ~0x00000004);
errorMessage_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -959,6 +1044,10 @@ public final class Rpc {
to_bitField0_ |= 0x00000008;
}
result.errorMessage_ = errorMessage_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.requestId_ = requestId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -987,6 +1076,9 @@ public final class Rpc {
if (other.hasErrorMessage()) {
setErrorMessage(other.getErrorMessage());
}
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1044,6 +1136,11 @@ public final class Rpc {
appError_ = input.readInt32();
break;
}
+ case 40: {
+ bitField0_ |= 0x00000010;
+ requestId_ = input.readInt64();
+ break;
+ }
}
}
}
@@ -1120,12 +1217,12 @@ public final class Rpc {
}
// optional string error_message = 3;
- private Object errorMessage_ = "";
+ private java.lang.Object errorMessage_ = "";
public boolean hasErrorMessage() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public String getErrorMessage() {
- Object ref = errorMessage_;
+ java.lang.Object ref = errorMessage_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
errorMessage_ = s;
@@ -1155,6 +1252,27 @@ public final class Rpc {
onChanged();
}
+ // optional int64 request_id = 5;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000010;
+ requestId_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ requestId_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:com.orbekk.protobuf.Response)
}
@@ -1186,12 +1304,13 @@ public final class Rpc {
static {
java.lang.String[] descriptorData = {
"\n+src/main/java/com/orbekk/protobuf/Rpc." +
- "proto\022\023com.orbekk.protobuf\"P\n\007Request\022\031\n" +
+ "proto\022\023com.orbekk.protobuf\"d\n\007Request\022\031\n" +
"\021full_service_name\030\001 \001(\t\022\023\n\013method_name\030" +
- "\002 \001(\t\022\025\n\rrequest_proto\030\003 \001(\014\"\317\001\n\010Respons" +
- "e\022\026\n\016response_proto\030\001 \001(\014\0222\n\005error\030\002 \001(\016" +
- "2#.com.orbekk.protobuf.Response.Error\022\021\n" +
- "\tapp_error\030\004 \001(\005\022\025\n\rerror_message\030\003 \001(\t\"" +
+ "\002 \001(\t\022\025\n\rrequest_proto\030\003 \001(\014\022\022\n\nrequest_" +
+ "id\030\004 \001(\003\"\343\001\n\010Response\022\026\n\016response_proto\030" +
+ "\001 \001(\014\0222\n\005error\030\002 \001(\0162#.com.orbekk.protob" +
+ "uf.Response.Error\022\021\n\tapp_error\030\004 \001(\005\022\025\n\r" +
+ "error_message\030\003 \001(\t\022\022\n\nrequest_id\030\005 \001(\003\"" +
"M\n\005Error\022\023\n\017UNKNOWN_SERVICE\020\000\022\022\n\016UNKNOWN" +
"_METHOD\020\001\022\014\n\010CANCELED\020\002\022\r\n\tAPP_ERROR\020\003"
};
@@ -1205,7 +1324,7 @@ public final class Rpc {
internal_static_com_orbekk_protobuf_Request_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_com_orbekk_protobuf_Request_descriptor,
- new java.lang.String[] { "FullServiceName", "MethodName", "RequestProto", },
+ new java.lang.String[] { "FullServiceName", "MethodName", "RequestProto", "RequestId", },
com.orbekk.protobuf.Rpc.Request.class,
com.orbekk.protobuf.Rpc.Request.Builder.class);
internal_static_com_orbekk_protobuf_Response_descriptor =
@@ -1213,7 +1332,7 @@ public final class Rpc {
internal_static_com_orbekk_protobuf_Response_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_com_orbekk_protobuf_Response_descriptor,
- new java.lang.String[] { "ResponseProto", "Error", "AppError", "ErrorMessage", },
+ new java.lang.String[] { "ResponseProto", "Error", "AppError", "ErrorMessage", "RequestId", },
com.orbekk.protobuf.Rpc.Response.class,
com.orbekk.protobuf.Rpc.Response.Builder.class);
return null;
diff --git a/src/main/java/com/orbekk/protobuf/Rpc.proto b/src/main/java/com/orbekk/protobuf/Rpc.proto
index f733140..e4865fa 100644
--- a/src/main/java/com/orbekk/protobuf/Rpc.proto
+++ b/src/main/java/com/orbekk/protobuf/Rpc.proto
@@ -1,13 +1,14 @@
package com.orbekk.protobuf;
-// Next tag: 4
+// Next tag: 6
message Request {
optional string full_service_name = 1;
optional string method_name = 2;
optional bytes request_proto = 3;
+ optional int64 request_id = 4;
}
-// Next tag: 4
+// Next tag: 6
message Response {
optional bytes response_proto = 1;
enum Error {
@@ -19,4 +20,5 @@ message Response {
optional Error error = 2;
optional int32 app_error = 4;
optional string error_message = 3;
+ optional int64 request_id = 5;
}
diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java
new file mode 100644
index 0000000..56b54c2
--- /dev/null
+++ b/src/main/java/com/orbekk/protobuf/RpcChannel.java
@@ -0,0 +1,182 @@
+package com.orbekk.protobuf;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+public class RpcChannel extends Thread implements
+ com.google.protobuf.RpcChannel {
+ static final Logger logger =
+ Logger.getLogger(RpcChannel.class.getName());
+ private String host;
+ private int port;
+ private volatile Socket socket = null;
+ private AtomicLong nextId = new AtomicLong(0);
+ private Map<Long, RpcChannel.OngoingRequest> rpcs =
+ Collections.synchronizedMap(
+ new HashMap<Long, RpcChannel.OngoingRequest>());
+
+ private static class OngoingRequest implements Closeable {
+ long id;
+ RpcController controller;
+ RpcCallback<Message> done;
+ Message responsePrototype;
+ Map<Long, RpcChannel.OngoingRequest> rpcs;
+
+ public OngoingRequest(long id, RpcController controller,
+ RpcCallback<Message> done, Message responsePrototype,
+ Map<Long, RpcChannel.OngoingRequest> rpcs) {
+ this.id = id;
+ this.controller = controller;
+ this.done = done;
+ this.responsePrototype = responsePrototype;
+ this.rpcs = rpcs;
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new AssertionError("Not implemented");
+ }
+ }
+
+ public static RpcChannel create(String host, int port) {
+ RpcChannel channel = new RpcChannel(host, port);
+ channel.start();
+ return channel;
+ }
+
+ private RpcChannel(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ private Socket getSocket() {
+ if (socket == null || socket.isClosed()) {
+ try {
+ logger.info("Creating new socket to " + host + ":" + port);
+ synchronized (this) {
+ socket = new Socket(host, port);
+ notify();
+ }
+ } catch (UnknownHostException e) {
+ return null;
+ } catch (IOException e) {
+ logger.log(Level.WARNING,
+ "Could not establish connection.", e);
+ return null;
+ }
+ }
+ return socket;
+ }
+
+ private Rpc.Request createRequest(Descriptors.MethodDescriptor method,
+ RpcController controller,
+ Message requestMessage,
+ Message responsePrototype,
+ RpcCallback<Message> done) {
+ long id = nextId.incrementAndGet();
+ OngoingRequest ongoingRequest = new OngoingRequest(id, controller,
+ done, responsePrototype, rpcs);
+ rpcs.put(id, ongoingRequest);
+
+ Rpc.Request request = Rpc.Request.newBuilder()
+ .setRequestId(id)
+ .setFullServiceName(method.getService().getFullName())
+ .setMethodName(method.getName())
+ .setRequestProto(requestMessage.toByteString())
+ .build();
+
+ return request;
+ }
+
+ private void finishRequest(Rpc.Response response) {
+ OngoingRequest ongoingRequest = rpcs.remove(response.getRequestId());
+ if (ongoingRequest != null) {
+ try {
+ Message responsePb = ongoingRequest.responsePrototype.toBuilder()
+ .mergeFrom(response.getResponseProto()).build();
+ ongoingRequest.done.run(responsePb);
+ } catch (InvalidProtocolBufferException e) {
+ throw new AssertionError("Should fail here.");
+ }
+ }
+ }
+
+ @Override public void callMethod(
+ Descriptors.MethodDescriptor method,
+ RpcController controller,
+ Message requestMessage,
+ Message responsePrototype,
+ RpcCallback<Message> done) {
+ try {
+ Rpc.Request request = createRequest(method, controller,
+ requestMessage, responsePrototype, done);
+ Socket socket = getSocket();
+ request.writeDelimitedTo(socket.getOutputStream());
+ } catch (IOException e) {
+ throw new AssertionError("Should return error.");
+ }
+ }
+
+ private void handleResponses(Socket socket) {
+ try {
+ logger.info("Handling responses to socket " + socket);
+ while (!socket.isClosed()) {
+ Rpc.Response response;
+ response = Rpc.Response.parseDelimitedFrom(
+ socket.getInputStream());
+ finishRequest(response);
+ }
+ } catch (IOException e) {
+ // Breaks the loop.
+ } finally {
+ if (socket != null && !socket.isClosed()) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ // Socket is closed.
+ }
+ }
+ }
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ synchronized (this) {
+ if (socket == null) {
+ wait();
+ }
+ }
+ if (socket != null) {
+ handleResponses(socket);
+ }
+ } catch (InterruptedException e) {
+ // Interrupts handled by outer loop
+ }
+ }
+ }
+
+ public void close() {
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ logger.info("Error closing socket.");
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java
index ffaedb0..b17c0bc 100644
--- a/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java
+++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufClient.java
@@ -1,26 +1,35 @@
package com.orbekk.protobuf;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Logger;
+
+import com.google.protobuf.RpcCallback;
public class SimpleProtobufClient {
+ static final Logger logger =
+ Logger.getLogger(SimpleProtobufClient.class.getName());
+
public void run() {
- try {
- Socket socket = new Socket("localhost", 10000);
- Rpc.Request r1 = Rpc.Request.newBuilder()
- .setFullServiceName("com.orbekk.protobuf.TestService")
- .setMethodName("Run")
+ RpcChannel channel = RpcChannel.create("localhost", 10000);
+ Test.TestService test = Test.TestService.newStub(channel);
+ Test.TestRequest request = Test.TestRequest.newBuilder()
+ .setId("Hello!")
.build();
- Rpc.Request r2 = Rpc.Request.newBuilder()
- .setFullServiceName("Service2")
- .build();
- r1.writeDelimitedTo(socket.getOutputStream());
- r2.writeDelimitedTo(socket.getOutputStream());
- } catch (UnknownHostException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
+ int count = 10;
+ final CountDownLatch stop = new CountDownLatch(count);
+ for (int i = 0; i < count; i++) {
+ logger.info("Sending request.");
+ test.run(null, request, new RpcCallback<Test.TestResponse>() {
+ @Override public void run(Test.TestResponse response) {
+ System.out.println("Response from server: " + response);
+ stop.countDown();
+ }
+ });
+ }
+ try {
+ stop.await();
+ } catch (InterruptedException e) {
+ // Stop waiting.
}
}
diff --git a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
index 0a20883..a6dd7c4 100644
--- a/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
+++ b/src/main/java/com/orbekk/protobuf/SimpleProtobufServer.java
@@ -53,6 +53,7 @@ public class SimpleProtobufServer extends Thread {
throws IOException {
Service service = registeredServices.get(request.getFullServiceName());
final Rpc.Response.Builder response = Rpc.Response.newBuilder();
+ response.setRequestId(request.getRequestId());
if (service == null) {
response.setError(Rpc.Response.Error.UNKNOWN_SERVICE);
response.build().writeDelimitedTo(out);
@@ -75,6 +76,7 @@ public class SimpleProtobufServer extends Thread {
.mergeFrom(request.getRequestProto())
.build();
service.callMethod(method, null, requestMessage, doneCallback);
+ response.build().writeDelimitedTo(out);
}
private void handleConnection(final Socket connection) {
@@ -84,6 +86,13 @@ public class SimpleProtobufServer extends Thread {
while (true) {
Rpc.Request r1 = Rpc.Request.parseDelimitedFrom(
connection.getInputStream());
+ if (r1 == null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ // Connection is closed.
+ }
+ }
handleRequest(r1, connection.getOutputStream());
}
} catch (IOException e) {
diff --git a/src/main/java/com/orbekk/protobuf/Test.java b/src/main/java/com/orbekk/protobuf/Test.java
index 8b17a74..f1be1a9 100644
--- a/src/main/java/com/orbekk/protobuf/Test.java
+++ b/src/main/java/com/orbekk/protobuf/Test.java
@@ -46,12 +46,12 @@ public final class Test {
private int bitField0_;
// optional string id = 1;
public static final int ID_FIELD_NUMBER = 1;
- private Object id_;
+ private java.lang.Object id_;
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getId() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (ref instanceof String) {
return (String) ref;
} else {
@@ -65,7 +65,7 @@ public final class Test {
}
}
private com.google.protobuf.ByteString getIdBytes() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
@@ -112,8 +112,10 @@ public final class Test {
return size;
}
+ private static final long serialVersionUID = 0L;
@java.lang.Override
- protected Object writeReplace() throws java.io.ObjectStreamException {
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
return super.writeReplace();
}
@@ -335,12 +337,12 @@ public final class Test {
private int bitField0_;
// optional string id = 1;
- private Object id_ = "";
+ private java.lang.Object id_ = "";
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getId() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
id_ = s;
@@ -419,12 +421,12 @@ public final class Test {
private int bitField0_;
// optional string id = 1;
public static final int ID_FIELD_NUMBER = 1;
- private Object id_;
+ private java.lang.Object id_;
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getId() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (ref instanceof String) {
return (String) ref;
} else {
@@ -438,7 +440,7 @@ public final class Test {
}
}
private com.google.protobuf.ByteString getIdBytes() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
@@ -485,8 +487,10 @@ public final class Test {
return size;
}
+ private static final long serialVersionUID = 0L;
@java.lang.Override
- protected Object writeReplace() throws java.io.ObjectStreamException {
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
return super.writeReplace();
}
@@ -708,12 +712,12 @@ public final class Test {
private int bitField0_;
// optional string id = 1;
- private Object id_ = "";
+ private java.lang.Object id_ = "";
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getId() {
- Object ref = id_;
+ java.lang.Object ref = id_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
id_ = s;