diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-05-08 13:58:47 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-05-08 13:58:47 +0200 |
commit | 904aacc87e71b60a039f7c8332f0866f472c9aa0 (patch) | |
tree | ccfbeb83e8a1048c4da0d192056cdf7f2051f997 /same/src/main/java/com/orbekk/same/Master.java | |
parent | acacdc5a43a7b7c268a425122cd0830e13d96ac9 (diff) |
Master takeover protocol.
Implement the takeover protocol that guarantees correct after master
takeover.
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Master.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 209 |
1 files changed, 193 insertions, 16 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 678baf2..0571896 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -15,7 +15,10 @@ */ package com.orbekk.same; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -26,8 +29,11 @@ import com.google.protobuf.RpcController; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; +import com.orbekk.same.Services.FullStateResponse; +import com.orbekk.same.Services.MasterState; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; +import com.orbekk.util.RpcList; public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -57,6 +63,163 @@ public class Master { } } + private class MasterTakeover implements Runnable { + final List<Services.ClientState> clientStates = new ArrayList<Services.ClientState>(); + final AtomicBoolean aborted = new AtomicBoolean(false); + final List<String> clients = new CopyOnWriteArrayList<String>(); + final MasterState newMaster; + + private class TakeoverCallback implements RpcCallback<MasterTakeoverResponse> { + final String client; + + public TakeoverCallback(String client) { + this.client = client; + } + + @Override public void run(MasterTakeoverResponse response) { + if (response == null) { + clients.remove(client); + } else if (!response.getSuccess()) { + aborted.set(true); + } else { + clientStates.add(response.getClientState()); + } + } + } + + private class FullStateCallback implements RpcCallback<FullStateResponse> { + @Override public void run(FullStateResponse response) { + if (response != null) { + for (Services.Component componentPb : response.getComponentList()) { + state.update(componentPb.getId(), componentPb.getData(), + componentPb.getRevision()); + updateRevision(componentPb.getRevision()); + } + } + } + } + + private class RemoveClientCallback<T> implements RpcCallback<T> { + final String client; + + public RemoveClientCallback(String client) { + this.client = client; + } + + @Override public void run(T response) { + if (response == null) { + clients.remove(client); + } + } + } + + public MasterTakeover(List<String> clients, MasterState newMaster) { + this.clients.addAll(clients); + this.newMaster = newMaster; + } + + private void sendTakeovers() throws InterruptedException { + RpcList rpcs = new RpcList(); + for (String location : clients) { + Services.Client client = connections.getClient0(location); + if (client == null) { + clients.remove(location); + } else { + Rpc rpc = rpcf.create(); + client.masterTakeover(rpc, newMaster, new TakeoverCallback(location)); + rpcs.add(rpc); + } + } + rpcs.awaitAll(); + } + + private ClientState getBestClient(List<ClientState> clients) { + if (clients.isEmpty()) { + return null; + } + ClientState best = clients.get(0); + for (ClientState client : clients) { + if (client.getRevision() > best.getRevision()) { + best = client; + } + } + return best; + } + + private void getMostRecentState() throws InterruptedException { + boolean successful = false; + while (!successful && !aborted.get()) { + ClientState bestClient = getBestClient(clientStates); + if (bestClient == null) { + successful = true; + continue; + } + Services.Client client = connections.getClient0(bestClient.getLocation()); + if (client == null) { + clients.remove(bestClient.getLocation()); + continue; + } + + Rpc rpc = rpcf.create(); + FullStateCallback done = new FullStateCallback(); + client.getFullState(rpc, Empty.getDefaultInstance(), done); + rpc.await(); + successful = rpc.isOk(); + + if (!successful) { + clients.remove(bestClient.getLocation()); + } + } + } + + private void sendFullState() throws InterruptedException { + RpcList rpcs = new RpcList(); + for (String location : clients) { + Services.Client client = connections.getClient0(location); + if (client == null) { + clients.remove(client); + continue; + } + RemoveClientCallback<Empty> done = new RemoveClientCallback<Empty>(location); + for (Component component : state.getComponents()) { + Services.Component componentPb = ServicesPbConversion.componentToPb(component); + Rpc rpc = rpcf.create(); + client.setState(rpc, componentPb, done); + rpcs.add(rpc); + } + } + rpcs.awaitAll(); + } + + private void finishTakeover() throws InterruptedException { + RpcList rpcs = new RpcList(); + for (String location : clients) { + Services.Client client = connections.getClient0(location); + if (client == null) { + clients.remove(client); + continue; + } + RemoveClientCallback<Empty> done = new RemoveClientCallback<Empty>(location); + Rpc rpc = rpcf.create(); + client.masterTakeoverFinished(rpc, newMaster, done); + rpcs.add(rpc); + } + rpcs.awaitAll(); + } + + @Override public void run() { + try { + sendTakeovers(); + getMostRecentState(); + sendFullState(); + finishTakeover(); + } catch (InterruptedException e) { + // Abort master takeover. + aborted.set(true); + } + } + } + public static Master create(ConnectionManager connections, String myUrl, String networkName, String myLocation, RpcFactory rpcf) { @@ -94,7 +257,6 @@ public class Master { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback<Empty> done) { sendInitialMasterTakeover(request.getLocation()); - sendFullState(request.getLocation()); addParticipant(request.getLocation()); done.run(Empty.getDefaultInstance()); } @@ -158,13 +320,24 @@ public class Master { sendComponents(clientLocation, components); } - private void sendInitialMasterTakeover(String clientLocation) { + private synchronized void sendInitialMasterTakeover(String clientLocation) { Services.Client client = connections.getClient0(clientLocation); - Rpc rpc = rpcf.create(); - RpcCallback<MasterTakeoverResponse> done = + + // Step 1: Send takeover. + Rpc rpc1 = rpcf.create(); + RpcCallback<MasterTakeoverResponse> done1 = new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>( - clientLocation, rpc); - client.masterTakeover(rpc, getMasterInfo(), done); + clientLocation, rpc1); + client.masterTakeover(rpc1, getMasterInfo(), done1); + + // Step 2: Send all state. + sendFullState(clientLocation); + + // Step 3: Finish takeover. + Rpc rpc2 = rpcf.create(); + RpcCallback<Empty> done2 = new RemoveParticipantIfFailsCallback<Empty>( + clientLocation, rpc2); + client.masterTakeoverFinished(rpc2, getMasterInfo(), done2); } void performWork() { @@ -204,18 +377,22 @@ public class Master { /** This master should take over from an earlier master. */ public void resumeFrom(State lastKnownState, final int masterId) { + for (Component c : lastKnownState.getComponents()) { + updateRevision(c.getRevision()); + } state = lastKnownState; this.masterId = masterId; - - for (final String location : state.getList(State.PARTICIPANTS)) { - Services.Client client = connections.getClient0(location); - final Rpc rpc = rpcf.create(); - RpcCallback<MasterTakeoverResponse> done = new RemoveParticipantIfFailsCallback<Services.MasterTakeoverResponse>(location, rpc); - if (client == null) { - removeParticipant(location); - continue; - } - client.masterTakeover(rpc, getMasterInfo(), done); + MasterTakeover takeover = new MasterTakeover( + state.getList(State.PARTICIPANTS), getMasterInfo()); + takeover.run(); + } + + public void updateRevision(long newRevision) { + boolean updated = false; + while (!updated) { + long expected = revision.get(); + long update = Math.max(expected, newRevision); + updated = revision.compareAndSet(expected, update); } } } |