diff options
Diffstat (limited to 'same')
9 files changed, 25 insertions, 100 deletions
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index e2723a3..eaca410 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -81,6 +81,8 @@ public class MasterProposer extends Thread { public int getResult() throws InterruptedException { done.await(); + logger.info("ResponseHandler: {} / {} successes.", + numPromises.get(), numRequests); return result.get(); } } @@ -117,6 +119,8 @@ public class MasterProposer extends Thread { .setProposalNumber(proposalNumber) .build(); paxos.acceptRequest(rpc, request, handler); + rpc.await(); + logger.info("Rpc result from paxos.acceptRequest: " + rpc.errorText()); } return handler.getResult(); } @@ -139,6 +143,7 @@ public class MasterProposer extends Thread { Integer proposeRetry(int proposalNumber, Runnable retryAction) throws InterruptedException { + logger.info("Paxos services: {}.", paxosLocations); assert proposalNumber > 0; int nextProposal = proposalNumber; int result = nextProposal - 1; @@ -148,7 +153,8 @@ public class MasterProposer extends Thread { if (result == nextProposal) { result = internalAcceptRequest(nextProposal); } - logger.info("Proposed value {}, result {}", nextProposal, result); + logger.info("Proposed value {}, result {}.", + nextProposal, result); if (result < 0) { nextProposal = -result + 1; if (retryAction != null) { diff --git a/same/src/main/java/com/orbekk/paxos/PaxosService.java b/same/src/main/java/com/orbekk/paxos/PaxosService.java deleted file mode 100644 index a6f6b08..0000000 --- a/same/src/main/java/com/orbekk/paxos/PaxosService.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.orbekk.paxos; - -public interface PaxosService { - - /** - * @return N == proposalNumber if a promise is made. - * -M if another promise already was made, where M is the promise - * highest proposal number. - */ - int propose(String clientUrl, int proposalNumber) throws Exception; - int acceptRequest(String clientUrl, int proposalNumber) throws Exception; -} diff --git a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java index 6b9dd14..40e6320 100644 --- a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java +++ b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java @@ -12,7 +12,7 @@ import com.orbekk.same.Services.PaxosResponse; /** * This class better be thread-safe. */ -public class PaxosServiceImpl implements PaxosService { +public class PaxosServiceImpl { private Logger logger = LoggerFactory.getLogger(getClass()); private int highestPromise = 0; private int highestAcceptedValue = 0; @@ -23,6 +23,8 @@ public class PaxosServiceImpl implements PaxosService { @Override public void propose(RpcController controller, PaxosRequest request, RpcCallback<PaxosResponse> done) { + logger.info("propose({}). Highest promise: {}, Highest accepted: {}", + new Object[]{request, highestPromise, highestAcceptedValue}); String clientUrl = request.getClient().getLocation(); int proposalNumber = request.getProposalNumber(); int response = @@ -36,6 +38,8 @@ public class PaxosServiceImpl implements PaxosService { @Override public void acceptRequest(RpcController controller, PaxosRequest request, RpcCallback<PaxosResponse> done) { + logger.info("acceptRequest({}). Highest promise: {}, Highest accepted: {}", + new Object[]{request, highestPromise, highestAcceptedValue}); String clientUrl = request.getClient().getLocation(); int proposalNumber = request.getProposalNumber(); int response = @@ -56,8 +60,7 @@ public class PaxosServiceImpl implements PaxosService { return service; } - @Override - public synchronized int propose(String clientUrl, + private synchronized int propose(String clientUrl, int proposalNumber) { if (proposalNumber > highestPromise) { logger.info(tag + "propose({}, {}) = accepted", @@ -73,8 +76,7 @@ public class PaxosServiceImpl implements PaxosService { } } - @Override - public synchronized int acceptRequest(String clientUrl, + private synchronized int acceptRequest(String clientUrl, int proposalNumber) { if (proposalNumber == highestPromise) { logger.info(tag + "acceptRequest({}, {}) = accepted", diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 27ce787..5a7918c 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -70,7 +70,7 @@ public class Client { public void run(Services.UpdateComponentResponse response) { if (!rpc.isOk()) { logger.warn("Master failed to respond to update " + - "request: {}", rpc); + "request: {}", rpc.errorText()); op.complete(DelayedOperation.Status.createError( "Error contacting master. Try again later.")); startMasterElection(); diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java index 6fe8669..0002016 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java @@ -1,7 +1,5 @@ package com.orbekk.same; -import com.orbekk.paxos.PaxosService; - /** * An interface that returns a connection for a participant. * diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 0e199d6..280df9b 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -16,7 +16,6 @@ import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.orbekk.paxos.PaxosService; import com.orbekk.protobuf.RpcChannel; public class ConnectionManagerImpl implements ConnectionManager { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 5d97903..6438c8f 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -3,7 +3,6 @@ package com.orbekk.same; import java.util.Map; import java.util.HashMap; -import com.orbekk.paxos.PaxosService; import com.orbekk.same.Services.Directory; import com.orbekk.same.Services.Master; import com.orbekk.same.Services.Paxos; diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java deleted file mode 100644 index 3160d2d..0000000 --- a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.orbekk.paxos; - -import static org.junit.Assert.*; - -import com.orbekk.same.TestConnectionManager; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -public class PaxosServiceTest { - TestConnectionManager connections = new TestConnectionManager(); - private PaxosServiceImpl p1 = new PaxosServiceImpl("P1: "); - private PaxosServiceImpl p2 = new PaxosServiceImpl("P2: "); - private PaxosServiceImpl p3 = new PaxosServiceImpl("P3: "); - private PaxosServiceImpl p4 = new PaxosServiceImpl("P4: "); - private PaxosServiceImpl p5 = new PaxosServiceImpl("P5: "); - private String client = "client"; - private String client1 = "client1"; - private String client2 = "client2"; - private String client3 = "client3"; - private String client4 = "client4"; - private String client5 = "client5"; - private List<PaxosServiceImpl> servers = new ArrayList<PaxosServiceImpl>(); - - @Before - public void setUp() { - Collections.addAll(servers, p1, p2, p3, p4, p5); - connections.paxosMap0.put("p1", p1.getService()); - connections.paxosMap0.put("p2", p2.getService()); - connections.paxosMap0.put("p3", p3.getService()); - connections.paxosMap0.put("p4", p4.getService()); - connections.paxosMap0.put("p5", p5.getService()); - } - - @Test - public void simpleCase() { - assertEquals(1, p1.propose(client, 1)); - assertEquals(1, p1.acceptRequest(client, 1)); - } - - @Test - public void lowerProposalFails() { - assertEquals(10, p1.propose(client1, 10)); - assertEquals(-10, p1.propose(client2, 9)); - assertEquals(100, p1.propose(client2, 100)); - } - - @Test - public void testAccept() { - assertEquals(3, p1.propose(client1, 3)); - assertEquals(4, p1.propose(client2, 4)); - assertEquals(-4, p1.acceptRequest(client1, 3)); - assertEquals(4, p1.acceptRequest(client2, 4)); - } - - public List<String> paxosUrls() { - return new ArrayList<String>(connections.paxosMap0.keySet()); - } - - @Test - @Ignore - public void integrationTest() { -// MasterProposer proposer = new MasterProposer("client1", paxosUrls(), -// connections); -// assertTrue(proposer.propose(1)); - } -} diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java index 275eaac..a69bdca 100644 --- a/same/src/test/java/com/orbekk/same/FunctionalTest.java +++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java @@ -1,10 +1,8 @@ package com.orbekk.same; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.is; import java.util.ArrayList; import java.util.List; @@ -12,8 +10,8 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import com.orbekk.paxos.PaxosService; import com.orbekk.paxos.PaxosServiceImpl; +import com.orbekk.protobuf.Rpc; import com.orbekk.util.DelayedOperation; /** A functional test that runs with a master and several clients. */ @@ -29,7 +27,13 @@ public class FunctionalTest { VariableFactory vf3; List<Client> clients = new ArrayList<Client>(); TestConnectionManager connections = new TestConnectionManager(); - RpcFactory rpcf = new RpcFactory(5000); + RpcFactory rpcf = new RpcFactory(5000) { + @Override public Rpc create() { + Rpc rpc = super.create(); + rpc.complete(); + return rpc; + }; + }; @Before public void setUp() { master = Master.create(connections, |