diff options
Diffstat (limited to 'same/src')
10 files changed, 7 insertions, 164 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index e789f2e..6d3a7d5 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -13,11 +13,10 @@ import org.slf4j.LoggerFactory;  import com.orbekk.paxos.MasterProposer;  import com.orbekk.same.State.Component; -import com.orbekk.same.discovery.DiscoveryListener;  import com.orbekk.util.DelayedOperation;  import com.orbekk.util.WorkQueue; -public class Client implements DiscoveryListener { +public class Client {      public static long MASTER_TAKEOVER_TIMEOUT = 4000l;      private Logger logger = LoggerFactory.getLogger(getClass());      /** TODO: Not really useful yet. Remove? */ @@ -119,11 +118,6 @@ public class Client implements DiscoveryListener {          }          @Override -        public void discoveryRequest(String remoteUrl) { -            discoveryThread.add(remoteUrl); -        } - -        @Override          public synchronized void masterTakeover(String masterUrl, String networkName,                   int masterId) throws Exception {              logger.info("MasterTakeover({}, {}, {})", @@ -153,15 +147,6 @@ public class Client implements DiscoveryListener {          }      }; -    private WorkQueue<String> discoveryThread = new WorkQueue<String>() { -        @Override protected void onChange() { -            List<String> pending = getAndClear(); -            for (String url : pending) { -                discover(url); -            } -        } -    }; -      public Client(State state, ConnectionManager connections,              String myUrl, Broadcaster broadcaster) {          this.state = state; @@ -171,16 +156,13 @@ public class Client implements DiscoveryListener {      }      public void start() { -        discoveryThread.start();      }      public void interrupt() {          connectionState = ConnectionState.DISCONNECTED; -        discoveryThread.interrupt();      }      void performWork() { -        discoveryThread.performWork();      }      public String getUrl() { @@ -228,39 +210,6 @@ public class Client implements DiscoveryListener {          this.networkListener = listener;      } -    public void sendDiscoveryRequest(String url) { -        try { -            connections.getClient(url) -            .discoveryRequest(myUrl); -        } catch (Exception e) { -            logger.warn("Failed to send discovery request: {}", -                    throwableToString(e)); -        } -    } - -    @Override -    public void discover(String url) { -        String networkName = state.getDataOf(".networkName"); -        if (networkName.equals(".InvalidClientNetwork")) { -            logger.warn("Client not joined to a network. Ignoring discovery"); -            return; -        } else if (networkName.equals(".Private")) { -            logger.info("Ignoring broadcast to .Private network."); -            return; -        } - -        if (!url.equals(myUrl)) { -            try { -                connections.getClient(url) -                .notifyNetwork(state.getDataOf(".networkName"), -                        state.getDataOf(".masterUrl")); -            } catch (Exception e) { -                logger.warn("Failed to contact new client {}: {}", url, -                        throwableToString(e)); -            } -        } -    } -      public ClientService getService() {          return serviceImpl;      } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index f1247a5..8d460db 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -5,9 +5,6 @@ public interface ClientService {      void setState(String component, String data, long revision) throws Exception; -    // Manual discovery request by client. -    void discoveryRequest(String remoteUrl) throws Exception; -          /** A new master takes over.       *        * @param masterUrl The new master URL. diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index f3f4edb..ade469e 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -12,7 +12,6 @@ import com.googlecode.jsonrpc4j.ProxyUtil;  import com.orbekk.net.MyJsonRpcHttpClient;  import com.orbekk.paxos.PaxosService;  import com.orbekk.same.discovery.DirectoryService; -import com.orbekk.same.discovery.DiscoveryService;  public class ConnectionManagerImpl implements ConnectionManager {      private int connectionTimeout; diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index e9a7916..c1c7901 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -11,7 +11,6 @@ import com.orbekk.paxos.PaxosService;  import com.orbekk.paxos.PaxosServiceImpl;  import com.orbekk.same.config.Configuration;  import com.orbekk.same.discovery.DirectoryService; -import com.orbekk.same.discovery.DiscoveryService;  import com.orbekk.same.http.ServerContainer;  import com.orbekk.same.http.StateServlet;  import com.orbekk.same.http.JettyServerBuilder; @@ -24,7 +23,6 @@ public class SameController {      private Master master;      private Client client;      private PaxosServiceImpl paxos; -    private DiscoveryService discoveryService;      private BroadcasterFactory broadcasterFactory;      private Configuration configuration;      private ConnectionManager connections; @@ -81,13 +79,6 @@ public class SameController {                  clientUrl, BroadcasterImpl.getDefaultBroadcastRunner());          PaxosServiceImpl paxos = new PaxosServiceImpl(""); -        DiscoveryService discoveryService = null; -        if ("true".equals(configuration.get("enableDiscovery"))) { -            BroadcastListener broadcastListener = new BroadcastListener( -                    configuration.getInt("discoveryPort")); -            discoveryService = new DiscoveryService(client, broadcastListener); -        } -          StateServlet stateServlet = new StateServlet(client.getInterface(),                  new VariableFactory(client.getInterface())); @@ -100,7 +91,7 @@ public class SameController {          SameController controller = new SameController(                  configuration, connections, server, master, client, -                paxos, discoveryService, broadcaster, broadcasterFactory); +                paxos, broadcaster, broadcasterFactory);          return controller;      } @@ -115,7 +106,6 @@ public class SameController {              MasterServiceProxy master,              Client client,              PaxosServiceImpl paxos, -            DiscoveryService discoveryService,              Broadcaster serviceBroadcaster,              BroadcasterFactory broadcasterFactory) {          this.configuration = configuration; @@ -124,7 +114,6 @@ public class SameController {          this.masterService = master;          this.client = client;          this.paxos = paxos; -        this.discoveryService = discoveryService;          this.serviceBroadcaster = serviceBroadcaster;          this.broadcasterFactory = broadcasterFactory;      } @@ -133,9 +122,6 @@ public class SameController {          server.start();          client.setMasterController(masterController);          client.start(); -        if (discoveryService != null) { -            discoveryService.start(); -        }      }      public void stop() { @@ -145,30 +131,16 @@ public class SameController {                  master.interrupt();              }              server.stop(); -            if (discoveryService != null) { -                discoveryService.interrupt(); -            }          } catch (Exception e) {              logger.error("Failed to stop webserver", e);          }      }      public void join() { -        try { -            server.join(); -            client.interrupt(); -            if (master != null) { -                master.interrupt(); -            } -            if (discoveryService != null) { -                discoveryService.join(); -            } -        } catch (InterruptedException e) { -            try { -                server.stop(); -            } catch (Exception e1) { -                logger.error("Failed to stop server", e); -            } +        server.join(); +        client.interrupt(); +        if (master != null) { +            master.interrupt();          }      } diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java deleted file mode 100644 index e006c77..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryListener.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.orbekk.same.discovery; - -public interface DiscoveryListener { -    void discover(String url); -} diff --git a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java b/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java deleted file mode 100644 index a1147cc..0000000 --- a/same/src/main/java/com/orbekk/same/discovery/DiscoveryService.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.orbekk.same.discovery; - -import java.net.DatagramPacket; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.orbekk.net.BroadcastListener; - -public class DiscoveryService extends Thread { -    private Logger logger = LoggerFactory.getLogger(getClass()); -    BroadcastListener broadcastListener; -    DiscoveryListener listener; - -    public DiscoveryService(DiscoveryListener listener, -            BroadcastListener broadcastListener) { -        this.listener = listener; -        this.broadcastListener = broadcastListener; -    } - -    @Override -    public void run() { -        logger.info("DiscoveryService starting."); -        while (!Thread.interrupted()) { -            DatagramPacket packet = broadcastListener.listen(); -            if (packet == null) { -                // An error or interrupt occurred. -                continue; -            } -            String content = new String(packet.getData(), 0, packet.getLength()); -            String[] words = content.split(" "); - -            if (!content.startsWith("Discover") || words.length < 2) { -                logger.warn("Invalid discovery message: {}", content); -                continue; -            } - -            String url = words[1]; -            logger.info("Received discovery from {}", url); -            if (listener != null) { -                listener.discover(url); -            } -        } -        logger.info("DiscoveryService stopped."); -    } - -    @Override public void interrupt() { -        logger.info("Interrupt()"); -        super.interrupt(); -        broadcastListener.interrupt(); -    } -} diff --git a/same/src/main/resources/client.properties.example b/same/src/main/resources/client.properties.example index 5c13a0a..cb5acdf 100644 --- a/same/src/main/resources/client.properties.example +++ b/same/src/main/resources/client.properties.example @@ -2,5 +2,3 @@ port=10011  localIp=10.0.0.6  baseUrl=http://10.0.0.6:10011/  masterUrl=http://10.0.0.6:10010/MasterService.json -enableDiscovery=true -discoveryPort=15066 diff --git a/same/src/main/resources/master.properties.example b/same/src/main/resources/master.properties.example index edcf323..cf197fb 100644 --- a/same/src/main/resources/master.properties.example +++ b/same/src/main/resources/master.properties.example @@ -2,6 +2,5 @@ port=10010  localIp=10.0.0.6  baseUrl=http://10.0.0.6:10010/  masterUrl=http://10.0.0.6:10010/MasterService.json -enableDiscovery=true -discoveryPort=15066  networkName=KlatreplanteNetwork +isMaster=true diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java index a94c039..0e2a278 100644 --- a/same/src/test/java/com/orbekk/same/ClientTest.java +++ b/same/src/test/java/com/orbekk/same/ClientTest.java @@ -57,15 +57,6 @@ public class ClientTest {          verify(listener).notifyNetwork("MyNetwork", "MasterUrl");      } -    @Test public void discover() throws Exception { -        clientS.setState(".masterUrl", "master", 1); -        ClientService mockClient = mock(ClientService.class); -        connections.clientMap.put("mockClient/ClientService.json", -                mockClient); -        client.discover("mockClient/ClientService.json"); -        verify(mockClient).notifyNetwork("ClientNetwork", "master"); -    } -          @Test public void stateListenerReceivesUpdate() throws Exception {          StateChangedListener listener = mock(StateChangedListener.class);          client.getInterface().addStateListener(listener); diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java index e2cbadd..f369d06 100644 --- a/same/src/test/java/com/orbekk/same/MasterTest.java +++ b/same/src/test/java/com/orbekk/same/MasterTest.java @@ -30,11 +30,6 @@ public class MasterTest {          }          @Override -        public void discoveryRequest(String remoteUrl) throws Exception { -            throw new Exception("Unreachable client"); -        } - -        @Override          public void masterTakeover(String masterUrl, String networkName, int masterId)                  throws Exception {              throw new Exception("Unreachable client");  | 
