diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-20 15:54:17 +0100 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-03-20 15:54:17 +0100 |
commit | 810c7c1587b5c44c150eb4ba893874555802fc6b (patch) | |
tree | 8b34c6ca117d489241b66def1a394f26dcea2a14 | |
parent | a16f9ef4b751900a412da4c72c9245f716dc29a6 (diff) |
Remove discovery code.
Use centralized discovery instead.
10 files changed, 7 insertions, 164 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index e789f2e..6d3a7d5 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -13,11 +13,10 @@ import org.slf4j.LoggerFactory; import com.orbekk.paxos.MasterProposer; import com.orbekk.same.State.Component; -import com.orbekk.same.discovery.DiscoveryListener; import com.orbekk.util.DelayedOperation; import com.orbekk.util.WorkQueue; -public class Client implements DiscoveryListener { +public class Client { public static long MASTER_TAKEOVER_TIMEOUT = 4000l; private Logger logger = LoggerFactory.getLogger(getClass()); /** TODO: Not really useful yet. Remove? */ @@ -119,11 +118,6 @@ public class Client implements DiscoveryListener { } @Override - public void discoveryRequest(String remoteUrl) { - discoveryThread.add(remoteUrl); - } - - @Override public synchronized void masterTakeover(String masterUrl, String networkName, int masterId) throws Exception { logger.info("MasterTakeover({}, {}, {})", @@ -153,15 +147,6 @@ public class Client implements DiscoveryListener { } }; - private WorkQueue<String> discoveryThread = new WorkQueue<String>() { - @Override protected void onChange() { - List<String> pending = getAndClear(); - for (String url : pending) { - discover(url); - } - } - }; - public Client(State state, ConnectionManager connections, String myUrl, Broadcaster broadcaster) { this.state = state; @@ -171,16 +156,13 @@ public class Client implements DiscoveryListener { } public void start() { - discoveryThread.start(); } public void interrupt() { connectionState = ConnectionState.DISCONNECTED; - discoveryThread.interrupt(); } void performWork() { - discoveryThread.performWork(); } public String getUrl() { @@ -228,39 +210,6 @@ public class Client implements DiscoveryListener { this.networkListener = listener; } - public void sendDiscoveryRequest(String url) { - try { - connections.getClient(url) - .discoveryRequest(myUrl); - } catch (Exception e) { - logger.warn("Failed to send discovery request: {}", - throwableToString(e)); - } - } - - @Override - public void discover(String url) { - String networkName = state.getDataOf(".networkName"); - if (networkName.equals(".InvalidClientNetwork")) { - logger.warn("Client not joined to a network. Ignoring discovery"); - return; - } else if (networkName.equals(".Private")) { - logger.info("Ignoring broadcast to .Private network."); - return; - } - - if (!url.equals(myUrl)) { - try { - connections.getClient(url) - .notifyNetwork(state.getDataOf(".networkName"), - state.getDataOf(".masterUrl")); - } catch (Exception e) { - logger.warn("Failed to contact new client {}: {}", url, - throwableToString(e)); - } - } - } - public ClientService getService() { return serviceImpl; } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index f1247a5..8d460db 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -5,9 +5,6 @@ public interface ClientService { void setState(String component, String data, long revision) throws Exception; - // Manual discovery request by client. - void discoveryRequest(String remoteUrl) throws Exception; - /** A new master takes over. * * @param masterUrl The new master URL. diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index f3f4edb..ade469e 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -12,7 +12,6 @@ import com.googlecode.jsonrpc4j.ProxyUtil; import com.orbekk.net.MyJsonRpcHttpClient; import com.orbekk.paxos.PaxosService; import com.orbekk.same.discovery.DirectoryService; -import com.orbekk.same.discovery.DiscoveryService; public class ConnectionManagerImpl implements ConnectionManager { private int connectionTimeout; diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index e9a7916..c1c7901 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -11,7 +11,6 @@ import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; import com.orbekk.same.config.Configuration; import com.orbekk.same.discovery.DirectoryService; -import com.orbekk.same.discovery.DiscoveryService; import com.orbekk.same.http.ServerContainer; import com.orbekk.same.http.StateServlet; import com.orbekk.same.http.JettyServerBuilder; @@ -24,7 +23,6 @@ public class SameController { private Master master; private Client client; private PaxosServiceImpl paxos; - private DiscoveryService discoveryService; private BroadcasterFactory broadcasterFactory; private Configuration configuration; private ConnectionManager connections; @@ -81,13 +79,6 @@ public class SameController { clientUrl, BroadcasterImpl.getDefaultBroadcastRunner()); PaxosServiceImpl paxos = new PaxosServiceImpl(""); - DiscoveryService discoveryService = null; - if ("true".equals(configuration.get("enableDiscovery"))) { - BroadcastListener broadcastListener = new BroadcastListener( - configuration.getInt("discoveryPort")); - discoveryService = new DiscoveryService(client, broadcastListener); - } - StateServlet stateServlet = new StateServlet(client.getInterface(), new VariableFactory(client.getInterface())); @@ -100,7 +91,7 @@ public class SameController { SameController controller = new SameController( configuration, connections, server, master, client, - paxos, discoveryService, broadcaster, broadcasterFactory); + paxos, broadcaster, broadcasterFactory); return controller; } @@ -115,7 +106,6 @@ public class SameController { MasterServiceProxy master, Client client, PaxosServiceImpl paxos, - DiscoveryService discoveryService, Broadcaster serviceBroadcaster, BroadcasterFactory broadcasterFactory) { this.configuration = configuration; @@ -124,7 +114,6 @@ public class SameController { this.masterService = master; this.client = client; this.paxos = paxos; - this.discoveryService = discoveryService; this.serviceBroadcaster = serviceBroadcaster; this.broadcasterFactory = broadcasterFactory; } @@ -133,9 +122,6 @@ public class SameController { server.start(); client.setMasterController(masterController); client.start(); - if (discoveryService != null) { - discoveryService.start(); - } } public void stop() { @@ -145,30 +131,16 @@ public class SameController { master.interrupt(); } server.stop(); - if (discoveryService != null) { - discoveryService.interrupt(); - } } catch (Exception e) { logger.error("Failed to stop webserver", e); } } public void join() { - try { - server.join(); - client.interrupt(); - if (master != null) { - master.interrupt(); - } - if (discoveryService != null) { - discoveryService.join(); - } - } catch (InterruptedException e) { - try { - server.stop(); - } catch (Exception e1) { - logger.error("Failed to stop server", e); - } + server.join(); + client.interrupt(); + if (master != null) { + master.interrupt(); } } diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java deleted file mode 100644 index e006c77..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.orbekk.same.discovery; - -public interface DiscoveryListener { - void discover(String url); -} diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java deleted file mode 100644 index a1147cc..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.orbekk.same.discovery; - -import java.net.DatagramPacket; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.orbekk.net.BroadcastListener; - -public class DiscoveryService extends Thread { - private Logger logger = LoggerFactory.getLogger(getClass()); - BroadcastListener broadcastListener; - DiscoveryListener listener; - - public DiscoveryService(DiscoveryListener listener, - BroadcastListener broadcastListener) { - this.listener = listener; - this.broadcastListener = broadcastListener; - } - - @Override - public void run() { - logger.info("DiscoveryService starting."); - while (!Thread.interrupted()) { - DatagramPacket packet = broadcastListener.listen(); - if (packet == null) { - // An error or interrupt occurred. - continue; - } - String content = new String(packet.getData(), 0, packet.getLength()); - String[] words = content.split(" "); - - if (!content.startsWith("Discover") || words.length < 2) { - logger.warn("Invalid discovery message: {}", content); - continue; - } - - String url = words[1]; - logger.info("Received discovery from {}", url); - if (listener != null) { - listener.discover(url); - } - } - logger.info("DiscoveryService stopped."); - } - - @Override public void interrupt() { - logger.info("Interrupt()"); - super.interrupt(); - broadcastListener.interrupt(); - } -} diff --git a/same/src/main/resources/client.properties.example b/same/src/main/resources/client.properties.example index 5c13a0a..cb5acdf 100644 --- a/same/src/main/resources/client.properties.example +++ b/same/src/main/resources/client.properties.example @@ -2,5 +2,3 @@ port=10011 localIp=10.0.0.6 baseUrl=http://10.0.0.6:10011/ masterUrl=http://10.0.0.6:10010/MasterService.json -enableDiscovery=true -discoveryPort=15066 diff --git a/same/src/main/resources/master.properties.example b/same/src/main/resources/master.properties.example index edcf323..cf197fb 100644 --- a/same/src/main/resources/master.properties.example +++ b/same/src/main/resources/master.properties.example @@ -2,6 +2,5 @@ port=10010 localIp=10.0.0.6 baseUrl=http://10.0.0.6:10010/ masterUrl=http://10.0.0.6:10010/MasterService.json -enableDiscovery=true -discoveryPort=15066 networkName=KlatreplanteNetwork +isMaster=true diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java index a94c039..0e2a278 100644 --- a/same/src/test/java/com/orbekk/same/ClientTest.java +++ b/same/src/test/java/com/orbekk/same/ClientTest.java @@ -57,15 +57,6 @@ public class ClientTest { verify(listener).notifyNetwork("MyNetwork", "MasterUrl"); } - @Test public void discover() throws Exception { - clientS.setState(".masterUrl", "master", 1); - ClientService mockClient = mock(ClientService.class); - connections.clientMap.put("mockClient/ClientService.json", - mockClient); - client.discover("mockClient/ClientService.json"); - verify(mockClient).notifyNetwork("ClientNetwork", "master"); - } - @Test public void stateListenerReceivesUpdate() throws Exception { StateChangedListener listener = mock(StateChangedListener.class); client.getInterface().addStateListener(listener); diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index e2cbadd..f369d06 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -30,11 +30,6 @@ public class MasterTest { } @Override - public void discoveryRequest(String remoteUrl) throws Exception { - throw new Exception("Unreachable client"); - } - - @Override public void masterTakeover(String masterUrl, String networkName, int masterId) throws Exception { throw new Exception("Unreachable client"); |