From 1e8fd072ee765e70770a89970d164809e9d31a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 6 Feb 2012 21:00:19 +0100 Subject: Refactor: Rename classes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename Client and Master classes, because the classes contain more than just the service interfaces. – ClientServiceImpl => Client. – MasterServiceImpl => Master. --- same/src/main/java/com/orbekk/same/Client.java | 158 ++++++++++++++++++++ .../java/com/orbekk/same/ClientServiceImpl.java | 158 -------------------- same/src/main/java/com/orbekk/same/Master.java | 163 +++++++++++++++++++++ .../java/com/orbekk/same/MasterServiceImpl.java | 163 --------------------- same/src/main/java/com/orbekk/same/Same.java | 6 +- .../main/java/com/orbekk/same/SameController.java | 16 +- .../com/orbekk/same/ClientServiceImplTest.java | 2 +- .../com/orbekk/same/MasterServiceImplTest.java | 12 +- 8 files changed, 339 insertions(+), 339 deletions(-) create mode 100644 same/src/main/java/com/orbekk/same/Client.java delete mode 100644 same/src/main/java/com/orbekk/same/ClientServiceImpl.java create mode 100644 same/src/main/java/com/orbekk/same/Master.java delete mode 100644 same/src/main/java/com/orbekk/same/MasterServiceImpl.java (limited to 'same/src') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java new file mode 100644 index 0000000..ce3947c --- /dev/null +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -0,0 +1,158 @@ +package com.orbekk.same; + +import static com.orbekk.same.StackTraceUtil.throwableToString; + +import java.util.ArrayList; +import java.util.List; + +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbekk.util.WorkQueue; + +public class Client implements DiscoveryListener { + private Logger logger = LoggerFactory.getLogger(getClass()); + private ConnectionManager connections; + private State state; + private String myUrl; + private StateChangedListener stateListener; + private NetworkNotificationListener networkListener; + + private ClientService serviceImpl = new ClientService() { + @Override + public void setState(String component, String data, long revision) throws Exception { + boolean status = state.update(component, data, revision); + if (status) { + if (stateListener != null) { + stateListener.stateChanged(component, data); + } + } else { + logger.warn("Ignoring update: {}", + new State.Component(component, revision, data)); + } + } + + @Override + public void notifyNetwork(String networkName, String masterUrl) throws Exception { + logger.info("NotifyNetwork(networkName={}, masterUrl={})", + networkName, masterUrl); + if (networkListener != null) { + networkListener.notifyNetwork(networkName, masterUrl); + } + } + + @Override + public void discoveryRequest(String remoteUrl) { + discoveryThread.add(remoteUrl); + } + }; + + private WorkQueue discoveryThread = new WorkQueue() { + @Override protected void onChange() { + List pending = getAndClear(); + for (String url : pending) { + discover(url); + } + } + }; + + public Client(State state, ConnectionManager connections, + String myUrl) { + this.state = state; + this.connections = connections; + this.myUrl = myUrl; + } + + public void start() { + discoveryThread.start(); + } + + public void interrupt() { + discoveryThread.interrupt(); + } + + public String getUrl() { + return myUrl; + } + + public void joinNetwork(String masterUrl) { + logger.info("joinNetwork({})", masterUrl); + MasterService master = connections.getMaster(masterUrl); + state.clear(); + try { + master.joinNetworkRequest(myUrl); + } catch (Exception e) { + logger.error("Unable to connect to master.", e); + } + } + + String lib_get(String name) { + return state.getDataOf(name); + } + + T lib_get(String name, TypeReference type) { + return state.getParsedData(name, type); + } + + void lib_set(String name, String data) throws UpdateConflict { + String masterUrl = state.getDataOf(".masterUrl"); + long revision = state.getRevision(name) + 1; + MasterService master = connections.getMaster(masterUrl); + try { + boolean success = master.updateStateRequest(name, data, + revision); + if (!success) { + throw new UpdateConflict("State update conflict when " + + "updating " + name); + } + } catch (Exception e) { + logger.error("Unable to contact master. Update fails.", e); + throw new UpdateConflict("Unable to contact master. Update fails."); + } + } + + public State.Component getState(String name) { + return state.getComponent(name); + } + + State testGetState() { + return state; + } + + public void setStateChangedListener(StateChangedListener listener) { + this.stateListener = listener; + } + + public void setNetworkListener(NetworkNotificationListener listener) { + this.networkListener = listener; + } + + public void sendDiscoveryRequest(String url) { + try { + connections.getClient(url) + .discoveryRequest(myUrl); + } catch (Exception e) { + logger.warn("Failed to send discovery request: {}", + throwableToString(e)); + } + } + + @Override + public void discover(String url) { + if (!url.equals(myUrl)) { + try { + connections.getClient(url) + .notifyNetwork(state.getDataOf(".networkName"), + state.getDataOf(".masterUrl")); + } catch (Exception e) { + logger.warn("Failed to contact new client {}: {}", url, + throwableToString(e)); + } + } + } + + public ClientService getService() { + return serviceImpl; + } +} diff --git a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java b/same/src/main/java/com/orbekk/same/ClientServiceImpl.java deleted file mode 100644 index c503919..0000000 --- a/same/src/main/java/com/orbekk/same/ClientServiceImpl.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.orbekk.same; - -import static com.orbekk.same.StackTraceUtil.throwableToString; - -import java.util.ArrayList; -import java.util.List; - -import org.codehaus.jackson.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.orbekk.util.WorkQueue; - -public class ClientServiceImpl implements DiscoveryListener { - private Logger logger = LoggerFactory.getLogger(getClass()); - private ConnectionManager connections; - private State state; - private String myUrl; - private StateChangedListener stateListener; - private NetworkNotificationListener networkListener; - - private ClientService serviceImpl = new ClientService() { - @Override - public void setState(String component, String data, long revision) throws Exception { - boolean status = state.update(component, data, revision); - if (status) { - if (stateListener != null) { - stateListener.stateChanged(component, data); - } - } else { - logger.warn("Ignoring update: {}", - new State.Component(component, revision, data)); - } - } - - @Override - public void notifyNetwork(String networkName, String masterUrl) throws Exception { - logger.info("NotifyNetwork(networkName={}, masterUrl={})", - networkName, masterUrl); - if (networkListener != null) { - networkListener.notifyNetwork(networkName, masterUrl); - } - } - - @Override - public void discoveryRequest(String remoteUrl) { - discoveryThread.add(remoteUrl); - } - }; - - private WorkQueue discoveryThread = new WorkQueue() { - @Override protected void onChange() { - List pending = getAndClear(); - for (String url : pending) { - discover(url); - } - } - }; - - public ClientServiceImpl(State state, ConnectionManager connections, - String myUrl) { - this.state = state; - this.connections = connections; - this.myUrl = myUrl; - } - - public void start() { - discoveryThread.start(); - } - - public void interrupt() { - discoveryThread.interrupt(); - } - - public String getUrl() { - return myUrl; - } - - public void joinNetwork(String masterUrl) { - logger.info("joinNetwork({})", masterUrl); - MasterService master = connections.getMaster(masterUrl); - state.clear(); - try { - master.joinNetworkRequest(myUrl); - } catch (Exception e) { - logger.error("Unable to connect to master.", e); - } - } - - String lib_get(String name) { - return state.getDataOf(name); - } - - T lib_get(String name, TypeReference type) { - return state.getParsedData(name, type); - } - - void lib_set(String name, String data) throws UpdateConflict { - String masterUrl = state.getDataOf(".masterUrl"); - long revision = state.getRevision(name) + 1; - MasterService master = connections.getMaster(masterUrl); - try { - boolean success = master.updateStateRequest(name, data, - revision); - if (!success) { - throw new UpdateConflict("State update conflict when " + - "updating " + name); - } - } catch (Exception e) { - logger.error("Unable to contact master. Update fails.", e); - throw new UpdateConflict("Unable to contact master. Update fails."); - } - } - - public State.Component getState(String name) { - return state.getComponent(name); - } - - State testGetState() { - return state; - } - - public void setStateChangedListener(StateChangedListener listener) { - this.stateListener = listener; - } - - public void setNetworkListener(NetworkNotificationListener listener) { - this.networkListener = listener; - } - - public void sendDiscoveryRequest(String url) { - try { - connections.getClient(url) - .discoveryRequest(myUrl); - } catch (Exception e) { - logger.warn("Failed to send discovery request: {}", - throwableToString(e)); - } - } - - @Override - public void discover(String url) { - if (!url.equals(myUrl)) { - try { - connections.getClient(url) - .notifyNetwork(state.getDataOf(".networkName"), - state.getDataOf(".masterUrl")); - } catch (Exception e) { - logger.warn("Failed to contact new client {}: {}", url, - throwableToString(e)); - } - } - } - - public ClientService getService() { - return serviceImpl; - } -} diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java new file mode 100644 index 0000000..eaf0a8f --- /dev/null +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -0,0 +1,163 @@ +package com.orbekk.same; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbekk.same.State.Component; + +public class Master implements MasterService, Runnable { + private Logger logger = LoggerFactory.getLogger(getClass()); + private final ConnectionManager connections; + private State state; + private boolean stopped = false; + private Broadcaster broadcaster; + private List _fullStateReceivers = new ArrayList(); + private Thread workerThread = null; + + public static Master create(ConnectionManager connections, + Broadcaster broadcaster, String myUrl) { + State state = new State("DefaultMaster"); + state.update(".masterUrl", myUrl, 1); + return new Master(state, connections, broadcaster); + } + + /** Constructor for internal use. + */ + Master(State initialState, ConnectionManager connections, + Broadcaster broadcaster) { + this.state = initialState; + this.connections = connections; + this.broadcaster = broadcaster; + } + + @Override + public synchronized void joinNetworkRequest(String clientUrl) { + logger.info("JoinNetworkRequest({})", clientUrl); + List participants = participants(); + if (!participants.contains(clientUrl)) { + participants.add(clientUrl); + _fullStateReceivers.add(clientUrl); + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + notifyAll(); + } else { + logger.warn("Client {} already part of network. " + + "Ignoring participation request", clientUrl); + } + } + + boolean _sendUpdatedComponents() { + boolean worked = false; + for (final Component component : state.getAndClearUpdatedComponents()) { + logger.info("Broadcasting new component {}", component); + broadcastNewComponents(participants(), listWrap(component)); + worked = true; + } + return worked; + } + + private List listWrap(T o) { + List list = new ArrayList(); + list.add(o); + return list; + } + + synchronized boolean _sendFullState() { + boolean hasWork = _fullStateReceivers.size() != 0; + if (hasWork) { + logger.info("Sending full state to new participants."); + final List components = state.getComponents(); + broadcastNewComponents(_fullStateReceivers, components); + _fullStateReceivers.clear(); + } + return hasWork; + } + + private synchronized void removeParticipant(String url) { + List participants = participants(); + if (participants.contains(url)) { + logger.warn("RemoveParticipant({})", url); + participants.remove(url); + state.updateFromObject(".participants", participants, + state.getRevision(".participants") + 1); + notifyAll(); + } + } + + private void broadcastNewComponents(List destinations, + final List components) { + broadcaster.broadcast(destinations, new ServiceOperation() { + @Override public void run(String url) { + ClientService client = connections.getClient(url); + try { + for (Component c : components) { + client.setState(c.getName(), c.getData(), + c.getRevision()); + } + } catch (Exception e) { + logger.info("Client {} failed to receive state update.", url); + removeParticipant(url); + } + } + }); + } + + private List participants() { + return state.getList(".participants"); + } + + + @Override + public synchronized boolean updateStateRequest(String component, + String newData, long revision) { + boolean updated = state.update(component, newData, revision + 1); + if (updated) { + notifyAll(); + } + return updated; + } + + boolean _performWork() { + boolean worked = false; + worked |= _sendUpdatedComponents(); + worked |= _sendFullState(); + return worked; + } + + @Override + public void run() { + while (!stopped) { + if (!_performWork()) { + synchronized (this) { + try { + wait(500); + } catch (InterruptedException e) { + stopped = true; + } + } + } + if (Thread.interrupted()) { + stopped = true; + } + } + } + + public void start() { + if (workerThread == null) { + workerThread = new Thread(this); + workerThread.start(); + logger.info("Master thread started. {}", state); + } + } + + public void join() throws InterruptedException { + workerThread.join(); + } + + public void interrupt() { + workerThread.interrupt(); + } +} diff --git a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java b/same/src/main/java/com/orbekk/same/MasterServiceImpl.java deleted file mode 100644 index 7e25fc1..0000000 --- a/same/src/main/java/com/orbekk/same/MasterServiceImpl.java +++ /dev/null @@ -1,163 +0,0 @@ -package com.orbekk.same; - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.orbekk.same.State.Component; - -public class MasterServiceImpl implements MasterService, Runnable { - private Logger logger = LoggerFactory.getLogger(getClass()); - private final ConnectionManager connections; - private State state; - private boolean stopped = false; - private Broadcaster broadcaster; - private List _fullStateReceivers = new ArrayList(); - private Thread workerThread = null; - - public static MasterServiceImpl create(ConnectionManager connections, - Broadcaster broadcaster, String myUrl) { - State state = new State("DefaultMaster"); - state.update(".masterUrl", myUrl, 1); - return new MasterServiceImpl(state, connections, broadcaster); - } - - /** Constructor for internal use. - */ - MasterServiceImpl(State initialState, ConnectionManager connections, - Broadcaster broadcaster) { - this.state = initialState; - this.connections = connections; - this.broadcaster = broadcaster; - } - - @Override - public synchronized void joinNetworkRequest(String clientUrl) { - logger.info("JoinNetworkRequest({})", clientUrl); - List participants = participants(); - if (!participants.contains(clientUrl)) { - participants.add(clientUrl); - _fullStateReceivers.add(clientUrl); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - notifyAll(); - } else { - logger.warn("Client {} already part of network. " + - "Ignoring participation request", clientUrl); - } - } - - boolean _sendUpdatedComponents() { - boolean worked = false; - for (final Component component : state.getAndClearUpdatedComponents()) { - logger.info("Broadcasting new component {}", component); - broadcastNewComponents(participants(), listWrap(component)); - worked = true; - } - return worked; - } - - private List listWrap(T o) { - List list = new ArrayList(); - list.add(o); - return list; - } - - synchronized boolean _sendFullState() { - boolean hasWork = _fullStateReceivers.size() != 0; - if (hasWork) { - logger.info("Sending full state to new participants."); - final List components = state.getComponents(); - broadcastNewComponents(_fullStateReceivers, components); - _fullStateReceivers.clear(); - } - return hasWork; - } - - private synchronized void removeParticipant(String url) { - List participants = participants(); - if (participants.contains(url)) { - logger.warn("RemoveParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - notifyAll(); - } - } - - private void broadcastNewComponents(List destinations, - final List components) { - broadcaster.broadcast(destinations, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } - } - }); - } - - private List participants() { - return state.getList(".participants"); - } - - - @Override - public synchronized boolean updateStateRequest(String component, - String newData, long revision) { - boolean updated = state.update(component, newData, revision + 1); - if (updated) { - notifyAll(); - } - return updated; - } - - boolean _performWork() { - boolean worked = false; - worked |= _sendUpdatedComponents(); - worked |= _sendFullState(); - return worked; - } - - @Override - public void run() { - while (!stopped) { - if (!_performWork()) { - synchronized (this) { - try { - wait(500); - } catch (InterruptedException e) { - stopped = true; - } - } - } - if (Thread.interrupted()) { - stopped = true; - } - } - } - - public void start() { - if (workerThread == null) { - workerThread = new Thread(this); - workerThread.start(); - logger.info("Master thread started. {}", state); - } - } - - public void join() throws InterruptedException { - workerThread.join(); - } - - public void interrupt() { - workerThread.interrupt(); - } -} diff --git a/same/src/main/java/com/orbekk/same/Same.java b/same/src/main/java/com/orbekk/same/Same.java index 69e1a0f..b152be4 100644 --- a/same/src/main/java/com/orbekk/same/Same.java +++ b/same/src/main/java/com/orbekk/same/Same.java @@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory; @Deprecated public class Same implements SameInterface { private Logger logger = LoggerFactory.getLogger(getClass()); - private ClientServiceImpl client; + private Client client; private StateChangedProxy stateChangedProxy = new StateChangedProxy(); private class StateChangedProxy implements StateChangedListener { @@ -25,13 +25,13 @@ public class Same implements SameInterface { } } - public static Same createSame(ClientServiceImpl client) { + public static Same createSame(Client client) { Same same = new Same(client); client.setStateChangedListener(same.stateChangedProxy); return same; } - Same(ClientServiceImpl client) { + Same(Client client) { this.client = client; } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 44e1721..abf97c1 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -15,8 +15,8 @@ public class SameController { private Logger logger = LoggerFactory.getLogger(getClass()); private int port; private ServerContainer server; - private MasterServiceImpl master; - private ClientServiceImpl client; + private Master master; + private Client client; private PaxosServiceImpl paxos; /** @@ -37,10 +37,10 @@ public class SameController { String masterUrl = baseUrl + "MasterService.json"; String clientUrl = baseUrl + "ClientService.json"; - MasterServiceImpl master = MasterServiceImpl.create( + Master master = Master.create( connections, broadcaster, masterUrl); - ClientServiceImpl client = new ClientServiceImpl(state, connections, + Client client = new Client(state, connections, clientUrl); PaxosServiceImpl paxos = new PaxosServiceImpl(""); @@ -59,8 +59,8 @@ public class SameController { public SameController( int port, ServerContainer server, - MasterServiceImpl master, - ClientServiceImpl client, + Master master, + Client client, PaxosServiceImpl paxos) { this.port = port; this.server = server; @@ -103,11 +103,11 @@ public class SameController { client.joinNetwork(url); } - public ClientServiceImpl getClient() { + public Client getClient() { return client; } - public MasterServiceImpl getMaster() { + public Master getMaster() { return master; } } diff --git a/same/src/test/java/com/orbekk/same/ClientServiceImplTest.java b/same/src/test/java/com/orbekk/same/ClientServiceImplTest.java index 2c5604a..4dfc54a 100644 --- a/same/src/test/java/com/orbekk/same/ClientServiceImplTest.java +++ b/same/src/test/java/com/orbekk/same/ClientServiceImplTest.java @@ -8,7 +8,7 @@ import static org.mockito.Mockito.*; public class ClientServiceImplTest { private State state = new State("ClientNetwork"); private TestConnectionManager connections = new TestConnectionManager(); - private ClientServiceImpl client = new ClientServiceImpl(state, connections, + private Client client = new Client(state, connections, "http://client/ClientService.json"); private ClientService clientS = client.getService(); diff --git a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java b/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java index cc89054..0e7c81d 100644 --- a/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java +++ b/same/src/test/java/com/orbekk/same/MasterServiceImplTest.java @@ -12,7 +12,7 @@ public class MasterServiceImplTest { private State state = new State("TestNetwork"); private TestConnectionManager connections = new TestConnectionManager(); private TestBroadcaster broadcaster = new TestBroadcaster(); - private MasterServiceImpl master; + private Master master; public static class UnreachableClient implements ClientService { @Override @@ -36,7 +36,7 @@ public class MasterServiceImplTest { @Before public void setUp() { state.update(".masterUrl", "http://master/MasterService.json", 1); - master = new MasterServiceImpl(state, connections, broadcaster); + master = new Master(state, connections, broadcaster); connections.masterMap.put("http://master/MasterService.json", master); } @@ -66,7 +66,7 @@ public class MasterServiceImplTest { @Test public void clientJoin() { - ClientServiceImpl client = new ClientServiceImpl( + Client client = new Client( new State("ClientNetwork"), connections, "http://client/ClientService.json"); ClientService clientS = client.getService(); @@ -79,12 +79,12 @@ public class MasterServiceImplTest { @Test public void validStateRequest() { - ClientServiceImpl client1 = new ClientServiceImpl( + Client client1 = new Client( new State("ClientNetwork"), connections, "http://client/ClientService.json"); ClientService client1S = client1.getService(); connections.clientMap.put("http://client/ClientService.json", client1S); - ClientServiceImpl client2 = new ClientServiceImpl( + Client client2 = new Client( new State("ClientNetwork"), connections, "http://client2/ClientService.json"); ClientService client2S = client2.getService(); @@ -114,7 +114,7 @@ public class MasterServiceImplTest { @Test public void masterRemovesParticipant() { - ClientServiceImpl client = new ClientServiceImpl( + Client client = new Client( new State("ClientNetwork"), connections, "http://client/ClientService.json"); ClientService clientS = client.getService(); -- cgit v1.2.3