summaryrefslogtreecommitdiff
path: root/same
diff options
context:
space:
mode:
Diffstat (limited to 'same')
-rw-r--r--same/pom.xml95
-rw-r--r--same/src/main/java/com/orbekk/net/BroadcastListener.java53
-rw-r--r--same/src/main/java/com/orbekk/net/Broadcaster.java77
-rw-r--r--same/src/main/java/com/orbekk/net/HttpUtil.java26
-rw-r--r--same/src/main/java/com/orbekk/rpc/App.java47
-rw-r--r--same/src/main/java/com/orbekk/rpc/Client.java66
-rw-r--r--same/src/main/java/com/orbekk/rpc/RpcHandler.java52
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManager.java10
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java31
-rw-r--r--same/src/main/java/com/orbekk/same/SameService.java40
-rw-r--r--same/src/main/java/com/orbekk/same/SameServiceImpl.java60
-rw-r--r--same/src/main/java/com/orbekk/same/SameState.java233
-rw-r--r--same/src/main/java/com/orbekk/same/State.java78
-rw-r--r--same/src/main/java/com/orbekk/same/UrlReceiver.java10
-rw-r--r--same/src/main/resources/log4j.properties7
-rw-r--r--same/src/test/java/com/orbekk/same/SameStateTest.java105
16 files changed, 990 insertions, 0 deletions
diff --git a/same/pom.xml b/same/pom.xml
new file mode 100644
index 0000000..6d3d5f9
--- /dev/null
+++ b/same/pom.xml
@@ -0,0 +1,95 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.orbekk</groupId>
+ <artifactId>same</artifactId>
+ <version>0.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>same</name>
+ <url>http://github.com/orbekk/master</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>jsonrpc3j-webdav-maven-repo</id>
+ <name>jsonrpc4j maven repository</name>
+ <url>http://jsonrpc4j.googlecode.com/svn/maven/repo/</url>
+ <layout>default</layout>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.googlecode</groupId>
+ <artifactId>jsonrpc4j</artifactId>
+ <version>0.18</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.portlet</groupId>
+ <artifactId>portlet-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.0.0.M3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M3</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java
new file mode 100644
index 0000000..c0b66e0
--- /dev/null
+++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java
@@ -0,0 +1,53 @@
+package com.orbekk.net;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BroadcastListener {
+ private int port;
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ public BroadcastListener(int port) {
+ this.port = port;
+ }
+
+ public DatagramPacket listen() {
+ logger.debug("Waiting for broadcast on port " + port);
+ DatagramSocket socket;
+ try {
+ socket = new DatagramSocket(port);
+ } catch (SocketException e) {
+ logger.warn("Failed to create socket.", e.fillInStackTrace());
+ return null;
+ }
+ try {
+ socket.setBroadcast(true);
+ } catch (SocketException e) {
+ logger.warn("Exception: {}", e);
+ }
+ byte[] buffer = new byte[2048];
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ try {
+ socket.receive(packet);
+ } catch (IOException e) {
+ logger.warn("Exception when listening for broadcast: {}", e);
+ return null;
+ }
+
+ String address = packet.getAddress().getHostAddress();
+ logger.debug("Received broadcast from " + address +
+ ": " + new String(packet.getData()));
+ return packet;
+ }
+
+ public static void main(String[] args) {
+ int port = Integer.parseInt(args[0]);
+ BroadcastListener listener = new BroadcastListener(port);
+ System.out.println("Received broadcast: " + listener.listen());
+ }
+}
diff --git a/same/src/main/java/com/orbekk/net/Broadcaster.java b/same/src/main/java/com/orbekk/net/Broadcaster.java
new file mode 100644
index 0000000..95e279c
--- /dev/null
+++ b/same/src/main/java/com/orbekk/net/Broadcaster.java
@@ -0,0 +1,77 @@
+package com.orbekk.net;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Broadcaster {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ public List<InetAddress> getBroadcastAddresses() {
+ List<InetAddress> broadcastAddresses = new LinkedList<InetAddress>();
+
+ Enumeration<NetworkInterface> interfaces;
+ try {
+ interfaces = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e) {
+ logger.warn("Network problem?", e.fillInStackTrace());
+ return broadcastAddresses;
+ }
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface iface = interfaces.nextElement();
+ try {
+ if (iface.isLoopback()) {
+ logger.debug("Ignoring looback device " + iface.getName());
+ continue;
+ }
+ for (InterfaceAddress address : iface.getInterfaceAddresses()) {
+ InetAddress broadcast = address.getBroadcast();
+ if (broadcast != null) {
+ broadcastAddresses.add(broadcast);
+ }
+ }
+ } catch (SocketException e) {
+ logger.info("Ignoring interface " + iface.getName(), e.fillInStackTrace());
+ }
+ }
+ return broadcastAddresses;
+ }
+
+ public boolean sendBroadcast(int port, byte[] data) {
+ boolean successful = false;
+ for (InetAddress broadcastAddress : getBroadcastAddresses()) {
+ try {
+ DatagramSocket socket = new DatagramSocket();
+ socket.setBroadcast(true);
+ DatagramPacket packet = new DatagramPacket(data, data.length, broadcastAddress, port);
+ socket.send(packet);
+ successful = true;
+ } catch (SocketException e) {
+ logger.warn("Failed to send broadcast to " + broadcastAddress +
+ ". ", e.fillInStackTrace());
+ } catch (IOException e) {
+ logger.warn("Error when sending broadcast to " +
+ broadcastAddress + ".", e.fillInStackTrace());
+ }
+ }
+ return successful;
+ }
+
+ public static void main(String[] args) {
+ int port = Integer.parseInt(args[0]);
+ Broadcaster broadcaster = new Broadcaster();
+ String message = "Broadcast from Java broadcaster.";
+ broadcaster.sendBroadcast(port, message.getBytes());
+ }
+}
diff --git a/same/src/main/java/com/orbekk/net/HttpUtil.java b/same/src/main/java/com/orbekk/net/HttpUtil.java
new file mode 100644
index 0000000..b4bb887
--- /dev/null
+++ b/same/src/main/java/com/orbekk/net/HttpUtil.java
@@ -0,0 +1,26 @@
+package com.orbekk.net;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpUtil {
+ private static final Logger logger =
+ LoggerFactory.getLogger(HttpUtil.class);
+
+ public static void sendHttpRequest(String url) {
+ try {
+ URL pingUrl = new URL(url);
+ pingUrl.openStream();
+ // URLConnection connection = pingUrl.openConnection();
+ // connection.connect();
+ } catch (MalformedURLException e) {
+ logger.warn("Unable to send ping to {}: {}.", url, e);
+ } catch (IOException e) {
+ logger.warn("Error when sending ping: {}", e);
+ }
+ }
+}
diff --git a/same/src/main/java/com/orbekk/rpc/App.java b/same/src/main/java/com/orbekk/rpc/App.java
new file mode 100644
index 0000000..0e9dce3
--- /dev/null
+++ b/same/src/main/java/com/orbekk/rpc/App.java
@@ -0,0 +1,47 @@
+package com.orbekk.rpc;
+
+import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.ConnectionManagerImpl;
+import com.orbekk.same.SameState;
+import com.orbekk.same.SameService;
+import com.orbekk.same.SameServiceImpl;
+import org.eclipse.jetty.server.Server;
+
+public class App {
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Arguments: port networkName clientId");
+ System.exit(1);
+ }
+ int port = Integer.parseInt(args[0]);
+ String networkName = args[1];
+ String clientId = args[2];
+
+ ConnectionManagerImpl connections = new ConnectionManagerImpl();
+
+ SameState sameState = new SameState(networkName, clientId,
+ connections);
+ sameState.start();
+
+ SameServiceImpl service = new SameServiceImpl(sameState);
+ JsonRpcServer jsonServer = new JsonRpcServer(service,
+ SameService.class);
+
+ Server server = new Server(port);
+ RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState);
+ server.setHandler(rpcHandler);
+
+ try {
+ server.start();
+ } catch (Exception e) {
+ System.out.println("Could not start jetty server.");
+ e.printStackTrace();
+ }
+
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ System.out.println("Interrupt");
+ }
+ }
+}
diff --git a/same/src/main/java/com/orbekk/rpc/Client.java b/same/src/main/java/com/orbekk/rpc/Client.java
new file mode 100644
index 0000000..225347a
--- /dev/null
+++ b/same/src/main/java/com/orbekk/rpc/Client.java
@@ -0,0 +1,66 @@
+package com.orbekk.rpc;
+
+import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.ConnectionManagerImpl;
+import com.orbekk.same.SameState;
+import com.orbekk.same.SameService;
+import com.orbekk.same.SameServiceImpl;
+import com.orbekk.net.HttpUtil;
+import org.eclipse.jetty.server.Server;
+
+public class Client {
+
+ public static void main(String[] args) {
+ if (args.length < 4) {
+ System.err.println("Arguments: port clientId thisNetworkName " +
+ "remoteNetworkAddr");
+ System.exit(1);
+ }
+ int port = Integer.parseInt(args[0]);
+ String clientId = args[1];
+ String networkName = args[2];
+ String remoteAddr = args[3];
+
+ ConnectionManagerImpl connections = new ConnectionManagerImpl();
+
+ SameState sameState = new SameState(networkName, clientId,
+ connections);
+ sameState.start();
+
+ SameServiceImpl service = new SameServiceImpl(sameState);
+ JsonRpcServer jsonServer = new JsonRpcServer(service,
+ SameService.class);
+
+ Server server = new Server(port);
+ RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState);
+ server.setHandler(rpcHandler);
+
+ try {
+ server.start();
+ } catch (Exception e) {
+ System.out.println("Could not start jetty server.");
+ e.printStackTrace();
+ }
+
+ while (sameState.getUrl() == null) {
+ HttpUtil.sendHttpRequest(remoteAddr + "ping?port=" + port);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // Ignore interrupt in wait loop.
+ }
+ }
+
+ SameService remoteService = connections.getConnection(remoteAddr);
+ remoteService.notifyNetwork("NoNetwork");
+ remoteService.participateNetwork("FirstNetwork",
+ sameState.getClientId(), sameState.getUrl());
+
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ System.out.println("Interrupt");
+ }
+
+ }
+}
diff --git a/same/src/main/java/com/orbekk/rpc/RpcHandler.java b/same/src/main/java/com/orbekk/rpc/RpcHandler.java
new file mode 100644
index 0000000..0bafad4
--- /dev/null
+++ b/same/src/main/java/com/orbekk/rpc/RpcHandler.java
@@ -0,0 +1,52 @@
+package com.orbekk.rpc;
+
+import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.net.HttpUtil;
+import com.orbekk.same.UrlReceiver;
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcHandler extends AbstractHandler {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private JsonRpcServer rpcServer;
+ private UrlReceiver urlReceiver;
+
+ public RpcHandler(JsonRpcServer rpcServer,
+ UrlReceiver urlReceiver) {
+ this.rpcServer = rpcServer;
+ this.urlReceiver = urlReceiver;
+ }
+
+ @Override
+ public synchronized void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ logger.info("Handling request to target: " + target);
+
+ if (urlReceiver != null) {
+ String sameServiceUrl = "http://" + request.getLocalAddr() +
+ ":" + request.getLocalPort() + "/SameService.json";
+ urlReceiver.setUrl(sameServiceUrl);
+ urlReceiver = null;
+ }
+
+ if (target.equals("/ping")) {
+ int remotePort = Integer.parseInt(request.getParameter("port"));
+ String pongUrl = "http://" + request.getRemoteAddr() + ":" +
+ remotePort + "/pong";
+ logger.info("Got ping. Sending pong to {}", pongUrl);
+ HttpUtil.sendHttpRequest(pongUrl);
+ } else if (target.equals("/pong")) {
+ logger.info("Received pong from {}", request.getRemoteAddr());
+ } else {
+ rpcServer.handle(request, response);
+ }
+ baseRequest.setHandled(true);
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java
new file mode 100644
index 0000000..985f6f0
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java
@@ -0,0 +1,10 @@
+package com.orbekk.same;
+
+/**
+ * An interface that returns a connection for a participant.
+ *
+ * When testing, this interface can be mocked to use local participants only.
+ */
+public interface ConnectionManager {
+ SameService getConnection(String url);
+}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
new file mode 100644
index 0000000..841d5fa
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -0,0 +1,31 @@
+package com.orbekk.same;
+
+import com.googlecode.jsonrpc4j.JsonRpcHttpClient;
+import com.googlecode.jsonrpc4j.ProxyUtil;
+import java.net.MalformedURLException;
+import java.net.URL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionManagerImpl implements ConnectionManager {
+
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ public ConnectionManagerImpl() {
+ }
+
+ @Override
+ public SameService getConnection(String url) {
+ SameService service = null;
+ try {
+ JsonRpcHttpClient client = new JsonRpcHttpClient(new URL(url));
+ service = ProxyUtil.createProxy(
+ this.getClass().getClassLoader(),
+ SameService.class,
+ client);
+ } catch (MalformedURLException e) {
+ logger.warn("Unable to create client for {}, {}", url, e);
+ }
+ return service;
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/SameService.java b/same/src/main/java/com/orbekk/same/SameService.java
new file mode 100644
index 0000000..8f239da
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/SameService.java
@@ -0,0 +1,40 @@
+package com.orbekk.same;
+
+import java.util.Map;
+
+public interface SameService {
+ /**
+ * A notification that 'networkName' exists.
+ *
+ * This is called by any participant of a network after a broadcast
+ * has been performed.
+ */
+ void notifyNetwork(String networkName);
+
+ /**
+ * A request from the callee to participate in 'networkName'.
+ */
+ void participateNetwork(String networkName, String clientId, String url);
+
+ /**
+ * Notification of participation in network.
+ */
+ void notifyParticipation(String networkName, String masterId);
+
+ /**
+ * New state.
+ *
+ * When sent to a non-master from the master, use 'newState' as the
+ * current state.
+ *
+ * When sent to a master, broadcast the new state to all clients.
+ */
+ void setState(String newState);
+
+ /**
+ * Notify all nodes of network participants.
+ *
+ * Only sent from master to non-master.
+ */
+ void setParticipants(Map<String, String> participants);
+}
diff --git a/same/src/main/java/com/orbekk/same/SameServiceImpl.java b/same/src/main/java/com/orbekk/same/SameServiceImpl.java
new file mode 100644
index 0000000..27579b5
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/SameServiceImpl.java
@@ -0,0 +1,60 @@
+package com.orbekk.same;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SameServiceImpl implements SameService {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private SameState sameState;
+
+ public SameServiceImpl(SameState sameState) {
+ this.sameState = sameState;
+ }
+
+ @Override
+ public void notifyNetwork(String networkName) {
+ logger.info("Notification from network " + networkName);
+ }
+
+ @Override
+ public void participateNetwork(String networkName, String clientId,
+ String url) {
+ if (!networkName.equals(sameState.getNetworkName())) {
+ logger.warn("Client tried to join {}, but network name is {}.",
+ networkName, sameState.getNetworkName());
+ return;
+ }
+ if (clientId.equals("") || url.equals("")) {
+ logger.warn("Missing client info: ClientId({}), URL({}).",
+ clientId, url);
+ return;
+ }
+ sameState.addParticipant(clientId, url);
+ }
+
+ @Override
+ public void notifyParticipation(String networkName, String masterId) {
+ logger.info("Joining network {}. Master is {}", networkName, masterId);
+ // int i = 1;
+ // for (Map.Entry<String, String> e : participants.entrySet()) {
+ // String clientId = e.getKey();
+ // String url = e.getValue();
+ // logger.info(" {} participant {}: {}, {}",
+ // new Object[]{networkName, i, clientId, url});
+ // i++;
+ // }
+ sameState.joinNetwork(networkName, masterId);
+ }
+
+ @Override
+ public void setParticipants(Map<String, String> participants) {
+ sameState.setParticipants(participants);
+ }
+
+ @Override
+ public void setState(String newState) {
+ sameState.setState(newState);
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/SameState.java b/same/src/main/java/com/orbekk/same/SameState.java
new file mode 100644
index 0000000..c9cd216
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/SameState.java
@@ -0,0 +1,233 @@
+package com.orbekk.same;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The implementation of a 'Same' state.
+ *
+ * This class manages the current state of the Same protocol.
+ */
+public class SameState extends Thread implements UrlReceiver {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private ConnectionManager connections;
+
+ // TODO: Change the name of State.
+ private com.orbekk.same.State state =
+ new com.orbekk.same.State();
+
+ /**
+ * The client id of this participant.
+ */
+ private String clientId;
+
+ /**
+ * Stopping condition for this thread.
+ */
+ private boolean stopped = false;
+
+ private String _setState = null;
+ private Map<String, String> _setParticipants = null;
+
+ private Map<String, String> pendingParticipants =
+ new HashMap<String, String>();
+
+ public SameState(String networkName, String clientId,
+ ConnectionManager connections) {
+ state.setNetworkName(networkName);
+ this.clientId = clientId;
+ this.connections = connections;
+ state.setMasterId(clientId);
+ state.getParticipants().put(clientId, null);
+ }
+
+ public String getMasterId() {
+ return state.getMasterId();
+ }
+
+ public synchronized Map<String, String> getParticipants() {
+ return state.getParticipants();
+ }
+
+ /**
+ * Reset this SameService to an initial state.
+ *
+ * TODO: Implement fully.
+ */
+ private synchronized void resetState() {
+ state = new com.orbekk.same.State();
+ pendingParticipants.clear();
+ }
+
+ public synchronized void joinNetwork(String networkName, String masterId) {
+ resetState();
+ state.setNetworkName(networkName);
+ state.setMasterId(masterId);
+ logger.info("Joined network {}.", networkName);
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getNetworkName() {
+ return state.getNetworkName();
+ }
+
+ public String getCurrentState() {
+ return state.getData();
+ }
+
+ /**
+ * TODO: Move to a separate library.
+ */
+ public void librarySetNewState(String newState) {
+ connections.getConnection(
+ state.getParticipants().get(state.getMasterId()))
+ .setState(newState);
+ }
+
+ public String getUrl() {
+ return state.getParticipants().get(clientId);
+ }
+
+ @Override
+ public void setUrl(String url) {
+ logger.info("My URL is {}", url);
+ state.getParticipants().put(clientId, url);
+ }
+
+ public synchronized void addParticipant(String clientId, String url) {
+ logger.info("PendingParticipant.add: {} ({})", clientId, url);
+ pendingParticipants.put(clientId, url);
+ notifyAll();
+ }
+
+ public synchronized void setParticipants(Map<String, String> participants) {
+ logger.info("Pending operation: _setParticipants");
+ _setParticipants = participants;
+ notifyAll();
+ }
+
+ public synchronized void setState(String newState) {
+ logger.info("Pending operation: _setState");
+ _setState = newState;
+ notifyAll();
+ }
+
+ private synchronized void handleSetParticipants() {
+ if (_setParticipants != null) {
+ if (isMaster()) {
+ logger.error("{}: Master received setParticipants.", clientId);
+ } else {
+ logger.info("{}: New participants committed.", clientId);
+ state.getParticipants().clear();
+ state.getParticipants().putAll(_setParticipants);
+ }
+ }
+ _setParticipants = null;
+ }
+
+ public synchronized void handleSetState() {
+ if (_setState != null) {
+ if (isMaster()) {
+ broadcast(new ServiceOperation() {
+ @Override void run(SameService service) {
+ service.setState(_setState);
+ }
+ });
+ }
+ state.setData(_setState);
+ _setState = null;
+ }
+ }
+
+ private boolean isMaster() {
+ return state.getMasterId().equals(clientId);
+ }
+
+ private synchronized void handleNewParticipants() {
+ if (!isMaster()) {
+ for (Map.Entry<String, String> e : pendingParticipants.entrySet()) {
+ SameService master = connections.getConnection(
+ state.getParticipants().get(state.getMasterId()));
+ logger.info("Redirecting participant request to {}",
+ state.getMasterId());
+ String clientId = e.getKey();
+ String url = e.getValue();
+ master.participateNetwork(state.getNetworkName(), clientId,
+ url);
+ }
+ } else {
+ state.getParticipants().putAll(pendingParticipants);
+ for (Map.Entry<String, String> e :
+ pendingParticipants.entrySet()) {
+ String clientId = e.getKey();
+ String url = e.getValue();
+ logger.info("New participant: {} URL({})", clientId, url);
+ SameService remoteService = connections.getConnection(url);
+ remoteService.notifyParticipation(state.getNetworkName(),
+ state.getMasterId());
+ broadcast(new ServiceOperation(){
+ @Override void run(SameService service) {
+ service.setParticipants(state.getParticipants());
+ }
+ });
+ }
+ }
+ pendingParticipants.clear();
+ }
+
+ /**
+ * This method runs the pending commands to SameState.
+ *
+ * It should be called by the worker thread, but can be called directly
+ * for testing purposes to avoid threading in unit tests.
+ */
+ synchronized void internalRun() {
+ handleNewParticipants();
+ handleSetState();
+ handleSetParticipants();
+ }
+
+ public synchronized void run() {
+ while (!stopped) {
+ internalRun();
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore interrupt in wait loop.
+ }
+ }
+ }
+
+ public synchronized void stopSame() {
+ try {
+ stopped = true;
+ notifyAll();
+ this.join();
+ } catch (InterruptedException e) {
+ logger.warn("Got InterruptedException while waiting for SameState " +
+ "to finish. Ignoring.");
+ }
+ }
+
+ public abstract static class ServiceOperation {
+ abstract void run(SameService service);
+ }
+
+ public synchronized void broadcast(ServiceOperation operation) {
+ for (Map.Entry<String, String> e :
+ state.getParticipants().entrySet()) {
+ String clientId = e.getKey();
+ String url = e.getValue();
+ if (!clientId.equals(this.clientId)) {
+ operation.run(connections.getConnection(url));
+ }
+ }
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java
new file mode 100644
index 0000000..70ffa83
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/State.java
@@ -0,0 +1,78 @@
+package com.orbekk.same;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class State {
+ private long stateIteration = 0;
+ private Map<String, String> participants = new HashMap<String, String>();
+ private String networkName = "";
+ private String masterId = "";
+ private String data = "";
+
+ public long getStateIteration() {
+ return stateIteration;
+ }
+
+ public void setStateIteration(long stateIteration) {
+ this.stateIteration = stateIteration;
+ }
+
+ public Map<String, String> getParticipants() {
+ return participants;
+ }
+
+ public String getNetworkName() {
+ return networkName;
+ }
+
+ public void setNetworkName(String networkName) {
+ this.networkName = networkName;
+ }
+
+ public String getMasterId() {
+ return masterId;
+ }
+
+ public void setMasterId(String masterId) {
+ this.masterId = masterId;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder participantsString = new StringBuilder();
+ participantsString.append("[");
+ boolean first = true;
+ for (Map.Entry<String, String> e : participants.entrySet()) {
+ if (!first) {
+ participantsString.append(", ");
+ }
+ first = false;
+ participantsString.append(e.getKey())
+ .append("(")
+ .append(e.getValue())
+ .append(")");
+ String clientId = e.getKey();
+ String url = e.getValue();
+ }
+ participantsString.append("]");
+
+ return String.format(
+ "State( \n" +
+ " stateIteration = %d,\n" +
+ " networkName = %s,\n" +
+ " masterId = %s,\n" +
+ " data = %s,\n" +
+ " participants = %s\n" +
+ ")", stateIteration, networkName, masterId, data,
+ participantsString);
+ }
+}
diff --git a/same/src/main/java/com/orbekk/same/UrlReceiver.java b/same/src/main/java/com/orbekk/same/UrlReceiver.java
new file mode 100644
index 0000000..31a0276
--- /dev/null
+++ b/same/src/main/java/com/orbekk/same/UrlReceiver.java
@@ -0,0 +1,10 @@
+package com.orbekk.same;
+
+/**
+ * An interface to get notified of the URL to this computer.
+ *
+ * This interface is used to reliably obtain the URL of this host.
+ */
+public interface UrlReceiver {
+ void setUrl(String url);
+}
diff --git a/same/src/main/resources/log4j.properties b/same/src/main/resources/log4j.properties
new file mode 100644
index 0000000..70f8a5f
--- /dev/null
+++ b/same/src/main/resources/log4j.properties
@@ -0,0 +1,7 @@
+log4j.rootLogger=INFO, A1
+log4j.com.orbekk=DEBUG, A1
+
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] - %m%n
+
diff --git a/same/src/test/java/com/orbekk/same/SameStateTest.java b/same/src/test/java/com/orbekk/same/SameStateTest.java
new file mode 100644
index 0000000..b16f5f9
--- /dev/null
+++ b/same/src/test/java/com/orbekk/same/SameStateTest.java
@@ -0,0 +1,105 @@
+package com.orbekk.same;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.HashMap;
+import org.junit.Test;
+import org.junit.Before;
+
+public class SameStateTest {
+ private MockConnectionManager connections;
+ private SameState state1, state2, state3;
+ private SameService service1, service2, service3;
+
+ public static class MockConnectionManager implements ConnectionManager {
+ public Map<String, SameService> connections =
+ new HashMap<String, SameService>();
+
+ @Override
+ public SameService getConnection(String url) {
+ return connections.get(url);
+ }
+ }
+
+ public SameStateTest() {
+ }
+
+ @Before public void setUp() {
+ connections = new MockConnectionManager();
+
+ state1 = new SameState("Network1", "Client1", connections);
+ state1.setUrl("test://client1");
+ service1 = new SameServiceImpl(state1);
+ state2 = new SameState("Network2", "Client2", connections);
+ state2.setUrl("test://client2");
+ service2 = new SameServiceImpl(state2);
+ state3 = new SameState("Network3", "Client3", connections);
+ state3.setUrl("test://client3");
+ service3 = new SameServiceImpl(state3);
+
+ connections.connections.put(state1.getUrl(), service1);
+ connections.connections.put(state2.getUrl(), service2);
+ connections.connections.put(state3.getUrl(), service3);
+ }
+
+ public void joinNetwork() {
+ connections.getConnection(state1.getUrl()).
+ participateNetwork("Network1", state2.getClientId(),
+ state2.getUrl());
+ connections.getConnection(state1.getUrl()).
+ participateNetwork("Network1", state3.getClientId(),
+ state3.getUrl());
+ state1.internalRun();
+ state2.internalRun();
+ state3.internalRun();
+
+ assertTrue(state1.getParticipants().size() == 3);
+ assertTrue(state2.getParticipants().size() == 3);
+ assertTrue(state3.getParticipants().size() == 3);
+ }
+
+ @Test public void testJoinNetwork() {
+ connections.getConnection(state1.getUrl()).
+ participateNetwork("Network1", state2.getClientId(),
+ state2.getUrl());
+ assertTrue(state1.getParticipants().size() == 1);
+ assertTrue(state2.getParticipants().size() == 1);
+
+ state1.internalRun();
+ state2.internalRun();
+
+ assertTrue(state1.getParticipants().size() == 2);
+ assertTrue(state2.getParticipants().size() == 2);
+ assertEquals(state1.getNetworkName(), state2.getNetworkName());
+
+ connections.getConnection(state2.getUrl()).
+ participateNetwork("Network1", state3.getClientId(),
+ state3.getUrl());
+ state2.internalRun();
+ state1.internalRun();
+ state3.internalRun();
+ state2.internalRun();
+
+ assertTrue(state1.getParticipants().size() == 3);
+ assertTrue(state2.getParticipants().size() == 3);
+ assertTrue(state3.getParticipants().size() == 3);
+ assertEquals(state1.getNetworkName(), state2.getNetworkName());
+ assertEquals(state2.getNetworkName(), state3.getNetworkName());
+
+ assertEquals("Client1", state1.getMasterId());
+ assertEquals("Client1", state2.getMasterId());
+ assertEquals("Client1", state3.getMasterId());
+ }
+
+ @Test public void setState() {
+ joinNetwork();
+ state1.librarySetNewState("New state1");
+ state1.internalRun();
+ state2.internalRun();
+ state3.internalRun();
+ assertEquals("New state1", state1.getCurrentState());
+ assertEquals("New state1", state2.getCurrentState());
+ assertEquals("New state1", state2.getCurrentState());
+ }
+}