diff options
Diffstat (limited to 'jsonrpc/src/main')
8 files changed, 91 insertions, 48 deletions
diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java index d92003e..0e9dce3 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java @@ -28,7 +28,7 @@ public class App { SameService.class); Server server = new Server(port); - RpcHandler rpcHandler = new RpcHandler(jsonServer, service); + RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState); server.setHandler(rpcHandler); try { diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java index ea63fd7..2ec6c89 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java @@ -5,6 +5,7 @@ import com.orbekk.same.ConnectionManagerImpl; import com.orbekk.same.SameState; import com.orbekk.same.SameService; import com.orbekk.same.SameServiceImpl; +import com.orbekk.net.HttpUtil; import java.net.MalformedURLException; import java.net.URL; import org.eclipse.jetty.server.Server; @@ -33,7 +34,7 @@ public class Client { SameService.class); Server server = new Server(port); - RpcHandler rpcHandler = new RpcHandler(jsonServer, service); + RpcHandler rpcHandler = new RpcHandler(jsonServer, sameState); server.setHandler(rpcHandler); try { @@ -42,11 +43,20 @@ public class Client { System.out.println("Could not start jetty server."); e.printStackTrace(); } + + while (sameState.getUrl() == null) { + HttpUtil.sendHttpRequest(remoteAddr + "ping?port=" + port); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // Ignore interrupt in wait loop. + } + } SameService remoteService = connections.getConnection(remoteAddr); remoteService.notifyNetwork("NoNetwork"); remoteService.participateNetwork("FirstNetwork", - sameState.getClientId(), "", port); + sameState.getClientId(), sameState.getUrl()); try { server.join(); diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java index 39676fe..0bafad4 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java @@ -1,34 +1,52 @@ package com.orbekk.rpc; +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.net.HttpUtil; +import com.orbekk.same.UrlReceiver; import java.io.IOException; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; - -import com.googlecode.jsonrpc4j.JsonRpcServer; -import com.orbekk.same.CallerInfoListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RpcHandler extends AbstractHandler { + private Logger logger = LoggerFactory.getLogger(getClass()); private JsonRpcServer rpcServer; - private CallerInfoListener callerInfoListener; + private UrlReceiver urlReceiver; public RpcHandler(JsonRpcServer rpcServer, - CallerInfoListener callerInfoListener) { + UrlReceiver urlReceiver) { this.rpcServer = rpcServer; - this.callerInfoListener = callerInfoListener; + this.urlReceiver = urlReceiver; } @Override public synchronized void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - if (callerInfoListener != null) { - callerInfoListener.setCaller(request.getRemoteAddr()); + logger.info("Handling request to target: " + target); + + if (urlReceiver != null) { + String sameServiceUrl = "http://" + request.getLocalAddr() + + ":" + request.getLocalPort() + "/SameService.json"; + urlReceiver.setUrl(sameServiceUrl); + urlReceiver = null; + } + + if (target.equals("/ping")) { + int remotePort = Integer.parseInt(request.getParameter("port")); + String pongUrl = "http://" + request.getRemoteAddr() + ":" + + remotePort + "/pong"; + logger.info("Got ping. Sending pong to {}", pongUrl); + HttpUtil.sendHttpRequest(pongUrl); + } else if (target.equals("/pong")) { + logger.info("Received pong from {}", request.getRemoteAddr()); + } else { + rpcServer.handle(request, response); } - rpcServer.handle(request, response); + baseRequest.setHandled(true); } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java deleted file mode 100644 index e3ccfea..0000000 --- a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.orbekk.same; - -/** - * An interface to get notified of the current caller. - * - * This interface is needed because jsonrpc4j does not pass the - * HttpServletRequest to the service implementation. - */ -public interface CallerInfoListener { - void setCaller(String callerIp); -} diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index 0a261a8..76e9035 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -13,14 +13,9 @@ public interface SameService { /** * A request from the callee to participate in 'networkName'. - * - * A client may not know its URL. If the url parameter is empty, - * use info from the HttpServletRequest. - * - * TODO: Always pass a valid URL and get rid of the port parameter. */ void participateNetwork(String networkName, String clientId, - String url, int remotePort); + String url); /** * Notification of participation in network. diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index e7484c1..533a055 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -6,34 +6,31 @@ import java.util.LinkedList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SameServiceImpl implements SameService, CallerInfoListener { +public class SameServiceImpl implements SameService { private Logger logger = LoggerFactory.getLogger(getClass()); private SameState sameState; - private String currentCallerIp; public SameServiceImpl(SameState sameState) { this.sameState = sameState; } @Override - public void setCaller(String callerIp) { - currentCallerIp = callerIp; - } - - @Override public void notifyNetwork(String networkName) { logger.info("Notification from network " + networkName); } @Override public void participateNetwork(String networkName, String clientId, - String url, int remotePort) { + String url) { if (!networkName.equals(sameState.getNetworkName())) { logger.warn("Client tried to join {}, but network name is {}.", networkName, sameState.getNetworkName()); + return; } - if (url.equals("")) { - url = "http://" + currentCallerIp + ":" + remotePort; + if (clientId.equals("") || url.equals("")) { + logger.warn("Missing client info: ClientId({}), URL({}).", + clientId, url); + return; } sameState.addParticipant(clientId, url); } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java index d7dd3dc..c097a0d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -10,14 +10,30 @@ import org.slf4j.LoggerFactory; * * This class manages the current state of the Same protocol. */ -public class SameState extends Thread { +public class SameState extends Thread implements UrlReceiver { private Logger logger = LoggerFactory.getLogger(getClass()); private ConnectionManager connections; private List<String> participants = new LinkedList<String>(); private String currentState = ""; private String networkName; - // The client id of this participant. + + /** + * The client id of this participant. + */ private String clientId; + + /** + * The URL of this participant. + * + * Important note: Our URL is unknown initially. Url is null until we + * receive our first request. The URL is then taken to be the target URL + * from the request. + */ + private String url = null; + + /** + * Stopping condition for this thread. + */ private boolean stopped = false; /** @@ -48,12 +64,20 @@ public class SameState extends Thread { return currentState; } + public String getUrl() { + return url; + } + + @Override + public void setUrl(String url) { + logger.info("My URL is {}", url); + this.url = url; + } + public synchronized void addParticipant(String clientId, String url) { - synchronized(this) { - logger.info("Add pending participant: {} ({})", clientId, url); - pendingParticipants.add(url); - notifyAll(); - } + logger.info("PendingParticipant.add: {} ({})", clientId, url); + pendingParticipants.add(url); + notifyAll(); } private synchronized void handleNewParticipants() { diff --git a/jsonrpc/src/main/java/com/orbekk/same/UrlReceiver.java b/jsonrpc/src/main/java/com/orbekk/same/UrlReceiver.java new file mode 100644 index 0000000..31a0276 --- /dev/null +++ b/jsonrpc/src/main/java/com/orbekk/same/UrlReceiver.java @@ -0,0 +1,10 @@ +package com.orbekk.same; + +/** + * An interface to get notified of the URL to this computer. + * + * This interface is used to reliably obtain the URL of this host. + */ +public interface UrlReceiver { + void setUrl(String url); +} |