From 2e0844bf70e54fd52f75e3f3babdd13ef81a8f2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 7 Feb 2012 09:42:32 +0100 Subject: Use DiscoveryService in app. --- .../java/com/orbekk/net/BroadcastListener.java | 5 ++++- .../java/com/orbekk/same/DiscoveryService.java | 5 ++++- .../main/java/com/orbekk/same/SameController.java | 24 ++++++++++++++++++++-- 3 files changed, 30 insertions(+), 4 deletions(-) (limited to 'same/src') diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java index 8c59300..59a1022 100644 --- a/same/src/main/java/com/orbekk/net/BroadcastListener.java +++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java @@ -20,13 +20,16 @@ public class BroadcastListener { public synchronized DatagramPacket listen() { logger.debug("Waiting for broadcast on port " + port); try { - socket = new DatagramSocket(port); + if (socket == null) { + socket = new DatagramSocket(port); + } } catch (SocketException e) { logger.warn("Failed to create socket.", e.fillInStackTrace()); return null; } try { socket.setBroadcast(true); + socket.setReuseAddress(true); } catch (SocketException e) { logger.warn("Exception: {}", e); } diff --git a/same/src/main/java/com/orbekk/same/DiscoveryService.java b/same/src/main/java/com/orbekk/same/DiscoveryService.java index bc28a58..0415316 100644 --- a/same/src/main/java/com/orbekk/same/DiscoveryService.java +++ b/same/src/main/java/com/orbekk/same/DiscoveryService.java @@ -12,13 +12,14 @@ public class DiscoveryService extends Thread { BroadcastListener broadcastListener; DiscoveryListener listener; - public DiscoveryService(int port, DiscoveryListener listener, + public DiscoveryService(DiscoveryListener listener, BroadcastListener broadcastListener) { this.listener = listener; this.broadcastListener = broadcastListener; } public void run() { + logger.info("DiscoveryService starting."); while (!Thread.interrupted()) { DatagramPacket packet = broadcastListener.listen(); String content = new String(packet.getData(), 0, packet.getLength()); @@ -35,9 +36,11 @@ public class DiscoveryService extends Thread { listener.discover(url); } } + logger.info("DiscoveryService stopped."); } @Override public void interrupt() { + logger.info("Interrupt()"); super.interrupt(); broadcastListener.interrupt(); } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 6b18115..16c4d80 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -3,6 +3,7 @@ package com.orbekk.same; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orbekk.net.BroadcastListener; import com.orbekk.net.HttpUtil; import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; @@ -18,6 +19,7 @@ public class SameController { private Master master; private Client client; private PaxosServiceImpl paxos; + private DiscoveryService discoveryService; /** * Timeout for remote operations in milliseconds. @@ -44,6 +46,13 @@ public class SameController { clientUrl); PaxosServiceImpl paxos = new PaxosServiceImpl(""); + DiscoveryService discoveryService = null; + if ("true".equals(configuration.get("enableDiscovery"))) { + BroadcastListener broadcastListener = new BroadcastListener( + configuration.getInt("discoveryPort")); + discoveryService = new DiscoveryService(client, broadcastListener); + } + ServerContainer server = new ServerBuilder(port) .withServlet(new StateServlet(client.getInterface()), "/_/state") .withService(client.getService(), ClientService.class) @@ -52,7 +61,7 @@ public class SameController { .build(); SameController controller = new SameController(port, server, master, client, - paxos); + paxos, discoveryService); return controller; } @@ -61,18 +70,23 @@ public class SameController { ServerContainer server, Master master, Client client, - PaxosServiceImpl paxos) { + PaxosServiceImpl paxos, + DiscoveryService discoveryService) { this.port = port; this.server = server; this.master = master; this.client = client; this.paxos = paxos; + this.discoveryService = discoveryService; } public void start() throws Exception { server.start(); master.start(); client.start(); + if (discoveryService != null) { + discoveryService.start(); + } } public void stop() { @@ -80,6 +94,9 @@ public class SameController { client.interrupt(); master.interrupt(); server.stop(); + if (discoveryService != null) { + discoveryService.interrupt(); + } } catch (Exception e) { logger.error("Failed to stop webserver", e); } @@ -89,6 +106,9 @@ public class SameController { try { server.join(); master.join(); + if (discoveryService != null) { + discoveryService.join(); + } } catch (InterruptedException e) { master.interrupt(); try { -- cgit v1.2.3