summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/Master.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java209
1 files changed, 193 insertions, 16 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 678baf2..0571896 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -15,7 +15,10 @@
*/
package com.orbekk.same;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -26,8 +29,11 @@ import com.google.protobuf.RpcController;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
+import com.orbekk.same.Services.FullStateResponse;
+import com.orbekk.same.Services.MasterState;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
+import com.orbekk.util.RpcList;
public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
@@ -57,6 +63,163 @@ public class Master {
}
}
+ private class MasterTakeover implements Runnable {
+ final List<Services.ClientState> clientStates = new ArrayList<Services.ClientState>();
+ final AtomicBoolean aborted = new AtomicBoolean(false);
+ final List<String> clients = new CopyOnWriteArrayList<String>();
+ final MasterState newMaster;
+
+ private class TakeoverCallback implements RpcCallback<MasterTakeoverResponse> {
+ final String client;
+
+ public TakeoverCallback(String client) {
+ this.client = client;
+ }
+
+ @Override public void run(MasterTakeoverResponse response) {
+ if (response == null) {
+ clients.remove(client);
+ } else if (!response.getSuccess()) {
+ aborted.set(true);
+ } else {
+ clientStates.add(response.getClientState());
+ }
+ }
+ }
+
+ private class FullStateCallback implements RpcCallback<FullStateResponse> {
+ @Override public void run(FullStateResponse response) {
+ if (response != null) {
+ for (Services.Component componentPb : response.getComponentList()) {
+ state.update(componentPb.getId(), componentPb.getData(),
+ componentPb.getRevision());
+ updateRevision(componentPb.getRevision());
+ }
+ }
+ }
+ }
+
+ private class RemoveClientCallback<T> implements RpcCallback<T> {
+ final String client;
+
+ public RemoveClientCallback(String client) {
+ this.client = client;
+ }
+
+ @Override public void run(T response) {
+ if (response == null) {
+ clients.remove(client);
+ }
+ }
+ }
+
+ public MasterTakeover(List<String> clients, MasterState newMaster) {
+ this.clients.addAll(clients);
+ this.newMaster = newMaster;
+ }
+
+ private void sendTakeovers() throws InterruptedException {
+ RpcList rpcs = new RpcList();
+ for (String location : clients) {
+ Services.Client client = connections.getClient0(location);
+ if (client == null) {
+ clients.remove(location);
+ } else {
+ Rpc rpc = rpcf.create();
+ client.masterTakeover(rpc, newMaster, new TakeoverCallback(location));
+ rpcs.add(rpc);
+ }
+ }
+ rpcs.awaitAll();
+ }
+
+ private ClientState getBestClient(List<ClientState> clients) {
+ if (clients.isEmpty()) {
+ return null;
+ }
+ ClientState best = clients.get(0);
+ for (ClientState client : clients) {
+ if (client.getRevision() > best.getRevision()) {
+ best = client;
+ }
+ }
+ return best;
+ }
+
+ private void getMostRecentState() throws InterruptedException {
+ boolean successful = false;
+ while (!successful && !aborted.get()) {
+ ClientState bestClient = getBestClient(clientStates);
+ if (bestClient == null) {
+ successful = true;
+ continue;
+ }
+ Services.Client client = connections.getClient0(bestClient.getLocation());
+ if (client == null) {
+ clients.remove(bestClient.getLocation());
+ continue;
+ }
+
+ Rpc rpc = rpcf.create();
+ FullStateCallback done = new FullStateCallback();
+ client.getFullState(rpc, Empty.getDefaultInstance(), done);
+ rpc.await();
+ successful = rpc.isOk();
+
+ if (!successful) {
+ clients.remove(bestClient.getLocation());
+ }
+ }
+ }
+
+ private void sendFullState() throws InterruptedException {
+ RpcList rpcs = new RpcList();
+ for (String location : clients) {
+ Services.Client client = connections.getClient0(location);
+ if (client == null) {
+ clients.remove(client);
+ continue;
+ }
+ RemoveClientCallback<Empty> done = new RemoveClientCallback<Empty>(location);
+ for (Component component : state.getComponents()) {
+ Services.Component componentPb = ServicesPbConversion.componentToPb(component);
+ Rpc rpc = rpcf.create();
+ client.setState(rpc, componentPb, done);
+ rpcs.add(rpc);
+ }
+ }
+ rpcs.awaitAll();
+ }
+
+ private void finishTakeover() throws InterruptedException {
+ RpcList rpcs = new RpcList();
+ for (String location : clients) {
+ Services.Client client = connections.getClient0(location);
+ if (client == null) {
+ clients.remove(client);
+ continue;
+ }
+ RemoveClientCallback<Empty> done = new RemoveClientCallback<Empty>(location);
+ Rpc rpc = rpcf.create();
+ client.masterTakeoverFinished(rpc, newMaster, done);
+ rpcs.add(rpc);
+ }
+ rpcs.awaitAll();
+ }
+
+ @Override public void run() {
+ try {
+ sendTakeovers();
+ getMostRecentState();
+ sendFullState();
+ finishTakeover();
+ } catch (InterruptedException e) {
+ // Abort master takeover.
+ aborted.set(true);
+ }
+ }
+ }
+
public static Master create(ConnectionManager connections,
String myUrl, String networkName,
String myLocation, RpcFactory rpcf) {
@@ -94,7 +257,6 @@ public class Master {
@Override public void joinNetworkRequest(RpcController controller,
ClientState request, RpcCallback<Empty> done) {
sendInitialMasterTakeover(request.getLocation());
- sendFullState(request.getLocation());
addParticipant(request.getLocation());
done.run(Empty.getDefaultInstance());
}
@@ -158,13 +320,24 @@ public class Master {
sendComponents(clientLocation, components);
}
- private void sendInitialMasterTakeover(String clientLocation) {
+ private synchronized void sendInitialMasterTakeover(String clientLocation) {
Services.Client client = connections.getClient0(clientLocation);
- Rpc rpc = rpcf.create();
- RpcCallback<MasterTakeoverResponse> done =
+
+ // Step 1: Send takeover.
+ Rpc rpc1 = rpcf.create();
+ RpcCallback<MasterTakeoverResponse> done1 =
new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
- clientLocation, rpc);
- client.masterTakeover(rpc, getMasterInfo(), done);
+ clientLocation, rpc1);
+ client.masterTakeover(rpc1, getMasterInfo(), done1);
+
+ // Step 2: Send all state.
+ sendFullState(clientLocation);
+
+ // Step 3: Finish takeover.
+ Rpc rpc2 = rpcf.create();
+ RpcCallback<Empty> done2 = new RemoveParticipantIfFailsCallback<Empty>(
+ clientLocation, rpc2);
+ client.masterTakeoverFinished(rpc2, getMasterInfo(), done2);
}
void performWork() {
@@ -204,18 +377,22 @@ public class Master {
/** This master should take over from an earlier master. */
public void resumeFrom(State lastKnownState, final int masterId) {
+ for (Component c : lastKnownState.getComponents()) {
+ updateRevision(c.getRevision());
+ }
state = lastKnownState;
this.masterId = masterId;
-
- for (final String location : state.getList(State.PARTICIPANTS)) {
- Services.Client client = connections.getClient0(location);
- final Rpc rpc = rpcf.create();
- RpcCallback<MasterTakeoverResponse> done = new RemoveParticipantIfFailsCallback<Services.MasterTakeoverResponse>(location, rpc);
- if (client == null) {
- removeParticipant(location);
- continue;
- }
- client.masterTakeover(rpc, getMasterInfo(), done);
+ MasterTakeover takeover = new MasterTakeover(
+ state.getList(State.PARTICIPANTS), getMasterInfo());
+ takeover.run();
+ }
+
+ public void updateRevision(long newRevision) {
+ boolean updated = false;
+ while (!updated) {
+ long expected = revision.get();
+ long update = Math.max(expected, newRevision);
+ updated = revision.compareAndSet(expected, update);
}
}
}