summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java21
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java2
2 files changed, 19 insertions, 4 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 69d0e33..4f42c4e 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -18,7 +18,7 @@ import com.orbekk.util.DelayedOperation;
import com.orbekk.util.WorkQueue;
public class Client implements DiscoveryListener {
- public static final long MASTER_TAKEOVER_TIMEOUT = 4000l;
+ public static long MASTER_TAKEOVER_TIMEOUT = 4000l;
private Logger logger = LoggerFactory.getLogger(getClass());
/** TODO: Not really useful yet. Remove? */
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
@@ -29,6 +29,7 @@ public class Client implements DiscoveryListener {
private int masterId = -1;
private MasterController masterController = null;
private Broadcaster broadcaster;
+ private Future<Integer> currentMasterProposal = null;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -124,6 +125,9 @@ public class Client implements DiscoveryListener {
@Override
public void masterTakeover(String masterUrl, String networkName,
int masterId) throws Exception {
+ logger.info("MasterTakeover({}, {}, {})",
+ new Object[]{masterUrl, networkName, masterId});
+ abortMasterElection();
Client.this.masterUrl = masterUrl;
connectionState = ConnectionState.STABLE;
}
@@ -267,19 +271,28 @@ public class Client implements DiscoveryListener {
}
}
};
- Future<Integer> proposal = proposer.startProposalTask(1, sleeperTask);
+ synchronized (this) {
+ currentMasterProposal = proposer.startProposalTask(1, sleeperTask);
+ }
Integer result = null;
try {
- result = proposal.get();
+ result = currentMasterProposal.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (CancellationException e) {
}
- if (!proposal.isCancelled() && result != null) {
+ if (!currentMasterProposal.isCancelled() && result != null) {
masterController.enableMaster(state);
}
}
+ private synchronized void abortMasterElection() {
+ if (currentMasterProposal != null && !currentMasterProposal.isDone()) {
+ boolean status = currentMasterProposal.cancel(true);
+ logger.info("Abort status: {}", status);
+ }
+ }
+
public void startMasterElection() {
List<String> participants = state.getList(".participants");
broadcaster.broadcast(participants, new ServiceOperation() {
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 9380409..fb8bd10 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -120,6 +120,8 @@ public class FunctionalTest {
}
};
client1.setMasterController(controller);
+ client2.setMasterController(controller);
+ client3.setMasterController(controller);
client1.startMasterElection();
newMaster.performWork();
assertThat(client1.masterUrl, is(newMasterUrl));