diff options
Diffstat (limited to 'jsonrpc')
7 files changed, 152 insertions, 18 deletions
| diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/App.java b/jsonrpc/src/main/java/com/orbekk/rpc/App.java index ffd316d..2162a59 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/App.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/App.java @@ -1,6 +1,7 @@  package com.orbekk.rpc;  import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.SameState;  import com.orbekk.same.SameService;  import com.orbekk.same.SameServiceImpl;  import org.eclipse.jetty.server.Server; @@ -14,12 +15,13 @@ public class App {          int port = Integer.parseInt(args[0]);          String networkName = args[1]; -        SameService service = new SameServiceImpl(networkName); +        SameState sameState = new SameState(networkName); +        SameServiceImpl service = new SameServiceImpl(sameState);          JsonRpcServer jsonServer = new JsonRpcServer(service,                  SameService.class);             Server server = new Server(port); -        RpcHandler rpcHandler = new RpcHandler(jsonServer); +        RpcHandler rpcHandler = new RpcHandler(jsonServer, service);          server.setHandler(rpcHandler);          try { diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java index ba77df6..99abf93 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/Client.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/Client.java @@ -10,11 +10,12 @@ import com.googlecode.jsonrpc4j.ProxyUtil;  public class Client {      public static void main(String[] args) { -        if (args.length < 1) { -            System.err.println("Arguments: networkAddress"); +        if (args.length < 2) { +            System.err.println("Arguments: networkAddress port");              System.exit(1);          }          String networkAddress = args[0]; +        int port = Integer.parseInt(args[1]);          JsonRpcHttpClient client = null;          try {              client = new JsonRpcHttpClient(new URL(networkAddress)); @@ -26,6 +27,6 @@ public class Client {                  SameService.class,                  client);          service.notifyNetwork("NoNetwork"); -        System.out.println(service.participateNetwork("FirstNetwork")); +        service.participateNetwork("FirstNetwork", port);      }  } diff --git a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java index bc76e4e..39676fe 100644 --- a/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java +++ b/jsonrpc/src/main/java/com/orbekk/rpc/RpcHandler.java @@ -10,18 +10,25 @@ import org.eclipse.jetty.server.Request;  import org.eclipse.jetty.server.handler.AbstractHandler;  import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.orbekk.same.CallerInfoListener;  public class RpcHandler extends AbstractHandler {      private JsonRpcServer rpcServer; +    private CallerInfoListener callerInfoListener; -    public RpcHandler(JsonRpcServer rpcServer) { +    public RpcHandler(JsonRpcServer rpcServer, +            CallerInfoListener callerInfoListener) {          this.rpcServer = rpcServer; +        this.callerInfoListener = callerInfoListener;      }      @Override -    public void handle(String target, Request baseRequest, +    public synchronized void handle(String target, Request baseRequest,              HttpServletRequest request, HttpServletResponse response)              throws IOException, ServletException { +        if (callerInfoListener != null) { +            callerInfoListener.setCaller(request.getRemoteAddr()); +        }          rpcServer.handle(request, response);      }  } diff --git a/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java new file mode 100644 index 0000000..e3ccfea --- /dev/null +++ b/jsonrpc/src/main/java/com/orbekk/same/CallerInfoListener.java @@ -0,0 +1,11 @@ +package com.orbekk.same; + +/** + * An interface to get notified of the current caller. + * + * This interface is needed because jsonrpc4j does not pass the + * HttpServletRequest to the service implementation. + */ +public interface CallerInfoListener { +    void setCaller(String callerIp); +} diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameService.java b/jsonrpc/src/main/java/com/orbekk/same/SameService.java index dccc1e9..d18cb9d 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameService.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameService.java @@ -1,6 +1,23 @@  package com.orbekk.same; +import java.util.List; +  public interface SameService { +    /** +     * A notification that 'networkName' exists. +     * +     * This is called by any participant of a network after a broadcast +     * has been performed. +     */      void notifyNetwork(String networkName); -    String participateNetwork(String networkName); + +    /** +     * A request from the callee to participate in 'networkName'. +     */ +    void participateNetwork(String networkName, int remotePort); + +    /** +     * Notification of participation in network. +     */ +    void notifyParticipation(String networkName, List<String> participants);  } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java index 7e1de00..0052de3 100644 --- a/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java +++ b/jsonrpc/src/main/java/com/orbekk/same/SameServiceImpl.java @@ -6,13 +6,18 @@ import java.util.LinkedList;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; -public class SameServiceImpl implements SameService { +public class SameServiceImpl implements SameService, CallerInfoListener {      private Logger logger = LoggerFactory.getLogger(getClass()); -    private List<String> participants = new LinkedList<String>(); -    private String networkName; +    private SameState sameState; +    private String currentCallerIp; -    public SameServiceImpl(String networkName) { -        this.networkName = networkName; +    public SameServiceImpl(SameState sameState) { +        this.sameState = sameState; +    } + +    @Override +    public void setCaller(String callerIp) { +        currentCallerIp = callerIp;      }      @Override @@ -21,12 +26,25 @@ public class SameServiceImpl implements SameService {      }      @Override -    public String participateNetwork(String networkName) { -        logger.info("Got participation request."); -        if (!networkName.equals(this.networkName)) { +    public void participateNetwork(String networkName, int remotePort) { +        if (!networkName.equals(sameState.getNetworkName())) {              logger.warn("Client tried to join {}, but network name is {}.", -                    networkName, this.networkName); +                    networkName, sameState.getNetworkName()); +        } +        String url = "http://" + currentCallerIp + ":" + remotePort; +        sameState.addParticipant(url); +    } + +    @Override +    public void notifyParticipation(String networkName, +            List<String> participants) { +        logger.info("Joining network {}.", networkName); +        int i = 1; +        for (String participant : participants) { +            logger.info("  {} participant {}: {}", +                    new Object[]{networkName, i, participant}); +            i++;          } -        return "<Not implemented>"; +        logger.warn("Joining not implemented.");      }  } diff --git a/jsonrpc/src/main/java/com/orbekk/same/SameState.java b/jsonrpc/src/main/java/com/orbekk/same/SameState.java new file mode 100644 index 0000000..b2518c2 --- /dev/null +++ b/jsonrpc/src/main/java/com/orbekk/same/SameState.java @@ -0,0 +1,78 @@ +package com.orbekk.same; + +import java.util.List; +import java.util.LinkedList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The implementation of a 'Same' state. + * + * This class manages the current state of the Same protocol. + */ +public class SameState extends Thread { +    private Logger logger = LoggerFactory.getLogger(getClass()); +    private List<String> participants = new LinkedList<String>(); +    private String currentState = ""; +    private String networkName; +    private boolean stopped = false; + +    /** +     * Queue for pending participants. +     */ +    private List<String> pendingParticipants = new LinkedList<String>(); + +    public SameState(String networkName) { +        this.networkName = networkName; +    } + +    public synchronized List<String> getParticipants() { +        return participants; +    } + +    public String getNetworkName() { +        return networkName; +    } + +    public String getCurrentState() { +        return currentState; +    } + +    public synchronized void addParticipant(String url) { +        synchronized(this) { +            logger.info("Add pending participant: {}", url); +            pendingParticipants.add(url); +            notifyAll(); +        } +    } + +    private synchronized void handleNewParticipants() { +        for (String url : pendingParticipants) { +            logger.info("New participant: {}", url); +            participants.add(url); +        } +        pendingParticipants.clear(); +    } + +    public synchronized void run() { +        while (!stopped) { +            handleNewParticipants(); +            try { +                wait(1000); +            } catch (InterruptedException e) { +                // Ignore interrupt in wait loop. +            } +        } +    } + +    public synchronized void stopSame() { +        try { +            stopped = true; +            notifyAll(); +            this.join(); +        } catch (InterruptedException e) { +            logger.warn("Got InterruptedException while waiting for SameState " + +                    "to finish. Ignoring."); +        } +    } +} | 
