summaryrefslogtreecommitdiff
path: root/same/src/main
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 15:43:43 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-16 15:43:43 +0100
commit25d3d6d1ff6f57ad354f03a0b6cc4f1a897bb342 (patch)
tree208c70014add61dc9b5de25d6a8f502ed6b3ccc0 /same/src/main
parent9444d8b16c93581bbab7928304cb14622050f691 (diff)
Implement all MasterService operations in NewMaster.
Tested with tests ported from old MasterImplTest.
Diffstat (limited to 'same/src/main')
-rw-r--r--same/src/main/java/com/orbekk/same/NewMaster.java71
1 files changed, 69 insertions, 2 deletions
diff --git a/same/src/main/java/com/orbekk/same/NewMaster.java b/same/src/main/java/com/orbekk/same/NewMaster.java
index 161586f..c2b2d9b 100644
--- a/same/src/main/java/com/orbekk/same/NewMaster.java
+++ b/same/src/main/java/com/orbekk/same/NewMaster.java
@@ -1,5 +1,7 @@
package com.orbekk.same;
+import com.orbekk.same.State.Component;
+import java.util.Collections;
import java.util.List;
import com.orbekk.util.WorkQueue;
import org.slf4j.Logger;
@@ -29,36 +31,101 @@ public class NewMaster {
@Override
public boolean updateStateRequest(String component,
String newData, long revision) {
- return false;
+ logger.info("updateStateRequest({}, {}, {})",
+ new Object[]{component, newData, revision});
+ boolean updated = state.update(component, newData, revision + 1);
+ if (updated) {
+ updateStateRequestThread.add(component);
+ }
+ return updated;
}
@Override
public void joinNetworkRequest(String clientUrl) {
+ logger.info("joinNetworkRequest({})", clientUrl);
+ List<String> participants = state.getList(".participants");
+ sendFullStateThread.add(clientUrl);
+ if (!participants.contains(clientUrl)) {
+ participants.add(clientUrl);
+ synchronized (NewMaster.this) {
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ }
+ updateStateRequestThread.add(".participants");
+ } else {
+ logger.warn("Client {} joining: Already part of network");
+ }
}
};
WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
@Override protected void onChange() {
List<String> pending = getAndClear();
+ logger.info("updateStateRequestThread: Updated state: {}",
+ pending);
for (String componentName : pending) {
- logger.info("Component updated: {}", componentName);
+ Component component = state.getComponent(componentName);
+ List<String> participants = state.getList(".participants");
+ broadcastNewComponents(participants,
+ Collections.singletonList(component));
}
}
};
+ WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
+ @Override protected void onChange() {
+ List<String> pending = getAndClear();
+ logger.info("Sending full state to {}", pending);
+ final List<Component> components = state.getComponents();
+ broadcastNewComponents(pending, components);
+ }
+ };
+
void performWork() {
+ sendFullStateThread.performWork();
updateStateRequestThread.performWork();
}
public void start() {
+ sendFullStateThread.start();
updateStateRequestThread.start();
}
public void interrupt() {
+ sendFullStateThread.interrupt();
updateStateRequestThread.interrupt();
}
public MasterService getService() {
return serviceImpl;
}
+
+ private synchronized void removeParticipant(String url) {
+ List<String> participants = state.getList(".participants");
+ if (participants.contains(url)) {
+ logger.info("removeParticipant({})", url);
+ participants.remove(url);
+ state.updateFromObject(".participants", participants,
+ state.getRevision(".participants") + 1);
+ updateStateRequestThread.add(".participants");
+ }
+ }
+
+ private void broadcastNewComponents(List<String> destinations,
+ final List<State.Component> components) {
+ broadcaster.broadcast(destinations, new ServiceOperation() {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
+ try {
+ for (Component c : components) {
+ client.setState(c.getName(), c.getData(),
+ c.getRevision());
+ }
+ } catch (Exception e) {
+ logger.info("Client {} failed to receive state update.", url);
+ removeParticipant(url);
+ }
+ }
+ });
+ }
}