From e100caf587fb7e9db626cf59c6f9dbf9114d72f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 10 Apr 2012 15:19:19 +0200 Subject: Add protobuf Master service. --- same/src/main/java/com/orbekk/same/Master.java | 86 +++++++++++++++++++------- 1 file changed, 65 insertions(+), 21 deletions(-) (limited to 'same/src/main/java/com/orbekk/same/Master.java') diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 6041cf1..86b1e06 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -1,12 +1,20 @@ package com.orbekk.same; -import com.orbekk.same.State.Component; import java.util.Collections; import java.util.List; -import com.orbekk.util.WorkQueue; +import java.util.concurrent.atomic.AtomicBoolean; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.orbekk.same.Services.ClientState; +import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.UpdateComponentResponse; +import com.orbekk.same.State.Component; +import com.orbekk.util.WorkQueue; + public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); private final ConnectionManager connections; @@ -34,26 +42,14 @@ public class Master { return myUrl; } - private MasterService serviceImpl = new MasterService() { - @Override - public boolean updateStateRequest(String component, - String newData, long revision) { - logger.info("updateStateRequest({}, {}, {})", - new Object[]{component, newData, revision}); - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - updateStateRequestThread.add(component); - } - return updated; - } - - @Override - public void joinNetworkRequest(String clientUrl) { - logger.info("joinNetworkRequest({})", clientUrl); + private Services.Master newMasterImpl = new Services.Master() { + @Override public void joinNetworkRequest(RpcController controller, + ClientState request, RpcCallback done) { + logger.info("joinNetworkRequest({})", request); List participants = state.getList(".participants"); - sendFullStateThread.add(clientUrl); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); + sendFullStateThread.add(request.getUrl()); + if (!participants.contains(request.getUrl())) { + participants.add(request.getUrl()); synchronized (Master.this) { state.updateFromObject(".participants", participants, state.getRevision(".participants") + 1); @@ -62,6 +58,54 @@ public class Master { } else { logger.warn("Client {} joining: Already part of network"); } + done.run(Empty.getDefaultInstance()); + } + + @Override public void updateStateRequest(RpcController controller, + Services.Component request, + RpcCallback done) { + logger.info("updateStateRequest({})", request); + boolean updated = state.update(request.getId(), request.getData(), + request.getRevision() + 1); + if (updated) { + updateStateRequestThread.add(request.getId()); + } + done.run(Services.UpdateComponentResponse.newBuilder() + .setSuccess(updated).build()); + } + }; + + private MasterService serviceImpl = new MasterService() { + @Override + public boolean updateStateRequest(String component, + String newData, long revision) { + Services.Component request = Services.Component.newBuilder() + .setId(component) + .setData(newData) + .setRevision(revision) + .build(); + final AtomicBoolean result = new AtomicBoolean(false); + RpcCallback done = + new RpcCallback() { + @Override public void run(UpdateComponentResponse response) { + result.set(response.getSuccess()); + } + }; + newMasterImpl.updateStateRequest(null, request, done); + return result.get(); + } + + @Override + public void joinNetworkRequest(String clientUrl) { + Services.ClientState request = Services.ClientState.newBuilder() + .setUrl(clientUrl) + .build(); + RpcCallback done = + new RpcCallback() { + @Override public void run(Empty response) { + } + }; + newMasterImpl.joinNetworkRequest(null, request, done); } }; -- cgit v1.2.3