summaryrefslogtreecommitdiff
path: root/same/src/main/java/com
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 14:27:33 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 14:27:33 +0200
commit1cde431ec7e0f5f29ec329c1949c7f5c76366ce5 (patch)
treec1ae7f8804b396c502213cc8d2bb6a19fef8e3a9 /same/src/main/java/com
parentdc9e014d453e469442cb3555f6c62d496e04dee7 (diff)
Fix thread starvation bug in Client.
Diffstat (limited to 'same/src/main/java/com')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java98
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java6
2 files changed, 63 insertions, 41 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 5a7918c..bef096b 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
@@ -31,6 +32,7 @@ public class Client {
private volatile Future<Integer> currentMasterProposal = null;
private volatile MasterState masterInfo;
private final RpcFactory rpcf;
+ private final ExecutorService executor;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -155,18 +157,71 @@ public class Client {
return;
}
connectionState = ConnectionState.UNSTABLE;
+ executor.execute(new MasterStarter(request));
done.run(Empty.getDefaultInstance());
- tryBecomeMaster(request);
}
};
+ private class MasterStarter implements Runnable {
+ private final MasterState failedMaster;
+
+ public MasterStarter(MasterState failedMaster) {
+ this.failedMaster = failedMaster;
+ }
+
+ @Override public void run() {
+ logger.info("Trying to become master. Failed master: {}.",
+ failedMaster);
+ List<String> paxosUrls = state.getList(State.PARTICIPANTS);
+ paxosUrls.remove(failedMaster.getMasterLocation());
+ MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
+ connections, rpcf);
+ if (masterController == null) {
+ logger.warn("Could not become master: No master controller.");
+ return;
+ }
+ Runnable sleeperTask = new Runnable() {
+ @Override public synchronized void run() {
+ try {
+ wait(MASTER_TAKEOVER_TIMEOUT);
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+ synchronized (this) {
+ if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
+ logger.info("Master election aborted. Master already chosen.");
+ return;
+ }
+ currentMasterProposal = proposer.startProposalTask(
+ masterInfo.getMasterId() + 1, sleeperTask);
+ }
+ Integer result = null;
+ try {
+ result = currentMasterProposal.get();
+ } catch (InterruptedException e) {
+ } catch (ExecutionException e) {
+ logger.error("Error electing master: ", e);
+ } catch (CancellationException e) {
+ }
+ if (!currentMasterProposal.isCancelled() && result != null &&
+ masterInfo.getMasterId() <= failedMaster.getMasterId()) {
+ masterController.enableMaster(new State(state), result);
+ } else {
+ logger.info("Master election aborted. Master already chosen.");
+ }
+ }
+ }
+
public Client(State state, ConnectionManager connections,
- String myUrl, String myLocation, RpcFactory rpcf) {
+ String myUrl, String myLocation, RpcFactory rpcf,
+ ExecutorService executor) {
this.state = state;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
this.rpcf = rpcf;
+ this.executor = executor;
}
public void start() {
@@ -174,6 +229,7 @@ public class Client {
public void interrupt() {
connectionState = ConnectionState.DISCONNECTED;
+ executor.shutdown();
}
void performWork() {
@@ -242,44 +298,6 @@ public class Client {
return newServiceImpl;
}
- private void tryBecomeMaster(MasterState failedMaster) {
- List<String> paxosUrls = state.getList(State.PARTICIPANTS);
- paxosUrls.remove(failedMaster.getMasterLocation());
- MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
- connections, rpcf);
- if (masterController == null) {
- logger.warn("Could not become master: No master controller.");
- return;
- }
- Runnable sleeperTask = new Runnable() {
- @Override public synchronized void run() {
- try {
- wait(MASTER_TAKEOVER_TIMEOUT);
- } catch (InterruptedException e) {
- }
- }
- };
- synchronized (this) {
- if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
- logger.info("Master election aborted. Master already chosen.");
- return;
- }
- currentMasterProposal = proposer.startProposalTask(
- masterInfo.getMasterId() + 1, sleeperTask);
- }
- Integer result = null;
- try {
- result = currentMasterProposal.get();
- } catch (InterruptedException e) {
- } catch (ExecutionException e) {
- logger.error("Error electing master: ", e);
- } catch (CancellationException e) {
- }
- if (!currentMasterProposal.isCancelled() && result != null) {
- masterController.enableMaster(new State(state), result);
- }
- }
-
private synchronized void abortMasterElection() {
if (currentMasterProposal != null && !currentMasterProposal.isDone()) {
boolean status = currentMasterProposal.cancel(true);
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 9438c7b..651303c 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -1,5 +1,8 @@
package com.orbekk.same;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +63,9 @@ public class SameController {
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
+ ExecutorService clientExecutor = Executors.newCachedThreadPool();
Client client = new Client(clientState, connections,
- clientUrl, myLocation, rpcf);
+ clientUrl, myLocation, rpcf, clientExecutor);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);