summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 14:27:33 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 14:27:33 +0200
commit1cde431ec7e0f5f29ec329c1949c7f5c76366ce5 (patch)
treec1ae7f8804b396c502213cc8d2bb6a19fef8e3a9
parentdc9e014d453e469442cb3555f6c62d496e04dee7 (diff)
Fix thread starvation bug in Client.
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java98
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java6
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java27
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java8
4 files changed, 91 insertions, 48 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 5a7918c..bef096b 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
@@ -31,6 +32,7 @@ public class Client {
private volatile Future<Integer> currentMasterProposal = null;
private volatile MasterState masterInfo;
private final RpcFactory rpcf;
+ private final ExecutorService executor;
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
@@ -155,18 +157,71 @@ public class Client {
return;
}
connectionState = ConnectionState.UNSTABLE;
+ executor.execute(new MasterStarter(request));
done.run(Empty.getDefaultInstance());
- tryBecomeMaster(request);
}
};
+ private class MasterStarter implements Runnable {
+ private final MasterState failedMaster;
+
+ public MasterStarter(MasterState failedMaster) {
+ this.failedMaster = failedMaster;
+ }
+
+ @Override public void run() {
+ logger.info("Trying to become master. Failed master: {}.",
+ failedMaster);
+ List<String> paxosUrls = state.getList(State.PARTICIPANTS);
+ paxosUrls.remove(failedMaster.getMasterLocation());
+ MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
+ connections, rpcf);
+ if (masterController == null) {
+ logger.warn("Could not become master: No master controller.");
+ return;
+ }
+ Runnable sleeperTask = new Runnable() {
+ @Override public synchronized void run() {
+ try {
+ wait(MASTER_TAKEOVER_TIMEOUT);
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+ synchronized (this) {
+ if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
+ logger.info("Master election aborted. Master already chosen.");
+ return;
+ }
+ currentMasterProposal = proposer.startProposalTask(
+ masterInfo.getMasterId() + 1, sleeperTask);
+ }
+ Integer result = null;
+ try {
+ result = currentMasterProposal.get();
+ } catch (InterruptedException e) {
+ } catch (ExecutionException e) {
+ logger.error("Error electing master: ", e);
+ } catch (CancellationException e) {
+ }
+ if (!currentMasterProposal.isCancelled() && result != null &&
+ masterInfo.getMasterId() <= failedMaster.getMasterId()) {
+ masterController.enableMaster(new State(state), result);
+ } else {
+ logger.info("Master election aborted. Master already chosen.");
+ }
+ }
+ }
+
public Client(State state, ConnectionManager connections,
- String myUrl, String myLocation, RpcFactory rpcf) {
+ String myUrl, String myLocation, RpcFactory rpcf,
+ ExecutorService executor) {
this.state = state;
this.connections = connections;
this.myUrl = myUrl;
this.myLocation = myLocation;
this.rpcf = rpcf;
+ this.executor = executor;
}
public void start() {
@@ -174,6 +229,7 @@ public class Client {
public void interrupt() {
connectionState = ConnectionState.DISCONNECTED;
+ executor.shutdown();
}
void performWork() {
@@ -242,44 +298,6 @@ public class Client {
return newServiceImpl;
}
- private void tryBecomeMaster(MasterState failedMaster) {
- List<String> paxosUrls = state.getList(State.PARTICIPANTS);
- paxosUrls.remove(failedMaster.getMasterLocation());
- MasterProposer proposer = new MasterProposer(getClientState(), paxosUrls,
- connections, rpcf);
- if (masterController == null) {
- logger.warn("Could not become master: No master controller.");
- return;
- }
- Runnable sleeperTask = new Runnable() {
- @Override public synchronized void run() {
- try {
- wait(MASTER_TAKEOVER_TIMEOUT);
- } catch (InterruptedException e) {
- }
- }
- };
- synchronized (this) {
- if (failedMaster.getMasterId() < masterInfo.getMasterId()) {
- logger.info("Master election aborted. Master already chosen.");
- return;
- }
- currentMasterProposal = proposer.startProposalTask(
- masterInfo.getMasterId() + 1, sleeperTask);
- }
- Integer result = null;
- try {
- result = currentMasterProposal.get();
- } catch (InterruptedException e) {
- } catch (ExecutionException e) {
- logger.error("Error electing master: ", e);
- } catch (CancellationException e) {
- }
- if (!currentMasterProposal.isCancelled() && result != null) {
- masterController.enableMaster(new State(state), result);
- }
- }
-
private synchronized void abortMasterElection() {
if (currentMasterProposal != null && !currentMasterProposal.isDone()) {
boolean status = currentMasterProposal.cancel(true);
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 9438c7b..651303c 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -1,5 +1,8 @@
package com.orbekk.same;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +63,9 @@ public class SameController {
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
+ ExecutorService clientExecutor = Executors.newCachedThreadPool();
Client client = new Client(clientState, connections,
- clientUrl, myLocation, rpcf);
+ clientUrl, myLocation, rpcf, clientExecutor);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index a69bdca..2a73656 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -6,6 +6,9 @@ import static org.hamcrest.Matchers.is;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Test;
@@ -16,6 +19,7 @@ import com.orbekk.util.DelayedOperation;
/** A functional test that runs with a master and several clients. */
public class FunctionalTest {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
Master master;
String masterUrl = "http://master/MasterService.json";
String masterLocation = "master:1";
@@ -35,6 +39,18 @@ public class FunctionalTest {
};
};
+ /** Works with a single thread executor. */
+ public void awaitExecution() throws InterruptedException {
+ final CountDownLatch finished = new CountDownLatch(1);
+ Runnable sendFinished = new Runnable() {
+ @Override public void run() {
+ finished.countDown();
+ }
+ };
+ executor.execute(sendFinished);
+ finished.await();
+ }
+
@Before public void setUp() {
master = Master.create(connections,
masterUrl, "TestMaster", masterLocation, rpcf);
@@ -52,7 +68,7 @@ public class FunctionalTest {
Client newClient(String clientName, String clientUrl, String location) {
Client client = new Client(new State(clientName), connections,
- clientUrl, location, rpcf);
+ clientUrl, location, rpcf, executor);
connections.clientMap0.put(location, client.getNewService());
clients.add(client);
String paxosUrl = clientUrl.replace("ClientService", "PaxosService");
@@ -113,7 +129,7 @@ public class FunctionalTest {
assertThat(x2.get(), is("TestValue1"));
}
- @Test public void clientBecomesMaster() {
+ @Test public void clientBecomesMaster() throws Exception {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
@@ -132,12 +148,13 @@ public class FunctionalTest {
client2.setMasterController(controller);
client3.setMasterController(controller);
client1.startMasterElection();
+ awaitExecution();
newMaster.performWork();
assertThat(client1.getMaster().getMasterLocation(), is(newMasterLocation));
assertThat(client2.getMaster().getMasterLocation(), is(newMasterLocation));
}
- @Test public void onlyOneNewMaster() {
+ @Test public void onlyOneNewMaster() throws Exception {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:1";
final Master newMaster = Master.create(connections,
@@ -160,12 +177,13 @@ public class FunctionalTest {
client2.setMasterController(controller);
client3.setMasterController(controller);
client1.startMasterElection();
+ awaitExecution();
newMaster.performWork();
assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
assertThat(client2.getMaster().getMasterUrl(), is(newMasterUrl));
}
- @Test public void masterFails() {
+ @Test public void masterFails() throws Exception {
String newMasterUrl = "http://newMaster/MasterService.json";
String newMasterLocation = "newMaster:2";
final Master newMaster = Master.create(connections,
@@ -188,6 +206,7 @@ public class FunctionalTest {
connections.masterMap0.put(masterLocation, null);
assertThat(x1.set("Woop, woop").getStatus().getStatusCode(),
is(DelayedOperation.Status.ERROR));
+ awaitExecution();
performWork();
newMaster.performWork();
assertThat(client1.getMaster().getMasterUrl(), is(newMasterUrl));
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index f22e9e1..6aa3b6e 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -1,16 +1,17 @@
package com.orbekk.same;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class MasterTest {
+ private ExecutorService executor = Executors.newCachedThreadPool();
private State state = new State("TestNetwork");
private TestConnectionManager connections = new TestConnectionManager();
private Master master;
@@ -30,7 +31,8 @@ public class MasterTest {
public void clientJoin() throws Exception {
Client client = new Client(
new State("ClientNetwork"), connections,
- "http://client/ClientService.json", "clientLocation", rpcf);
+ "http://client/ClientService.json", "clientLocation", rpcf,
+ executor);
connections.clientMap0.put("clientLocation", client.getNewService());
client.joinNetwork(master.getMasterInfo());
master.performWork();