summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/pom.xml43
-rw-r--r--same/src/main/java/com/orbekk/net/BroadcastListener.java62
-rw-r--r--same/src/main/java/com/orbekk/net/HttpUtil.java25
-rw-r--r--same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java76
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java43
-rw-r--r--same/src/main/java/com/orbekk/same/ClientService.java17
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManager.java4
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java41
-rw-r--r--same/src/main/java/com/orbekk/same/MasterService.java7
-rw-r--r--same/src/main/java/com/orbekk/same/MasterServiceProxy.java45
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java23
-rw-r--r--same/src/main/java/com/orbekk/same/TestConnectionManager.java21
-rw-r--r--same/src/main/java/com/orbekk/same/TjwsApp.java19
-rw-r--r--same/src/main/java/com/orbekk/same/benchmark/HttpClientBenchmark.java82
-rw-r--r--same/src/main/java/com/orbekk/same/benchmark/HttpExampleServer.java29
-rw-r--r--same/src/main/java/com/orbekk/same/benchmark/HttpExampleService.java5
-rw-r--r--same/src/main/java/com/orbekk/same/http/HelloServlet.java15
-rw-r--r--same/src/main/java/com/orbekk/same/http/JettyServerBuilder.java48
-rw-r--r--same/src/main/java/com/orbekk/same/http/JettyServerContainer.java65
-rw-r--r--same/src/main/java/com/orbekk/same/http/RpcServlet.java24
-rw-r--r--same/src/main/java/com/orbekk/same/http/ServerContainer.java8
-rw-r--r--same/src/main/java/com/orbekk/same/http/StateServlet.java104
-rw-r--r--same/src/main/java/com/orbekk/same/http/TjwsServerBuilder.java41
-rw-r--r--same/src/main/java/com/orbekk/same/http/TjwsServerContainer.java73
-rw-r--r--same/src/test/java/com/orbekk/net/BroadcastListenerTest.java32
-rw-r--r--same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java12
-rw-r--r--same/src/test/java/com/orbekk/same/ClientTest.java63
-rw-r--r--same/src/test/java/com/orbekk/same/FunctionalTest.java2
-rw-r--r--same/src/test/java/com/orbekk/same/MasterTest.java20
29 files changed, 7 insertions, 1042 deletions
diff --git a/same/pom.xml b/same/pom.xml
index 5462491..24fce46 100644
--- a/same/pom.xml
+++ b/same/pom.xml
@@ -10,21 +10,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
- <repositories>
- <repository>
- <id>jsonrpc3j-webdav-maven-repo</id>
- <name>jsonrpc4j maven repository</name>
- <url>http://jsonrpc4j.googlecode.com/svn/maven/repo/</url>
- <layout>default</layout>
- </repository>
- </repositories>
<dependencies>
<dependency>
- <groupId>com.googlecode</groupId>
- <artifactId>jsonrpc4j</artifactId>
- <version>0.18</version>
- </dependency>
- <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.7.5</version>
@@ -52,36 +39,6 @@
<version>1.6.4</version>
</dependency>
<dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- <version>2.5</version>
- </dependency>
- <dependency>
- <groupId>javax.portlet</groupId>
- <artifactId>portlet-api</artifactId>
- <version>2.0</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>8.0.0.M3</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <version>8.0.0.M3</version>
- </dependency>
- <dependency>
- <groupId>org.jboss.resteasy</groupId>
- <artifactId>tjws</artifactId>
- <version>2.3.1.GA</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.1.2</version>
- </dependency>
- <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.1</version>
diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java
deleted file mode 100644
index df8c02e..0000000
--- a/same/src/main/java/com/orbekk/net/BroadcastListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.orbekk.net;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BroadcastListener {
- private int port;
- private Logger logger = LoggerFactory.getLogger(getClass());
- DatagramSocket socket;
-
- public BroadcastListener(int port) {
- this.port = port;
- }
-
- public synchronized DatagramPacket listen() {
- logger.debug("Waiting for broadcast on port " + port);
- try {
- if (socket == null) {
- socket = new DatagramSocket(null);
- socket.setReuseAddress(true);
- socket.bind(new InetSocketAddress(port));
- }
- } catch (SocketException e) {
- logger.warn("Failed to create socket.", e.fillInStackTrace());
- return null;
- }
- try {
- socket.setBroadcast(true);
- } catch (SocketException e) {
- logger.warn("Exception: {}", e);
- }
- byte[] buffer = new byte[2048];
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- try {
- socket.receive(packet);
- } catch (IOException e) {
- logger.warn("Exception when listening for broadcast: {}", e);
- return null;
- }
-
- String address = packet.getAddress().getHostAddress();
- logger.debug("Received broadcast from " + address +
- ": " + new String(packet.getData(), 0, packet.getLength()));
- return packet;
- }
-
- public void interrupt() {
- socket.close();
- }
-
- public static void main(String[] args) {
- int port = Integer.parseInt(args[0]);
- BroadcastListener listener = new BroadcastListener(port);
- System.out.println("Received broadcast: " + listener.listen());
- }
-}
diff --git a/same/src/main/java/com/orbekk/net/HttpUtil.java b/same/src/main/java/com/orbekk/net/HttpUtil.java
deleted file mode 100644
index 946c377..0000000
--- a/same/src/main/java/com/orbekk/net/HttpUtil.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.orbekk.net;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HttpUtil {
- private static final Logger logger =
- LoggerFactory.getLogger(HttpUtil.class);
-
- public static void sendHttpRequest(String url) {
- try {
- URL pingUrl = new URL(url);
- pingUrl.openStream();
- // URLConnection connection = pingUrl.openConnection();
- // connection.connect();
- } catch (MalformedURLException e) {
- logger.warn("Unable to send ping to {}: {}.", url, e);
- } catch (IOException e) {
- logger.warn("Error when sending ping: {}", e);
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java b/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java
deleted file mode 100644
index d5e30d7..0000000
--- a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.orbekk.net;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Type;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentProducer;
-import org.apache.http.entity.EntityTemplate;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.googlecode.jsonrpc4j.JsonRpcClient;
-import com.googlecode.jsonrpc4j.JsonRpcHttpClient;
-
-/**
- * This class is horrible. :S
- *
- * We extend JsonRpcHttpClient but try to override everything it does.
- */
-public class MyJsonRpcHttpClient extends JsonRpcHttpClient {
- Logger logger = LoggerFactory.getLogger(getClass());
- private URL serviceUrl;
- private JsonRpcClient rpcClient;
- private HttpClient httpClient;
-
- public MyJsonRpcHttpClient(URL serviceUrl, int connectionTimeout,
- int readTimeout) {
- super(null);
- httpClient = new DefaultHttpClient();
- HttpParams params = httpClient.getParams();
- HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
- HttpConnectionParams.setSoTimeout(params, readTimeout);
- rpcClient = new JsonRpcClient();
- this.serviceUrl = serviceUrl;
- }
-
- @Override
- public synchronized Object invoke(
- final String methodName, final Object[] arguments, Type returnType,
- Map<String, String> extraHeaders)
- throws Exception {
- EntityTemplate entity = new EntityTemplate(new ContentProducer() {
- @Override
- public void writeTo(OutputStream out) throws IOException {
- try {
- rpcClient.invoke(methodName, arguments, out);
- } catch (Exception e) {
- throw new IOException("RPC Failed: " + e.getMessage());
- }
- }
- });
- entity.setContentType("application/json-rpc");
-
- HttpPost post = new HttpPost(serviceUrl.toString());
-
- for (Map.Entry<String, String> entry : extraHeaders.entrySet()) {
- post.addHeader(entry.getKey(), entry.getValue());
- }
-
- post.setEntity(entity);
-
- HttpResponse response = httpClient.execute(post);
- HttpEntity responseEntity = response.getEntity();
-
- return super.readResponse(returnType, responseEntity.getContent());
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 1b99ce4..6582e4a 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -159,45 +159,6 @@ public class Client {
}
};
- private ClientService serviceImpl = new ClientService() {
- RpcCallback<Empty> noOp = new RpcCallback<Empty>() {
- @Override public void run(Empty unused) {
- }
- };
-
- @Override
- public void setState(String component, String data, long revision) throws Exception {
- logger.info("SetState: {}, {}, {}",
- new Object[]{component, data, revision});
- Services.Component request = Services.Component.newBuilder()
- .setId(component)
- .setData(data)
- .setRevision(revision)
- .build();
- newServiceImpl.setState(null, request, noOp);
- }
-
- @Override
- public synchronized void masterTakeover(String masterUrl, String networkName,
- int masterId, String masterLocation) throws Exception {
- Services.MasterState request = Services.MasterState.newBuilder()
- .setMasterUrl(masterUrl)
- .setNetworkName(networkName)
- .setMasterId(masterId)
- .setMasterLocation(masterLocation)
- .build();
- newServiceImpl.masterTakeover(null, request, noOp);
- }
-
- @Override
- public void masterDown(int masterId) throws Exception {
- Services.MasterState request = masterInfo.toBuilder()
- .setMasterId(masterId)
- .build();
- newServiceImpl.masterDown(null, request, noOp);
- }
- };
-
public Client(State state, ConnectionManager connections,
String myUrl, String myLocation) {
this.state = state;
@@ -275,10 +236,6 @@ public class Client {
return state;
}
- public ClientService getService() {
- return serviceImpl;
- }
-
public Services.Client getNewService() {
return newServiceImpl;
}
diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java
deleted file mode 100644
index d7122c7..0000000
--- a/same/src/main/java/com/orbekk/same/ClientService.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.orbekk.same;
-
-public interface ClientService {
- void setState(String component, String data, long revision) throws Exception;
-
- /** A new master takes over.
- *
- * @param masterUrl The new master URL.
- * @param masterId The ID of the new master. Only accept if this is higher
- * than the current master.
- */
- void masterTakeover(String masterUrl, String networkName,
- int masterId, String masterLocation) throws Exception;
-
- /** The master is down, so start a new master election. */
- void masterDown(int masterId) throws Exception;
-}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManager.java b/same/src/main/java/com/orbekk/same/ConnectionManager.java
index e4c7832..6fe8669 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManager.java
@@ -8,10 +8,6 @@ import com.orbekk.paxos.PaxosService;
* When testing, this interface can be mocked to use local participants only.
*/
public interface ConnectionManager {
- ClientService getClient(String url);
- MasterService getMaster(String url);
- PaxosService getPaxos(String url);
-
Services.Master getMaster0(String location);
Services.Client getClient0(String location);
Services.Directory getDirectory(String location);
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index 1bf697f..ac0abe0 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -17,15 +17,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.googlecode.jsonrpc4j.ProxyUtil;
-import com.orbekk.net.MyJsonRpcHttpClient;
import com.orbekk.paxos.PaxosService;
import com.orbekk.protobuf.RpcChannel;
public class ConnectionManagerImpl implements ConnectionManager {
private int connectionTimeout;
private int readTimeout;
- private Map<String, MyJsonRpcHttpClient> connectionCache =
- new HashMap<String, MyJsonRpcHttpClient>();
private ConcurrentMap<String, Future<RpcChannel>> channels =
new ConcurrentHashMap<String, Future<RpcChannel>>();
@@ -40,29 +37,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
this.readTimeout = readTimeout;
}
- private MyJsonRpcHttpClient getConnection(String url)
- throws MalformedURLException {
- if (!connectionCache.containsKey(url)) {
- connectionCache.put(url, new MyJsonRpcHttpClient(new URL(url),
- connectionTimeout, readTimeout));
- }
- return connectionCache.get(url);
- }
-
- private <T>T getClassProxy(String url, Class<T> clazz) {
- T service = null;
- try {
- MyJsonRpcHttpClient client = getConnection(url);
- service = ProxyUtil.createProxy(
- this.getClass().getClassLoader(),
- clazz,
- client);
- } catch (MalformedURLException e) {
- logger.warn("Unable to create client for {}, {}", url, e);
- }
- return service;
- }
-
private RpcChannel getChannel(String location) {
Future<RpcChannel> channel = channels.get(location);
if (channel == null) {
@@ -102,21 +76,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
}
@Override
- public ClientService getClient(String url) {
- return getClassProxy(url, ClientService.class);
- }
-
- @Override
- public MasterService getMaster(String url) {
- return getClassProxy(url, MasterService.class);
- }
-
- @Override
- public PaxosService getPaxos(String url) {
- return getClassProxy(url, PaxosService.class);
- }
-
- @Override
public Services.Master getMaster0(String location) {
RpcChannel channel = getChannel(location);
if (channel != null) {
diff --git a/same/src/main/java/com/orbekk/same/MasterService.java b/same/src/main/java/com/orbekk/same/MasterService.java
deleted file mode 100644
index 6076ed4..0000000
--- a/same/src/main/java/com/orbekk/same/MasterService.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.orbekk.same;
-
-public interface MasterService {
- void joinNetworkRequest(String clientUrl) throws Exception;
- boolean updateStateRequest(String component, String newData, long revision)
- throws Exception;
-}
diff --git a/same/src/main/java/com/orbekk/same/MasterServiceProxy.java b/same/src/main/java/com/orbekk/same/MasterServiceProxy.java
deleted file mode 100644
index 0532d86..0000000
--- a/same/src/main/java/com/orbekk/same/MasterServiceProxy.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.orbekk.same;
-
-public class MasterServiceProxy implements MasterService {
- public static class MasterDeactivatedException extends Exception {
- public MasterDeactivatedException() {
- }
- }
-
- private MasterService masterService = null;
-
- public MasterServiceProxy() {
- }
-
- public MasterServiceProxy(MasterService masterService) {
- this.masterService = masterService;
- }
-
- public MasterService getService() {
- return masterService;
- }
-
- public void setService(MasterService masterService) {
- this.masterService = masterService;
- }
-
- @Override
- public void joinNetworkRequest(String clientUrl) throws Exception {
- if (masterService != null) {
- masterService.joinNetworkRequest(clientUrl);
- } else {
- throw new MasterDeactivatedException();
- }
- }
-
- @Override
- public boolean updateStateRequest(String component, String newData, long revision)
- throws Exception {
- if (masterService != null) {
- return masterService.updateStateRequest(component, newData, revision);
- } else {
- throw new MasterDeactivatedException();
- }
- }
-
-}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index a16f57a..e28db59 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -9,15 +9,10 @@ import com.orbekk.paxos.PaxosServiceImpl;
import com.orbekk.protobuf.Rpc;
import com.orbekk.protobuf.SimpleProtobufServer;
import com.orbekk.same.config.Configuration;
-import com.orbekk.same.http.JettyServerBuilder;
-import com.orbekk.same.http.ServerContainer;
-import com.orbekk.same.http.StateServlet;
public class SameController {
private Logger logger = LoggerFactory.getLogger(getClass());
- private ServerContainer server;
private SimpleProtobufServer pServer;
- private MasterServiceProxy masterService;
private Master master;
private Client client;
private PaxosServiceImpl paxos;
@@ -65,24 +60,15 @@ public class SameController {
configuration.get("localIp"), configuration.getInt("port"));
String clientUrl = baseUrl + "ClientService.json";
- MasterServiceProxy master = new MasterServiceProxy();
Client client = new Client(clientState, connections,
clientUrl, myLocation);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
- StateServlet stateServlet = new StateServlet(client.getInterface(),
- new VariableFactory(client.getInterface()));
- ServerContainer server = new JettyServerBuilder(port)
- .withServlet(stateServlet, "/_/state")
- .withService(client.getService(), ClientService.class)
- .withService(master, MasterService.class)
- .withService(paxos, PaxosService.class)
- .build();
SimpleProtobufServer pServer = SimpleProtobufServer.create(pport);
pServer.registerService(client.getNewService());
SameController controller = new SameController(
- configuration, connections, server, master, client,
+ configuration, connections, client,
paxos, broadcaster, pServer);
return controller;
}
@@ -90,16 +76,12 @@ public class SameController {
public SameController(
Configuration configuration,
ConnectionManager connections,
- ServerContainer server,
- MasterServiceProxy master,
Client client,
PaxosServiceImpl paxos,
Broadcaster serviceBroadcaster,
SimpleProtobufServer pServer) {
this.configuration = configuration;
this.connections = connections;
- this.server = server;
- this.masterService = master;
this.client = client;
this.paxos = paxos;
this.serviceBroadcaster = serviceBroadcaster;
@@ -107,7 +89,6 @@ public class SameController {
}
public void start() throws Exception {
- server.start();
pServer.start();
client.setMasterController(masterController);
client.start();
@@ -119,7 +100,6 @@ public class SameController {
if (master != null) {
master.interrupt();
}
- server.stop();
pServer.interrupt();
} catch (Exception e) {
logger.error("Failed to stop webserver", e);
@@ -127,7 +107,6 @@ public class SameController {
}
public void join() {
- server.join();
client.interrupt();
if (master != null) {
master.interrupt();
diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
index b93e49b..5d97903 100644
--- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
@@ -12,12 +12,6 @@ import com.orbekk.same.Services.Paxos;
* This class is used in test.
*/
public class TestConnectionManager implements ConnectionManager {
- public Map<String, ClientService> clientMap =
- new HashMap<String, ClientService>();
- public Map<String, MasterService> masterMap =
- new HashMap<String, MasterService>();
- public Map<String, PaxosService> paxosMap =
- new HashMap<String, PaxosService>();
public Map<String, Services.Directory> directoryMap =
new HashMap<String, Services.Directory>();
public Map<String, Services.Master> masterMap0 =
@@ -31,21 +25,6 @@ public class TestConnectionManager implements ConnectionManager {
}
@Override
- public ClientService getClient(String url) {
- return clientMap.get(url);
- }
-
- @Override
- public MasterService getMaster(String url) {
- return masterMap.get(url);
- }
-
- @Override
- public PaxosService getPaxos(String url) {
- return paxosMap.get(url);
- }
-
- @Override
public Directory getDirectory(String location) {
return directoryMap.get(location);
}
diff --git a/same/src/main/java/com/orbekk/same/TjwsApp.java b/same/src/main/java/com/orbekk/same/TjwsApp.java
deleted file mode 100644
index c3f1a78..0000000
--- a/same/src/main/java/com/orbekk/same/TjwsApp.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.orbekk.same;
-
-import com.orbekk.same.http.HelloServlet;
-import com.orbekk.same.http.ServerContainer;
-import com.orbekk.same.http.TjwsServerBuilder;
-
-public class TjwsApp {
- public static void main(String[] args) {
- ServerContainer server = new TjwsServerBuilder(8080)
- .withServlet(new HelloServlet(), "/hello")
- .build();
- try {
- server.start();
- server.join();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/benchmark/HttpClientBenchmark.java b/same/src/main/java/com/orbekk/same/benchmark/HttpClientBenchmark.java
deleted file mode 100644
index fc315c4..0000000
--- a/same/src/main/java/com/orbekk/same/benchmark/HttpClientBenchmark.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.orbekk.same.benchmark;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.concurrent.CountDownLatch;
-
-import com.google.protobuf.RpcCallback;
-import com.googlecode.jsonrpc4j.ProxyUtil;
-import com.orbekk.net.MyJsonRpcHttpClient;
-import com.orbekk.protobuf.Rpc;
-import com.orbekk.protobuf.RpcChannel;
-import com.orbekk.same.benchmark.Example.Data;
-
-public class HttpClientBenchmark {
- private final HttpExampleService service;
- private final int warmupIterations;
- private final int iterations;
-
- public static void benchmark(String url, int warmupIterations,
- int iterations) throws InterruptedException {
- MyJsonRpcHttpClient client;
- try {
- client = new MyJsonRpcHttpClient(
- new URL(url), 2000, 2000);
- } catch (MalformedURLException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- HttpExampleService service = ProxyUtil.createProxy(
- HttpClientBenchmark.class.getClassLoader(),
- HttpExampleService.class,
- client);
- HttpClientBenchmark benchmark = new HttpClientBenchmark(
- service, warmupIterations, iterations);
- benchmark.benchmark();
- }
-
- public HttpClientBenchmark(HttpExampleService service,
- int warmupIterations, int iterations) {
- this.service = service;
- this.warmupIterations = warmupIterations;
- this.iterations = iterations;
- }
-
- private void runBenchmark(int iterations) throws InterruptedException {
- final CountDownLatch finished =
- new CountDownLatch(iterations);
-
- for (int i = 0; i < iterations; i++) {
- service.methodA("", iterations, 0);
- finished.countDown();
- }
-
- finished.await();
- }
-
- public void benchmark() throws InterruptedException {
- long warmupStart = System.currentTimeMillis();
- runBenchmark(warmupIterations);
- long warmupFinished = System.currentTimeMillis();
- System.out.println("Warmup: " + warmupIterations + " in " +
- (warmupFinished - warmupStart) + "ms. ");
- long start = System.currentTimeMillis();
- runBenchmark(iterations);
- long finished = System.currentTimeMillis();
- System.out.println("Benchmark: " + iterations+ " in " +
- (finished - start) + "ms. ");
- }
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.err.println("Usage: ClientBenchmark <url>");
- System.exit(1);
- }
- String url = args[0];
- try {
- benchmark(url, 1000, 10000);
- } catch (InterruptedException e) {
- System.out.println("Benchmark failed.");
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/benchmark/HttpExampleServer.java b/same/src/main/java/com/orbekk/same/benchmark/HttpExampleServer.java
deleted file mode 100644
index f694680..0000000
--- a/same/src/main/java/com/orbekk/same/benchmark/HttpExampleServer.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.orbekk.same.benchmark;
-
-import java.util.logging.Logger;
-
-import com.orbekk.same.http.JettyServerBuilder;
-import com.orbekk.same.http.JettyServerContainer;
-
-public class HttpExampleServer {
- private final static Logger logger =
- Logger.getLogger(HttpExampleServer.class.getName());
- private volatile JettyServerContainer server;
-
- class ServiceImpl implements HttpExampleService {
- @Override public String methodA(String message, int arg1, int arg2) {
- return message + arg1 + arg2;
- }
- }
-
- public void runServer(int port) throws Exception {
- server = new JettyServerBuilder(port)
- .withService(new ServiceImpl(), HttpExampleService.class)
- .build();
- server.start();
- }
-
- public void stopServer() throws Exception {
- server.stop();
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/benchmark/HttpExampleService.java b/same/src/main/java/com/orbekk/same/benchmark/HttpExampleService.java
deleted file mode 100644
index ee8cbf5..0000000
--- a/same/src/main/java/com/orbekk/same/benchmark/HttpExampleService.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.orbekk.same.benchmark;
-
-public interface HttpExampleService {
- String methodA(String message, int arg1, int arg2);
-}
diff --git a/same/src/main/java/com/orbekk/same/http/HelloServlet.java b/same/src/main/java/com/orbekk/same/http/HelloServlet.java
deleted file mode 100644
index 78ce6b1..0000000
--- a/same/src/main/java/com/orbekk/same/http/HelloServlet.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.orbekk.same.http;
-
-import java.io.IOException;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-public class HelloServlet extends HttpServlet {
- public void doGet(HttpServletRequest request, HttpServletResponse response)
- throws IOException {
- response.setContentType("text/plain; charset=utf8");
- response.getWriter().println("Hello, World");
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/JettyServerBuilder.java b/same/src/main/java/com/orbekk/same/http/JettyServerBuilder.java
deleted file mode 100644
index 91d2725..0000000
--- a/same/src/main/java/com/orbekk/same/http/JettyServerBuilder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.orbekk.same.http;
-
-import javax.servlet.http.HttpServlet;
-
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.googlecode.jsonrpc4j.JsonRpcServer;
-
-public class JettyServerBuilder {
- Logger logger = LoggerFactory.getLogger(getClass());
- int port;
- ServletContextHandler context = null;
-
- public JettyServerBuilder(int port) {
- this.port = port;
- }
-
- public JettyServerBuilder withServlet(HttpServlet servlet, String pathSpec) {
- logger.info("Servlet binding: {} → {}", pathSpec, servlet);
- getServletContextHandler().addServlet(new ServletHolder(servlet),
- pathSpec);
- return this;
- }
-
- public <T> JettyServerBuilder withService(T service, Class<T> clazz) {
- JsonRpcServer server = new JsonRpcServer(service, clazz);
- String pathSpec = "/" + clazz.getSimpleName() + ".json";
- return withServlet(new RpcServlet(server), pathSpec);
- }
-
- public JettyServerContainer build() {
- JettyServerContainer server = JettyServerContainer.create(port);
- server.setReuseAddress(true);
- server.setContext(getServletContextHandler());
- return server;
- }
-
- private ServletContextHandler getServletContextHandler() {
- if (context == null) {
- context = new ServletContextHandler();
- context.setContextPath("/");
- }
- return context;
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/JettyServerContainer.java b/same/src/main/java/com/orbekk/same/http/JettyServerContainer.java
deleted file mode 100644
index 16d7033..0000000
--- a/same/src/main/java/com/orbekk/same/http/JettyServerContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.orbekk.same.http;
-
-import org.eclipse.jetty.server.AbstractConnector;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JettyServerContainer implements ServerContainer {
- Logger logger = LoggerFactory.getLogger(getClass());
- Server server;
- int port;
- ServletContextHandler context = null;
-
- public JettyServerContainer(Server server, int port, ServletContextHandler context) {
- this.server = server;
- this.port = port;
- this.context = context;
- }
-
- public static JettyServerContainer create(int port) {
- Server server = new Server(port);
- return new JettyServerContainer(server, port, null);
- }
-
- public void setContext(ServletContextHandler context) {
- server.setHandler(context);
- this.context = context;
- }
-
- public void setReuseAddress(boolean on) {
- Connector connector = server.getConnectors()[0];
- if (connector instanceof AbstractConnector) {
- AbstractConnector connector_ = (AbstractConnector)connector;
- connector_.setReuseAddress(on);
- }
- }
-
- public int getPort() {
- if (port == 0) {
- return server.getConnectors()[0].getLocalPort();
- } else {
- return port;
- }
- }
-
- public void start() throws Exception {
- server.start();
- logger.info("Started server on port {}", getPort());
- }
-
- public void stop() throws Exception {
- server.stop();
- logger.info("Server stopped.");
- }
-
- public void join() {
- try {
- server.join();
- } catch (InterruptedException e) {
- return;
- }
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/RpcServlet.java b/same/src/main/java/com/orbekk/same/http/RpcServlet.java
deleted file mode 100644
index 9450d67..0000000
--- a/same/src/main/java/com/orbekk/same/http/RpcServlet.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.orbekk.same.http;
-
-import java.io.IOException;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import com.googlecode.jsonrpc4j.JsonRpcServer;
-
-public class RpcServlet extends HttpServlet {
- JsonRpcServer rpcServer;
-
- public RpcServlet(JsonRpcServer rpcServer) {
- super();
- this.rpcServer = rpcServer;
- }
-
- @Override
- protected void doPost(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- rpcServer.handle(request, response);
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/ServerContainer.java b/same/src/main/java/com/orbekk/same/http/ServerContainer.java
deleted file mode 100644
index b69adbf..0000000
--- a/same/src/main/java/com/orbekk/same/http/ServerContainer.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.orbekk.same.http;
-
-public interface ServerContainer {
- public abstract int getPort();
- public abstract void start() throws Exception;
- public abstract void stop() throws Exception;
- public abstract void join();
-} \ No newline at end of file
diff --git a/same/src/main/java/com/orbekk/same/http/StateServlet.java b/same/src/main/java/com/orbekk/same/http/StateServlet.java
deleted file mode 100644
index efc4237..0000000
--- a/same/src/main/java/com/orbekk/same/http/StateServlet.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.orbekk.same.http;
-
-import static com.orbekk.same.StackTraceUtil.throwableToString;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.orbekk.same.ClientInterface;
-import com.orbekk.same.UpdateConflict;
-import com.orbekk.same.Variable;
-import com.orbekk.same.VariableFactory;
-
-public class StateServlet extends HttpServlet {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private ClientInterface client;
- private VariableFactory variableFactory;
- private final static String TITLE = "State viewer";
-
- public StateServlet(ClientInterface client,
- VariableFactory variableFactory) {
- this.client = client;
- this.variableFactory = variableFactory;
- }
-
- private void handleSetState(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- if (request.getParameter("key") == null ||
- request.getParameter("value") == null) {
- response.getWriter().println(
- "Usage: action=set&key=DesiredKey&value=DesiredValue");
- }
-
- String key = request.getParameter("key");
- String value = request.getParameter("value");
- Variable<String> variable = variableFactory.createString(key);
-
- variable.set(value).waitFor();
-
- response.getWriter().println("Updated component: " +
- key + "=" + value);
- }
-
- @Override
- protected void doGet(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- if ("set".equals(request.getParameter("action"))) {
- handleSetState(request, response);
- response.sendRedirect(request.getServletPath() + "?message=OK");
- } else {
- response.setContentType("text/html; charset=utf8");
- writeHeader(response);
- if (request.getParameter("message") != null) {
- response.getWriter().println("<p>");
- response.getWriter().println(request.getParameter("message"));
- }
- writeState(response);
- writeSetStateForm(response);
- writeFooter(response);
- response.setStatus(HttpServletResponse.SC_OK);
- }
- }
-
- private void writeState(HttpServletResponse response) throws IOException {
- PrintWriter w = response.getWriter();
- w.println("<h2>State</h2>");
- w.println("<pre>");
- w.println(client.getState());
- w.println("</pre>");
- }
-
- private void writeSetStateForm(HttpServletResponse response)
- throws IOException {
- PrintWriter w = response.getWriter();
- w.println("<h3>Change state</h3>");
- w.println("<form name=\"stateInput\" action=\"\">");
- w.println("<p>Key: <input type=\"text\" name=\"key\" />");
- w.println("<p>Value: <input type=\"text\" name=\"value\" />");
- w.println("<input type=\"hidden\" name=\"action\" value=\"set\" />");
- w.println("<p><input type=\"submit\" value=\"Sumbit\" />");
- w.println("</form>");
- }
-
- private void writeHeader(HttpServletResponse response) throws IOException {
- PrintWriter w = response.getWriter();
- w.println("<html>");
- w.println("<head>");
- w.println("<title>" + TITLE + "</title>");
- w.println("</head>");
- w.println("<body>");
- }
-
- private void writeFooter(HttpServletResponse response) throws IOException {
- PrintWriter w = response.getWriter();
- w.println("</body>");
- w.println("</html>");
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/TjwsServerBuilder.java b/same/src/main/java/com/orbekk/same/http/TjwsServerBuilder.java
deleted file mode 100644
index 7a07dbd..0000000
--- a/same/src/main/java/com/orbekk/same/http/TjwsServerBuilder.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.orbekk.same.http;
-
-import java.util.ArrayList;
-
-import javax.servlet.http.HttpServlet;
-
-import com.googlecode.jsonrpc4j.JsonRpcServer;
-
-public class TjwsServerBuilder {
- private int port;
- private ArrayList<String> servletPaths = new ArrayList<String>();
- private ArrayList<HttpServlet> servlets = new ArrayList<HttpServlet>();
-
- public TjwsServerBuilder(int port) {
- this.port = port;
- }
-
- /** Note: Does not preserve order. */
- public TjwsServerBuilder withServlet(HttpServlet servlet,
- String pathSpec) {
- servletPaths.add(pathSpec);
- servlets.add(servlet);
- return this;
- }
-
- public <T> TjwsServerBuilder withService(T service, Class<T> clazz) {
- JsonRpcServer server = new JsonRpcServer(service, clazz);
- String pathSpec = "/" + clazz.getSimpleName() + ".json";
- return withServlet(new RpcServlet(server), pathSpec);
- }
-
- public ServerContainer build() {
- TjwsServerContainer server = TjwsServerContainer.create(port);
- for (int i = 0; i < servletPaths.size(); i++) {
- String path = servletPaths.get(i);
- HttpServlet servlet = servlets.get(i);
- server.addServlet(path, servlet);
- }
- return server;
- }
-}
diff --git a/same/src/main/java/com/orbekk/same/http/TjwsServerContainer.java b/same/src/main/java/com/orbekk/same/http/TjwsServerContainer.java
deleted file mode 100644
index 2aa7efd..0000000
--- a/same/src/main/java/com/orbekk/same/http/TjwsServerContainer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package com.orbekk.same.http;
-
-import java.util.Properties;
-
-import javax.servlet.http.HttpServlet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import Acme.Serve.Serve;
-
-public class TjwsServerContainer implements ServerContainer {
- private static class MyServer extends Serve {
- public void join() {
- try {
- backgroundThread.join();
- } catch (InterruptedException e) {
- return;
- };
- }
- }
-
- private Logger logger = LoggerFactory.getLogger(getClass());
- private MyServer server;
-
- public static TjwsServerContainer create(int port) {
- Properties properties = new Properties();
- properties.put(Serve.ARG_PORT, port);
- MyServer server = new MyServer();
- server.arguments = properties;
- return new TjwsServerContainer(server);
- }
-
- public TjwsServerContainer(MyServer server) {
- this.server = server;
- }
-
- /* (non-Javadoc)
- * @see com.orbekk.same.http.ServerContainer#getPort()
- */
- @Override
- public int getPort() {
- return (Integer)this.server.getAttribute(Serve.ARG_PORT);
- }
-
- /* (non-Javadoc)
- * @see com.orbekk.same.http.ServerContainer#start()
- */
- @Override
- public void start() {
- server.runInBackground();
- }
-
- /* (non-Javadoc)
- * @see com.orbekk.same.http.ServerContainer#stop()
- */
- @Override
- public void stop() {
- server.stopBackground();
- }
-
- /* (non-Javadoc)
- * @see com.orbekk.same.http.ServerContainer#join()
- */
- @Override
- public void join() {
- server.join();
- }
-
- public void addServlet(String pathSpec, HttpServlet servlet) {
- server.addServlet(pathSpec, servlet);
- }
-}
diff --git a/same/src/test/java/com/orbekk/net/BroadcastListenerTest.java b/same/src/test/java/com/orbekk/net/BroadcastListenerTest.java
deleted file mode 100644
index 67d4ece..0000000
--- a/same/src/test/java/com/orbekk/net/BroadcastListenerTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.orbekk.net;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BroadcastListenerTest {
- private Logger logger = LoggerFactory.getLogger(getClass());
-
- @Test
- public void interruptWorks() throws Exception {
- final BroadcastListener listener = new BroadcastListener(0);
-
- Thread t = new Thread() {
- @Override public void run() {
- listener.listen();
- }
- };
- t.start();
-
- while (listener.socket == null) {
- logger.info("Waiting for listener to start.");
- Thread.sleep(100);
- }
-
- listener.interrupt();
- t.join();
- }
-
-}
diff --git a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java b/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java
index c07ca87..3160d2d 100644
--- a/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java
+++ b/same/src/test/java/com/orbekk/paxos/PaxosServiceTest.java
@@ -29,11 +29,11 @@ public class PaxosServiceTest {
@Before
public void setUp() {
Collections.addAll(servers, p1, p2, p3, p4, p5);
- connections.paxosMap.put("p1", p1);
- connections.paxosMap.put("p2", p2);
- connections.paxosMap.put("p3", p3);
- connections.paxosMap.put("p4", p4);
- connections.paxosMap.put("p5", p5);
+ connections.paxosMap0.put("p1", p1.getService());
+ connections.paxosMap0.put("p2", p2.getService());
+ connections.paxosMap0.put("p3", p3.getService());
+ connections.paxosMap0.put("p4", p4.getService());
+ connections.paxosMap0.put("p5", p5.getService());
}
@Test
@@ -58,7 +58,7 @@ public class PaxosServiceTest {
}
public List<String> paxosUrls() {
- return new ArrayList<String>(connections.paxosMap.keySet());
+ return new ArrayList<String>(connections.paxosMap0.keySet());
}
@Test
diff --git a/same/src/test/java/com/orbekk/same/ClientTest.java b/same/src/test/java/com/orbekk/same/ClientTest.java
deleted file mode 100644
index 4dbc408..0000000
--- a/same/src/test/java/com/orbekk/same/ClientTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.orbekk.same;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.orbekk.util.DelayedOperation;
-
-public class ClientTest {
- private State state = new State("ClientNetwork");
- private TestConnectionManager connections = new TestConnectionManager();
- private Client client = new Client(state, connections,
- "http://client/ClientService.json", "clientLocation");
- private ClientService clientS = client.getService();
- private MasterService mockMaster = mock(MasterService.class);
-
- @Before public void setUp() {
- connections.masterMap.put("master", mockMaster);
- }
-
- @Test public void disconnectedFailsUpdate() throws Exception {
- ClientInterface clientI = client.getInterface();
- DelayedOperation op = clientI.set(null);
- assertTrue(op.isDone());
- assertFalse(op.getStatus().isOk());
- }
-
- // TODO: Fix this test with protobuf rpc.
- @Ignore
- @Test public void connectedUpdateWorks() throws Exception {
- clientS.masterTakeover("master", "MyNetwork", 1, "master");
- ClientInterface clientI = client.getInterface();
- State.Component component = new State.Component(
- "TestVariable", 1, "meow");
- when(mockMaster.updateStateRequest("TestVariable", "meow", 1))
- .thenReturn(true);
- DelayedOperation op = clientI.set(component);
- assertTrue(op.getStatus().isOk());
- }
-
- @Test public void testSetState() throws Exception {
- clientS.setState("TestState", "Test data", 100);
- assertEquals(100, state.getRevision("TestState"));
- assertEquals("Test data", state.getDataOf("TestState"));
- }
-
- @Test public void stateListenerReceivesUpdate() throws Exception {
- StateChangedListener listener = mock(StateChangedListener.class);
- client.getInterface().addStateListener(listener);
- clientS.setState("StateListenerVariable", "100", 1);
- State.Component component = state.getComponent("StateListenerVariable");
- assertEquals("100", component.getData());
- verify(listener).stateChanged(eq(component));
- }
-}
diff --git a/same/src/test/java/com/orbekk/same/FunctionalTest.java b/same/src/test/java/com/orbekk/same/FunctionalTest.java
index da1faca..839b107 100644
--- a/same/src/test/java/com/orbekk/same/FunctionalTest.java
+++ b/same/src/test/java/com/orbekk/same/FunctionalTest.java
@@ -49,12 +49,10 @@ public class FunctionalTest {
Client newClient(String clientName, String clientUrl, String location) {
Client client = new Client(new State(clientName), connections,
clientUrl, location);
- connections.clientMap.put(clientUrl, client.getService());
connections.clientMap0.put(location, client.getNewService());
clients.add(client);
String paxosUrl = clientUrl.replace("ClientService", "PaxosService");
PaxosServiceImpl paxos = new PaxosServiceImpl(paxosUrl);
- connections.paxosMap.put(paxosUrl, paxos);
connections.paxosMap0.put(location, paxos.getService());
return client;
}
diff --git a/same/src/test/java/com/orbekk/same/MasterTest.java b/same/src/test/java/com/orbekk/same/MasterTest.java
index b3cfb94..4fa0a5f 100644
--- a/same/src/test/java/com/orbekk/same/MasterTest.java
+++ b/same/src/test/java/com/orbekk/same/MasterTest.java
@@ -15,26 +15,6 @@ public class MasterTest {
private TestConnectionManager connections = new TestConnectionManager();
private TestBroadcaster broadcaster = new TestBroadcaster();
private Master master;
-
- public static class UnreachableClient implements ClientService {
- @Override
- public void setState(String component, String data, long revision)
- throws Exception {
- throw new Exception("Unreachable client");
- }
-
- @Override
- public void masterTakeover(String masterUrl, String networkName,
- int masterId, String masterLocation)
- throws Exception {
- throw new Exception("Unreachable client");
- }
-
- @Override
- public void masterDown(int masterId) throws Exception {
- throw new Exception("Unreachable client");
- }
- }
@Before
public void setUp() {