From fc7ef2ba156db641e1d193cc2e3f1352eaa74bd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 10 Jan 2012 22:24:01 +0100 Subject: Add SameState class to manage the state of a client. The SameState class manages a view of the 'Same' network. Some functionality was moved from SameServiceImpl to SameState. --- jsonrpc/src/main/java/com/orbekk/rpc/App.java | 6 +- jsonrpc/src/main/java/com/orbekk/rpc/Client.java | 7 +- .../src/main/java/com/orbekk/rpc/RpcHandler.java | 11 ++- .../java/com/orbekk/same/CallerInfoListener.java | 11 +++ .../src/main/java/com/orbekk/same/SameService.java | 19 +++++- .../main/java/com/orbekk/same/SameServiceImpl.java | 38 ++++++++--- .../src/main/java/com/orbekk/same/SameState.java | 78 ++++++++++++++++++++++ 7 files changed, 152 insertions(+), 18 deletions(-) create mode 100644 jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java create mode 100644 jsonrpc/src/main/java/com/orbekk/same/SameState.java diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java index ffd316d..2162a59 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java @@ -1,6 +1,7 @@ package com.orbekk.rpc; import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.SameState; import com.orbekk.same.SameService; import com.orbekk.same.SameServiceImpl; import org.eclipse.jetty.server.Server; @@ -14,12 +15,13 @@ public class App { int port = Integer.parseInt(args[0]); String networkName = args[1]; - SameService service = new SameServiceImpl(networkName); + SameState sameState = new SameState(networkName); + SameServiceImpl service = new SameServiceImpl(sameState); JsonRpcServer jsonServer = new JsonRpcServer(service, SameService.class); Server server = new Server(port); - RpcHandler rpcHandler = new RpcHandler(jsonServer); + RpcHandler rpcHandler = new RpcHandler(jsonServer, service); server.setHandler(rpcHandler); try { diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java index ba77df6..99abf93 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java @@ -10,11 +10,12 @@ import com.googlecode.jsonrpc4j.ProxyUtil; public class Client { public static void main(String[] args) { - if (args.length < 1) { - System.err.println("Arguments: networkAddress"); + if (args.length < 2) { + System.err.println("Arguments: networkAddress port"); System.exit(1); } String networkAddress = args[0]; + int port = Integer.parseInt(args[1]); JsonRpcHttpClient client = null; try { client = new JsonRpcHttpClient(new URL(networkAddress)); @@ -26,6 +27,6 @@ public class Client { SameService.class, client); service.notifyNetwork("NoNetwork"); - System.out.println(service.participateNetwork("FirstNetwork")); + service.participateNetwork("FirstNetwork", port); } } diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java index bc76e4e..39676fe 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java @@ -10,18 +10,25 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.CallerInfoListener; public class RpcHandler extends AbstractHandler { private JsonRpcServer rpcServer; + private CallerInfoListener callerInfoListener; - public RpcHandler(JsonRpcServer rpcServer) { + public RpcHandler(JsonRpcServer rpcServer, + CallerInfoListener callerInfoListener) { this.rpcServer = rpcServer; + this.callerInfoListener = callerInfoListener; } @Override - public void handle(String target, Request baseRequest, + public synchronized void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + if (callerInfoListener != null) { + callerInfoListener.setCaller(request.getRemoteAddr()); + } rpcServer.handle(request, response); } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java new file mode 100644 index 0000000..e3ccfea --- /dev/null +++ b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java @@ -0,0 +1,11 @@ +package com.orbekk.same; + +/** + * An interface to get notified of the current caller. + * + * This interface is needed because jsonrpc4j does not pass the + * HttpServletRequest to the service implementation. + */ +public interface CallerInfoListener { + void setCaller(String callerIp); +} diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index dccc1e9..d18cb9d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -1,6 +1,23 @@ package com.orbekk.same; +import java.util.List; + public interface SameService { + /** + * A notification that 'networkName' exists. + * + * This is called by any participant of a network after a broadcast + * has been performed. + */ void notifyNetwork(String networkName); - String participateNetwork(String networkName); + + /** + * A request from the callee to participate in 'networkName'. + */ + void participateNetwork(String networkName, int remotePort); + + /** + * Notification of participation in network. + */ + void notifyParticipation(String networkName, List participants); } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 7e1de00..0052de3 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -6,13 +6,18 @@ import java.util.LinkedList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SameServiceImpl implements SameService { +public class SameServiceImpl implements SameService, CallerInfoListener { private Logger logger = LoggerFactory.getLogger(getClass()); - private List participants = new LinkedList(); - private String networkName; + private SameState sameState; + private String currentCallerIp; - public SameServiceImpl(String networkName) { - this.networkName = networkName; + public SameServiceImpl(SameState sameState) { + this.sameState = sameState; + } + + @Override + public void setCaller(String callerIp) { + currentCallerIp = callerIp; } @Override @@ -21,12 +26,25 @@ public class SameServiceImpl implements SameService { } @Override - public String participateNetwork(String networkName) { - logger.info("Got participation request."); - if (!networkName.equals(this.networkName)) { + public void participateNetwork(String networkName, int remotePort) { + if (!networkName.equals(sameState.getNetworkName())) { logger.warn("Client tried to join {}, but network name is {}.", - networkName, this.networkName); + networkName, sameState.getNetworkName()); + } + String url = "http://" + currentCallerIp + ":" + remotePort; + sameState.addParticipant(url); + } + + @Override + public void notifyParticipation(String networkName, + List participants) { + logger.info("Joining network {}.", networkName); + int i = 1; + for (String participant : participants) { + logger.info(" {} participant {}: {}", + new Object[]{networkName, i, participant}); + i++; } - return ""; + logger.warn("Joining not implemented."); } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java new file mode 100644 index 0000000..b2518c2 --- /dev/null +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -0,0 +1,78 @@ +package com.orbekk.same; + +import java.util.List; +import java.util.LinkedList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The implementation of a 'Same' state. + * + * This class manages the current state of the Same protocol. + */ +public class SameState extends Thread { + private Logger logger = LoggerFactory.getLogger(getClass()); + private List participants = new LinkedList(); + private String currentState = ""; + private String networkName; + private boolean stopped = false; + + /** + * Queue for pending participants. + */ + private List pendingParticipants = new LinkedList(); + + public SameState(String networkName) { + this.networkName = networkName; + } + + public synchronized List getParticipants() { + return participants; + } + + public String getNetworkName() { + return networkName; + } + + public String getCurrentState() { + return currentState; + } + + public synchronized void addParticipant(String url) { + synchronized(this) { + logger.info("Add pending participant: {}", url); + pendingParticipants.add(url); + notifyAll(); + } + } + + private synchronized void handleNewParticipants() { + for (String url : pendingParticipants) { + logger.info("New participant: {}", url); + participants.add(url); + } + pendingParticipants.clear(); + } + + public synchronized void run() { + while (!stopped) { + handleNewParticipants(); + try { + wait(1000); + } catch (InterruptedException e) { + // Ignore interrupt in wait loop. + } + } + } + + public synchronized void stopSame() { + try { + stopped = true; + notifyAll(); + this.join(); + } catch (InterruptedException e) { + logger.warn("Got InterruptedException while waiting for SameState " + + "to finish. Ignoring."); + } + } +} -- cgit v1.2.3