diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-01-13 10:18:02 +0100 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-01-13 10:18:02 +0100 |
commit | ba7132f3ce5629cff2cc4857fff7bd672511bee8 (patch) | |
tree | dc6f7d1adafb8351004f2de6f3f775ebfe596fc7 /same | |
parent | 85a93de08694f25bd049c5236f11f06b8d8e4ff7 (diff) |
Rename projects.
– jsonrpc → same
– master → same-android
Diffstat (limited to 'same')
-rw-r--r-- | same/pom.xml | 95 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/net/BroadcastListener.java | 53 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/net/Broadcaster.java | 77 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/net/HttpUtil.java | 26 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/rpc/App.java | 47 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/rpc/Client.java | 66 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/rpc/RpcHandler.java | 52 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/ConnectionManager.java | 10 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java | 31 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameService.java | 40 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameServiceImpl.java | 60 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameState.java | 233 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/State.java | 78 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/UrlReceiver.java | 10 | ||||
-rw-r--r-- | same/src/main/resources/log4j.properties | 7 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/same/SameStateTest.java | 105 |
16 files changed, 990 insertions, 0 deletions
diff --git a/same/pom.xml b/same/pom.xml new file mode 100644 index 0000000..6d3d5f9 --- /dev/null +++ b/same/pom.xml @@ -0,0 +1,95 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.orbekk</groupId> + <artifactId>same</artifactId> + <version>0.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>same</name> + <url>http://github.com/orbekk/master</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <repositories> + <repository> + <id>jsonrpc3j-webdav-maven-repo</id> + <name>jsonrpc4j maven repository</name> + <url>http://jsonrpc4j.googlecode.com/svn/maven/repo/</url> + <layout>default</layout> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>com.googlecode</groupId> + <artifactId>jsonrpc4j</artifactId> + <version>0.18</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.7.5</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.6.4</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.6.4</version> + </dependency> + + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + <version>2.5</version> + </dependency> + + <dependency> + <groupId>javax.portlet</groupId> + <artifactId>portlet-api</artifactId> + <version>2.0</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>8.0.0.M3</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>8.0.0.M3</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java new file mode 100644 index 0000000..c0b66e0 --- /dev/null +++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java @@ -0,0 +1,53 @@ +package com.orbekk.net; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BroadcastListener { + private int port; + private Logger logger = LoggerFactory.getLogger(getClass()); + + public BroadcastListener(int port) { + this.port = port; + } + + public DatagramPacket listen() { + logger.debug("Waiting for broadcast on port " + port); + DatagramSocket socket; + try { + socket = new DatagramSocket(port); + } catch (SocketException e) { + logger.warn("Failed to create socket.", e.fillInStackTrace()); + return null; + } + try { + socket.setBroadcast(true); + } catch (SocketException e) { + logger.warn("Exception: {}", e); + } + byte[] buffer = new byte[2048]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + socket.receive(packet); + } catch (IOException e) { + logger.warn("Exception when listening for broadcast: {}", e); + return null; + } + + String address = packet.getAddress().getHostAddress(); + logger.debug("Received broadcast from " + address + + ": " + new String(packet.getData())); + return packet; + } + + public static void main(String[] args) { + int port = Integer.parseInt(args[0]); + BroadcastListener listener = new BroadcastListener(port); + System.out.println("Received broadcast: " + listener.listen()); + } +} diff --git a/same/src/main/java/com/orbekk/net/Broadcaster.java b/same/src/main/java/com/orbekk/net/Broadcaster.java new file mode 100644 index 0000000..95e279c --- /dev/null +++ b/same/src/main/java/com/orbekk/net/Broadcaster.java @@ -0,0 +1,77 @@ +package com.orbekk.net; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Broadcaster { + private Logger logger = LoggerFactory.getLogger(getClass()); + + public List<InetAddress> getBroadcastAddresses() { + List<InetAddress> broadcastAddresses = new LinkedList<InetAddress>(); + + Enumeration<NetworkInterface> interfaces; + try { + interfaces = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e) { + logger.warn("Network problem?", e.fillInStackTrace()); + return broadcastAddresses; + } + while (interfaces.hasMoreElements()) { + NetworkInterface iface = interfaces.nextElement(); + try { + if (iface.isLoopback()) { + logger.debug("Ignoring looback device " + iface.getName()); + continue; + } + for (InterfaceAddress address : iface.getInterfaceAddresses()) { + InetAddress broadcast = address.getBroadcast(); + if (broadcast != null) { + broadcastAddresses.add(broadcast); + } + } + } catch (SocketException e) { + logger.info("Ignoring interface " + iface.getName(), e.fillInStackTrace()); + } + } + return broadcastAddresses; + } + + public boolean sendBroadcast(int port, byte[] data) { + boolean successful = false; + for (InetAddress broadcastAddress : getBroadcastAddresses()) { + try { + DatagramSocket socket = new DatagramSocket(); + socket.setBroadcast(true); + DatagramPacket packet = new DatagramPacket(data, data.length, broadcastAddress, port); + socket.send(packet); + successful = true; + } catch (SocketException e) { + logger.warn("Failed to send broadcast to " + broadcastAddress + + ". ", e.fillInStackTrace()); + } catch (IOException e) { + logger.warn("Error when sending broadcast to " + + broadcastAddress + ".", e.fillInStackTrace()); + } + } + return successful; + } + + public static void main(String[] args) { + int port = Integer.parseInt(args[0]); + Broadcaster broadcaster = new Broadcaster(); + String message = "Broadcast from Java broadcaster."; + broadcaster.sendBroadcast(port, message.getBytes()); + } +} diff --git a/same/src/main/java/com/orbekk/net/HttpUtil.java b/same/src/main/java/com/orbekk/net/HttpUtil.java new file mode 100644 index 0000000..b4bb887 --- /dev/null +++ b/same/src/main/java/com/orbekk/net/HttpUtil.java @@ -0,0 +1,26 @@ +package com.orbekk.net; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpUtil { + private static final Logger logger = + LoggerFactory.getLogger(HttpUtil.class); + + public static void sendHttpRequest(String url) { + try { + URL pingUrl = new URL(url); + pingUrl.openStream(); + // URLConnection connection = pingUrl.openConnection(); + // connection.connect(); + } catch (MalformedURLException e) { + logger.warn("Unable to send ping to {}: {}.", url, e); + } catch (IOException e) { + logger.warn("Error when sending ping: {}", e); + } + } +} diff --git a/same/src/main/java/com/orbekk/rpc/App.java b/same/src/main/java/com/orbekk/rpc/App.java new file mode 100644 index 0000000..0e9dce3 --- /dev/null +++ b/same/src/main/java/com/orbekk/rpc/App.java @@ -0,0 +1,47 @@ +package com.orbekk.rpc; + +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.ConnectionManagerImpl; +import com.orbekk.same.SameState; +import com.orbekk.same.SameService; +import com.orbekk.same.SameServiceImpl; +import org.eclipse.jetty.server.Server; + +public class App { + public static void main(String[] args) { + if (args.length < 3) { + System.err.println("Arguments: port networkName clientId"); + System.exit(1); + } + int port = Integer.parseInt(args[0]); + String networkName = args[1]; + String clientId = args[2]; + + ConnectionManagerImpl connections = new ConnectionManagerImpl(); + + SameState sameState = new SameState(networkName, clientId, + connections); + sameState.start(); + + SameServiceImpl service = new SameServiceImpl(sameState); + JsonRpcServer jsonServer = new JsonRpcServer(service, + SameService.class); + + Server server = new Server(port); + RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState); + server.setHandler(rpcHandler); + + try { + server.start(); + } catch (Exception e) { + System.out.println("Could not start jetty server."); + e.printStackTrace(); + } + + try { + server.join(); + } catch (InterruptedException e) { + System.out.println("Interrupt"); + } + } +} diff --git a/same/src/main/java/com/orbekk/rpc/Client.java b/same/src/main/java/com/orbekk/rpc/Client.java new file mode 100644 index 0000000..225347a --- /dev/null +++ b/same/src/main/java/com/orbekk/rpc/Client.java @@ -0,0 +1,66 @@ +package com.orbekk.rpc; + +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.ConnectionManagerImpl; +import com.orbekk.same.SameState; +import com.orbekk.same.SameService; +import com.orbekk.same.SameServiceImpl; +import com.orbekk.net.HttpUtil; +import org.eclipse.jetty.server.Server; + +public class Client { + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Arguments: port clientId thisNetworkName " + + "remoteNetworkAddr"); + System.exit(1); + } + int port = Integer.parseInt(args[0]); + String clientId = args[1]; + String networkName = args[2]; + String remoteAddr = args[3]; + + ConnectionManagerImpl connections = new ConnectionManagerImpl(); + + SameState sameState = new SameState(networkName, clientId, + connections); + sameState.start(); + + SameServiceImpl service = new SameServiceImpl(sameState); + JsonRpcServer jsonServer = new JsonRpcServer(service, + SameService.class); + + Server server = new Server(port); + RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState); + server.setHandler(rpcHandler); + + try { + server.start(); + } catch (Exception e) { + System.out.println("Could not start jetty server."); + e.printStackTrace(); + } + + while (sameState.getUrl() == null) { + HttpUtil.sendHttpRequest(remoteAddr + "ping?port=" + port); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // Ignore interrupt in wait loop. + } + } + + SameService remoteService = connections.getConnection(remoteAddr); + remoteService.notifyNetwork("NoNetwork"); + remoteService.participateNetwork("FirstNetwork", + sameState.getClientId(), sameState.getUrl()); + + try { + server.join(); + } catch (InterruptedException e) { + System.out.println("Interrupt"); + } + + } +} diff --git a/same/src/main/java/com/orbekk/rpc/RpcHandler.java b/same/src/main/java/com/orbekk/rpc/RpcHandler.java new file mode 100644 index 0000000..0bafad4 --- /dev/null +++ b/same/src/main/java/com/orbekk/rpc/RpcHandler.java @@ -0,0 +1,52 @@ +package com.orbekk.rpc; + +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.net.HttpUtil; +import com.orbekk.same.UrlReceiver; +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RpcHandler extends AbstractHandler { + private Logger logger = LoggerFactory.getLogger(getClass()); + private JsonRpcServer rpcServer; + private UrlReceiver urlReceiver; + + public RpcHandler(JsonRpcServer rpcServer, + UrlReceiver urlReceiver) { + this.rpcServer = rpcServer; + this.urlReceiver = urlReceiver; + } + + @Override + public synchronized void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + logger.info("Handling request to target: " + target); + + if (urlReceiver != null) { + String sameServiceUrl = "http://" + request.getLocalAddr() + + ":" + request.getLocalPort() + "/SameService.json"; + urlReceiver.setUrl(sameServiceUrl); + urlReceiver = null; + } + + if (target.equals("/ping")) { + int remotePort = Integer.parseInt(request.getParameter("port")); + String pongUrl = "http://" + request.getRemoteAddr() + ":" + + remotePort + "/pong"; + logger.info("Got ping. Sending pong to {}", pongUrl); + HttpUtil.sendHttpRequest(pongUrl); + } else if (target.equals("/pong")) { + logger.info("Received pong from {}", request.getRemoteAddr()); + } else { + rpcServer.handle(request, response); + } + baseRequest.setHandled(true); + } +} diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java new file mode 100644 index 0000000..985f6f0 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -0,0 +1,10 @@ +package com.orbekk.same; + +/** + * An interface that returns a connection for a participant. + * + * When testing, this interface can be mocked to use local participants only. + */ +public interface ConnectionManager { + SameService getConnection(String url); +} diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java new file mode 100644 index 0000000..841d5fa --- /dev/null +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -0,0 +1,31 @@ +package com.orbekk.same; + +import com.googlecode.jsonrpc4j.JsonRpcHttpClient; +import com.googlecode.jsonrpc4j.ProxyUtil; +import java.net.MalformedURLException; +import java.net.URL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionManagerImpl implements ConnectionManager { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + public ConnectionManagerImpl() { + } + + @Override + public SameService getConnection(String url) { + SameService service = null; + try { + JsonRpcHttpClient client = new JsonRpcHttpClient(new URL(url)); + service = ProxyUtil.createProxy( + this.getClass().getClassLoader(), + SameService.class, + client); + } catch (MalformedURLException e) { + logger.warn("Unable to create client for {}, {}", url, e); + } + return service; + } +} diff --git a/same/src/main/java/com/orbekk/same/SameService.java b/same/src/main/java/com/orbekk/same/SameService.java new file mode 100644 index 0000000..8f239da --- /dev/null +++ b/same/src/main/java/com/orbekk/same/SameService.java @@ -0,0 +1,40 @@ +package com.orbekk.same; + +import java.util.Map; + +public interface SameService { + /** + * A notification that 'networkName' exists. + * + * This is called by any participant of a network after a broadcast + * has been performed. + */ + void notifyNetwork(String networkName); + + /** + * A request from the callee to participate in 'networkName'. + */ + void participateNetwork(String networkName, String clientId, String url); + + /** + * Notification of participation in network. + */ + void notifyParticipation(String networkName, String masterId); + + /** + * New state. + * + * When sent to a non-master from the master, use 'newState' as the + * current state. + * + * When sent to a master, broadcast the new state to all clients. + */ + void setState(String newState); + + /** + * Notify all nodes of network participants. + * + * Only sent from master to non-master. + */ + void setParticipants(Map<String, String> participants); +} diff --git a/same/src/main/java/com/orbekk/same/SameServiceImpl.java b/same/src/main/java/com/orbekk/same/SameServiceImpl.java new file mode 100644 index 0000000..27579b5 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -0,0 +1,60 @@ +package com.orbekk.same; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SameServiceImpl implements SameService { + private Logger logger = LoggerFactory.getLogger(getClass()); + private SameState sameState; + + public SameServiceImpl(SameState sameState) { + this.sameState = sameState; + } + + @Override + public void notifyNetwork(String networkName) { + logger.info("Notification from network " + networkName); + } + + @Override + public void participateNetwork(String networkName, String clientId, + String url) { + if (!networkName.equals(sameState.getNetworkName())) { + logger.warn("Client tried to join {}, but network name is {}.", + networkName, sameState.getNetworkName()); + return; + } + if (clientId.equals("") || url.equals("")) { + logger.warn("Missing client info: ClientId({}), URL({}).", + clientId, url); + return; + } + sameState.addParticipant(clientId, url); + } + + @Override + public void notifyParticipation(String networkName, String masterId) { + logger.info("Joining network {}. Master is {}", networkName, masterId); + // int i = 1; + // for (Map.Entry<String, String> e : participants.entrySet()) { + // String clientId = e.getKey(); + // String url = e.getValue(); + // logger.info(" {} participant {}: {}, {}", + // new Object[]{networkName, i, clientId, url}); + // i++; + // } + sameState.joinNetwork(networkName, masterId); + } + + @Override + public void setParticipants(Map<String, String> participants) { + sameState.setParticipants(participants); + } + + @Override + public void setState(String newState) { + sameState.setState(newState); + } +} diff --git a/same/src/main/java/com/orbekk/same/SameState.java b/same/src/main/java/com/orbekk/same/SameState.java new file mode 100644 index 0000000..c9cd216 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/SameState.java @@ -0,0 +1,233 @@ +package com.orbekk.same; + +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The implementation of a 'Same' state. + * + * This class manages the current state of the Same protocol. + */ +public class SameState extends Thread implements UrlReceiver { + private Logger logger = LoggerFactory.getLogger(getClass()); + private ConnectionManager connections; + + // TODO: Change the name of State. + private com.orbekk.same.State state = + new com.orbekk.same.State(); + + /** + * The client id of this participant. + */ + private String clientId; + + /** + * Stopping condition for this thread. + */ + private boolean stopped = false; + + private String _setState = null; + private Map<String, String> _setParticipants = null; + + private Map<String, String> pendingParticipants = + new HashMap<String, String>(); + + public SameState(String networkName, String clientId, + ConnectionManager connections) { + state.setNetworkName(networkName); + this.clientId = clientId; + this.connections = connections; + state.setMasterId(clientId); + state.getParticipants().put(clientId, null); + } + + public String getMasterId() { + return state.getMasterId(); + } + + public synchronized Map<String, String> getParticipants() { + return state.getParticipants(); + } + + /** + * Reset this SameService to an initial state. + * + * TODO: Implement fully. + */ + private synchronized void resetState() { + state = new com.orbekk.same.State(); + pendingParticipants.clear(); + } + + public synchronized void joinNetwork(String networkName, String masterId) { + resetState(); + state.setNetworkName(networkName); + state.setMasterId(masterId); + logger.info("Joined network {}.", networkName); + } + + public String getClientId() { + return clientId; + } + + public String getNetworkName() { + return state.getNetworkName(); + } + + public String getCurrentState() { + return state.getData(); + } + + /** + * TODO: Move to a separate library. + */ + public void librarySetNewState(String newState) { + connections.getConnection( + state.getParticipants().get(state.getMasterId())) + .setState(newState); + } + + public String getUrl() { + return state.getParticipants().get(clientId); + } + + @Override + public void setUrl(String url) { + logger.info("My URL is {}", url); + state.getParticipants().put(clientId, url); + } + + public synchronized void addParticipant(String clientId, String url) { + logger.info("PendingParticipant.add: {} ({})", clientId, url); + pendingParticipants.put(clientId, url); + notifyAll(); + } + + public synchronized void setParticipants(Map<String, String> participants) { + logger.info("Pending operation: _setParticipants"); + _setParticipants = participants; + notifyAll(); + } + + public synchronized void setState(String newState) { + logger.info("Pending operation: _setState"); + _setState = newState; + notifyAll(); + } + + private synchronized void handleSetParticipants() { + if (_setParticipants != null) { + if (isMaster()) { + logger.error("{}: Master received setParticipants.", clientId); + } else { + logger.info("{}: New participants committed.", clientId); + state.getParticipants().clear(); + state.getParticipants().putAll(_setParticipants); + } + } + _setParticipants = null; + } + + public synchronized void handleSetState() { + if (_setState != null) { + if (isMaster()) { + broadcast(new ServiceOperation() { + @Override void run(SameService service) { + service.setState(_setState); + } + }); + } + state.setData(_setState); + _setState = null; + } + } + + private boolean isMaster() { + return state.getMasterId().equals(clientId); + } + + private synchronized void handleNewParticipants() { + if (!isMaster()) { + for (Map.Entry<String, String> e : pendingParticipants.entrySet()) { + SameService master = connections.getConnection( + state.getParticipants().get(state.getMasterId())); + logger.info("Redirecting participant request to {}", + state.getMasterId()); + String clientId = e.getKey(); + String url = e.getValue(); + master.participateNetwork(state.getNetworkName(), clientId, + url); + } + } else { + state.getParticipants().putAll(pendingParticipants); + for (Map.Entry<String, String> e : + pendingParticipants.entrySet()) { + String clientId = e.getKey(); + String url = e.getValue(); + logger.info("New participant: {} URL({})", clientId, url); + SameService remoteService = connections.getConnection(url); + remoteService.notifyParticipation(state.getNetworkName(), + state.getMasterId()); + broadcast(new ServiceOperation(){ + @Override void run(SameService service) { + service.setParticipants(state.getParticipants()); + } + }); + } + } + pendingParticipants.clear(); + } + + /** + * This method runs the pending commands to SameState. + * + * It should be called by the worker thread, but can be called directly + * for testing purposes to avoid threading in unit tests. + */ + synchronized void internalRun() { + handleNewParticipants(); + handleSetState(); + handleSetParticipants(); + } + + public synchronized void run() { + while (!stopped) { + internalRun(); + try { + wait(1000); + } catch (InterruptedException e) { + // Ignore interrupt in wait loop. + } + } + } + + public synchronized void stopSame() { + try { + stopped = true; + notifyAll(); + this.join(); + } catch (InterruptedException e) { + logger.warn("Got InterruptedException while waiting for SameState " + + "to finish. Ignoring."); + } + } + + public abstract static class ServiceOperation { + abstract void run(SameService service); + } + + public synchronized void broadcast(ServiceOperation operation) { + for (Map.Entry<String, String> e : + state.getParticipants().entrySet()) { + String clientId = e.getKey(); + String url = e.getValue(); + if (!clientId.equals(this.clientId)) { + operation.run(connections.getConnection(url)); + } + } + } +} diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java new file mode 100644 index 0000000..70ffa83 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/State.java @@ -0,0 +1,78 @@ +package com.orbekk.same; + +import java.util.Map; +import java.util.HashMap; + +public class State { + private long stateIteration = 0; + private Map<String, String> participants = new HashMap<String, String>(); + private String networkName = ""; + private String masterId = ""; + private String data = ""; + + public long getStateIteration() { + return stateIteration; + } + + public void setStateIteration(long stateIteration) { + this.stateIteration = stateIteration; + } + + public Map<String, String> getParticipants() { + return participants; + } + + public String getNetworkName() { + return networkName; + } + + public void setNetworkName(String networkName) { + this.networkName = networkName; + } + + public String getMasterId() { + return masterId; + } + + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + @Override + public String toString() { + StringBuilder participantsString = new StringBuilder(); + participantsString.append("["); + boolean first = true; + for (Map.Entry<String, String> e : participants.entrySet()) { + if (!first) { + participantsString.append(", "); + } + first = false; + participantsString.append(e.getKey()) + .append("(") + .append(e.getValue()) + .append(")"); + String clientId = e.getKey(); + String url = e.getValue(); + } + participantsString.append("]"); + + return String.format( + "State( \n" + + " stateIteration = %d,\n" + + " networkName = %s,\n" + + " masterId = %s,\n" + + " data = %s,\n" + + " participants = %s\n" + + ")", stateIteration, networkName, masterId, data, + participantsString); + } +} diff --git a/same/src/main/java/com/orbekk/same/UrlReceiver.java b/same/src/main/java/com/orbekk/same/UrlReceiver.java new file mode 100644 index 0000000..31a0276 --- /dev/null +++ b/same/src/main/java/com/orbekk/same/UrlReceiver.java @@ -0,0 +1,10 @@ +package com.orbekk.same; + +/** + * An interface to get notified of the URL to this computer. + * + * This interface is used to reliably obtain the URL of this host. + */ +public interface UrlReceiver { + void setUrl(String url); +} diff --git a/same/src/main/resources/log4j.properties b/same/src/main/resources/log4j.properties new file mode 100644 index 0000000..70f8a5f --- /dev/null +++ b/same/src/main/resources/log4j.properties @@ -0,0 +1,7 @@ +log4j.rootLogger=INFO, A1 +log4j.com.orbekk=DEBUG, A1 + +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] - %m%n + diff --git a/same/src/test/java/com/orbekk/same/SameStateTest.java b/same/src/test/java/com/orbekk/same/SameStateTest.java new file mode 100644 index 0000000..b16f5f9 --- /dev/null +++ b/same/src/test/java/com/orbekk/same/SameStateTest.java @@ -0,0 +1,105 @@ +package com.orbekk.same; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.HashMap; +import org.junit.Test; +import org.junit.Before; + +public class SameStateTest { + private MockConnectionManager connections; + private SameState state1, state2, state3; + private SameService service1, service2, service3; + + public static class MockConnectionManager implements ConnectionManager { + public Map<String, SameService> connections = + new HashMap<String, SameService>(); + + @Override + public SameService getConnection(String url) { + return connections.get(url); + } + } + + public SameStateTest() { + } + + @Before public void setUp() { + connections = new MockConnectionManager(); + + state1 = new SameState("Network1", "Client1", connections); + state1.setUrl("test://client1"); + service1 = new SameServiceImpl(state1); + state2 = new SameState("Network2", "Client2", connections); + state2.setUrl("test://client2"); + service2 = new SameServiceImpl(state2); + state3 = new SameState("Network3", "Client3", connections); + state3.setUrl("test://client3"); + service3 = new SameServiceImpl(state3); + + connections.connections.put(state1.getUrl(), service1); + connections.connections.put(state2.getUrl(), service2); + connections.connections.put(state3.getUrl(), service3); + } + + public void joinNetwork() { + connections.getConnection(state1.getUrl()). + participateNetwork("Network1", state2.getClientId(), + state2.getUrl()); + connections.getConnection(state1.getUrl()). + participateNetwork("Network1", state3.getClientId(), + state3.getUrl()); + state1.internalRun(); + state2.internalRun(); + state3.internalRun(); + + assertTrue(state1.getParticipants().size() == 3); + assertTrue(state2.getParticipants().size() == 3); + assertTrue(state3.getParticipants().size() == 3); + } + + @Test public void testJoinNetwork() { + connections.getConnection(state1.getUrl()). + participateNetwork("Network1", state2.getClientId(), + state2.getUrl()); + assertTrue(state1.getParticipants().size() == 1); + assertTrue(state2.getParticipants().size() == 1); + + state1.internalRun(); + state2.internalRun(); + + assertTrue(state1.getParticipants().size() == 2); + assertTrue(state2.getParticipants().size() == 2); + assertEquals(state1.getNetworkName(), state2.getNetworkName()); + + connections.getConnection(state2.getUrl()). + participateNetwork("Network1", state3.getClientId(), + state3.getUrl()); + state2.internalRun(); + state1.internalRun(); + state3.internalRun(); + state2.internalRun(); + + assertTrue(state1.getParticipants().size() == 3); + assertTrue(state2.getParticipants().size() == 3); + assertTrue(state3.getParticipants().size() == 3); + assertEquals(state1.getNetworkName(), state2.getNetworkName()); + assertEquals(state2.getNetworkName(), state3.getNetworkName()); + + assertEquals("Client1", state1.getMasterId()); + assertEquals("Client1", state2.getMasterId()); + assertEquals("Client1", state3.getMasterId()); + } + + @Test public void setState() { + joinNetwork(); + state1.librarySetNewState("New state1"); + state1.internalRun(); + state2.internalRun(); + state3.internalRun(); + assertEquals("New state1", state1.getCurrentState()); + assertEquals("New state1", state2.getCurrentState()); + assertEquals("New state1", state2.getCurrentState()); + } +} |