diff options
| -rw-r--r-- | same/src/main/java/com/orbekk/same/App.java | 7 | ||||
| -rw-r--r-- | same/src/main/java/com/orbekk/same/Client.java | 18 | ||||
| -rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 18 | 
3 files changed, 31 insertions, 12 deletions
| diff --git a/same/src/main/java/com/orbekk/same/App.java b/same/src/main/java/com/orbekk/same/App.java index b084db2..464668d 100644 --- a/same/src/main/java/com/orbekk/same/App.java +++ b/same/src/main/java/com/orbekk/same/App.java @@ -14,8 +14,11 @@ public class App {          try {              controller.start();              controller.searchNetworks(); -            controller.createNetwork(configuration.get("networkName")); -            controller.joinNetwork(configuration.get("masterUrl")); +            if ("true".equals(configuration.get("isMaster"))) { +                controller.createNetwork(configuration.get("networkName")); +            } else { +                controller.joinNetwork(configuration.get("masterUrl")); +            }              controller.join();          } catch (Exception e) {              logger.error("Error in App.", e); diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 1927731..e789f2e 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -126,14 +126,14 @@ public class Client implements DiscoveryListener {          @Override          public synchronized void masterTakeover(String masterUrl, String networkName,                   int masterId) throws Exception { +            logger.info("MasterTakeover({}, {}, {})", +                    new Object[]{masterUrl, networkName, masterId});              if (masterId <= Client.this.masterId) {                  logger.warn("{}:{} tried to take over, but current master is " +                  		"{}:{}. Ignoring", new Object[]{masterUrl, masterId,                                  state.getDataOf(".masterUrl"),                                  Client.this.masterId});               } -            logger.info("MasterTakeover({}, {}, {})", -                    new Object[]{masterUrl, networkName, masterId});              abortMasterElection();              Client.this.masterUrl = masterUrl;              Client.this.masterId = masterId; @@ -195,7 +195,7 @@ public class Client implements DiscoveryListener {          this.masterController = masterController;      } -    private void reset() { +    private synchronized void reset() {          state.clear();          masterId = 0;      } @@ -265,16 +265,22 @@ public class Client implements DiscoveryListener {          return serviceImpl;      } -    private List<String> getPaxosUrls() { +    private List<String> getPaxosUrlsNoMaster() {          List<String> paxosUrls = new ArrayList<String>();          for (String participant : state.getList(".participants")) { -            paxosUrls.add(participant.replace("ClientService", "PaxosService")); +            String masterPaxos = state.getDataOf(".masterUrl") +                    .replace("MasterService", "PaxosService"); +            String paxos = participant.replace("ClientService", "PaxosService"); +            if (!paxos.equals(masterPaxos)) { +                paxosUrls.add(participant.replace("ClientService", "PaxosService")); +            }          } +        logger.info("Paxos urls: {}", paxosUrls);          return paxosUrls;      }      private void tryBecomeMaster(int failedMasterId) { -        List<String> paxosUrls = getPaxosUrls(); +        List<String> paxosUrls = getPaxosUrlsNoMaster();          MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls,                  connections);          if (masterController == null) { diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 3bb6260..ade0b1e 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -80,6 +80,20 @@ public class Master {              List<String> pending = getAndClear();              logger.info("Sending full state to {}", pending);              final List<Component> components = state.getComponents(); +            broadcaster.broadcast(pending, new ServiceOperation() { +                @Override public void run(String url) { +                    ClientService client = connections.getClient(url); +                    try { +                        client.masterTakeover( +                                state.getDataOf(".masterUrl"), +                                state.getDataOf(".networkName"), +                                masterId); +                    } catch (Exception e) { +                        logger.info("Client {} failed to acknowledge master. Remove."); +                        removeParticipant(url); +                    } +                } +            });              broadcastNewComponents(pending, components);          }      }; @@ -120,10 +134,6 @@ public class Master {              @Override public void run(String url) {                  ClientService client = connections.getClient(url);                  try { -                    client.masterTakeover( -                            state.getDataOf(".masterUrl"), -                            state.getDataOf(".networkName"), -                            masterId);                      for (Component c : components) {                          client.setState(c.getName(), c.getData(),                                  c.getRevision()); | 
