summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 12:10:57 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-04-25 12:10:57 +0200
commit768938fa1e2bfaa5a246e28badefc7d8fe7cadd3 (patch)
tree29e17a0603922c087db4c24804ba22db290e8eaa
parenta8b921e07bf4ad1936e01628049a9c6fd7349312 (diff)
Add debug messages in Paxos service.
– Remove old PaxosService.
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java8
-rw-r--r--same/src/main/java/com/orbekk/paxos/PaxosService.java12
-rw-r--r--same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java12
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java2
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManager.java2
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java1
-rw-r--r--same/src/main/java/com/orbekk/same/TestConnectionManager.java1
-rw-r--r--same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java71
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java16
9 files changed, 25 insertions, 100 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index e2723a3..eaca410 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -81,6 +81,8 @@ public class MasterProposer extends Thread {
public int getResult() throws InterruptedException {
done.await();
+ logger.info("ResponseHandler: {} / {} successes.",
+ numPromises.get(), numRequests);
return result.get();
}
}
@@ -117,6 +119,8 @@ public class MasterProposer extends Thread {
.setProposalNumber(proposalNumber)
.build();
paxos.acceptRequest(rpc, request, handler);
+ rpc.await();
+ logger.info("Rpc result from paxos.acceptRequest: " + rpc.errorText());
}
return handler.getResult();
}
@@ -139,6 +143,7 @@ public class MasterProposer extends Thread {
Integer proposeRetry(int proposalNumber, Runnable retryAction)
throws InterruptedException {
+ logger.info("Paxos services: {}.", paxosLocations);
assert proposalNumber > 0;
int nextProposal = proposalNumber;
int result = nextProposal - 1;
@@ -148,7 +153,8 @@ public class MasterProposer extends Thread {
if (result == nextProposal) {
result = internalAcceptRequest(nextProposal);
}
- logger.info("Proposed value {}, result {}", nextProposal, result);
+ logger.info("Proposed value {}, result {}.",
+ nextProposal, result);
if (result < 0) {
nextProposal = -result + 1;
if (retryAction != null) {
diff --git a/same/src/main/java/com/orbekk/paxos/PaxosService.java b/same/src/main/java/com/orbekk/paxos/PaxosService.java
deleted file mode 100644
index a6f6b08..0000000
--- a/same/src/main/java/com/orbekk/paxos/PaxosService.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.orbekk.paxos;
-
-public interface PaxosService {
-
- /**
- * @return N == proposalNumber if a promise is made.
- * -M if another promise already was made, where M is the promise
- * highest proposal number.
- */
- int propose(String clientUrl, int proposalNumber) throws Exception;
- int acceptRequest(String clientUrl, int proposalNumber) throws Exception;
-}
diff --git a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
index 6b9dd14..40e6320 100644
--- a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
+++ b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
@@ -12,7 +12,7 @@ import com.orbekk.same.Services.PaxosResponse;
/**
* This class better be thread-safe.
*/
-public class PaxosServiceImpl implements PaxosService {
+public class PaxosServiceImpl {
private Logger logger = LoggerFactory.getLogger(getClass());
private int highestPromise = 0;
private int highestAcceptedValue = 0;
@@ -23,6 +23,8 @@ public class PaxosServiceImpl implements PaxosService {
@Override
public void propose(RpcController controller, PaxosRequest request,
RpcCallback<PaxosResponse> done) {
+ logger.info("propose({}). Highest promise: {}, Highest accepted: {}",
+ new Object[]{request, highestPromise, highestAcceptedValue});
String clientUrl = request.getClient().getLocation();
int proposalNumber = request.getProposalNumber();
int response =
@@ -36,6 +38,8 @@ public class PaxosServiceImpl implements PaxosService {
@Override
public void acceptRequest(RpcController controller,
PaxosRequest request, RpcCallback<PaxosResponse> done) {
+ logger.info("acceptRequest({}). Highest promise: {}, Highest accepted: {}",
+ new Object[]{request, highestPromise, highestAcceptedValue});
String clientUrl = request.getClient().getLocation();
int proposalNumber = request.getProposalNumber();
int response =
@@ -56,8 +60,7 @@ public class PaxosServiceImpl implements PaxosService {
return service;
}
- @Override
- public synchronized int propose(String clientUrl,
+ private synchronized int propose(String clientUrl,
int proposalNumber) {
if (proposalNumber > highestPromise) {
logger.info(tag + "propose({}, {}) = accepted",
@@ -73,8 +76,7 @@ public class PaxosServiceImpl implements PaxosService {
}
}
- @Override
- public synchronized int acceptRequest(String clientUrl,
+ private synchronized int acceptRequest(String clientUrl,
int proposalNumber) {
if (proposalNumber == highestPromise) {
logger.info(tag + "acceptRequest({}, {}) = accepted",
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 27ce787..5a7918c 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -70,7 +70,7 @@ public class Client {
public void run(Services.UpdateComponentResponse response) {
if (!rpc.isOk()) {
logger.warn("Master failed to respond to update " +
- "request: {}", rpc);
+ "request: {}", rpc.errorText());
op.complete(DelayedOperation.Status.createError(
"Error contacting master. Try again later."));
startMasterElection();
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java
index 6fe8669..0002016 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java
@@ -1,7 +1,5 @@
package com.orbekk.same;
-import com.orbekk.paxos.PaxosService;
-
/**
* An interface that returns a connection for a participant.
*
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index 0e199d6..280df9b 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.orbekk.paxos.PaxosService;
import com.orbekk.protobuf.RpcChannel;
public class ConnectionManagerImpl implements ConnectionManager {
diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
index 5d97903..6438c8f 100644
--- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
@@ -3,7 +3,6 @@ package com.orbekk.same;
import java.util.Map;
import java.util.HashMap;
-import com.orbekk.paxos.PaxosService;
import com.orbekk.same.Services.Directory;
import com.orbekk.same.Services.Master;
import com.orbekk.same.Services.Paxos;
diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java
deleted file mode 100644
index 3160d2d..0000000
--- a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.orbekk.paxos;
-
-import static org.junit.Assert.*;
-
-import com.orbekk.same.TestConnectionManager;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class PaxosServiceTest {
- TestConnectionManager connections = new TestConnectionManager();
- private PaxosServiceImpl p1 = new PaxosServiceImpl("P1: ");
- private PaxosServiceImpl p2 = new PaxosServiceImpl("P2: ");
- private PaxosServiceImpl p3 = new PaxosServiceImpl("P3: ");
- private PaxosServiceImpl p4 = new PaxosServiceImpl("P4: ");
- private PaxosServiceImpl p5 = new PaxosServiceImpl("P5: ");
- private String client = "client";
- private String client1 = "client1";
- private String client2 = "client2";
- private String client3 = "client3";
- private String client4 = "client4";
- private String client5 = "client5";
- private List<PaxosServiceImpl> servers = new ArrayList<PaxosServiceImpl>();
-
- @Before
- public void setUp() {
- Collections.addAll(servers, p1, p2, p3, p4, p5);
- connections.paxosMap0.put("p1", p1.getService());
- connections.paxosMap0.put("p2", p2.getService());
- connections.paxosMap0.put("p3", p3.getService());
- connections.paxosMap0.put("p4", p4.getService());
- connections.paxosMap0.put("p5", p5.getService());
- }
-
- @Test
- public void simpleCase() {
- assertEquals(1, p1.propose(client, 1));
- assertEquals(1, p1.acceptRequest(client, 1));
- }
-
- @Test
- public void lowerProposalFails() {
- assertEquals(10, p1.propose(client1, 10));
- assertEquals(-10, p1.propose(client2, 9));
- assertEquals(100, p1.propose(client2, 100));
- }
-
- @Test
- public void testAccept() {
- assertEquals(3, p1.propose(client1, 3));
- assertEquals(4, p1.propose(client2, 4));
- assertEquals(-4, p1.acceptRequest(client1, 3));
- assertEquals(4, p1.acceptRequest(client2, 4));
- }
-
- public List<String> paxosUrls() {
- return new ArrayList<String>(connections.paxosMap0.keySet());
- }
-
- @Test
- @Ignore
- public void integrationTest() {
-// MasterProposer proposer = new MasterProposer("client1", paxosUrls(),
-// connections);
-// assertTrue(proposer.propose(1));
- }
-}
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index 275eaac..a69bdca 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -1,10 +1,8 @@
package com.orbekk.same;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
import java.util.ArrayList;
import java.util.List;
@@ -12,8 +10,8 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
-import com.orbekk.paxos.PaxosService;
import com.orbekk.paxos.PaxosServiceImpl;
+import com.orbekk.protobuf.Rpc;
import com.orbekk.util.DelayedOperation;
/** A functional test that runs with a master and several clients. */
@@ -29,7 +27,13 @@ public class FunctionalTest {
VariableFactory vf3;
List<Client> clients = new ArrayList<Client>();
TestConnectionManager connections = new TestConnectionManager();
- RpcFactory rpcf = new RpcFactory(5000);
+ RpcFactory rpcf = new RpcFactory(5000) {
+ @Override public Rpc create() {
+ Rpc rpc = super.create();
+ rpc.complete();
+ return rpc;
+ };
+ };
@Before public void setUp() {
master = Master.create(connections,