summaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/main/java/com/orbekk/rpc/App.java11
-rw-r--r--jsonrpc/src/main/java/com/orbekk/rpc/Client.java60
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java23
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameService.java8
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java9
-rw-r--r--jsonrpc/src/main/java/com/orbekk/same/SameState.java24
6 files changed, 103 insertions, 32 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java
index 2f956fe..d92003e 100644
--- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java
+++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java
@@ -1,6 +1,7 @@
package com.orbekk.rpc;
import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.ConnectionManagerImpl;
import com.orbekk.same.SameState;
import com.orbekk.same.SameService;
import com.orbekk.same.SameServiceImpl;
@@ -8,14 +9,18 @@ import org.eclipse.jetty.server.Server;
public class App {
public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Arguments: port networkName");
+ if (args.length < 3) {
+ System.err.println("Arguments: port networkName clientId");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
String networkName = args[1];
+ String clientId = args[2];
- SameState sameState = new SameState(networkName);
+ ConnectionManagerImpl connections = new ConnectionManagerImpl();
+
+ SameState sameState = new SameState(networkName, clientId,
+ connections);
sameState.start();
SameServiceImpl service = new SameServiceImpl(sameState);
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
index 99abf93..ea63fd7 100644
--- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
+++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java
@@ -1,32 +1,58 @@
package com.orbekk.rpc;
+import com.googlecode.jsonrpc4j.JsonRpcServer;
+import com.orbekk.same.ConnectionManagerImpl;
+import com.orbekk.same.SameState;
+import com.orbekk.same.SameService;
+import com.orbekk.same.SameServiceImpl;
import java.net.MalformedURLException;
import java.net.URL;
-
-import com.orbekk.same.SameService;
-import com.googlecode.jsonrpc4j.JsonRpcHttpClient;
-import com.googlecode.jsonrpc4j.ProxyUtil;
+import org.eclipse.jetty.server.Server;
public class Client {
public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Arguments: networkAddress port");
+ if (args.length < 4) {
+ System.err.println("Arguments: port clientId thisNetworkName " +
+ "remoteNetworkAddr");
System.exit(1);
}
- String networkAddress = args[0];
- int port = Integer.parseInt(args[1]);
- JsonRpcHttpClient client = null;
+ int port = Integer.parseInt(args[0]);
+ String clientId = args[1];
+ String networkName = args[2];
+ String remoteAddr = args[3];
+
+ ConnectionManagerImpl connections = new ConnectionManagerImpl();
+
+ SameState sameState = new SameState(networkName, clientId,
+ connections);
+ sameState.start();
+
+ SameServiceImpl service = new SameServiceImpl(sameState);
+ JsonRpcServer jsonServer = new JsonRpcServer(service,
+ SameService.class);
+
+ Server server = new Server(port);
+ RpcHandler rpcHandler = new RpcHandler(jsonServer, service);
+ server.setHandler(rpcHandler);
+
try {
- client = new JsonRpcHttpClient(new URL(networkAddress));
- } catch (MalformedURLException e) {
+ server.start();
+ } catch (Exception e) {
+ System.out.println("Could not start jetty server.");
e.printStackTrace();
}
- SameService service = ProxyUtil.createProxy(
- client.getClass().getClassLoader(),
- SameService.class,
- client);
- service.notifyNetwork("NoNetwork");
- service.participateNetwork("FirstNetwork", port);
+
+ SameService remoteService = connections.getConnection(remoteAddr);
+ remoteService.notifyNetwork("NoNetwork");
+ remoteService.participateNetwork("FirstNetwork",
+ sameState.getClientId(), "", port);
+
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ System.out.println("Interrupt");
+ }
+
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index e3a6928..841d5fa 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -1,16 +1,31 @@
package com.orbekk.same;
+import com.googlecode.jsonrpc4j.JsonRpcHttpClient;
+import com.googlecode.jsonrpc4j.ProxyUtil;
+import java.net.MalformedURLException;
+import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionManagerImpl {
+public class ConnectionManagerImpl implements ConnectionManager {
+
private Logger logger = LoggerFactory.getLogger(getClass());
public ConnectionManagerImpl() {
}
- SameService getConnection(String url) {
- // TODO: Implement this class.
- return null;
+ @Override
+ public SameService getConnection(String url) {
+ SameService service = null;
+ try {
+ JsonRpcHttpClient client = new JsonRpcHttpClient(new URL(url));
+ service = ProxyUtil.createProxy(
+ this.getClass().getClassLoader(),
+ SameService.class,
+ client);
+ } catch (MalformedURLException e) {
+ logger.warn("Unable to create client for {}, {}", url, e);
+ }
+ return service;
}
}
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
index d18cb9d..0a261a8 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java
@@ -13,8 +13,14 @@ public interface SameService {
/**
* A request from the callee to participate in 'networkName'.
+ *
+ * A client may not know its URL. If the url parameter is empty,
+ * use info from the HttpServletRequest.
+ *
+ * TODO: Always pass a valid URL and get rid of the port parameter.
*/
- void participateNetwork(String networkName, int remotePort);
+ void participateNetwork(String networkName, String clientId,
+ String url, int remotePort);
/**
* Notification of participation in network.
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
index 0052de3..e7484c1 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java
@@ -26,13 +26,16 @@ public class SameServiceImpl implements SameService, CallerInfoListener {
}
@Override
- public void participateNetwork(String networkName, int remotePort) {
+ public void participateNetwork(String networkName, String clientId,
+ String url, int remotePort) {
if (!networkName.equals(sameState.getNetworkName())) {
logger.warn("Client tried to join {}, but network name is {}.",
networkName, sameState.getNetworkName());
}
- String url = "http://" + currentCallerIp + ":" + remotePort;
- sameState.addParticipant(url);
+ if (url.equals("")) {
+ url = "http://" + currentCallerIp + ":" + remotePort;
+ }
+ sameState.addParticipant(clientId, url);
}
@Override
diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
index b2518c2..d7dd3dc 100644
--- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java
+++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java
@@ -12,9 +12,12 @@ import org.slf4j.LoggerFactory;
*/
public class SameState extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
+ private ConnectionManager connections;
private List<String> participants = new LinkedList<String>();
private String currentState = "";
private String networkName;
+ // The client id of this participant.
+ private String clientId;
private boolean stopped = false;
/**
@@ -22,14 +25,21 @@ public class SameState extends Thread {
*/
private List<String> pendingParticipants = new LinkedList<String>();
- public SameState(String networkName) {
+ public SameState(String networkName, String clientId,
+ ConnectionManager connections) {
this.networkName = networkName;
+ this.clientId = clientId;
+ this.connections = connections;
}
public synchronized List<String> getParticipants() {
return participants;
}
+ public String getClientId() {
+ return clientId;
+ }
+
public String getNetworkName() {
return networkName;
}
@@ -38,18 +48,24 @@ public class SameState extends Thread {
return currentState;
}
- public synchronized void addParticipant(String url) {
+ public synchronized void addParticipant(String clientId, String url) {
synchronized(this) {
- logger.info("Add pending participant: {}", url);
+ logger.info("Add pending participant: {} ({})", clientId, url);
pendingParticipants.add(url);
notifyAll();
}
}
private synchronized void handleNewParticipants() {
+ // Adding all pending participants ensures that each of the new
+ // participants is informed of all participants.
+ //
+ // TODO: Does not inform old participants.
+ participants.addAll(pendingParticipants);
for (String url : pendingParticipants) {
logger.info("New participant: {}", url);
- participants.add(url);
+ SameService remoteService = connections.getConnection(url);
+ remoteService.notifyParticipation(networkName, participants);
}
pendingParticipants.clear();
}