diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-02-06 21:00:19 +0100 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-02-06 21:00:19 +0100 |
commit | 1e8fd072ee765e70770a89970d164809e9d31a26 (patch) | |
tree | b89d1f22ec9f9511f225c22b44c705a39af10fa2 /same/src/main/java/com/orbekk/same/Client.java | |
parent | 8d19516ef4a2983f343883c29a48eacd942040ec (diff) |
Refactor: Rename classes.
Rename Client and Master classes, because the classes contain more than
just the service interfaces.
– ClientServiceImpl => Client.
– MasterServiceImpl => Master.
Diffstat (limited to 'same/src/main/java/com/orbekk/same/Client.java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java new file mode 100644 index 0000000..ce3947c --- /dev/null +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -0,0 +1,158 @@ +package com.orbekk.same; + +import static com.orbekk.same.StackTraceUtil.throwableToString; + +import java.util.ArrayList; +import java.util.List; + +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbekk.util.WorkQueue; + +public class Client implements DiscoveryListener { + private Logger logger = LoggerFactory.getLogger(getClass()); + private ConnectionManager connections; + private State state; + private String myUrl; + private StateChangedListener stateListener; + private NetworkNotificationListener networkListener; + + private ClientService serviceImpl = new ClientService() { + @Override + public void setState(String component, String data, long revision) throws Exception { + boolean status = state.update(component, data, revision); + if (status) { + if (stateListener != null) { + stateListener.stateChanged(component, data); + } + } else { + logger.warn("Ignoring update: {}", + new State.Component(component, revision, data)); + } + } + + @Override + public void notifyNetwork(String networkName, String masterUrl) throws Exception { + logger.info("NotifyNetwork(networkName={}, masterUrl={})", + networkName, masterUrl); + if (networkListener != null) { + networkListener.notifyNetwork(networkName, masterUrl); + } + } + + @Override + public void discoveryRequest(String remoteUrl) { + discoveryThread.add(remoteUrl); + } + }; + + private WorkQueue<String> discoveryThread = new WorkQueue<String>() { + @Override protected void onChange() { + List<String> pending = getAndClear(); + for (String url : pending) { + discover(url); + } + } + }; + + public Client(State state, ConnectionManager connections, + String myUrl) { + this.state = state; + this.connections = connections; + this.myUrl = myUrl; + } + + public void start() { + discoveryThread.start(); + } + + public void interrupt() { + discoveryThread.interrupt(); + } + + public String getUrl() { + return myUrl; + } + + public void joinNetwork(String masterUrl) { + logger.info("joinNetwork({})", masterUrl); + MasterService master = connections.getMaster(masterUrl); + state.clear(); + try { + master.joinNetworkRequest(myUrl); + } catch (Exception e) { + logger.error("Unable to connect to master.", e); + } + } + + String lib_get(String name) { + return state.getDataOf(name); + } + + <T> T lib_get(String name, TypeReference<T> type) { + return state.getParsedData(name, type); + } + + void lib_set(String name, String data) throws UpdateConflict { + String masterUrl = state.getDataOf(".masterUrl"); + long revision = state.getRevision(name) + 1; + MasterService master = connections.getMaster(masterUrl); + try { + boolean success = master.updateStateRequest(name, data, + revision); + if (!success) { + throw new UpdateConflict("State update conflict when " + + "updating " + name); + } + } catch (Exception e) { + logger.error("Unable to contact master. Update fails.", e); + throw new UpdateConflict("Unable to contact master. Update fails."); + } + } + + public State.Component getState(String name) { + return state.getComponent(name); + } + + State testGetState() { + return state; + } + + public void setStateChangedListener(StateChangedListener listener) { + this.stateListener = listener; + } + + public void setNetworkListener(NetworkNotificationListener listener) { + this.networkListener = listener; + } + + public void sendDiscoveryRequest(String url) { + try { + connections.getClient(url) + .discoveryRequest(myUrl); + } catch (Exception e) { + logger.warn("Failed to send discovery request: {}", + throwableToString(e)); + } + } + + @Override + public void discover(String url) { + if (!url.equals(myUrl)) { + try { + connections.getClient(url) + .notifyNetwork(state.getDataOf(".networkName"), + state.getDataOf(".masterUrl")); + } catch (Exception e) { + logger.warn("Failed to contact new client {}: {}", url, + throwableToString(e)); + } + } + } + + public ClientService getService() { + return serviceImpl; + } +} |