summaryrefslogtreecommitdiff
path: root/same
diff options
context:
space:
mode:
Diffstat (limited to 'same')
-rw-r--r--same/src/main/java/com/orbekk/same/App.java7
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java18
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java18
3 files changed, 31 insertions, 12 deletions
diff --git a/same/src/main/java/com/orbekk/same/App.java b/same/src/main/java/com/orbekk/same/App.java
index b084db2..464668d 100644
--- a/same/src/main/java/com/orbekk/same/App.java
+++ b/same/src/main/java/com/orbekk/same/App.java
@@ -14,8 +14,11 @@ public class App {
try {
controller.start();
controller.searchNetworks();
- controller.createNetwork(configuration.get("networkName"));
- controller.joinNetwork(configuration.get("masterUrl"));
+ if ("true".equals(configuration.get("isMaster"))) {
+ controller.createNetwork(configuration.get("networkName"));
+ } else {
+ controller.joinNetwork(configuration.get("masterUrl"));
+ }
controller.join();
} catch (Exception e) {
logger.error("Error in App.", e);
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 1927731..e789f2e 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -126,14 +126,14 @@ public class Client implements DiscoveryListener {
@Override
public synchronized void masterTakeover(String masterUrl, String networkName,
int masterId) throws Exception {
+ logger.info("MasterTakeover({}, {}, {})",
+ new Object[]{masterUrl, networkName, masterId});
if (masterId <= Client.this.masterId) {
logger.warn("{}:{} tried to take over, but current master is " +
"{}:{}. Ignoring", new Object[]{masterUrl, masterId,
state.getDataOf(".masterUrl"),
Client.this.masterId});
}
- logger.info("MasterTakeover({}, {}, {})",
- new Object[]{masterUrl, networkName, masterId});
abortMasterElection();
Client.this.masterUrl = masterUrl;
Client.this.masterId = masterId;
@@ -195,7 +195,7 @@ public class Client implements DiscoveryListener {
this.masterController = masterController;
}
- private void reset() {
+ private synchronized void reset() {
state.clear();
masterId = 0;
}
@@ -265,16 +265,22 @@ public class Client implements DiscoveryListener {
return serviceImpl;
}
- private List<String> getPaxosUrls() {
+ private List<String> getPaxosUrlsNoMaster() {
List<String> paxosUrls = new ArrayList<String>();
for (String participant : state.getList(".participants")) {
- paxosUrls.add(participant.replace("ClientService", "PaxosService"));
+ String masterPaxos = state.getDataOf(".masterUrl")
+ .replace("MasterService", "PaxosService");
+ String paxos = participant.replace("ClientService", "PaxosService");
+ if (!paxos.equals(masterPaxos)) {
+ paxosUrls.add(participant.replace("ClientService", "PaxosService"));
+ }
}
+ logger.info("Paxos urls: {}", paxosUrls);
return paxosUrls;
}
private void tryBecomeMaster(int failedMasterId) {
- List<String> paxosUrls = getPaxosUrls();
+ List<String> paxosUrls = getPaxosUrlsNoMaster();
MasterProposer proposer = new MasterProposer(getUrl(), paxosUrls,
connections);
if (masterController == null) {
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 3bb6260..ade0b1e 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -80,6 +80,20 @@ public class Master {
List<String> pending = getAndClear();
logger.info("Sending full state to {}", pending);
final List<Component> components = state.getComponents();
+ broadcaster.broadcast(pending, new ServiceOperation() {
+ @Override public void run(String url) {
+ ClientService client = connections.getClient(url);
+ try {
+ client.masterTakeover(
+ state.getDataOf(".masterUrl"),
+ state.getDataOf(".networkName"),
+ masterId);
+ } catch (Exception e) {
+ logger.info("Client {} failed to acknowledge master. Remove.");
+ removeParticipant(url);
+ }
+ }
+ });
broadcastNewComponents(pending, components);
}
};
@@ -120,10 +134,6 @@ public class Master {
@Override public void run(String url) {
ClientService client = connections.getClient(url);
try {
- client.masterTakeover(
- state.getDataOf(".masterUrl"),
- state.getDataOf(".networkName"),
- masterId);
for (Component c : components) {
client.setState(c.getName(), c.getData(),
c.getRevision());