summaryrefslogtreecommitdiff
path: root/same/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java')
-rw-r--r--same/src/main/java/com/orbekk/net/BroadcastListener.java5
-rw-r--r--same/src/main/java/com/orbekk/same/DiscoveryService.java5
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java24
3 files changed, 30 insertions, 4 deletions
diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java
index 8c59300..59a1022 100644
--- a/same/src/main/java/com/orbekk/net/BroadcastListener.java
+++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java
@@ -20,13 +20,16 @@ public class BroadcastListener {
public synchronized DatagramPacket listen() {
logger.debug("Waiting for broadcast on port " + port);
try {
- socket = new DatagramSocket(port);
+ if (socket == null) {
+ socket = new DatagramSocket(port);
+ }
} catch (SocketException e) {
logger.warn("Failed to create socket.", e.fillInStackTrace());
return null;
}
try {
socket.setBroadcast(true);
+ socket.setReuseAddress(true);
} catch (SocketException e) {
logger.warn("Exception: {}", e);
}
diff --git a/same/src/main/java/com/orbekk/same/DiscoveryService.java b/same/src/main/java/com/orbekk/same/DiscoveryService.java
index bc28a58..0415316 100644
--- a/same/src/main/java/com/orbekk/same/DiscoveryService.java
+++ b/same/src/main/java/com/orbekk/same/DiscoveryService.java
@@ -12,13 +12,14 @@ public class DiscoveryService extends Thread {
BroadcastListener broadcastListener;
DiscoveryListener listener;
- public DiscoveryService(int port, DiscoveryListener listener,
+ public DiscoveryService(DiscoveryListener listener,
BroadcastListener broadcastListener) {
this.listener = listener;
this.broadcastListener = broadcastListener;
}
public void run() {
+ logger.info("DiscoveryService starting.");
while (!Thread.interrupted()) {
DatagramPacket packet = broadcastListener.listen();
String content = new String(packet.getData(), 0, packet.getLength());
@@ -35,9 +36,11 @@ public class DiscoveryService extends Thread {
listener.discover(url);
}
}
+ logger.info("DiscoveryService stopped.");
}
@Override public void interrupt() {
+ logger.info("Interrupt()");
super.interrupt();
broadcastListener.interrupt();
}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 6b18115..16c4d80 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -3,6 +3,7 @@ package com.orbekk.same;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.orbekk.net.BroadcastListener;
import com.orbekk.net.HttpUtil;
import com.orbekk.paxos.PaxosService;
import com.orbekk.paxos.PaxosServiceImpl;
@@ -18,6 +19,7 @@ public class SameController {
private Master master;
private Client client;
private PaxosServiceImpl paxos;
+ private DiscoveryService discoveryService;
/**
* Timeout for remote operations in milliseconds.
@@ -44,6 +46,13 @@ public class SameController {
clientUrl);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
+ DiscoveryService discoveryService = null;
+ if ("true".equals(configuration.get("enableDiscovery"))) {
+ BroadcastListener broadcastListener = new BroadcastListener(
+ configuration.getInt("discoveryPort"));
+ discoveryService = new DiscoveryService(client, broadcastListener);
+ }
+
ServerContainer server = new ServerBuilder(port)
.withServlet(new StateServlet(client.getInterface()), "/_/state")
.withService(client.getService(), ClientService.class)
@@ -52,7 +61,7 @@ public class SameController {
.build();
SameController controller = new SameController(port, server, master, client,
- paxos);
+ paxos, discoveryService);
return controller;
}
@@ -61,18 +70,23 @@ public class SameController {
ServerContainer server,
Master master,
Client client,
- PaxosServiceImpl paxos) {
+ PaxosServiceImpl paxos,
+ DiscoveryService discoveryService) {
this.port = port;
this.server = server;
this.master = master;
this.client = client;
this.paxos = paxos;
+ this.discoveryService = discoveryService;
}
public void start() throws Exception {
server.start();
master.start();
client.start();
+ if (discoveryService != null) {
+ discoveryService.start();
+ }
}
public void stop() {
@@ -80,6 +94,9 @@ public class SameController {
client.interrupt();
master.interrupt();
server.stop();
+ if (discoveryService != null) {
+ discoveryService.interrupt();
+ }
} catch (Exception e) {
logger.error("Failed to stop webserver", e);
}
@@ -89,6 +106,9 @@ public class SameController {
try {
server.join();
master.join();
+ if (discoveryService != null) {
+ discoveryService.join();
+ }
} catch (InterruptedException e) {
master.interrupt();
try {