summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-20 15:54:17 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-03-20 15:54:17 +0100
commit810c7c1587b5c44c150eb4ba893874555802fc6b (patch)
tree8b34c6ca117d489241b66def1a394f26dcea2a14
parenta16f9ef4b751900a412da4c72c9245f716dc29a6 (diff)
Remove discovery code.
Use centralized discovery instead.
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java53
-rw-r--r--same/src/main/java/com/orbekk/same/ClientService.java3
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java1
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java38
-rw-r--r--same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java5
-rw-r--r--same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java52
-rw-r--r--same/src/main/resources/client.properties.example2
-rw-r--r--same/src/main/resources/master.properties.example3
-rw-r--r--same/src/test/java/com/orbekk/same/ClientTest.java9
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java5
10 files changed, 7 insertions, 164 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index e789f2e..6d3a7d5 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -13,11 +13,10 @@ import org.slf4j.LoggerFactory;
import com.orbekk.paxos.MasterProposer;
import com.orbekk.same.State.Component;
-import com.orbekk.same.discovery.DiscoveryListener;
import com.orbekk.util.DelayedOperation;
import com.orbekk.util.WorkQueue;
-public class Client implements DiscoveryListener {
+public class Client {
public static long MASTER_TAKEOVER_TIMEOUT = 4000l;
private Logger logger = LoggerFactory.getLogger(getClass());
/** TODO: Not really useful yet. Remove? */
@@ -119,11 +118,6 @@ public class Client implements DiscoveryListener {
}
@Override
- public void discoveryRequest(String remoteUrl) {
- discoveryThread.add(remoteUrl);
- }
-
- @Override
public synchronized void masterTakeover(String masterUrl, String networkName,
int masterId) throws Exception {
logger.info("MasterTakeover({}, {}, {})",
@@ -153,15 +147,6 @@ public class Client implements DiscoveryListener {
}
};
- private WorkQueue<String> discoveryThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- for (String url : pending) {
- discover(url);
- }
- }
- };
-
public Client(State state, ConnectionManager connections,
String myUrl, Broadcaster broadcaster) {
this.state = state;
@@ -171,16 +156,13 @@ public class Client implements DiscoveryListener {
}
public void start() {
- discoveryThread.start();
}
public void interrupt() {
connectionState = ConnectionState.DISCONNECTED;
- discoveryThread.interrupt();
}
void performWork() {
- discoveryThread.performWork();
}
public String getUrl() {
@@ -228,39 +210,6 @@ public class Client implements DiscoveryListener {
this.networkListener = listener;
}
- public void sendDiscoveryRequest(String url) {
- try {
- connections.getClient(url)
- .discoveryRequest(myUrl);
- } catch (Exception e) {
- logger.warn("Failed to send discovery request: {}",
- throwableToString(e));
- }
- }
-
- @Override
- public void discover(String url) {
- String networkName = state.getDataOf(".networkName");
- if (networkName.equals(".InvalidClientNetwork")) {
- logger.warn("Client not joined to a network. Ignoring discovery");
- return;
- } else if (networkName.equals(".Private")) {
- logger.info("Ignoring broadcast to .Private network.");
- return;
- }
-
- if (!url.equals(myUrl)) {
- try {
- connections.getClient(url)
- .notifyNetwork(state.getDataOf(".networkName"),
- state.getDataOf(".masterUrl"));
- } catch (Exception e) {
- logger.warn("Failed to contact new client {}: {}", url,
- throwableToString(e));
- }
- }
- }
-
public ClientService getService() {
return serviceImpl;
}
diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java
index f1247a5..8d460db 100644
--- a/same/src/main/java/com/orbekk/same/ClientService.java
+++ b/same/src/main/java/com/orbekk/same/ClientService.java
@@ -5,9 +5,6 @@ public interface ClientService {
void setState(String component, String data, long revision) throws Exception;
- // Manual discovery request by client.
- void discoveryRequest(String remoteUrl) throws Exception;
-
/** A new master takes over.
*
* @param masterUrl The new master URL.
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index f3f4edb..ade469e 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -12,7 +12,6 @@ import com.googlecode.jsonrpc4j.ProxyUtil;
import com.orbekk.net.MyJsonRpcHttpClient;
import com.orbekk.paxos.PaxosService;
import com.orbekk.same.discovery.DirectoryService;
-import com.orbekk.same.discovery.DiscoveryService;
public class ConnectionManagerImpl implements ConnectionManager {
private int connectionTimeout;
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index e9a7916..c1c7901 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -11,7 +11,6 @@ import com.orbekk.paxos.PaxosService;
import com.orbekk.paxos.PaxosServiceImpl;
import com.orbekk.same.config.Configuration;
import com.orbekk.same.discovery.DirectoryService;
-import com.orbekk.same.discovery.DiscoveryService;
import com.orbekk.same.http.ServerContainer;
import com.orbekk.same.http.StateServlet;
import com.orbekk.same.http.JettyServerBuilder;
@@ -24,7 +23,6 @@ public class SameController {
private Master master;
private Client client;
private PaxosServiceImpl paxos;
- private DiscoveryService discoveryService;
private BroadcasterFactory broadcasterFactory;
private Configuration configuration;
private ConnectionManager connections;
@@ -81,13 +79,6 @@ public class SameController {
clientUrl, BroadcasterImpl.getDefaultBroadcastRunner());
PaxosServiceImpl paxos = new PaxosServiceImpl("");
- DiscoveryService discoveryService = null;
- if ("true".equals(configuration.get("enableDiscovery"))) {
- BroadcastListener broadcastListener = new BroadcastListener(
- configuration.getInt("discoveryPort"));
- discoveryService = new DiscoveryService(client, broadcastListener);
- }
-
StateServlet stateServlet = new StateServlet(client.getInterface(),
new VariableFactory(client.getInterface()));
@@ -100,7 +91,7 @@ public class SameController {
SameController controller = new SameController(
configuration, connections, server, master, client,
- paxos, discoveryService, broadcaster, broadcasterFactory);
+ paxos, broadcaster, broadcasterFactory);
return controller;
}
@@ -115,7 +106,6 @@ public class SameController {
MasterServiceProxy master,
Client client,
PaxosServiceImpl paxos,
- DiscoveryService discoveryService,
Broadcaster serviceBroadcaster,
BroadcasterFactory broadcasterFactory) {
this.configuration = configuration;
@@ -124,7 +114,6 @@ public class SameController {
this.masterService = master;
this.client = client;
this.paxos = paxos;
- this.discoveryService = discoveryService;
this.serviceBroadcaster = serviceBroadcaster;
this.broadcasterFactory = broadcasterFactory;
}
@@ -133,9 +122,6 @@ public class SameController {
server.start();
client.setMasterController(masterController);
client.start();
- if (discoveryService != null) {
- discoveryService.start();
- }
}
public void stop() {
@@ -145,30 +131,16 @@ public class SameController {
master.interrupt();
}
server.stop();
- if (discoveryService != null) {
- discoveryService.interrupt();
- }
} catch (Exception e) {
logger.error("Failed to stop webserver", e);
}
}
public void join() {
- try {
- server.join();
- client.interrupt();
- if (master != null) {
- master.interrupt();
- }
- if (discoveryService != null) {
- discoveryService.join();
- }
- } catch (InterruptedException e) {
- try {
- server.stop();
- } catch (Exception e1) {
- logger.error("Failed to stop server", e);
- }
+ server.join();
+ client.interrupt();
+ if (master != null) {
+ master.interrupt();
}
}
diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java
deleted file mode 100644
index e006c77..0000000
--- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.orbekk.same.discovery;
-
-public interface DiscoveryListener {
- void discover(String url);
-}
diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java
deleted file mode 100644
index a1147cc..0000000
--- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.orbekk.same.discovery;
-
-import java.net.DatagramPacket;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.orbekk.net.BroadcastListener;
-
-public class DiscoveryService extends Thread {
- private Logger logger = LoggerFactory.getLogger(getClass());
- BroadcastListener broadcastListener;
- DiscoveryListener listener;
-
- public DiscoveryService(DiscoveryListener listener,
- BroadcastListener broadcastListener) {
- this.listener = listener;
- this.broadcastListener = broadcastListener;
- }
-
- @Override
- public void run() {
- logger.info("DiscoveryService starting.");
- while (!Thread.interrupted()) {
- DatagramPacket packet = broadcastListener.listen();
- if (packet == null) {
- // An error or interrupt occurred.
- continue;
- }
- String content = new String(packet.getData(), 0, packet.getLength());
- String[] words = content.split(" ");
-
- if (!content.startsWith("Discover") || words.length < 2) {
- logger.warn("Invalid discovery message: {}", content);
- continue;
- }
-
- String url = words[1];
- logger.info("Received discovery from {}", url);
- if (listener != null) {
- listener.discover(url);
- }
- }
- logger.info("DiscoveryService stopped.");
- }
-
- @Override public void interrupt() {
- logger.info("Interrupt()");
- super.interrupt();
- broadcastListener.interrupt();
- }
-}
diff --git a/same/src/main/resources/client.properties.example b/same/src/main/resources/client.properties.example
index 5c13a0a..cb5acdf 100644
--- a/same/src/main/resources/client.properties.example
+++ b/same/src/main/resources/client.properties.example
@@ -2,5 +2,3 @@ port=10011
localIp=10.0.0.6
baseUrl=http://10.0.0.6:10011/
masterUrl=http://10.0.0.6:10010/MasterService.json
-enableDiscovery=true
-discoveryPort=15066
diff --git a/same/src/main/resources/master.properties.example b/same/src/main/resources/master.properties.example
index edcf323..cf197fb 100644
--- a/same/src/main/resources/master.properties.example
+++ b/same/src/main/resources/master.properties.example
@@ -2,6 +2,5 @@ port=10010
localIp=10.0.0.6
baseUrl=http://10.0.0.6:10010/
masterUrl=http://10.0.0.6:10010/MasterService.json
-enableDiscovery=true
-discoveryPort=15066
networkName=KlatreplanteNetwork
+isMaster=true
diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java
index a94c039..0e2a278 100644
--- a/same/src/test/java/com/orbekk/same/ClientTest.java
+++ b/same/src/test/java/com/orbekk/same/ClientTest.java
@@ -57,15 +57,6 @@ public class ClientTest {
verify(listener).notifyNetwork("MyNetwork", "MasterUrl");
}
- @Test public void discover() throws Exception {
- clientS.setState(".masterUrl", "master", 1);
- ClientService mockClient = mock(ClientService.class);
- connections.clientMap.put("mockClient/ClientService.json",
- mockClient);
- client.discover("mockClient/ClientService.json");
- verify(mockClient).notifyNetwork("ClientNetwork", "master");
- }
-
@Test public void stateListenerReceivesUpdate() throws Exception {
StateChangedListener listener = mock(StateChangedListener.class);
client.getInterface().addStateListener(listener);
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index e2cbadd..f369d06 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -30,11 +30,6 @@ public class MasterTest {
}
@Override
- public void discoveryRequest(String remoteUrl) throws Exception {
- throw new Exception("Unreachable client");
- }
-
- @Override
public void masterTakeover(String masterUrl, String networkName, int masterId)
throws Exception {
throw new Exception("Unreachable client");