summaryrefslogtreecommitdiff
path: root/jsonrpc/src/main/java/com/orbekk/same
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-10 22:24:01 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-10 22:33:00 +0100
commitfc7ef2ba156db641e1d193cc2e3f1352eaa74bd8 (patch)
treeb020dbcee136476eaf1cf6c8c22a7530789887d1 /jsonrpc/src/main/java/com/orbekk/same
parent3fd156ef2cd87e0b70dc5906aad7071ab908cedf (diff)
Add SameState class to manage the state of a client.
The SameState class manages a view of the 'Same' network. Some functionality was moved from SameServiceImpl to SameState.
Diffstat (limited to 'jsonrpc/src/main/java/com/orbekk/same')
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java11
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameService.java19
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java38
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameState.java78
4 files changed, 135 insertions, 11 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java
new file mode 100644
index 0000000..e3ccfea
--- /dev/null
+++ b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java
@@ -0,0 +1,11 @@
+package com.orbekk.same;
+
+/**
+ * An interface to get notified of the current caller.
+ *
+ * This interface is needed because jsonrpc4j does not pass the
+ * HttpServletRequest to the service implementation.
+ */
+public interface CallerInfoListener {
+ void setCaller(String callerIp);
+}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
index dccc1e9..d18cb9d 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
@@ -1,6 +1,23 @@
package com.orbekk.same;
+import java.util.List;
+
public interface SameService {
+ /**
+ * A notification that 'networkName' exists.
+ *
+ * This is called by any participant of a network after a broadcast
+ * has been performed.
+ */
void notifyNetwork(String networkName);
- String participateNetwork(String networkName);
+
+ /**
+ * A request from the callee to participate in 'networkName'.
+ */
+ void participateNetwork(String networkName, int remotePort);
+
+ /**
+ * Notification of participation in network.
+ */
+ void notifyParticipation(String networkName, List<String> participants);
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
index 7e1de00..0052de3 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
@@ -6,13 +6,18 @@ import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SameServiceImpl implements SameService {
+public class SameServiceImpl implements SameService, CallerInfoListener {
private Logger logger = LoggerFactory.getLogger(getClass());
- private List<String> participants = new LinkedList<String>();
- private String networkName;
+ private SameState sameState;
+ private String currentCallerIp;
- public SameServiceImpl(String networkName) {
- this.networkName = networkName;
+ public SameServiceImpl(SameState sameState) {
+ this.sameState = sameState;
+ }
+
+ @Override
+ public void setCaller(String callerIp) {
+ currentCallerIp = callerIp;
}
@Override
@@ -21,12 +26,25 @@ public class SameServiceImpl implements SameService {
}
@Override
- public String participateNetwork(String networkName) {
- logger.info("Got participation request.");
- if (!networkName.equals(this.networkName)) {
+ public void participateNetwork(String networkName, int remotePort) {
+ if (!networkName.equals(sameState.getNetworkName())) {
logger.warn("Client tried to join {}, but network name is {}.",
- networkName, this.networkName);
+ networkName, sameState.getNetworkName());
+ }
+ String url = "http://" + currentCallerIp + ":" + remotePort;
+ sameState.addParticipant(url);
+ }
+
+ @Override
+ public void notifyParticipation(String networkName,
+ List<String> participants) {
+ logger.info("Joining network {}.", networkName);
+ int i = 1;
+ for (String participant : participants) {
+ logger.info(" {} participant {}: {}",
+ new Object[]{networkName, i, participant});
+ i++;
}
- return "<Not implemented>";
+ logger.warn("Joining not implemented.");
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
new file mode 100644
index 0000000..b2518c2
--- /dev/null
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
@@ -0,0 +1,78 @@
+package com.orbekk.same;
+
+import java.util.List;
+import java.util.LinkedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The implementation of a 'Same' state.
+ *
+ * This class manages the current state of the Same protocol.
+ */
+public class SameState extends Thread {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private List<String> participants = new LinkedList<String>();
+ private String currentState = "";
+ private String networkName;
+ private boolean stopped = false;
+
+ /**
+ * Queue for pending participants.
+ */
+ private List<String> pendingParticipants = new LinkedList<String>();
+
+ public SameState(String networkName) {
+ this.networkName = networkName;
+ }
+
+ public synchronized List<String> getParticipants() {
+ return participants;
+ }
+
+ public String getNetworkName() {
+ return networkName;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public synchronized void addParticipant(String url) {
+ synchronized(this) {
+ logger.info("Add pending participant: {}", url);
+ pendingParticipants.add(url);
+ notifyAll();
+ }
+ }
+
+ private synchronized void handleNewParticipants() {
+ for (String url : pendingParticipants) {
+ logger.info("New participant: {}", url);
+ participants.add(url);
+ }
+ pendingParticipants.clear();
+ }
+
+ public synchronized void run() {
+ while (!stopped) {
+ handleNewParticipants();
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore interrupt in wait loop.
+ }
+ }
+ }
+
+ public synchronized void stopSame() {
+ try {
+ stopped = true;
+ notifyAll();
+ this.join();
+ } catch (InterruptedException e) {
+ logger.warn("Got InterruptedException while waiting for SameState " +
+ "to finish. Ignoring.");
+ }
+ }
+}