summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-27 16:08:54 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-02-27 16:08:54 +0100
commitc58c649531e4884a92f07d9b12f0ad77fbcf5d6a (patch)
tree38d2997ec6cdaed9120dba012b61afbb8b2e53ea
parentee51a98e9b61336610d667de57a8cff70d0ac1d9 (diff)
Clean up source code.
Clean up source code with Eclipse. In particular remove tabs.
-rw-r--r--same/src/main/java/com/orbekk/net/BroadcastListener.java10
-rw-r--r--same/src/main/java/com/orbekk/net/Broadcaster.java7
-rw-r--r--same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java16
-rw-r--r--same/src/main/java/com/orbekk/paxos/MasterProposer.java12
-rw-r--r--same/src/main/java/com/orbekk/paxos/PaxosService.java2
-rw-r--r--same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java4
-rw-r--r--same/src/main/java/com/orbekk/same/App.java2
-rw-r--r--same/src/main/java/com/orbekk/same/BroadcasterImpl.java3
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java54
-rw-r--r--same/src/main/java/com/orbekk/same/ClientService.java4
-rw-r--r--same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java8
-rw-r--r--same/src/main/java/com/orbekk/same/DiscoveryService.java11
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java20
-rw-r--r--same/src/main/java/com/orbekk/same/SameController.java46
-rw-r--r--same/src/main/java/com/orbekk/same/SameInterface.java8
-rw-r--r--same/src/main/java/com/orbekk/same/State.java54
-rw-r--r--same/src/main/java/com/orbekk/same/TestBroadcaster.java1
-rw-r--r--same/src/main/java/com/orbekk/same/TestConnectionManager.java7
-rw-r--r--same/src/main/java/com/orbekk/same/Variable.java2
-rw-r--r--same/src/main/java/com/orbekk/same/VariableFactory.java14
-rw-r--r--same/src/main/java/com/orbekk/same/config/Configuration.java16
-rw-r--r--same/src/main/java/com/orbekk/same/http/RpcServlet.java4
-rw-r--r--same/src/main/java/com/orbekk/same/http/ServerBuilder.java8
-rw-r--r--same/src/main/java/com/orbekk/same/http/ServerContainer.java16
-rw-r--r--same/src/main/java/com/orbekk/same/http/StateServlet.java16
-rw-r--r--same/src/main/java/com/orbekk/util/DelayedOperation.java190
-rw-r--r--same/src/main/java/com/orbekk/util/WorkQueue.java14
27 files changed, 278 insertions, 271 deletions
diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java
index 50ad5af..df8c02e 100644
--- a/same/src/main/java/com/orbekk/net/BroadcastListener.java
+++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java
@@ -13,11 +13,11 @@ public class BroadcastListener {
private int port;
private Logger logger = LoggerFactory.getLogger(getClass());
DatagramSocket socket;
-
+
public BroadcastListener(int port) {
this.port = port;
}
-
+
public synchronized DatagramPacket listen() {
logger.debug("Waiting for broadcast on port " + port);
try {
@@ -43,17 +43,17 @@ public class BroadcastListener {
logger.warn("Exception when listening for broadcast: {}", e);
return null;
}
-
+
String address = packet.getAddress().getHostAddress();
logger.debug("Received broadcast from " + address +
": " + new String(packet.getData(), 0, packet.getLength()));
return packet;
}
-
+
public void interrupt() {
socket.close();
}
-
+
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
BroadcastListener listener = new BroadcastListener(port);
diff --git a/same/src/main/java/com/orbekk/net/Broadcaster.java b/same/src/main/java/com/orbekk/net/Broadcaster.java
index b3e4860..cfec0ae 100644
--- a/same/src/main/java/com/orbekk/net/Broadcaster.java
+++ b/same/src/main/java/com/orbekk/net/Broadcaster.java
@@ -17,10 +17,10 @@ import org.slf4j.LoggerFactory;
public class Broadcaster implements BroadcasterInterface {
private Logger logger = LoggerFactory.getLogger(getClass());
-
+
public List<InetAddress> getBroadcastAddresses() {
List<InetAddress> broadcastAddresses = new LinkedList<InetAddress>();
-
+
Enumeration<NetworkInterface> interfaces;
try {
interfaces = NetworkInterface.getNetworkInterfaces();
@@ -48,6 +48,7 @@ public class Broadcaster implements BroadcasterInterface {
return broadcastAddresses;
}
+ @Override
public boolean sendBroadcast(int port, byte[] data) {
boolean successful = false;
for (InetAddress broadcastAddress : getBroadcastAddresses()) {
@@ -67,7 +68,7 @@ public class Broadcaster implements BroadcasterInterface {
}
return successful;
}
-
+
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
Broadcaster broadcaster = new Broadcaster();
diff --git a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java b/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java
index f4973f2..f83bb41 100644
--- a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java
+++ b/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java
@@ -31,9 +31,9 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient {
private URL serviceUrl;
private JsonRpcClient rpcClient;
private HttpClient httpClient;
-
+
public MyJsonRpcHttpClient(URL serviceUrl, int connectionTimeout,
- int readTimeout) {
+ int readTimeout) {
super(null);
httpClient = new DefaultHttpClient();
HttpParams params = httpClient.getParams();
@@ -41,8 +41,8 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient {
HttpConnectionParams.setSoTimeout(params, readTimeout);
rpcClient = new JsonRpcClient();
this.serviceUrl = serviceUrl;
- }
-
+ }
+
@Override
public synchronized Object invoke(
final String methodName, final Object[] arguments, Type returnType,
@@ -59,15 +59,15 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient {
}
});
entity.setContentType("application/json-rpc");
-
+
HttpPost post = new HttpPost(serviceUrl.toString());
-
+
for (Map.Entry<String, String> entry : extraHeaders.entrySet()) {
post.addHeader(entry.getKey(), entry.getValue());
}
-
+
post.setEntity(entity);
-
+
HttpResponse response = httpClient.execute(post);
HttpEntity responseEntity = response.getEntity();
diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
index effe6df..5eb7ebb 100644
--- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java
+++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java
@@ -12,14 +12,14 @@ public class MasterProposer {
private String myUrl;
private List<String> paxosUrls = new ArrayList<String>();
private ConnectionManager connections;
-
+
public MasterProposer(String clientUrl, List<String> paxosUrls,
ConnectionManager connections) {
this.myUrl = clientUrl;
this.paxosUrls = paxosUrls;
this.connections = connections;
}
-
+
private int internalPropose(int proposalNumber) {
int bestPromise = -proposalNumber;
int promises = 0;
@@ -43,7 +43,7 @@ public class MasterProposer {
return bestPromise;
}
}
-
+
private int internalAcceptRequest(int proposalNumber) {
int bestAccepted = -proposalNumber;
int accepts = 0;
@@ -79,11 +79,11 @@ public class MasterProposer {
return false;
}
}
-
+
public boolean proposeRetry(int proposalNumber) {
int nextProposal = proposalNumber;
int result = 0;
-
+
while (result != nextProposal) {
result = internalPropose(nextProposal);
if (result == nextProposal) {
@@ -94,7 +94,7 @@ public class MasterProposer {
nextProposal = -result + 1;
}
}
-
+
return true;
}
}
diff --git a/same/src/main/java/com/orbekk/paxos/PaxosService.java b/same/src/main/java/com/orbekk/paxos/PaxosService.java
index 8de02da..a6f6b08 100644
--- a/same/src/main/java/com/orbekk/paxos/PaxosService.java
+++ b/same/src/main/java/com/orbekk/paxos/PaxosService.java
@@ -1,7 +1,7 @@
package com.orbekk.paxos;
public interface PaxosService {
-
+
/**
* @return N == proposalNumber if a promise is made.
* -M if another promise already was made, where M is the promise
diff --git a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
index 3ecf523..58426bf 100644
--- a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
+++ b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java
@@ -28,7 +28,7 @@ public class PaxosServiceImpl implements PaxosService {
logger.info(tag + "propose({}, {}) = rejected " +
"(promised: {})",
new Object[]{clientUrl, proposalNumber,
- highestPromise});
+ highestPromise});
return -highestPromise;
}
}
@@ -45,7 +45,7 @@ public class PaxosServiceImpl implements PaxosService {
logger.info(tag + "acceptRequest({}, {}) = rejected " +
"(promise={})",
new Object[]{clientUrl, proposalNumber,
- highestPromise});
+ highestPromise});
return -highestPromise;
}
}
diff --git a/same/src/main/java/com/orbekk/same/App.java b/same/src/main/java/com/orbekk/same/App.java
index 35e408c..36429cf 100644
--- a/same/src/main/java/com/orbekk/same/App.java
+++ b/same/src/main/java/com/orbekk/same/App.java
@@ -20,7 +20,7 @@ public class App {
logger.error("Error in App.", e);
}
}
-
+
public static void main(String[] args) {
new App().run(args);
}
diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
index 5efdcf3..27b8539 100644
--- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
+++ b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java
@@ -13,11 +13,12 @@ public class BroadcasterImpl implements Broadcaster {
public static BroadcasterImpl getDefaultBroadcastRunner() {
return new BroadcasterImpl(Executors.newFixedThreadPool(20));
}
-
+
public BroadcasterImpl(Executor executor) {
this.executor = executor;
}
+ @Override
public synchronized void broadcast(final List<String> targets,
final ServiceOperation operation) {
for (final String t : targets) {
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index d29147b..5b361ec 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -20,18 +20,18 @@ public class Client implements DiscoveryListener {
private List<StateChangedListener> stateListeners =
new ArrayList<StateChangedListener>();
private NetworkNotificationListener networkListener;
-
+
public class ClientInterfaceImpl implements ClientInterface {
private ClientInterfaceImpl() {
}
-
+
/** Get a copy of all the client state.
*/
@Override
public State getState() {
return new State(state);
}
-
+
public void set(String name, String data, long revision)
throws UpdateConflict {
String masterUrl = state.getDataOf(".masterUrl");
@@ -48,26 +48,26 @@ public class Client implements DiscoveryListener {
throw new UpdateConflict("Unable to contact master. Update fails.");
}
}
-
+
@Override
public void set(Component component) throws UpdateConflict {
set(component.getName(), component.getData(),
component.getRevision());
}
-
+
@Override
public void addStateListener(StateChangedListener listener) {
stateListeners.add(listener);
}
-
+
@Override
public void removeStateListener(StateChangedListener listener) {
stateListeners.remove(listener);
}
}
-
+
private ClientInterface clientInterface = new ClientInterfaceImpl();
-
+
private ClientService serviceImpl = new ClientService() {
@Override
public void setState(String component, String data, long revision) throws Exception {
@@ -81,7 +81,7 @@ public class Client implements DiscoveryListener {
new State.Component(component, revision, data));
}
}
-
+
@Override
public void notifyNetwork(String networkName, String masterUrl) throws Exception {
logger.info("NotifyNetwork(networkName={}, masterUrl={})",
@@ -90,13 +90,13 @@ public class Client implements DiscoveryListener {
networkListener.notifyNetwork(networkName, masterUrl);
}
}
-
+
@Override
public void discoveryRequest(String remoteUrl) {
discoveryThread.add(remoteUrl);
}
};
-
+
private WorkQueue<String> discoveryThread = new WorkQueue<String>() {
@Override protected void onChange() {
List<String> pending = getAndClear();
@@ -105,7 +105,7 @@ public class Client implements DiscoveryListener {
}
}
};
-
+
public Client(State state, ConnectionManager connections,
String myUrl) {
this.state = state;
@@ -116,7 +116,7 @@ public class Client implements DiscoveryListener {
public void start() {
discoveryThread.start();
}
-
+
public void interrupt() {
discoveryThread.interrupt();
}
@@ -124,7 +124,7 @@ public class Client implements DiscoveryListener {
public String getUrl() {
return myUrl;
}
-
+
public void joinNetwork(String masterUrl) {
logger.info("joinNetwork({})", masterUrl);
MasterService master = connections.getMaster(masterUrl);
@@ -135,19 +135,19 @@ public class Client implements DiscoveryListener {
logger.error("Unable to connect to master.", e);
}
}
-
+
ClientInterface getInterface() {
return clientInterface;
}
-
+
String lib_get(String name) {
return state.getDataOf(name);
}
-
+
<T> T lib_get(String name, TypeReference<T> type) {
return state.getParsedData(name, type);
}
-
+
void lib_set(String name, String data) throws UpdateConflict {
String masterUrl = state.getDataOf(".masterUrl");
long revision = state.getRevision(name) + 1;
@@ -164,11 +164,11 @@ public class Client implements DiscoveryListener {
throw new UpdateConflict("Unable to contact master. Update fails.");
}
}
-
+
public State.Component getState(String name) {
return state.getComponent(name);
}
-
+
State testGetState() {
return state;
}
@@ -176,17 +176,17 @@ public class Client implements DiscoveryListener {
public void setNetworkListener(NetworkNotificationListener listener) {
this.networkListener = listener;
}
-
+
public void sendDiscoveryRequest(String url) {
try {
connections.getClient(url)
- .discoveryRequest(myUrl);
+ .discoveryRequest(myUrl);
} catch (Exception e) {
logger.warn("Failed to send discovery request: {}",
throwableToString(e));
}
}
-
+
@Override
public void discover(String url) {
String networkName = state.getDataOf(".networkName");
@@ -197,19 +197,19 @@ public class Client implements DiscoveryListener {
logger.info("Ignoring broadcast to .Private network.");
return;
}
-
+
if (!url.equals(myUrl)) {
try {
connections.getClient(url)
- .notifyNetwork(state.getDataOf(".networkName"),
- state.getDataOf(".masterUrl"));
+ .notifyNetwork(state.getDataOf(".networkName"),
+ state.getDataOf(".masterUrl"));
} catch (Exception e) {
logger.warn("Failed to contact new client {}: {}", url,
throwableToString(e));
}
}
}
-
+
public ClientService getService() {
return serviceImpl;
}
diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java
index 3992e4c..04f423d 100644
--- a/same/src/main/java/com/orbekk/same/ClientService.java
+++ b/same/src/main/java/com/orbekk/same/ClientService.java
@@ -2,9 +2,9 @@ package com.orbekk.same;
public interface ClientService {
void notifyNetwork(String networkName, String masterUrl) throws Exception;
-
+
void setState(String component, String data, long revision) throws Exception;
-
+
// Manual discovery request by client.
void discoveryRequest(String remoteUrl) throws Exception;
}
diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
index 91c83e5..619ac27 100644
--- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
+++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java
@@ -17,7 +17,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
private int readTimeout;
private Map<String, MyJsonRpcHttpClient> connectionCache =
new HashMap<String, MyJsonRpcHttpClient>();
-
+
private Logger logger = LoggerFactory.getLogger(getClass());
/**
@@ -33,11 +33,11 @@ public class ConnectionManagerImpl implements ConnectionManager {
throws MalformedURLException {
if (!connectionCache.containsKey(url)) {
connectionCache.put(url, new MyJsonRpcHttpClient(new URL(url),
- connectionTimeout, readTimeout));
+ connectionTimeout, readTimeout));
}
return connectionCache.get(url);
}
-
+
private <T>T getClassProxy(String url, Class<T> clazz) {
T service = null;
try {
@@ -61,7 +61,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
public MasterService getMaster(String url) {
return getClassProxy(url, MasterService.class);
}
-
+
@Override
public PaxosService getPaxos(String url) {
return getClassProxy(url, PaxosService.class);
diff --git a/same/src/main/java/com/orbekk/same/DiscoveryService.java b/same/src/main/java/com/orbekk/same/DiscoveryService.java
index d9af9bb..1987c69 100644
--- a/same/src/main/java/com/orbekk/same/DiscoveryService.java
+++ b/same/src/main/java/com/orbekk/same/DiscoveryService.java
@@ -11,13 +11,14 @@ public class DiscoveryService extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
BroadcastListener broadcastListener;
DiscoveryListener listener;
-
+
public DiscoveryService(DiscoveryListener listener,
BroadcastListener broadcastListener) {
this.listener = listener;
this.broadcastListener = broadcastListener;
}
-
+
+ @Override
public void run() {
logger.info("DiscoveryService starting.");
while (!Thread.interrupted()) {
@@ -28,12 +29,12 @@ public class DiscoveryService extends Thread {
}
String content = new String(packet.getData(), 0, packet.getLength());
String[] words = content.split(" ");
-
+
if (!content.startsWith("Discover") || words.length < 2) {
logger.warn("Invalid discovery message: {}", content);
continue;
}
-
+
String url = words[1];
logger.info("Received discovery from {}", url);
if (listener != null) {
@@ -42,7 +43,7 @@ public class DiscoveryService extends Thread {
}
logger.info("DiscoveryService stopped.");
}
-
+
@Override public void interrupt() {
logger.info("Interrupt()");
super.interrupt();
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index 77f7496..b491313 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -115,16 +115,16 @@ public class Master {
final List<State.Component> components) {
broadcaster.broadcast(destinations, new ServiceOperation() {
@Override public void run(String url) {
- ClientService client = connections.getClient(url);
- try {
- for (Component c : components) {
- client.setState(c.getName(), c.getData(),
- c.getRevision());
- }
- } catch (Exception e) {
- logger.info("Client {} failed to receive state update.", url);
- removeParticipant(url);
- }
+ ClientService client = connections.getClient(url);
+ try {
+ for (Component c : components) {
+ client.setState(c.getName(), c.getData(),
+ c.getRevision());
+ }
+ } catch (Exception e) {
+ logger.info("Client {} failed to receive state update.", url);
+ removeParticipant(url);
+ }
}
});
}
diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java
index 8141e5f..95db592 100644
--- a/same/src/main/java/com/orbekk/same/SameController.java
+++ b/same/src/main/java/com/orbekk/same/SameController.java
@@ -23,12 +23,12 @@ public class SameController {
private DiscoveryService discoveryService;
private BroadcasterFactory broadcasterFactory;
private Configuration configuration;
-
+
/**
* Timeout for remote operations in milliseconds.
*/
private static final int timeout = 10000;
-
+
public static SameController create(BroadcasterFactory broadcasterFactory,
Configuration configuration) {
int port = configuration.getInt("port");
@@ -36,47 +36,47 @@ public class SameController {
timeout, timeout);
State clientState = new State(".InvalidClientNetwork");
Broadcaster broadcaster = BroadcasterImpl.getDefaultBroadcastRunner();
-
+
String baseUrl = String.format("http://%s:%s/",
configuration.get("localIp"), configuration.getInt("port"));
-
+
String masterUrl = baseUrl + "MasterService.json";
String clientUrl = baseUrl + "ClientService.json";
-
+
Master master = Master.create(connections, broadcaster,
masterUrl, configuration.get("networkName"));
-
+
Client client = new Client(clientState, connections,
clientUrl);
PaxosServiceImpl paxos = new PaxosServiceImpl("");
-
+
DiscoveryService discoveryService = null;
if ("true".equals(configuration.get("enableDiscovery"))) {
BroadcastListener broadcastListener = new BroadcastListener(
configuration.getInt("discoveryPort"));
discoveryService = new DiscoveryService(client, broadcastListener);
}
-
+
StateServlet stateServlet = new StateServlet(client.getInterface(),
new VariableFactory(client.getInterface()));
-
+
ServerContainer server = new ServerBuilder(port)
- .withServlet(stateServlet, "/_/state")
- .withService(client.getService(), ClientService.class)
- .withService(master.getService(), MasterService.class)
- .withService(paxos, PaxosService.class)
- .build();
+ .withServlet(stateServlet, "/_/state")
+ .withService(client.getService(), ClientService.class)
+ .withService(master.getService(), MasterService.class)
+ .withService(paxos, PaxosService.class)
+ .build();
SameController controller = new SameController(
configuration, server, master, client,
paxos, discoveryService, broadcasterFactory);
return controller;
}
-
+
public static SameController create(Configuration configuration) {
return create(new DefaultBroadcasterFactory(), configuration);
}
-
+
public SameController(
Configuration configuration,
ServerContainer server,
@@ -102,7 +102,7 @@ public class SameController {
discoveryService.start();
}
}
-
+
public void stop() {
try {
client.interrupt();
@@ -115,7 +115,7 @@ public class SameController {
logger.error("Failed to stop webserver", e);
}
}
-
+
public void join() {
try {
server.join();
@@ -132,26 +132,26 @@ public class SameController {
}
}
}
-
+
public void searchNetworks() {
BroadcasterInterface broadcaster = broadcasterFactory.create();
String message = "Discover " + client.getUrl();
broadcaster.sendBroadcast(configuration.getInt("discoveryPort"),
message.getBytes());
}
-
+
public void joinNetwork(String url) {
client.joinNetwork(url);
}
-
+
public Client getClient() {
return client;
}
-
+
public Master getMaster() {
return master;
}
-
+
public VariableFactory createVariableFactory() {
return new VariableFactory(client.getInterface());
}
diff --git a/same/src/main/java/com/orbekk/same/SameInterface.java b/same/src/main/java/com/orbekk/same/SameInterface.java
index a0a7894..20d47d0 100644
--- a/same/src/main/java/com/orbekk/same/SameInterface.java
+++ b/same/src/main/java/com/orbekk/same/SameInterface.java
@@ -10,18 +10,18 @@ public interface SameInterface {
* Get the state with identifier 'id'.
*/
String get(String id);
-
+
/**
* Get the state with identifier 'id', converted to a Java
* object of type T using Jackson.
*/
<T> T get(String id, TypeReference<T> type);
-
+
/**
* Get the state with identifier 'id' as a list.
*/
List<String> getList(String id);
-
+
/**
* Set the state.
*
@@ -33,7 +33,7 @@ public interface SameInterface {
* Set from an object: Pass it, e.g., a List<String>.
*/
void setObject(String id, Object data);
-
+
void addStateChangedListener(StateChangedListener listener);
void removeStateChangedListener(StateChangedListener listener);
}
diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java
index 2b48a00..f0fee80 100644
--- a/same/src/main/java/com/orbekk/same/State.java
+++ b/same/src/main/java/com/orbekk/same/State.java
@@ -24,22 +24,22 @@ public class State {
private Map<String, Component> state = new HashMap<String, Component>();
private ObjectMapper mapper = new ObjectMapper();
private Set<String> updatedComponents = new TreeSet<String>();
-
+
public State(String networkName) {
update(".networkName", networkName, 1);
updateFromObject(".participants", new ArrayList<String>(), 1);
}
-
+
public State(State other) {
state.putAll(other.state);
}
-
+
public synchronized void clear() {
logger.info("Clearing state.");
updatedComponents.clear();
state.clear();
}
-
+
public synchronized void forceUpdate(String componentName,
String data, long revision) {
Component oldComponent = state.get(componentName);
@@ -48,7 +48,7 @@ public class State {
state.put(componentName, newComponent);
updatedComponents.add(componentName);
}
-
+
public synchronized boolean update(String componentName, String data,
long revision) {
Component component = null;
@@ -57,7 +57,7 @@ public class State {
} else {
component = state.get(componentName);
}
-
+
if (revision > component.getRevision()) {
Component oldComponent = new Component(component);
component.setName(componentName);
@@ -71,7 +71,7 @@ public class State {
return false;
}
}
-
+
/**
* Get a copy of a component.
*/
@@ -83,7 +83,7 @@ public class State {
return null;
}
}
-
+
public String getDataOf(String componentName) {
Component component = state.get(componentName);
if (component != null) {
@@ -92,7 +92,7 @@ public class State {
return null;
}
}
-
+
public long getRevision(String componentName) {
Component component = state.get(componentName);
if (component != null) {
@@ -103,7 +103,7 @@ public class State {
return 0;
}
}
-
+
/**
* Parses a JSON value using Jackson ObjectMapper.
*/
@@ -126,12 +126,12 @@ public class State {
}
return null;
}
-
+
public List<String> getList(String componentName) {
return getParsedData(componentName,
new TypeReference<List<String>>(){});
}
-
+
public boolean updateFromObject(String componentName, Object data, long revision) {
String dataS;
try {
@@ -151,14 +151,14 @@ public class State {
return false;
}
}
-
+
/**
* Pretty print a component.
*/
public String show(String componentName) {
return componentName + ": " + state.get(componentName);
}
-
+
/**
* Returns a list of all the components in this State.
*
@@ -171,7 +171,7 @@ public class State {
}
return list;
}
-
+
public synchronized List<Component> getAndClearUpdatedComponents() {
List<Component> components = new ArrayList<Component>();
for (String name : updatedComponents) {
@@ -185,7 +185,7 @@ public class State {
private String name;
private long revision;
private String data;
-
+
/**
* Copy constructor.
*/
@@ -194,21 +194,21 @@ public class State {
this.revision = other.revision;
this.data = other.data;
}
-
+
public Component(String name, long revision, String data) {
this.name = name;
this.revision = revision;
this.data = data;
}
-
+
public long getRevision() {
return revision;
}
-
+
public void setRevision(long revision) {
this.revision = revision;
}
-
+
public String getData() {
return data;
}
@@ -216,29 +216,29 @@ public class State {
public void setData(String data) {
this.data = data;
}
-
+
public String getName() {
return name;
}
-
+
public void setName(String name) {
this.name = name;
}
-
+
@Override public String toString() {
return "[" + this.name + ": " + this.data + "@" + revision + "]";
}
-
+
@Override public boolean equals(Object other) {
if (!(other instanceof Component)) {
return false;
}
Component o = (Component)other;
return name.equals(o.name) && data.equals(o.data) &&
- revision == o.revision;
+ revision == o.revision;
}
}
-
+
@Override public String toString() {
StringBuilder output = new StringBuilder();
output.append("State(\n");
@@ -248,7 +248,7 @@ public class State {
output.append(")");
return output.toString();
}
-
+
@Override public boolean equals(Object other) {
if (!(other instanceof State)) {
return false;
diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
index b2f9d8c..bac4742 100644
--- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java
+++ b/same/src/main/java/com/orbekk/same/TestBroadcaster.java
@@ -10,6 +10,7 @@ public class TestBroadcaster implements Broadcaster {
public TestBroadcaster() {
}
+ @Override
public void broadcast(final List<String> targets,
final ServiceOperation operation) {
for (String t : targets) {
diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
index 3439a78..25a3ee6 100644
--- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java
+++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java
@@ -10,7 +10,7 @@ import com.orbekk.paxos.PaxosService;
*/
public class TestConnectionManager implements ConnectionManager {
public Map<String, ClientService> clientMap =
- new HashMap<String, ClientService>();
+ new HashMap<String, ClientService>();
public Map<String, MasterService> masterMap =
new HashMap<String, MasterService>();
public Map<String, PaxosService> paxosMap =
@@ -19,14 +19,17 @@ public class TestConnectionManager implements ConnectionManager {
public TestConnectionManager() {
}
+ @Override
public ClientService getClient(String url) {
return clientMap.get(url);
}
+ @Override
public MasterService getMaster(String url) {
return masterMap.get(url);
}
-
+
+ @Override
public PaxosService getPaxos(String url) {
return paxosMap.get(url);
}
diff --git a/same/src/main/java/com/orbekk/same/Variable.java b/same/src/main/java/com/orbekk/same/Variable.java
index b6d3b7f..bea8664 100644
--- a/same/src/main/java/com/orbekk/same/Variable.java
+++ b/same/src/main/java/com/orbekk/same/Variable.java
@@ -9,7 +9,7 @@ public interface Variable<T> {
*/
void valueChanged(Variable<T> variable);
}
-
+
T get();
void set(T value) throws UpdateConflict;
void update();
diff --git a/same/src/main/java/com/orbekk/same/VariableFactory.java b/same/src/main/java/com/orbekk/same/VariableFactory.java
index 3f3b19c..f1796d6 100644
--- a/same/src/main/java/com/orbekk/same/VariableFactory.java
+++ b/same/src/main/java/com/orbekk/same/VariableFactory.java
@@ -18,14 +18,14 @@ public class VariableFactory {
private Logger logger = LoggerFactory.getLogger(getClass());
private ClientInterface client;
private ObjectMapper mapper = new ObjectMapper();
-
+
private class VariableImpl<T> implements Variable<T>, StateChangedListener {
String identifier;
TypeReference<T> type;
T value;
long revision = 0;
OnChangeListener<T> listener = null;
-
+
public VariableImpl(String identifier, TypeReference<T> type) {
this.identifier = identifier;
this.type = type;
@@ -41,7 +41,7 @@ public class VariableFactory {
try {
String serializedValue = mapper.writeValueAsString(value);
State.Component update = new State.Component(identifier,
- revision, serializedValue);
+ revision, serializedValue);
client.set(update);
} catch (JsonGenerationException e) {
logger.warn("Failed to convert to JSON: {}", value);
@@ -78,22 +78,22 @@ public class VariableFactory {
}
}
}
-
+
public static VariableFactory create(ClientInterface client) {
return new VariableFactory(client);
}
-
+
VariableFactory(ClientInterface client) {
this.client = client;
}
-
+
public <T> Variable<T> create(String identifier, TypeReference<T> type) {
VariableImpl<T> variable = new VariableImpl<T>(identifier, type);
variable.update();
client.addStateListener(variable);
return variable;
}
-
+
public Variable<String> createString(String identifier) {
return create(identifier, new TypeReference<String>() {});
}
diff --git a/same/src/main/java/com/orbekk/same/config/Configuration.java b/same/src/main/java/com/orbekk/same/config/Configuration.java
index 75e6ada..82148c9 100644
--- a/same/src/main/java/com/orbekk/same/config/Configuration.java
+++ b/same/src/main/java/com/orbekk/same/config/Configuration.java
@@ -11,18 +11,18 @@ import org.slf4j.LoggerFactory;
public class Configuration {
public final static String configurationProperty =
"com.orbekk.same.config.file";
-
+
static final Logger logger = LoggerFactory.getLogger(Configuration.class);
Properties configuration = new Properties();
-
+
public Configuration(Properties properties) {
this.configuration = properties;
}
-
+
Configuration() {
// Use factory methods.
}
-
+
public static Configuration loadOrDie() {
Configuration configuration = new Configuration();
boolean status = configuration.loadDefault();
@@ -32,13 +32,13 @@ public class Configuration {
}
return configuration;
}
-
+
public static Configuration load() {
Configuration configuration = new Configuration();
configuration.loadDefault();
return configuration;
}
-
+
public boolean loadDefault() {
String filename = System.getProperty(configurationProperty);
if (filename != null) {
@@ -60,7 +60,7 @@ public class Configuration {
}
return false;
}
-
+
public String get(String name) {
String value = configuration.getProperty(name);
if (value == null) {
@@ -68,7 +68,7 @@ public class Configuration {
}
return value;
}
-
+
public Integer getInt(String name) {
if (get(name) == null) {
return null;
diff --git a/same/src/main/java/com/orbekk/same/http/RpcServlet.java b/same/src/main/java/com/orbekk/same/http/RpcServlet.java
index 9b4f82e..9450d67 100644
--- a/same/src/main/java/com/orbekk/same/http/RpcServlet.java
+++ b/same/src/main/java/com/orbekk/same/http/RpcServlet.java
@@ -10,12 +10,12 @@ import com.googlecode.jsonrpc4j.JsonRpcServer;
public class RpcServlet extends HttpServlet {
JsonRpcServer rpcServer;
-
+
public RpcServlet(JsonRpcServer rpcServer) {
super();
this.rpcServer = rpcServer;
}
-
+
@Override
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws IOException {
diff --git a/same/src/main/java/com/orbekk/same/http/ServerBuilder.java b/same/src/main/java/com/orbekk/same/http/ServerBuilder.java
index 65ee670..543366e 100644
--- a/same/src/main/java/com/orbekk/same/http/ServerBuilder.java
+++ b/same/src/main/java/com/orbekk/same/http/ServerBuilder.java
@@ -13,24 +13,24 @@ public class ServerBuilder {
Logger logger = LoggerFactory.getLogger(getClass());
int port;
ServletContextHandler context = null;
-
+
public ServerBuilder(int port) {
this.port = port;
}
-
+
public ServerBuilder withServlet(HttpServlet servlet, String pathSpec) {
logger.info("Servlet binding: {} → {}", pathSpec, servlet);
getServletContextHandler().addServlet(new ServletHolder(servlet),
pathSpec);
return this;
}
-
+
public <T> ServerBuilder withService(T service, Class<T> clazz) {
JsonRpcServer server = new JsonRpcServer(service, clazz);
String pathSpec = "/" + clazz.getSimpleName() + ".json";
return withServlet(new RpcServlet(server), pathSpec);
}
-
+
public ServerContainer build() {
ServerContainer server = ServerContainer.create(port);
server.setReuseAddress(true);
diff --git a/same/src/main/java/com/orbekk/same/http/ServerContainer.java b/same/src/main/java/com/orbekk/same/http/ServerContainer.java
index f91cce1..af577a0 100644
--- a/same/src/main/java/com/orbekk/same/http/ServerContainer.java
+++ b/same/src/main/java/com/orbekk/same/http/ServerContainer.java
@@ -12,23 +12,23 @@ public class ServerContainer {
Server server;
int port;
ServletContextHandler context = null;
-
+
public ServerContainer(Server server, int port, ServletContextHandler context) {
this.server = server;
this.port = port;
this.context = context;
}
-
+
public static ServerContainer create(int port) {
Server server = new Server(port);
return new ServerContainer(server, port, null);
}
-
+
public void setContext(ServletContextHandler context) {
server.setHandler(context);
this.context = context;
}
-
+
public void setReuseAddress(boolean on) {
Connector connector = server.getConnectors()[0];
if (connector instanceof AbstractConnector) {
@@ -36,7 +36,7 @@ public class ServerContainer {
connector_.setReuseAddress(on);
}
}
-
+
public int getPort() {
if (port == 0) {
return server.getConnectors()[0].getLocalPort();
@@ -44,17 +44,17 @@ public class ServerContainer {
return port;
}
}
-
+
public void start() throws Exception {
server.start();
logger.info("Started server on port {}", getPort());
}
-
+
public void stop() throws Exception {
server.stop();
logger.info("Server stopped.");
}
-
+
public void join() throws InterruptedException {
server.join();
}
diff --git a/same/src/main/java/com/orbekk/same/http/StateServlet.java b/same/src/main/java/com/orbekk/same/http/StateServlet.java
index 8b4d8c5..578bdfc 100644
--- a/same/src/main/java/com/orbekk/same/http/StateServlet.java
+++ b/same/src/main/java/com/orbekk/same/http/StateServlet.java
@@ -22,13 +22,13 @@ public class StateServlet extends HttpServlet {
private ClientInterface client;
private VariableFactory variableFactory;
private final static String TITLE = "State viewer";
-
+
public StateServlet(ClientInterface client,
VariableFactory variableFactory) {
this.client = client;
this.variableFactory = variableFactory;
}
-
+
private void handleSetState(HttpServletRequest request,
HttpServletResponse response) throws IOException {
if (request.getParameter("key") == null ||
@@ -36,13 +36,13 @@ public class StateServlet extends HttpServlet {
response.getWriter().println(
"Usage: action=set&key=DesiredKey&value=DesiredValue");
}
-
+
try {
String key = request.getParameter("key");
String value = request.getParameter("value");
Variable<String> variable = variableFactory.createString(key);
variable.set(value);
-
+
response.getWriter().println("Updated component: " +
key + "=" + value);
} catch (UpdateConflict e) {
@@ -50,7 +50,7 @@ public class StateServlet extends HttpServlet {
throwableToString(e));
}
}
-
+
@Override
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws IOException {
@@ -78,7 +78,7 @@ public class StateServlet extends HttpServlet {
w.println(client.getState());
w.println("</pre>");
}
-
+
private void writeSetStateForm(HttpServletResponse response)
throws IOException {
PrintWriter w = response.getWriter();
@@ -90,7 +90,7 @@ public class StateServlet extends HttpServlet {
w.println("<p><input type=\"submit\" value=\"Sumbit\" />");
w.println("</form>");
}
-
+
private void writeHeader(HttpServletResponse response) throws IOException {
PrintWriter w = response.getWriter();
w.println("<html>");
@@ -99,7 +99,7 @@ public class StateServlet extends HttpServlet {
w.println("</head>");
w.println("<body>");
}
-
+
private void writeFooter(HttpServletResponse response) throws IOException {
PrintWriter w = response.getWriter();
w.println("</body>");
diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java
index 2c37d02..cf6ca30 100644
--- a/same/src/main/java/com/orbekk/util/DelayedOperation.java
+++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java
@@ -1,99 +1,99 @@
package com.orbekk.util;
public class DelayedOperation<T> {
- public static class Status {
- public final static int OK = 1;
- public final static int CONFLICT = 2;
- public final static int ERROR = 3;
-
- private int status;
- private String message;
-
- public static Status createOk() {
- return new Status(OK, "");
- }
-
- public static Status createConflict(String message) {
- return new Status(CONFLICT, message);
- }
-
- public static Status createError(String message) {
- return new Status(ERROR, message);
- }
-
- public Status(int status, String message) {
- this.status = status;
- this.message = message;
- }
-
- @Override public String toString() {
- switch(status) {
- case OK:
- return "OK";
- case CONFLICT:
- return "Conflicting update: " + message;
- case ERROR:
- return "Error: " + message;
- }
- throw new AssertionError("Unhandled case.");
- }
-
- @Override public boolean equals(Object other) {
- if (!(other instanceof Status)) {
- return false;
- }
- Status o = (Status)other;
- if (o.status != this.status) {
- return false;
- }
- if (message == null) {
- return o.message == null;
- }
- return message.equals(o.message);
- }
- }
-
- private T argument;
- private Status status;
- private boolean isDone;
- private int identifier;
-
- public DelayedOperation(T argument) {
- this.argument = argument;
- }
-
- public Status getStatus() {
- waitFor();
- return status;
- }
-
- public synchronized void waitFor() {
- while (!isDone) {
- try {
- wait();
- } catch (InterruptedException e) {
- complete(new Status(Status.ERROR, "Thread interrupted."));
- }
- }
- }
-
- public synchronized boolean isDone() {
- return isDone;
- }
-
- public synchronized void complete(Status status) {
- if (!isDone) {
- isDone = true;
- this.status = status;
- notifyAll();
- }
- }
-
- public synchronized int getIdentifier() {
- return identifier;
- }
-
- public synchronized void setIdentifier(int identifier) {
- this.identifier = identifier;
- }
+ public static class Status {
+ public final static int OK = 1;
+ public final static int CONFLICT = 2;
+ public final static int ERROR = 3;
+
+ private int status;
+ private String message;
+
+ public static Status createOk() {
+ return new Status(OK, "");
+ }
+
+ public static Status createConflict(String message) {
+ return new Status(CONFLICT, message);
+ }
+
+ public static Status createError(String message) {
+ return new Status(ERROR, message);
+ }
+
+ public Status(int status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ @Override public String toString() {
+ switch(status) {
+ case OK:
+ return "OK";
+ case CONFLICT:
+ return "Conflicting update: " + message;
+ case ERROR:
+ return "Error: " + message;
+ }
+ throw new AssertionError("Unhandled case.");
+ }
+
+ @Override public boolean equals(Object other) {
+ if (!(other instanceof Status)) {
+ return false;
+ }
+ Status o = (Status)other;
+ if (o.status != this.status) {
+ return false;
+ }
+ if (message == null) {
+ return o.message == null;
+ }
+ return message.equals(o.message);
+ }
+ }
+
+ private T argument;
+ private Status status;
+ private boolean isDone;
+ private int identifier;
+
+ public DelayedOperation(T argument) {
+ this.argument = argument;
+ }
+
+ public Status getStatus() {
+ waitFor();
+ return status;
+ }
+
+ public synchronized void waitFor() {
+ while (!isDone) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ complete(new Status(Status.ERROR, "Thread interrupted."));
+ }
+ }
+ }
+
+ public synchronized boolean isDone() {
+ return isDone;
+ }
+
+ public synchronized void complete(Status status) {
+ if (!isDone) {
+ isDone = true;
+ this.status = status;
+ notifyAll();
+ }
+ }
+
+ public synchronized int getIdentifier() {
+ return identifier;
+ }
+
+ public synchronized void setIdentifier(int identifier) {
+ this.identifier = identifier;
+ }
}
diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java
index 2fb2c88..397c4b8 100644
--- a/same/src/main/java/com/orbekk/util/WorkQueue.java
+++ b/same/src/main/java/com/orbekk/util/WorkQueue.java
@@ -16,21 +16,21 @@ abstract public class WorkQueue<E> extends Thread implements List<E> {
private Logger logger = LoggerFactory.getLogger(getClass());
private volatile List<E> list = null;
private volatile boolean done = false;
-
+
public WorkQueue() {
list = new ArrayList<E>();
}
-
+
public WorkQueue(Collection<? extends E> collection) {
list = new ArrayList<E>(collection);
}
-
+
public synchronized List<E> getAndClear() {
List<E> copy = new ArrayList<E>(list);
list.clear();
return copy;
}
-
+
/**
* OnChange event.
*
@@ -50,7 +50,7 @@ abstract public class WorkQueue<E> extends Thread implements List<E> {
onChange();
}
}
-
+
@Override
public void run() {
while (!done) {
@@ -71,8 +71,8 @@ abstract public class WorkQueue<E> extends Thread implements List<E> {
}
}
}
-
-
+
+
@Override
public synchronized boolean add(E e) {
notifyAll();