summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java108
1 files changed, 38 insertions, 70 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index b32fedd..90601df 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -15,7 +15,6 @@
*/
package com.orbekk.same;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
-import com.orbekk.util.WorkQueue;
public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
@@ -97,8 +95,8 @@ public class Master {
private Services.Master newMasterImpl = new Services.Master() {
@Override public void joinNetworkRequest(RpcController controller,
ClientState request, RpcCallback<Empty> done) {
- logger.info("joinNetworkRequest({})", request);
- sendFullStateThread.add(request.getLocation());
+ sendInitialMasterTakeover(request.getLocation());
+ sendFullState(request.getLocation());
addParticipant(request.getLocation());
done.run(Empty.getDefaultInstance());
}
@@ -112,31 +110,35 @@ public class Master {
success = true;
long newRevision = revision.incrementAndGet();
state.forceUpdate(request.getId(), request.getData(), newRevision);
- updateStateRequestThread.add(request.getId());
+ sendStateToClients(state.getComponent(request.getId()));
}
done.run(Services.UpdateComponentResponse.newBuilder()
.setSuccess(success).build());
}
};
- WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- List<Component> updatedComponents = new ArrayList<Component>();
- for (String component : pending) {
- updatedComponents.add(state.getComponent(component));
- }
-
- logger.info("updateStateRequestThread: Updated state: {}",
- pending);
- for (String clientLocation : state.getList(
- com.orbekk.same.State.PARTICIPANTS)) {
- sendComponents(clientLocation, updatedComponents);
- }
+ private void sendStateToClients(State.Component component) {
+ for (String clientLocation : state.getList(
+ com.orbekk.same.State.PARTICIPANTS)) {
+ sendComponent(clientLocation, component);
+ }
+ }
+
+ private void sendComponent(String clientLocation, Component component) {
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
}
- };
- public void sendComponents(String clientLocation,
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
+ Rpc rpc = rpcf.create();
+ RpcCallback<Empty> done =
+ new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
+ rpc);
+ client.setState(rpc, componentProto, done);
+ }
+
+ private void sendComponents(String clientLocation,
List<Component> components) {
Services.Client client = connections.getClient0(clientLocation);
if (client == null) {
@@ -144,7 +146,7 @@ public class Master {
}
for (Component component : components) {
- Services.Component componentProto = componentToProto(component);
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
@@ -153,51 +155,27 @@ public class Master {
}
}
- WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("Sending full state to {}", pending);
- final List<Component> components = state.getComponents();
- for (String clientLocation : pending) {
- Services.Client client = connections.getClient0(clientLocation);
- if (client == null) {
- removeParticipant(clientLocation);
- continue;
- }
-
- { // Send masterTakeover().
- Rpc rpc = rpcf.create();
- RpcCallback<MasterTakeoverResponse> done =
- new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
- clientLocation, rpc);
- client.masterTakeover(rpc, getMasterInfo(), done);
- }
- sendComponents(clientLocation, components);
- }
- }
- };
-
- private Services.Component componentToProto(State.Component component) {
- return Services.Component.newBuilder()
- .setId(component.getName())
- .setData(component.getData())
- .setRevision(component.getRevision())
- .build();
+ private void sendFullState(String clientLocation) {
+ List<Component> components = state.getComponents();
+ sendComponents(clientLocation, components);
+ }
+
+ private void sendInitialMasterTakeover(String clientLocation) {
+ Services.Client client = connections.getClient0(clientLocation);
+ Rpc rpc = rpcf.create();
+ RpcCallback<MasterTakeoverResponse> done =
+ new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
+ clientLocation, rpc);
+ client.masterTakeover(rpc, getMasterInfo(), done);
}
void performWork() {
- sendFullStateThread.performWork();
- updateStateRequestThread.performWork();
}
public void start() {
- sendFullStateThread.start();
- updateStateRequestThread.start();
}
public void interrupt() {
- sendFullStateThread.interrupt();
- updateStateRequestThread.interrupt();
}
public Services.Master getNewService() {
@@ -210,29 +188,19 @@ public class Master {
participants.add(location);
state.updateFromObject(State.PARTICIPANTS, participants,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}
private synchronized void removeParticipant(String url) {
- /** TODO: Remove this code. */
- List<String> participants = state.getList(".participants");
- if (participants.contains(url)) {
- logger.info("removeParticipant({})", url);
- participants.remove(url);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- updateStateRequestThread.add(".participants");
- }
-
List<String> participants0 = state.getList(State.PARTICIPANTS);
if (participants0.contains(url)) {
logger.info("removeParticipant({})", url);
participants0.remove(url);
state.updateFromObject(State.PARTICIPANTS, participants0,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}