summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-10 22:24:01 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-10 22:33:00 +0100
commitfc7ef2ba156db641e1d193cc2e3f1352eaa74bd8 (patch)
treeb020dbcee136476eaf1cf6c8c22a7530789887d1
parent3fd156ef2cd87e0b70dc5906aad7071ab908cedf (diff)
Add SameState class to manage the state of a client.
The SameState class manages a view of the 'Same' network. Some functionality was moved from SameServiceImpl to SameState.
-rw-r--r--jsonrpc/src/main/java/com/orbekk/rpc/App.java6
-rw-r--r--jsonrpc/src/main/java/com/orbekk/rpc/Client.java7
-rw-r--r--jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java11
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java11
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameService.java19
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java38
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameState.java78
7 files changed, 152 insertions, 18 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java
index ffd316d..2162a59 100644
--- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java
+++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java
@@ -1,6 +1,7 @@
package com.orbekk.rpc;
import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.SameState;
import com.orbekk.same.SameService;
import com.orbekk.same.SameServiceImpl;
import org.eclipse.jetty.server.Server;
@@ -14,12 +15,13 @@ public class App {
int port = Integer.parseInt(args[0]);
String networkName = args[1];
- SameService service = new SameServiceImpl(networkName);
+ SameState sameState = new SameState(networkName);
+ SameServiceImpl service = new SameServiceImpl(sameState);
JsonRpcServer jsonServer = new JsonRpcServer(service,
SameService.class);
Server server = new Server(port);
- RpcHandler rpcHandler = new RpcHandler(jsonServer);
+ RpcHandler rpcHandler = new RpcHandler(jsonServer, service);
server.setHandler(rpcHandler);
try {
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
index ba77df6..99abf93 100644
--- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
+++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
@@ -10,11 +10,12 @@ import com.googlecode.jsonrpc4j.ProxyUtil;
public class Client {
public static void main(String[] args) {
- if (args.length < 1) {
- System.err.println("Arguments: networkAddress");
+ if (args.length < 2) {
+ System.err.println("Arguments: networkAddress port");
System.exit(1);
}
String networkAddress = args[0];
+ int port = Integer.parseInt(args[1]);
JsonRpcHttpClient client = null;
try {
client = new JsonRpcHttpClient(new URL(networkAddress));
@@ -26,6 +27,6 @@ public class Client {
SameService.class,
client);
service.notifyNetwork("NoNetwork");
- System.out.println(service.participateNetwork("FirstNetwork"));
+ service.participateNetwork("FirstNetwork", port);
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java
index bc76e4e..39676fe 100644
--- a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java
+++ b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java
@@ -10,18 +10,25 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.CallerInfoListener;
public class RpcHandler extends AbstractHandler {
private JsonRpcServer rpcServer;
+ private CallerInfoListener callerInfoListener;
- public RpcHandler(JsonRpcServer rpcServer) {
+ public RpcHandler(JsonRpcServer rpcServer,
+ CallerInfoListener callerInfoListener) {
this.rpcServer = rpcServer;
+ this.callerInfoListener = callerInfoListener;
}
@Override
- public void handle(String target, Request baseRequest,
+ public synchronized void handle(String target, Request baseRequest,
HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
+ if (callerInfoListener != null) {
+ callerInfoListener.setCaller(request.getRemoteAddr());
+ }
rpcServer.handle(request, response);
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java
new file mode 100644
index 0000000..e3ccfea
--- /dev/null
+++ b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java
@@ -0,0 +1,11 @@
+package com.orbekk.same;
+
+/**
+ * An interface to get notified of the current caller.
+ *
+ * This interface is needed because jsonrpc4j does not pass the
+ * HttpServletRequest to the service implementation.
+ */
+public interface CallerInfoListener {
+ void setCaller(String callerIp);
+}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
index dccc1e9..d18cb9d 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
@@ -1,6 +1,23 @@
package com.orbekk.same;
+import java.util.List;
+
public interface SameService {
+ /**
+ * A notification that 'networkName' exists.
+ *
+ * This is called by any participant of a network after a broadcast
+ * has been performed.
+ */
void notifyNetwork(String networkName);
- String participateNetwork(String networkName);
+
+ /**
+ * A request from the callee to participate in 'networkName'.
+ */
+ void participateNetwork(String networkName, int remotePort);
+
+ /**
+ * Notification of participation in network.
+ */
+ void notifyParticipation(String networkName, List<String> participants);
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
index 7e1de00..0052de3 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
@@ -6,13 +6,18 @@ import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SameServiceImpl implements SameService {
+public class SameServiceImpl implements SameService, CallerInfoListener {
private Logger logger = LoggerFactory.getLogger(getClass());
- private List<String> participants = new LinkedList<String>();
- private String networkName;
+ private SameState sameState;
+ private String currentCallerIp;
- public SameServiceImpl(String networkName) {
- this.networkName = networkName;
+ public SameServiceImpl(SameState sameState) {
+ this.sameState = sameState;
+ }
+
+ @Override
+ public void setCaller(String callerIp) {
+ currentCallerIp = callerIp;
}
@Override
@@ -21,12 +26,25 @@ public class SameServiceImpl implements SameService {
}
@Override
- public String participateNetwork(String networkName) {
- logger.info("Got participation request.");
- if (!networkName.equals(this.networkName)) {
+ public void participateNetwork(String networkName, int remotePort) {
+ if (!networkName.equals(sameState.getNetworkName())) {
logger.warn("Client tried to join {}, but network name is {}.",
- networkName, this.networkName);
+ networkName, sameState.getNetworkName());
+ }
+ String url = "http://" + currentCallerIp + ":" + remotePort;
+ sameState.addParticipant(url);
+ }
+
+ @Override
+ public void notifyParticipation(String networkName,
+ List<String> participants) {
+ logger.info("Joining network {}.", networkName);
+ int i = 1;
+ for (String participant : participants) {
+ logger.info(" {} participant {}: {}",
+ new Object[]{networkName, i, participant});
+ i++;
}
- return "<Not implemented>";
+ logger.warn("Joining not implemented.");
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
new file mode 100644
index 0000000..b2518c2
--- /dev/null
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
@@ -0,0 +1,78 @@
+package com.orbekk.same;
+
+import java.util.List;
+import java.util.LinkedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The implementation of a 'Same' state.
+ *
+ * This class manages the current state of the Same protocol.
+ */
+public class SameState extends Thread {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private List<String> participants = new LinkedList<String>();
+ private String currentState = "";
+ private String networkName;
+ private boolean stopped = false;
+
+ /**
+ * Queue for pending participants.
+ */
+ private List<String> pendingParticipants = new LinkedList<String>();
+
+ public SameState(String networkName) {
+ this.networkName = networkName;
+ }
+
+ public synchronized List<String> getParticipants() {
+ return participants;
+ }
+
+ public String getNetworkName() {
+ return networkName;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public synchronized void addParticipant(String url) {
+ synchronized(this) {
+ logger.info("Add pending participant: {}", url);
+ pendingParticipants.add(url);
+ notifyAll();
+ }
+ }
+
+ private synchronized void handleNewParticipants() {
+ for (String url : pendingParticipants) {
+ logger.info("New participant: {}", url);
+ participants.add(url);
+ }
+ pendingParticipants.clear();
+ }
+
+ public synchronized void run() {
+ while (!stopped) {
+ handleNewParticipants();
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore interrupt in wait loop.
+ }
+ }
+ }
+
+ public synchronized void stopSame() {
+ try {
+ stopped = true;
+ notifyAll();
+ this.join();
+ } catch (InterruptedException e) {
+ logger.warn("Got InterruptedException while waiting for SameState " +
+ "to finish. Ignoring.");
+ }
+ }
+}