From 0c1fe5d3169b069bc88c1de7375658c1c885487c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 12 Mar 2012 19:40:18 +0100 Subject: Client begins in disconnected state. --- same/src/main/java/com/orbekk/same/Client.java | 34 ++++++++++++++++++++-- .../main/java/com/orbekk/same/ClientService.java | 9 ++++++ 2 files changed, 40 insertions(+), 3 deletions(-) (limited to 'same/src/main/java/com/orbekk') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 693a1ba..35e3f0e 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -5,10 +5,10 @@ import static com.orbekk.same.StackTraceUtil.throwableToString; import java.util.ArrayList; import java.util.List; -import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orbekk.paxos.MasterProposer; import com.orbekk.same.State.Component; import com.orbekk.same.discovery.DiscoveryListener; import com.orbekk.util.DelayedOperation; @@ -21,6 +21,9 @@ public class Client implements DiscoveryListener { private ConnectionManager connections; private State state; private String myUrl; + private String masterUrl; + private int masterId = -1; + private List stateListeners = new ArrayList(); private NetworkNotificationListener networkListener; @@ -41,7 +44,11 @@ public class Client implements DiscoveryListener { @Override public DelayedOperation set(Component component) { DelayedOperation op = new DelayedOperation(); - String masterUrl = state.getDataOf(".masterUrl"); + if (connectionState != ConnectionState.STABLE) { + op.complete(DelayedOperation.Status.createError( + "Not connected to master: " + connectionState)); + return op; + } MasterService master = connections.getMaster(masterUrl); try { boolean success = master.updateStateRequest( @@ -83,7 +90,6 @@ public class Client implements DiscoveryListener { private ClientService serviceImpl = new ClientService() { @Override public void setState(String component, String data, long revision) throws Exception { - connectionState = ConnectionState.STABLE; boolean status = state.update(component, data, revision); if (status) { for (StateChangedListener listener : stateListeners) { @@ -108,6 +114,13 @@ public class Client implements DiscoveryListener { public void discoveryRequest(String remoteUrl) { discoveryThread.add(remoteUrl); } + + @Override + public void masterTakeover(String masterUrl, String networkName, + int masterId) throws Exception { + Client.this.masterUrl = masterUrl; + connectionState = ConnectionState.STABLE; + } }; private WorkQueue discoveryThread = new WorkQueue() { @@ -207,4 +220,19 @@ public class Client implements DiscoveryListener { public ClientService getService() { return serviceImpl; } + + private List getPaxosUrls() { + List paxosUrls = new ArrayList(); + for (String participant : state.getList(".participants")) { + paxosUrls.add(participant.replace("ClientService", "PaxosService")); + } + return paxosUrls; + } + + private void startMasterElection() { + List paxosUrls = getPaxosUrls(); + MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls, + connections); + // TODO: Run election. + } } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index 04f423d..453dfef 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -7,4 +7,13 @@ public interface ClientService { // Manual discovery request by client. void discoveryRequest(String remoteUrl) throws Exception; + + /** A new master takes over. + * + * @param masterUrl The new master URL. + * @param masterId The ID of the new master. Only accept if this is higher + * than the current master. + */ + void masterTakeover(String masterUrl, String networkName, + int masterId) throws Exception; } -- cgit v1.2.3