summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-08 10:20:39 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-08 10:20:39 +0200
commit7bc7fe45b23f048a6bd609b7fbd01270b498a20b (patch)
tree69bb11e40dbc9c119f7461c430804d3ae9ef86af /same/src/main/java/com/orbekk/same
parent561b6ab936f1b60d364e81b08322899a931ecc2e (diff)
Get rid of queues in Master.
– Remove WorkQueue code. – Remove delayed operations in master. (Handled by RPC instead)
Diffstat (limited to 'same/src/main/java/com/orbekk/same')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java1
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java108
-rw-r--r--same/src/main/java/com/orbekk/same/ServicesPbConversion.java14
3 files changed, 47 insertions, 76 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index bbf9ca2..f632241 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -32,7 +32,6 @@ import com.orbekk.paxos.MasterProposer;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.FullStateResponse;
-import com.orbekk.same.Services.FullStateResponse.Builder;
import com.orbekk.same.Services.MasterState;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index b32fedd..90601df 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -15,7 +15,6 @@
*/
package com.orbekk.same;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
-import com.orbekk.util.WorkQueue;
public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
@@ -97,8 +95,8 @@ public class Master {
private Services.Master newMasterImpl = new Services.Master() {
@Override public void joinNetworkRequest(RpcController controller,
ClientState request, RpcCallback<Empty> done) {
- logger.info("joinNetworkRequest({})", request);
- sendFullStateThread.add(request.getLocation());
+ sendInitialMasterTakeover(request.getLocation());
+ sendFullState(request.getLocation());
addParticipant(request.getLocation());
done.run(Empty.getDefaultInstance());
}
@@ -112,31 +110,35 @@ public class Master {
success = true;
long newRevision = revision.incrementAndGet();
state.forceUpdate(request.getId(), request.getData(), newRevision);
- updateStateRequestThread.add(request.getId());
+ sendStateToClients(state.getComponent(request.getId()));
}
done.run(Services.UpdateComponentResponse.newBuilder()
.setSuccess(success).build());
}
};
- WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- List<Component> updatedComponents = new ArrayList<Component>();
- for (String component : pending) {
- updatedComponents.add(state.getComponent(component));
- }
-
- logger.info("updateStateRequestThread: Updated state: {}",
- pending);
- for (String clientLocation : state.getList(
- com.orbekk.same.State.PARTICIPANTS)) {
- sendComponents(clientLocation, updatedComponents);
- }
+ private void sendStateToClients(State.Component component) {
+ for (String clientLocation : state.getList(
+ com.orbekk.same.State.PARTICIPANTS)) {
+ sendComponent(clientLocation, component);
+ }
+ }
+
+ private void sendComponent(String clientLocation, Component component) {
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
}
- };
- public void sendComponents(String clientLocation,
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
+ Rpc rpc = rpcf.create();
+ RpcCallback<Empty> done =
+ new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
+ rpc);
+ client.setState(rpc, componentProto, done);
+ }
+
+ private void sendComponents(String clientLocation,
List<Component> components) {
Services.Client client = connections.getClient0(clientLocation);
if (client == null) {
@@ -144,7 +146,7 @@ public class Master {
}
for (Component component : components) {
- Services.Component componentProto = componentToProto(component);
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
@@ -153,51 +155,27 @@ public class Master {
}
}
- WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("Sending full state to {}", pending);
- final List<Component> components = state.getComponents();
- for (String clientLocation : pending) {
- Services.Client client = connections.getClient0(clientLocation);
- if (client == null) {
- removeParticipant(clientLocation);
- continue;
- }
-
- { // Send masterTakeover().
- Rpc rpc = rpcf.create();
- RpcCallback<MasterTakeoverResponse> done =
- new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
- clientLocation, rpc);
- client.masterTakeover(rpc, getMasterInfo(), done);
- }
- sendComponents(clientLocation, components);
- }
- }
- };
-
- private Services.Component componentToProto(State.Component component) {
- return Services.Component.newBuilder()
- .setId(component.getName())
- .setData(component.getData())
- .setRevision(component.getRevision())
- .build();
+ private void sendFullState(String clientLocation) {
+ List<Component> components = state.getComponents();
+ sendComponents(clientLocation, components);
+ }
+
+ private void sendInitialMasterTakeover(String clientLocation) {
+ Services.Client client = connections.getClient0(clientLocation);
+ Rpc rpc = rpcf.create();
+ RpcCallback<MasterTakeoverResponse> done =
+ new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
+ clientLocation, rpc);
+ client.masterTakeover(rpc, getMasterInfo(), done);
}
void performWork() {
- sendFullStateThread.performWork();
- updateStateRequestThread.performWork();
}
public void start() {
- sendFullStateThread.start();
- updateStateRequestThread.start();
}
public void interrupt() {
- sendFullStateThread.interrupt();
- updateStateRequestThread.interrupt();
}
public Services.Master getNewService() {
@@ -210,29 +188,19 @@ public class Master {
participants.add(location);
state.updateFromObject(State.PARTICIPANTS, participants,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}
private synchronized void removeParticipant(String url) {
- /** TODO: Remove this code. */
- List<String> participants = state.getList(".participants");
- if (participants.contains(url)) {
- logger.info("removeParticipant({})", url);
- participants.remove(url);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- updateStateRequestThread.add(".participants");
- }
-
List<String> participants0 = state.getList(State.PARTICIPANTS);
if (participants0.contains(url)) {
logger.info("removeParticipant({})", url);
participants0.remove(url);
state.updateFromObject(State.PARTICIPANTS, participants0,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}
diff --git a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
index 5638e1a..7fc52f0 100644
--- a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
+++ b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
@@ -7,12 +7,16 @@ public class ServicesPbConversion {
public static List<Services.Component> componentsToPb(List<State.Component> components) {
List<Services.Component> results = new ArrayList<Services.Component>();
for (State.Component c : components) {
- results.add(Services.Component.newBuilder()
- .setId(c.getName())
- .setRevision(c.getRevision())
- .setData(c.getData())
- .build());
+ results.add(componentToPb(c));
}
return results;
}
+
+ public static Services.Component componentToPb(State.Component component) {
+ return Services.Component.newBuilder()
+ .setId(component.getName())
+ .setRevision(component.getRevision())
+ .setData(component.getData())
+ .build();
+ }
}