diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-05-08 10:20:39 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-05-08 10:20:39 +0200 |
commit | 7bc7fe45b23f048a6bd609b7fbd01270b498a20b (patch) | |
tree | 69bb11e40dbc9c119f7461c430804d3ae9ef86af /same/src/main/java/com/orbekk/same | |
parent | 561b6ab936f1b60d364e81b08322899a931ecc2e (diff) |
Get rid of queues in Master.
– Remove WorkQueue code.
– Remove delayed operations in master.
(Handled by RPC instead)
Diffstat (limited to 'same/src/main/java/com/orbekk/same')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 1 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 108 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/ServicesPbConversion.java | 14 |
3 files changed, 47 insertions, 76 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index bbf9ca2..f632241 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -32,7 +32,6 @@ import com.orbekk.paxos.MasterProposer; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.FullStateResponse; -import com.orbekk.same.Services.FullStateResponse.Builder; import com.orbekk.same.Services.MasterState; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index b32fedd..90601df 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -15,7 +15,6 @@ */ package com.orbekk.same; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; -import com.orbekk.util.WorkQueue; public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -97,8 +95,8 @@ public class Master { private Services.Master newMasterImpl = new Services.Master() { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback<Empty> done) { - logger.info("joinNetworkRequest({})", request); - sendFullStateThread.add(request.getLocation()); + sendInitialMasterTakeover(request.getLocation()); + sendFullState(request.getLocation()); addParticipant(request.getLocation()); done.run(Empty.getDefaultInstance()); } @@ -112,31 +110,35 @@ public class Master { success = true; long newRevision = revision.incrementAndGet(); state.forceUpdate(request.getId(), request.getData(), newRevision); - updateStateRequestThread.add(request.getId()); + sendStateToClients(state.getComponent(request.getId())); } done.run(Services.UpdateComponentResponse.newBuilder() .setSuccess(success).build()); } }; - WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - List<Component> updatedComponents = new ArrayList<Component>(); - for (String component : pending) { - updatedComponents.add(state.getComponent(component)); - } - - logger.info("updateStateRequestThread: Updated state: {}", - pending); - for (String clientLocation : state.getList( - com.orbekk.same.State.PARTICIPANTS)) { - sendComponents(clientLocation, updatedComponents); - } + private void sendStateToClients(State.Component component) { + for (String clientLocation : state.getList( + com.orbekk.same.State.PARTICIPANTS)) { + sendComponent(clientLocation, component); + } + } + + private void sendComponent(String clientLocation, Component component) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); } - }; - public void sendComponents(String clientLocation, + Services.Component componentProto = ServicesPbConversion.componentToPb(component); + Rpc rpc = rpcf.create(); + RpcCallback<Empty> done = + new RemoveParticipantIfFailsCallback<Empty>(clientLocation, + rpc); + client.setState(rpc, componentProto, done); + } + + private void sendComponents(String clientLocation, List<Component> components) { Services.Client client = connections.getClient0(clientLocation); if (client == null) { @@ -144,7 +146,7 @@ public class Master { } for (Component component : components) { - Services.Component componentProto = componentToProto(component); + Services.Component componentProto = ServicesPbConversion.componentToPb(component); Rpc rpc = rpcf.create(); RpcCallback<Empty> done = new RemoveParticipantIfFailsCallback<Empty>(clientLocation, @@ -153,51 +155,27 @@ public class Master { } } - WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - logger.info("Sending full state to {}", pending); - final List<Component> components = state.getComponents(); - for (String clientLocation : pending) { - Services.Client client = connections.getClient0(clientLocation); - if (client == null) { - removeParticipant(clientLocation); - continue; - } - - { // Send masterTakeover(). - Rpc rpc = rpcf.create(); - RpcCallback<MasterTakeoverResponse> done = - new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( - clientLocation, rpc); - client.masterTakeover(rpc, getMasterInfo(), done); - } - sendComponents(clientLocation, components); - } - } - }; - - private Services.Component componentToProto(State.Component component) { - return Services.Component.newBuilder() - .setId(component.getName()) - .setData(component.getData()) - .setRevision(component.getRevision()) - .build(); + private void sendFullState(String clientLocation) { + List<Component> components = state.getComponents(); + sendComponents(clientLocation, components); + } + + private void sendInitialMasterTakeover(String clientLocation) { + Services.Client client = connections.getClient0(clientLocation); + Rpc rpc = rpcf.create(); + RpcCallback<MasterTakeoverResponse> done = + new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( + clientLocation, rpc); + client.masterTakeover(rpc, getMasterInfo(), done); } void performWork() { - sendFullStateThread.performWork(); - updateStateRequestThread.performWork(); } public void start() { - sendFullStateThread.start(); - updateStateRequestThread.start(); } public void interrupt() { - sendFullStateThread.interrupt(); - updateStateRequestThread.interrupt(); } public Services.Master getNewService() { @@ -210,29 +188,19 @@ public class Master { participants.add(location); state.updateFromObject(State.PARTICIPANTS, participants, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } private synchronized void removeParticipant(String url) { - /** TODO: Remove this code. */ - List<String> participants = state.getList(".participants"); - if (participants.contains(url)) { - logger.info("removeParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - updateStateRequestThread.add(".participants"); - } - List<String> participants0 = state.getList(State.PARTICIPANTS); if (participants0.contains(url)) { logger.info("removeParticipant({})", url); participants0.remove(url); state.updateFromObject(State.PARTICIPANTS, participants0, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } diff --git a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java index 5638e1a..7fc52f0 100644 --- a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java +++ b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java @@ -7,12 +7,16 @@ public class ServicesPbConversion { public static List<Services.Component> componentsToPb(List<State.Component> components) { List<Services.Component> results = new ArrayList<Services.Component>(); for (State.Component c : components) { - results.add(Services.Component.newBuilder() - .setId(c.getName()) - .setRevision(c.getRevision()) - .setData(c.getData()) - .build()); + results.add(componentToPb(c)); } return results; } + + public static Services.Component componentToPb(State.Component component) { + return Services.Component.newBuilder() + .setId(component.getName()) + .setRevision(component.getRevision()) + .setData(component.getData()) + .build(); + } } |