summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java86
1 files changed, 65 insertions, 21 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 6041cf1..86b1e06 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -1,12 +1,20 @@
package com.orbekk.same;
-import com.orbekk.same.State.Component;
import java.util.Collections;
import java.util.List;
-import com.orbekk.util.WorkQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.orbekk.same.Services.ClientState;
+import com.orbekk.same.Services.Empty;
+import com.orbekk.same.Services.UpdateComponentResponse;
+import com.orbekk.same.State.Component;
+import com.orbekk.util.WorkQueue;
+
public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
private final ConnectionManager connections;
@@ -34,26 +42,14 @@ public class Master {
return myUrl;
}
- private MasterService serviceImpl = new MasterService() {
- @Override
- public boolean updateStateRequest(String component,
- String newData, long revision) {
- logger.info("updateStateRequest({}, {}, {})",
- new Object[]{component, newData, revision});
- boolean updated = state.update(component, newData, revision + 1);
- if (updated) {
- updateStateRequestThread.add(component);
- }
- return updated;
- }
-
- @Override
- public void joinNetworkRequest(String clientUrl) {
- logger.info("joinNetworkRequest({})", clientUrl);
+ private Services.Master newMasterImpl = new Services.Master() {
+ @Override public void joinNetworkRequest(RpcController controller,
+ ClientState request, RpcCallback<Empty> done) {
+ logger.info("joinNetworkRequest({})", request);
List<String> participants = state.getList(".participants");
- sendFullStateThread.add(clientUrl);
- if (!participants.contains(clientUrl)) {
- participants.add(clientUrl);
+ sendFullStateThread.add(request.getUrl());
+ if (!participants.contains(request.getUrl())) {
+ participants.add(request.getUrl());
synchronized (Master.this) {
state.updateFromObject(".participants", participants,
state.getRevision(".participants") + 1);
@@ -62,6 +58,54 @@ public class Master {
} else {
logger.warn("Client {} joining: Already part of network");
}
+ done.run(Empty.getDefaultInstance());
+ }
+
+ @Override public void updateStateRequest(RpcController controller,
+ Services.Component request,
+ RpcCallback<Services.UpdateComponentResponse> done) {
+ logger.info("updateStateRequest({})", request);
+ boolean updated = state.update(request.getId(), request.getData(),
+ request.getRevision() + 1);
+ if (updated) {
+ updateStateRequestThread.add(request.getId());
+ }
+ done.run(Services.UpdateComponentResponse.newBuilder()
+ .setSuccess(updated).build());
+ }
+ };
+
+ private MasterService serviceImpl = new MasterService() {
+ @Override
+ public boolean updateStateRequest(String component,
+ String newData, long revision) {
+ Services.Component request = Services.Component.newBuilder()
+ .setId(component)
+ .setData(newData)
+ .setRevision(revision)
+ .build();
+ final AtomicBoolean result = new AtomicBoolean(false);
+ RpcCallback<Services.UpdateComponentResponse> done =
+ new RpcCallback<Services.UpdateComponentResponse>() {
+ @Override public void run(UpdateComponentResponse response) {
+ result.set(response.getSuccess());
+ }
+ };
+ newMasterImpl.updateStateRequest(null, request, done);
+ return result.get();
+ }
+
+ @Override
+ public void joinNetworkRequest(String clientUrl) {
+ Services.ClientState request = Services.ClientState.newBuilder()
+ .setUrl(clientUrl)
+ .build();
+ RpcCallback<Services.Empty> done =
+ new RpcCallback<Services.Empty>() {
+ @Override public void run(Empty response) {
+ }
+ };
+ newMasterImpl.joinNetworkRequest(null, request, done);
}
};