From 52088a08a9edb6d4b60e1e8923e1b3199db9f391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 11 Jan 2012 10:47:27 +0100 Subject: Continue SameService implementation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement ConnectionManager. - SameService now calls notifyParticipation() when a client tries to join. - Add client ids – this may or may not be a good idea. - Refactor App and Client code. --- jsonrpc/src/main/java/com/orbekk/rpc/App.java | 11 ++-- jsonrpc/src/main/java/com/orbekk/rpc/Client.java | 60 ++++++++++++++++------ .../com/orbekk/same/ConnectionManagerImpl.java | 23 +++++++-- .../src/main/java/com/orbekk/same/SameService.java | 8 ++- .../main/java/com/orbekk/same/SameServiceImpl.java | 9 ++-- .../src/main/java/com/orbekk/same/SameState.java | 24 +++++++-- 6 files changed, 103 insertions(+), 32 deletions(-) (limited to 'jsonrpc') diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java index 2f956fe..d92003e 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java @@ -1,6 +1,7 @@ package com.orbekk.rpc; import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.ConnectionManagerImpl; import com.orbekk.same.SameState; import com.orbekk.same.SameService; import com.orbekk.same.SameServiceImpl; @@ -8,14 +9,18 @@ import org.eclipse.jetty.server.Server; public class App { public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Arguments: port networkName"); + if (args.length < 3) { + System.err.println("Arguments: port networkName clientId"); System.exit(1); } int port = Integer.parseInt(args[0]); String networkName = args[1]; + String clientId = args[2]; - SameState sameState = new SameState(networkName); + ConnectionManagerImpl connections = new ConnectionManagerImpl(); + + SameState sameState = new SameState(networkName, clientId, + connections); sameState.start(); SameServiceImpl service = new SameServiceImpl(sameState); diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java index 99abf93..ea63fd7 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java @@ -1,32 +1,58 @@ package com.orbekk.rpc; +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.ConnectionManagerImpl; +import com.orbekk.same.SameState; +import com.orbekk.same.SameService; +import com.orbekk.same.SameServiceImpl; import java.net.MalformedURLException; import java.net.URL; - -import com.orbekk.same.SameService; -import com.googlecode.jsonrpc4j.JsonRpcHttpClient; -import com.googlecode.jsonrpc4j.ProxyUtil; +import org.eclipse.jetty.server.Server; public class Client { public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Arguments: networkAddress port"); + if (args.length < 4) { + System.err.println("Arguments: port clientId thisNetworkName " + + "remoteNetworkAddr"); System.exit(1); } - String networkAddress = args[0]; - int port = Integer.parseInt(args[1]); - JsonRpcHttpClient client = null; + int port = Integer.parseInt(args[0]); + String clientId = args[1]; + String networkName = args[2]; + String remoteAddr = args[3]; + + ConnectionManagerImpl connections = new ConnectionManagerImpl(); + + SameState sameState = new SameState(networkName, clientId, + connections); + sameState.start(); + + SameServiceImpl service = new SameServiceImpl(sameState); + JsonRpcServer jsonServer = new JsonRpcServer(service, + SameService.class); + + Server server = new Server(port); + RpcHandler rpcHandler = new RpcHandler(jsonServer, service); + server.setHandler(rpcHandler); + try { - client = new JsonRpcHttpClient(new URL(networkAddress)); - } catch (MalformedURLException e) { + server.start(); + } catch (Exception e) { + System.out.println("Could not start jetty server."); e.printStackTrace(); } - SameService service = ProxyUtil.createProxy( - client.getClass().getClassLoader(), - SameService.class, - client); - service.notifyNetwork("NoNetwork"); - service.participateNetwork("FirstNetwork", port); + + SameService remoteService = connections.getConnection(remoteAddr); + remoteService.notifyNetwork("NoNetwork"); + remoteService.participateNetwork("FirstNetwork", + sameState.getClientId(), "", port); + + try { + server.join(); + } catch (InterruptedException e) { + System.out.println("Interrupt"); + } + } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index e3a6928..841d5fa 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -1,16 +1,31 @@ package com.orbekk.same; +import com.googlecode.jsonrpc4j.JsonRpcHttpClient; +import com.googlecode.jsonrpc4j.ProxyUtil; +import java.net.MalformedURLException; +import java.net.URL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConnectionManagerImpl { +public class ConnectionManagerImpl implements ConnectionManager { + private Logger logger = LoggerFactory.getLogger(getClass()); public ConnectionManagerImpl() { } - SameService getConnection(String url) { - // TODO: Implement this class. - return null; + @Override + public SameService getConnection(String url) { + SameService service = null; + try { + JsonRpcHttpClient client = new JsonRpcHttpClient(new URL(url)); + service = ProxyUtil.createProxy( + this.getClass().getClassLoader(), + SameService.class, + client); + } catch (MalformedURLException e) { + logger.warn("Unable to create client for {}, {}", url, e); + } + return service; } } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index d18cb9d..0a261a8 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -13,8 +13,14 @@ public interface SameService { /** * A request from the callee to participate in 'networkName'. + * + * A client may not know its URL. If the url parameter is empty, + * use info from the HttpServletRequest. + * + * TODO: Always pass a valid URL and get rid of the port parameter. */ - void participateNetwork(String networkName, int remotePort); + void participateNetwork(String networkName, String clientId, + String url, int remotePort); /** * Notification of participation in network. diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 0052de3..e7484c1 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -26,13 +26,16 @@ public class SameServiceImpl implements SameService, CallerInfoListener { } @Override - public void participateNetwork(String networkName, int remotePort) { + public void participateNetwork(String networkName, String clientId, + String url, int remotePort) { if (!networkName.equals(sameState.getNetworkName())) { logger.warn("Client tried to join {}, but network name is {}.", networkName, sameState.getNetworkName()); } - String url = "http://" + currentCallerIp + ":" + remotePort; - sameState.addParticipant(url); + if (url.equals("")) { + url = "http://" + currentCallerIp + ":" + remotePort; + } + sameState.addParticipant(clientId, url); } @Override diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java index b2518c2..d7dd3dc 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameState.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -12,9 +12,12 @@ import org.slf4j.LoggerFactory; */ public class SameState extends Thread { private Logger logger = LoggerFactory.getLogger(getClass()); + private ConnectionManager connections; private List participants = new LinkedList(); private String currentState = ""; private String networkName; + // The client id of this participant. + private String clientId; private boolean stopped = false; /** @@ -22,14 +25,21 @@ public class SameState extends Thread { */ private List pendingParticipants = new LinkedList(); - public SameState(String networkName) { + public SameState(String networkName, String clientId, + ConnectionManager connections) { this.networkName = networkName; + this.clientId = clientId; + this.connections = connections; } public synchronized List getParticipants() { return participants; } + public String getClientId() { + return clientId; + } + public String getNetworkName() { return networkName; } @@ -38,18 +48,24 @@ public class SameState extends Thread { return currentState; } - public synchronized void addParticipant(String url) { + public synchronized void addParticipant(String clientId, String url) { synchronized(this) { - logger.info("Add pending participant: {}", url); + logger.info("Add pending participant: {} ({})", clientId, url); pendingParticipants.add(url); notifyAll(); } } private synchronized void handleNewParticipants() { + // Adding all pending participants ensures that each of the new + // participants is informed of all participants. + // + // TODO: Does not inform old participants. + participants.addAll(pendingParticipants); for (String url : pendingParticipants) { logger.info("New participant: {}", url); - participants.add(url); + SameService remoteService = connections.getConnection(url); + remoteService.notifyParticipation(networkName, participants); } pendingParticipants.clear(); } -- cgit v1.2.3