From 585f0a27869c230917c220962e0faf954a283e81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 13 Feb 2012 18:02:02 +0100 Subject: Fix broadcast code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Send broadcast from SameController. – Correct usage of SO_REUSEADDR. – BroadcasterInterface for Android code. --- .../java/com/orbekk/net/BroadcastListener.java | 6 ++-- same/src/main/java/com/orbekk/net/Broadcaster.java | 2 +- .../java/com/orbekk/net/BroadcasterFactory.java | 5 +++ .../java/com/orbekk/net/BroadcasterInterface.java | 5 +++ .../com/orbekk/net/DefaultBroadcasterFactory.java | 8 +++++ same/src/main/java/com/orbekk/same/App.java | 1 + .../main/java/com/orbekk/same/SameController.java | 41 ++++++++++++++++------ 7 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 same/src/main/java/com/orbekk/net/BroadcasterFactory.java create mode 100644 same/src/main/java/com/orbekk/net/BroadcasterInterface.java create mode 100644 same/src/main/java/com/orbekk/net/DefaultBroadcasterFactory.java (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java index 59a1022..3fdfd23 100644 --- a/same/src/main/java/com/orbekk/net/BroadcastListener.java +++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java @@ -3,6 +3,7 @@ package com.orbekk.net; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetSocketAddress; import java.net.SocketException; import org.slf4j.Logger; @@ -21,7 +22,9 @@ public class BroadcastListener { logger.debug("Waiting for broadcast on port " + port); try { if (socket == null) { - socket = new DatagramSocket(port); + socket = new DatagramSocket(null); + socket.setReuseAddress(true); + socket.bind(new InetSocketAddress(port)); } } catch (SocketException e) { logger.warn("Failed to create socket.", e.fillInStackTrace()); @@ -29,7 +32,6 @@ public class BroadcastListener { } try { socket.setBroadcast(true); - socket.setReuseAddress(true); } catch (SocketException e) { logger.warn("Exception: {}", e); } diff --git a/same/src/main/java/com/orbekk/net/Broadcaster.java b/same/src/main/java/com/orbekk/net/Broadcaster.java index 95e279c..b3e4860 100644 --- a/same/src/main/java/com/orbekk/net/Broadcaster.java +++ b/same/src/main/java/com/orbekk/net/Broadcaster.java @@ -15,7 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Broadcaster { +public class Broadcaster implements BroadcasterInterface { private Logger logger = LoggerFactory.getLogger(getClass()); public List getBroadcastAddresses() { diff --git a/same/src/main/java/com/orbekk/net/BroadcasterFactory.java b/same/src/main/java/com/orbekk/net/BroadcasterFactory.java new file mode 100644 index 0000000..30dd0ae --- /dev/null +++ b/same/src/main/java/com/orbekk/net/BroadcasterFactory.java @@ -0,0 +1,5 @@ +package com.orbekk.net; + +public interface BroadcasterFactory { + BroadcasterInterface create(); +} diff --git a/same/src/main/java/com/orbekk/net/BroadcasterInterface.java b/same/src/main/java/com/orbekk/net/BroadcasterInterface.java new file mode 100644 index 0000000..08a7e80 --- /dev/null +++ b/same/src/main/java/com/orbekk/net/BroadcasterInterface.java @@ -0,0 +1,5 @@ +package com.orbekk.net; + +public interface BroadcasterInterface { + boolean sendBroadcast(int port, byte[] data); +} diff --git a/same/src/main/java/com/orbekk/net/DefaultBroadcasterFactory.java b/same/src/main/java/com/orbekk/net/DefaultBroadcasterFactory.java new file mode 100644 index 0000000..018d334 --- /dev/null +++ b/same/src/main/java/com/orbekk/net/DefaultBroadcasterFactory.java @@ -0,0 +1,8 @@ +package com.orbekk.net; + +public class DefaultBroadcasterFactory implements BroadcasterFactory { + @Override + public BroadcasterInterface create() { + return new Broadcaster(); + } +} diff --git a/same/src/main/java/com/orbekk/same/App.java b/same/src/main/java/com/orbekk/same/App.java index 2955311..35e408c 100644 --- a/same/src/main/java/com/orbekk/same/App.java +++ b/same/src/main/java/com/orbekk/same/App.java @@ -13,6 +13,7 @@ public class App { SameController controller = SameController.create(configuration); try { controller.start(); + controller.searchNetworks(); controller.joinNetwork(configuration.get("masterUrl")); controller.join(); } catch (Exception e) { diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 5fc319d..d012d64 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -3,7 +3,10 @@ package com.orbekk.same; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orbekk.net.BroadcasterInterface; import com.orbekk.net.BroadcastListener; +import com.orbekk.net.BroadcasterFactory; +import com.orbekk.net.DefaultBroadcasterFactory; import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; import com.orbekk.same.config.Configuration; @@ -18,13 +21,16 @@ public class SameController { private Client client; private PaxosServiceImpl paxos; private DiscoveryService discoveryService; + private BroadcasterFactory broadcasterFactory; + private Configuration configuration; /** * Timeout for remote operations in milliseconds. */ private static final int timeout = 10000; - public static SameController create(Configuration configuration) { + public static SameController create(BroadcasterFactory broadcasterFactory, + Configuration configuration) { int port = configuration.getInt("port"); ConnectionManagerImpl connections = new ConnectionManagerImpl( timeout, timeout); @@ -52,29 +58,37 @@ public class SameController { } ServerContainer server = new ServerBuilder(port) - .withServlet(new StateServlet(client.getInterface()), "/_/state") - .withService(client.getService(), ClientService.class) - .withService(master, MasterService.class) - .withService(paxos, PaxosService.class) - .build(); + .withServlet(new StateServlet(client.getInterface()), "/_/state") + .withService(client.getService(), ClientService.class) + .withService(master, MasterService.class) + .withService(paxos, PaxosService.class) + .build(); - SameController controller = new SameController(port, server, master, client, - paxos, discoveryService); + SameController controller = new SameController( + configuration, server, master, client, + paxos, discoveryService, broadcasterFactory); return controller; } + public static SameController create(Configuration configuration) { + return create(new DefaultBroadcasterFactory(), configuration); + } + public SameController( - int port, + Configuration configuration, ServerContainer server, Master master, Client client, PaxosServiceImpl paxos, - DiscoveryService discoveryService) { + DiscoveryService discoveryService, + BroadcasterFactory broadcasterFactory) { + this.configuration = configuration; this.server = server; this.master = master; this.client = client; this.paxos = paxos; this.discoveryService = discoveryService; + this.broadcasterFactory = broadcasterFactory; } public void start() throws Exception { @@ -116,6 +130,13 @@ public class SameController { } } + public void searchNetworks() { + BroadcasterInterface broadcaster = broadcasterFactory.create(); + String message = "Discover " + client.getUrl(); + broadcaster.sendBroadcast(configuration.getInt("discoveryPort"), + message.getBytes()); + } + public void joinNetwork(String url) { client.joinNetwork(url); } -- cgit v1.2.3