diff options
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 108 |
1 files changed, 38 insertions, 70 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index b32fedd..90601df 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -15,7 +15,6 @@ */ package com.orbekk.same; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; -import com.orbekk.util.WorkQueue; public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -97,8 +95,8 @@ public class Master { private Services.Master newMasterImpl = new Services.Master() { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback<Empty> done) { - logger.info("joinNetworkRequest({})", request); - sendFullStateThread.add(request.getLocation()); + sendInitialMasterTakeover(request.getLocation()); + sendFullState(request.getLocation()); addParticipant(request.getLocation()); done.run(Empty.getDefaultInstance()); } @@ -112,31 +110,35 @@ public class Master { success = true; long newRevision = revision.incrementAndGet(); state.forceUpdate(request.getId(), request.getData(), newRevision); - updateStateRequestThread.add(request.getId()); + sendStateToClients(state.getComponent(request.getId())); } done.run(Services.UpdateComponentResponse.newBuilder() .setSuccess(success).build()); } }; - WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - List<Component> updatedComponents = new ArrayList<Component>(); - for (String component : pending) { - updatedComponents.add(state.getComponent(component)); - } - - logger.info("updateStateRequestThread: Updated state: {}", - pending); - for (String clientLocation : state.getList( - com.orbekk.same.State.PARTICIPANTS)) { - sendComponents(clientLocation, updatedComponents); - } + private void sendStateToClients(State.Component component) { + for (String clientLocation : state.getList( + com.orbekk.same.State.PARTICIPANTS)) { + sendComponent(clientLocation, component); + } + } + + private void sendComponent(String clientLocation, Component component) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); } - }; - public void sendComponents(String clientLocation, + Services.Component componentProto = ServicesPbConversion.componentToPb(component); + Rpc rpc = rpcf.create(); + RpcCallback<Empty> done = + new RemoveParticipantIfFailsCallback<Empty>(clientLocation, + rpc); + client.setState(rpc, componentProto, done); + } + + private void sendComponents(String clientLocation, List<Component> components) { Services.Client client = connections.getClient0(clientLocation); if (client == null) { @@ -144,7 +146,7 @@ public class Master { } for (Component component : components) { - Services.Component componentProto = componentToProto(component); + Services.Component componentProto = ServicesPbConversion.componentToPb(component); Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RemoveParticipantIfFailsCallback<Empty>(clientLocation, @@ -153,51 +155,27 @@ public class Master { } } - WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - logger.info("Sending full state to {}", pending); - final List<Component> components = state.getComponents(); - for (String clientLocation : pending) { - Services.Client client = connections.getClient0(clientLocation); - if (client == null) { - removeParticipant(clientLocation); - continue; - } - - { // Send masterTakeover(). - Rpc rpc = rpcf.create(); - RpcCallback<MasterTakeoverResponse> done = - new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( - clientLocation, rpc); - client.masterTakeover(rpc, getMasterInfo(), done); - } - sendComponents(clientLocation, components); - } - } - }; - - private Services.Component componentToProto(State.Component component) { - return Services.Component.newBuilder() - .setId(component.getName()) - .setData(component.getData()) - .setRevision(component.getRevision()) - .build(); + private void sendFullState(String clientLocation) { + List<Component> components = state.getComponents(); + sendComponents(clientLocation, components); + } + + private void sendInitialMasterTakeover(String clientLocation) { + Services.Client client = connections.getClient0(clientLocation); + Rpc rpc = rpcf.create(); + RpcCallback<MasterTakeoverResponse> done = + new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( + clientLocation, rpc); + client.masterTakeover(rpc, getMasterInfo(), done); } void performWork() { - sendFullStateThread.performWork(); - updateStateRequestThread.performWork(); } public void start() { - sendFullStateThread.start(); - updateStateRequestThread.start(); } public void interrupt() { - sendFullStateThread.interrupt(); - updateStateRequestThread.interrupt(); } public Services.Master getNewService() { @@ -210,29 +188,19 @@ public class Master { participants.add(location); state.updateFromObject(State.PARTICIPANTS, participants, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } private synchronized void removeParticipant(String url) { - /** TODO: Remove this code. */ - List<String> participants = state.getList(".participants"); - if (participants.contains(url)) { - logger.info("removeParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - updateStateRequestThread.add(".participants"); - } - List<String> participants0 = state.getList(State.PARTICIPANTS); if (participants0.contains(url)) { logger.info("removeParticipant({})", url); participants0.remove(url); state.updateFromObject(State.PARTICIPANTS, participants0, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } |