From c58c649531e4884a92f07d9b12f0ad77fbcf5d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 27 Feb 2012 16:08:54 +0100 Subject: Clean up source code. Clean up source code with Eclipse. In particular remove tabs. --- .../java/com/orbekk/net/BroadcastListener.java | 10 +- same/src/main/java/com/orbekk/net/Broadcaster.java | 7 +- .../java/com/orbekk/net/MyJsonRpcHttpClient.java | 16 +- .../main/java/com/orbekk/paxos/MasterProposer.java | 12 +- .../main/java/com/orbekk/paxos/PaxosService.java | 2 +- .../java/com/orbekk/paxos/PaxosServiceImpl.java | 4 +- same/src/main/java/com/orbekk/same/App.java | 2 +- .../main/java/com/orbekk/same/BroadcasterImpl.java | 3 +- same/src/main/java/com/orbekk/same/Client.java | 54 +++--- .../main/java/com/orbekk/same/ClientService.java | 4 +- .../com/orbekk/same/ConnectionManagerImpl.java | 8 +- .../java/com/orbekk/same/DiscoveryService.java | 11 +- same/src/main/java/com/orbekk/same/Master.java | 20 +-- .../main/java/com/orbekk/same/SameController.java | 46 ++--- .../main/java/com/orbekk/same/SameInterface.java | 8 +- same/src/main/java/com/orbekk/same/State.java | 54 +++--- .../main/java/com/orbekk/same/TestBroadcaster.java | 1 + .../com/orbekk/same/TestConnectionManager.java | 7 +- same/src/main/java/com/orbekk/same/Variable.java | 2 +- .../main/java/com/orbekk/same/VariableFactory.java | 14 +- .../java/com/orbekk/same/config/Configuration.java | 16 +- .../main/java/com/orbekk/same/http/RpcServlet.java | 4 +- .../java/com/orbekk/same/http/ServerBuilder.java | 8 +- .../java/com/orbekk/same/http/ServerContainer.java | 16 +- .../java/com/orbekk/same/http/StateServlet.java | 16 +- .../java/com/orbekk/util/DelayedOperation.java | 190 ++++++++++----------- same/src/main/java/com/orbekk/util/WorkQueue.java | 14 +- 27 files changed, 278 insertions(+), 271 deletions(-) diff --git a/same/src/main/java/com/orbekk/net/BroadcastListener.java b/same/src/main/java/com/orbekk/net/BroadcastListener.java index 50ad5af..df8c02e 100644 --- a/same/src/main/java/com/orbekk/net/BroadcastListener.java +++ b/same/src/main/java/com/orbekk/net/BroadcastListener.java @@ -13,11 +13,11 @@ public class BroadcastListener { private int port; private Logger logger = LoggerFactory.getLogger(getClass()); DatagramSocket socket; - + public BroadcastListener(int port) { this.port = port; } - + public synchronized DatagramPacket listen() { logger.debug("Waiting for broadcast on port " + port); try { @@ -43,17 +43,17 @@ public class BroadcastListener { logger.warn("Exception when listening for broadcast: {}", e); return null; } - + String address = packet.getAddress().getHostAddress(); logger.debug("Received broadcast from " + address + ": " + new String(packet.getData(), 0, packet.getLength())); return packet; } - + public void interrupt() { socket.close(); } - + public static void main(String[] args) { int port = Integer.parseInt(args[0]); BroadcastListener listener = new BroadcastListener(port); diff --git a/same/src/main/java/com/orbekk/net/Broadcaster.java b/same/src/main/java/com/orbekk/net/Broadcaster.java index b3e4860..cfec0ae 100644 --- a/same/src/main/java/com/orbekk/net/Broadcaster.java +++ b/same/src/main/java/com/orbekk/net/Broadcaster.java @@ -17,10 +17,10 @@ import org.slf4j.LoggerFactory; public class Broadcaster implements BroadcasterInterface { private Logger logger = LoggerFactory.getLogger(getClass()); - + public List getBroadcastAddresses() { List broadcastAddresses = new LinkedList(); - + Enumeration interfaces; try { interfaces = NetworkInterface.getNetworkInterfaces(); @@ -48,6 +48,7 @@ public class Broadcaster implements BroadcasterInterface { return broadcastAddresses; } + @Override public boolean sendBroadcast(int port, byte[] data) { boolean successful = false; for (InetAddress broadcastAddress : getBroadcastAddresses()) { @@ -67,7 +68,7 @@ public class Broadcaster implements BroadcasterInterface { } return successful; } - + public static void main(String[] args) { int port = Integer.parseInt(args[0]); Broadcaster broadcaster = new Broadcaster(); diff --git a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java b/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java index f4973f2..f83bb41 100644 --- a/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java +++ b/same/src/main/java/com/orbekk/net/MyJsonRpcHttpClient.java @@ -31,9 +31,9 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient { private URL serviceUrl; private JsonRpcClient rpcClient; private HttpClient httpClient; - + public MyJsonRpcHttpClient(URL serviceUrl, int connectionTimeout, - int readTimeout) { + int readTimeout) { super(null); httpClient = new DefaultHttpClient(); HttpParams params = httpClient.getParams(); @@ -41,8 +41,8 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient { HttpConnectionParams.setSoTimeout(params, readTimeout); rpcClient = new JsonRpcClient(); this.serviceUrl = serviceUrl; - } - + } + @Override public synchronized Object invoke( final String methodName, final Object[] arguments, Type returnType, @@ -59,15 +59,15 @@ public class MyJsonRpcHttpClient extends JsonRpcHttpClient { } }); entity.setContentType("application/json-rpc"); - + HttpPost post = new HttpPost(serviceUrl.toString()); - + for (Map.Entry entry : extraHeaders.entrySet()) { post.addHeader(entry.getKey(), entry.getValue()); } - + post.setEntity(entity); - + HttpResponse response = httpClient.execute(post); HttpEntity responseEntity = response.getEntity(); diff --git a/same/src/main/java/com/orbekk/paxos/MasterProposer.java b/same/src/main/java/com/orbekk/paxos/MasterProposer.java index effe6df..5eb7ebb 100644 --- a/same/src/main/java/com/orbekk/paxos/MasterProposer.java +++ b/same/src/main/java/com/orbekk/paxos/MasterProposer.java @@ -12,14 +12,14 @@ public class MasterProposer { private String myUrl; private List paxosUrls = new ArrayList(); private ConnectionManager connections; - + public MasterProposer(String clientUrl, List paxosUrls, ConnectionManager connections) { this.myUrl = clientUrl; this.paxosUrls = paxosUrls; this.connections = connections; } - + private int internalPropose(int proposalNumber) { int bestPromise = -proposalNumber; int promises = 0; @@ -43,7 +43,7 @@ public class MasterProposer { return bestPromise; } } - + private int internalAcceptRequest(int proposalNumber) { int bestAccepted = -proposalNumber; int accepts = 0; @@ -79,11 +79,11 @@ public class MasterProposer { return false; } } - + public boolean proposeRetry(int proposalNumber) { int nextProposal = proposalNumber; int result = 0; - + while (result != nextProposal) { result = internalPropose(nextProposal); if (result == nextProposal) { @@ -94,7 +94,7 @@ public class MasterProposer { nextProposal = -result + 1; } } - + return true; } } diff --git a/same/src/main/java/com/orbekk/paxos/PaxosService.java b/same/src/main/java/com/orbekk/paxos/PaxosService.java index 8de02da..a6f6b08 100644 --- a/same/src/main/java/com/orbekk/paxos/PaxosService.java +++ b/same/src/main/java/com/orbekk/paxos/PaxosService.java @@ -1,7 +1,7 @@ package com.orbekk.paxos; public interface PaxosService { - + /** * @return N == proposalNumber if a promise is made. * -M if another promise already was made, where M is the promise diff --git a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java index 3ecf523..58426bf 100644 --- a/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java +++ b/same/src/main/java/com/orbekk/paxos/PaxosServiceImpl.java @@ -28,7 +28,7 @@ public class PaxosServiceImpl implements PaxosService { logger.info(tag + "propose({}, {}) = rejected " + "(promised: {})", new Object[]{clientUrl, proposalNumber, - highestPromise}); + highestPromise}); return -highestPromise; } } @@ -45,7 +45,7 @@ public class PaxosServiceImpl implements PaxosService { logger.info(tag + "acceptRequest({}, {}) = rejected " + "(promise={})", new Object[]{clientUrl, proposalNumber, - highestPromise}); + highestPromise}); return -highestPromise; } } diff --git a/same/src/main/java/com/orbekk/same/App.java b/same/src/main/java/com/orbekk/same/App.java index 35e408c..36429cf 100644 --- a/same/src/main/java/com/orbekk/same/App.java +++ b/same/src/main/java/com/orbekk/same/App.java @@ -20,7 +20,7 @@ public class App { logger.error("Error in App.", e); } } - + public static void main(String[] args) { new App().run(args); } diff --git a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java index 5efdcf3..27b8539 100644 --- a/same/src/main/java/com/orbekk/same/BroadcasterImpl.java +++ b/same/src/main/java/com/orbekk/same/BroadcasterImpl.java @@ -13,11 +13,12 @@ public class BroadcasterImpl implements Broadcaster { public static BroadcasterImpl getDefaultBroadcastRunner() { return new BroadcasterImpl(Executors.newFixedThreadPool(20)); } - + public BroadcasterImpl(Executor executor) { this.executor = executor; } + @Override public synchronized void broadcast(final List targets, final ServiceOperation operation) { for (final String t : targets) { diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index d29147b..5b361ec 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -20,18 +20,18 @@ public class Client implements DiscoveryListener { private List stateListeners = new ArrayList(); private NetworkNotificationListener networkListener; - + public class ClientInterfaceImpl implements ClientInterface { private ClientInterfaceImpl() { } - + /** Get a copy of all the client state. */ @Override public State getState() { return new State(state); } - + public void set(String name, String data, long revision) throws UpdateConflict { String masterUrl = state.getDataOf(".masterUrl"); @@ -48,26 +48,26 @@ public class Client implements DiscoveryListener { throw new UpdateConflict("Unable to contact master. Update fails."); } } - + @Override public void set(Component component) throws UpdateConflict { set(component.getName(), component.getData(), component.getRevision()); } - + @Override public void addStateListener(StateChangedListener listener) { stateListeners.add(listener); } - + @Override public void removeStateListener(StateChangedListener listener) { stateListeners.remove(listener); } } - + private ClientInterface clientInterface = new ClientInterfaceImpl(); - + private ClientService serviceImpl = new ClientService() { @Override public void setState(String component, String data, long revision) throws Exception { @@ -81,7 +81,7 @@ public class Client implements DiscoveryListener { new State.Component(component, revision, data)); } } - + @Override public void notifyNetwork(String networkName, String masterUrl) throws Exception { logger.info("NotifyNetwork(networkName={}, masterUrl={})", @@ -90,13 +90,13 @@ public class Client implements DiscoveryListener { networkListener.notifyNetwork(networkName, masterUrl); } } - + @Override public void discoveryRequest(String remoteUrl) { discoveryThread.add(remoteUrl); } }; - + private WorkQueue discoveryThread = new WorkQueue() { @Override protected void onChange() { List pending = getAndClear(); @@ -105,7 +105,7 @@ public class Client implements DiscoveryListener { } } }; - + public Client(State state, ConnectionManager connections, String myUrl) { this.state = state; @@ -116,7 +116,7 @@ public class Client implements DiscoveryListener { public void start() { discoveryThread.start(); } - + public void interrupt() { discoveryThread.interrupt(); } @@ -124,7 +124,7 @@ public class Client implements DiscoveryListener { public String getUrl() { return myUrl; } - + public void joinNetwork(String masterUrl) { logger.info("joinNetwork({})", masterUrl); MasterService master = connections.getMaster(masterUrl); @@ -135,19 +135,19 @@ public class Client implements DiscoveryListener { logger.error("Unable to connect to master.", e); } } - + ClientInterface getInterface() { return clientInterface; } - + String lib_get(String name) { return state.getDataOf(name); } - + T lib_get(String name, TypeReference type) { return state.getParsedData(name, type); } - + void lib_set(String name, String data) throws UpdateConflict { String masterUrl = state.getDataOf(".masterUrl"); long revision = state.getRevision(name) + 1; @@ -164,11 +164,11 @@ public class Client implements DiscoveryListener { throw new UpdateConflict("Unable to contact master. Update fails."); } } - + public State.Component getState(String name) { return state.getComponent(name); } - + State testGetState() { return state; } @@ -176,17 +176,17 @@ public class Client implements DiscoveryListener { public void setNetworkListener(NetworkNotificationListener listener) { this.networkListener = listener; } - + public void sendDiscoveryRequest(String url) { try { connections.getClient(url) - .discoveryRequest(myUrl); + .discoveryRequest(myUrl); } catch (Exception e) { logger.warn("Failed to send discovery request: {}", throwableToString(e)); } } - + @Override public void discover(String url) { String networkName = state.getDataOf(".networkName"); @@ -197,19 +197,19 @@ public class Client implements DiscoveryListener { logger.info("Ignoring broadcast to .Private network."); return; } - + if (!url.equals(myUrl)) { try { connections.getClient(url) - .notifyNetwork(state.getDataOf(".networkName"), - state.getDataOf(".masterUrl")); + .notifyNetwork(state.getDataOf(".networkName"), + state.getDataOf(".masterUrl")); } catch (Exception e) { logger.warn("Failed to contact new client {}: {}", url, throwableToString(e)); } } } - + public ClientService getService() { return serviceImpl; } diff --git a/same/src/main/java/com/orbekk/same/ClientService.java b/same/src/main/java/com/orbekk/same/ClientService.java index 3992e4c..04f423d 100644 --- a/same/src/main/java/com/orbekk/same/ClientService.java +++ b/same/src/main/java/com/orbekk/same/ClientService.java @@ -2,9 +2,9 @@ package com.orbekk.same; public interface ClientService { void notifyNetwork(String networkName, String masterUrl) throws Exception; - + void setState(String component, String data, long revision) throws Exception; - + // Manual discovery request by client. void discoveryRequest(String remoteUrl) throws Exception; } diff --git a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java index 91c83e5..619ac27 100644 --- a/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java +++ b/same/src/main/java/com/orbekk/same/ConnectionManagerImpl.java @@ -17,7 +17,7 @@ public class ConnectionManagerImpl implements ConnectionManager { private int readTimeout; private Map connectionCache = new HashMap(); - + private Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -33,11 +33,11 @@ public class ConnectionManagerImpl implements ConnectionManager { throws MalformedURLException { if (!connectionCache.containsKey(url)) { connectionCache.put(url, new MyJsonRpcHttpClient(new URL(url), - connectionTimeout, readTimeout)); + connectionTimeout, readTimeout)); } return connectionCache.get(url); } - + private T getClassProxy(String url, Class clazz) { T service = null; try { @@ -61,7 +61,7 @@ public class ConnectionManagerImpl implements ConnectionManager { public MasterService getMaster(String url) { return getClassProxy(url, MasterService.class); } - + @Override public PaxosService getPaxos(String url) { return getClassProxy(url, PaxosService.class); diff --git a/same/src/main/java/com/orbekk/same/DiscoveryService.java b/same/src/main/java/com/orbekk/same/DiscoveryService.java index d9af9bb..1987c69 100644 --- a/same/src/main/java/com/orbekk/same/DiscoveryService.java +++ b/same/src/main/java/com/orbekk/same/DiscoveryService.java @@ -11,13 +11,14 @@ public class DiscoveryService extends Thread { private Logger logger = LoggerFactory.getLogger(getClass()); BroadcastListener broadcastListener; DiscoveryListener listener; - + public DiscoveryService(DiscoveryListener listener, BroadcastListener broadcastListener) { this.listener = listener; this.broadcastListener = broadcastListener; } - + + @Override public void run() { logger.info("DiscoveryService starting."); while (!Thread.interrupted()) { @@ -28,12 +29,12 @@ public class DiscoveryService extends Thread { } String content = new String(packet.getData(), 0, packet.getLength()); String[] words = content.split(" "); - + if (!content.startsWith("Discover") || words.length < 2) { logger.warn("Invalid discovery message: {}", content); continue; } - + String url = words[1]; logger.info("Received discovery from {}", url); if (listener != null) { @@ -42,7 +43,7 @@ public class DiscoveryService extends Thread { } logger.info("DiscoveryService stopped."); } - + @Override public void interrupt() { logger.info("Interrupt()"); super.interrupt(); diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 77f7496..b491313 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -115,16 +115,16 @@ public class Master { final List components) { broadcaster.broadcast(destinations, new ServiceOperation() { @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - for (Component c : components) { - client.setState(c.getName(), c.getData(), - c.getRevision()); - } - } catch (Exception e) { - logger.info("Client {} failed to receive state update.", url); - removeParticipant(url); - } + ClientService client = connections.getClient(url); + try { + for (Component c : components) { + client.setState(c.getName(), c.getData(), + c.getRevision()); + } + } catch (Exception e) { + logger.info("Client {} failed to receive state update.", url); + removeParticipant(url); + } } }); } diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index 8141e5f..95db592 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -23,12 +23,12 @@ public class SameController { private DiscoveryService discoveryService; private BroadcasterFactory broadcasterFactory; private Configuration configuration; - + /** * Timeout for remote operations in milliseconds. */ private static final int timeout = 10000; - + public static SameController create(BroadcasterFactory broadcasterFactory, Configuration configuration) { int port = configuration.getInt("port"); @@ -36,47 +36,47 @@ public class SameController { timeout, timeout); State clientState = new State(".InvalidClientNetwork"); Broadcaster broadcaster = BroadcasterImpl.getDefaultBroadcastRunner(); - + String baseUrl = String.format("http://%s:%s/", configuration.get("localIp"), configuration.getInt("port")); - + String masterUrl = baseUrl + "MasterService.json"; String clientUrl = baseUrl + "ClientService.json"; - + Master master = Master.create(connections, broadcaster, masterUrl, configuration.get("networkName")); - + Client client = new Client(clientState, connections, clientUrl); PaxosServiceImpl paxos = new PaxosServiceImpl(""); - + DiscoveryService discoveryService = null; if ("true".equals(configuration.get("enableDiscovery"))) { BroadcastListener broadcastListener = new BroadcastListener( configuration.getInt("discoveryPort")); discoveryService = new DiscoveryService(client, broadcastListener); } - + StateServlet stateServlet = new StateServlet(client.getInterface(), new VariableFactory(client.getInterface())); - + ServerContainer server = new ServerBuilder(port) - .withServlet(stateServlet, "/_/state") - .withService(client.getService(), ClientService.class) - .withService(master.getService(), MasterService.class) - .withService(paxos, PaxosService.class) - .build(); + .withServlet(stateServlet, "/_/state") + .withService(client.getService(), ClientService.class) + .withService(master.getService(), MasterService.class) + .withService(paxos, PaxosService.class) + .build(); SameController controller = new SameController( configuration, server, master, client, paxos, discoveryService, broadcasterFactory); return controller; } - + public static SameController create(Configuration configuration) { return create(new DefaultBroadcasterFactory(), configuration); } - + public SameController( Configuration configuration, ServerContainer server, @@ -102,7 +102,7 @@ public class SameController { discoveryService.start(); } } - + public void stop() { try { client.interrupt(); @@ -115,7 +115,7 @@ public class SameController { logger.error("Failed to stop webserver", e); } } - + public void join() { try { server.join(); @@ -132,26 +132,26 @@ public class SameController { } } } - + public void searchNetworks() { BroadcasterInterface broadcaster = broadcasterFactory.create(); String message = "Discover " + client.getUrl(); broadcaster.sendBroadcast(configuration.getInt("discoveryPort"), message.getBytes()); } - + public void joinNetwork(String url) { client.joinNetwork(url); } - + public Client getClient() { return client; } - + public Master getMaster() { return master; } - + public VariableFactory createVariableFactory() { return new VariableFactory(client.getInterface()); } diff --git a/same/src/main/java/com/orbekk/same/SameInterface.java b/same/src/main/java/com/orbekk/same/SameInterface.java index a0a7894..20d47d0 100644 --- a/same/src/main/java/com/orbekk/same/SameInterface.java +++ b/same/src/main/java/com/orbekk/same/SameInterface.java @@ -10,18 +10,18 @@ public interface SameInterface { * Get the state with identifier 'id'. */ String get(String id); - + /** * Get the state with identifier 'id', converted to a Java * object of type T using Jackson. */ T get(String id, TypeReference type); - + /** * Get the state with identifier 'id' as a list. */ List getList(String id); - + /** * Set the state. * @@ -33,7 +33,7 @@ public interface SameInterface { * Set from an object: Pass it, e.g., a List. */ void setObject(String id, Object data); - + void addStateChangedListener(StateChangedListener listener); void removeStateChangedListener(StateChangedListener listener); } diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index 2b48a00..f0fee80 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -24,22 +24,22 @@ public class State { private Map state = new HashMap(); private ObjectMapper mapper = new ObjectMapper(); private Set updatedComponents = new TreeSet(); - + public State(String networkName) { update(".networkName", networkName, 1); updateFromObject(".participants", new ArrayList(), 1); } - + public State(State other) { state.putAll(other.state); } - + public synchronized void clear() { logger.info("Clearing state."); updatedComponents.clear(); state.clear(); } - + public synchronized void forceUpdate(String componentName, String data, long revision) { Component oldComponent = state.get(componentName); @@ -48,7 +48,7 @@ public class State { state.put(componentName, newComponent); updatedComponents.add(componentName); } - + public synchronized boolean update(String componentName, String data, long revision) { Component component = null; @@ -57,7 +57,7 @@ public class State { } else { component = state.get(componentName); } - + if (revision > component.getRevision()) { Component oldComponent = new Component(component); component.setName(componentName); @@ -71,7 +71,7 @@ public class State { return false; } } - + /** * Get a copy of a component. */ @@ -83,7 +83,7 @@ public class State { return null; } } - + public String getDataOf(String componentName) { Component component = state.get(componentName); if (component != null) { @@ -92,7 +92,7 @@ public class State { return null; } } - + public long getRevision(String componentName) { Component component = state.get(componentName); if (component != null) { @@ -103,7 +103,7 @@ public class State { return 0; } } - + /** * Parses a JSON value using Jackson ObjectMapper. */ @@ -126,12 +126,12 @@ public class State { } return null; } - + public List getList(String componentName) { return getParsedData(componentName, new TypeReference>(){}); } - + public boolean updateFromObject(String componentName, Object data, long revision) { String dataS; try { @@ -151,14 +151,14 @@ public class State { return false; } } - + /** * Pretty print a component. */ public String show(String componentName) { return componentName + ": " + state.get(componentName); } - + /** * Returns a list of all the components in this State. * @@ -171,7 +171,7 @@ public class State { } return list; } - + public synchronized List getAndClearUpdatedComponents() { List components = new ArrayList(); for (String name : updatedComponents) { @@ -185,7 +185,7 @@ public class State { private String name; private long revision; private String data; - + /** * Copy constructor. */ @@ -194,21 +194,21 @@ public class State { this.revision = other.revision; this.data = other.data; } - + public Component(String name, long revision, String data) { this.name = name; this.revision = revision; this.data = data; } - + public long getRevision() { return revision; } - + public void setRevision(long revision) { this.revision = revision; } - + public String getData() { return data; } @@ -216,29 +216,29 @@ public class State { public void setData(String data) { this.data = data; } - + public String getName() { return name; } - + public void setName(String name) { this.name = name; } - + @Override public String toString() { return "[" + this.name + ": " + this.data + "@" + revision + "]"; } - + @Override public boolean equals(Object other) { if (!(other instanceof Component)) { return false; } Component o = (Component)other; return name.equals(o.name) && data.equals(o.data) && - revision == o.revision; + revision == o.revision; } } - + @Override public String toString() { StringBuilder output = new StringBuilder(); output.append("State(\n"); @@ -248,7 +248,7 @@ public class State { output.append(")"); return output.toString(); } - + @Override public boolean equals(Object other) { if (!(other instanceof State)) { return false; diff --git a/same/src/main/java/com/orbekk/same/TestBroadcaster.java b/same/src/main/java/com/orbekk/same/TestBroadcaster.java index b2f9d8c..bac4742 100644 --- a/same/src/main/java/com/orbekk/same/TestBroadcaster.java +++ b/same/src/main/java/com/orbekk/same/TestBroadcaster.java @@ -10,6 +10,7 @@ public class TestBroadcaster implements Broadcaster { public TestBroadcaster() { } + @Override public void broadcast(final List targets, final ServiceOperation operation) { for (String t : targets) { diff --git a/same/src/main/java/com/orbekk/same/TestConnectionManager.java b/same/src/main/java/com/orbekk/same/TestConnectionManager.java index 3439a78..25a3ee6 100644 --- a/same/src/main/java/com/orbekk/same/TestConnectionManager.java +++ b/same/src/main/java/com/orbekk/same/TestConnectionManager.java @@ -10,7 +10,7 @@ import com.orbekk.paxos.PaxosService; */ public class TestConnectionManager implements ConnectionManager { public Map clientMap = - new HashMap(); + new HashMap(); public Map masterMap = new HashMap(); public Map paxosMap = @@ -19,14 +19,17 @@ public class TestConnectionManager implements ConnectionManager { public TestConnectionManager() { } + @Override public ClientService getClient(String url) { return clientMap.get(url); } + @Override public MasterService getMaster(String url) { return masterMap.get(url); } - + + @Override public PaxosService getPaxos(String url) { return paxosMap.get(url); } diff --git a/same/src/main/java/com/orbekk/same/Variable.java b/same/src/main/java/com/orbekk/same/Variable.java index b6d3b7f..bea8664 100644 --- a/same/src/main/java/com/orbekk/same/Variable.java +++ b/same/src/main/java/com/orbekk/same/Variable.java @@ -9,7 +9,7 @@ public interface Variable { */ void valueChanged(Variable variable); } - + T get(); void set(T value) throws UpdateConflict; void update(); diff --git a/same/src/main/java/com/orbekk/same/VariableFactory.java b/same/src/main/java/com/orbekk/same/VariableFactory.java index 3f3b19c..f1796d6 100644 --- a/same/src/main/java/com/orbekk/same/VariableFactory.java +++ b/same/src/main/java/com/orbekk/same/VariableFactory.java @@ -18,14 +18,14 @@ public class VariableFactory { private Logger logger = LoggerFactory.getLogger(getClass()); private ClientInterface client; private ObjectMapper mapper = new ObjectMapper(); - + private class VariableImpl implements Variable, StateChangedListener { String identifier; TypeReference type; T value; long revision = 0; OnChangeListener listener = null; - + public VariableImpl(String identifier, TypeReference type) { this.identifier = identifier; this.type = type; @@ -41,7 +41,7 @@ public class VariableFactory { try { String serializedValue = mapper.writeValueAsString(value); State.Component update = new State.Component(identifier, - revision, serializedValue); + revision, serializedValue); client.set(update); } catch (JsonGenerationException e) { logger.warn("Failed to convert to JSON: {}", value); @@ -78,22 +78,22 @@ public class VariableFactory { } } } - + public static VariableFactory create(ClientInterface client) { return new VariableFactory(client); } - + VariableFactory(ClientInterface client) { this.client = client; } - + public Variable create(String identifier, TypeReference type) { VariableImpl variable = new VariableImpl(identifier, type); variable.update(); client.addStateListener(variable); return variable; } - + public Variable createString(String identifier) { return create(identifier, new TypeReference() {}); } diff --git a/same/src/main/java/com/orbekk/same/config/Configuration.java b/same/src/main/java/com/orbekk/same/config/Configuration.java index 75e6ada..82148c9 100644 --- a/same/src/main/java/com/orbekk/same/config/Configuration.java +++ b/same/src/main/java/com/orbekk/same/config/Configuration.java @@ -11,18 +11,18 @@ import org.slf4j.LoggerFactory; public class Configuration { public final static String configurationProperty = "com.orbekk.same.config.file"; - + static final Logger logger = LoggerFactory.getLogger(Configuration.class); Properties configuration = new Properties(); - + public Configuration(Properties properties) { this.configuration = properties; } - + Configuration() { // Use factory methods. } - + public static Configuration loadOrDie() { Configuration configuration = new Configuration(); boolean status = configuration.loadDefault(); @@ -32,13 +32,13 @@ public class Configuration { } return configuration; } - + public static Configuration load() { Configuration configuration = new Configuration(); configuration.loadDefault(); return configuration; } - + public boolean loadDefault() { String filename = System.getProperty(configurationProperty); if (filename != null) { @@ -60,7 +60,7 @@ public class Configuration { } return false; } - + public String get(String name) { String value = configuration.getProperty(name); if (value == null) { @@ -68,7 +68,7 @@ public class Configuration { } return value; } - + public Integer getInt(String name) { if (get(name) == null) { return null; diff --git a/same/src/main/java/com/orbekk/same/http/RpcServlet.java b/same/src/main/java/com/orbekk/same/http/RpcServlet.java index 9b4f82e..9450d67 100644 --- a/same/src/main/java/com/orbekk/same/http/RpcServlet.java +++ b/same/src/main/java/com/orbekk/same/http/RpcServlet.java @@ -10,12 +10,12 @@ import com.googlecode.jsonrpc4j.JsonRpcServer; public class RpcServlet extends HttpServlet { JsonRpcServer rpcServer; - + public RpcServlet(JsonRpcServer rpcServer) { super(); this.rpcServer = rpcServer; } - + @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { diff --git a/same/src/main/java/com/orbekk/same/http/ServerBuilder.java b/same/src/main/java/com/orbekk/same/http/ServerBuilder.java index 65ee670..543366e 100644 --- a/same/src/main/java/com/orbekk/same/http/ServerBuilder.java +++ b/same/src/main/java/com/orbekk/same/http/ServerBuilder.java @@ -13,24 +13,24 @@ public class ServerBuilder { Logger logger = LoggerFactory.getLogger(getClass()); int port; ServletContextHandler context = null; - + public ServerBuilder(int port) { this.port = port; } - + public ServerBuilder withServlet(HttpServlet servlet, String pathSpec) { logger.info("Servlet binding: {} → {}", pathSpec, servlet); getServletContextHandler().addServlet(new ServletHolder(servlet), pathSpec); return this; } - + public ServerBuilder withService(T service, Class clazz) { JsonRpcServer server = new JsonRpcServer(service, clazz); String pathSpec = "/" + clazz.getSimpleName() + ".json"; return withServlet(new RpcServlet(server), pathSpec); } - + public ServerContainer build() { ServerContainer server = ServerContainer.create(port); server.setReuseAddress(true); diff --git a/same/src/main/java/com/orbekk/same/http/ServerContainer.java b/same/src/main/java/com/orbekk/same/http/ServerContainer.java index f91cce1..af577a0 100644 --- a/same/src/main/java/com/orbekk/same/http/ServerContainer.java +++ b/same/src/main/java/com/orbekk/same/http/ServerContainer.java @@ -12,23 +12,23 @@ public class ServerContainer { Server server; int port; ServletContextHandler context = null; - + public ServerContainer(Server server, int port, ServletContextHandler context) { this.server = server; this.port = port; this.context = context; } - + public static ServerContainer create(int port) { Server server = new Server(port); return new ServerContainer(server, port, null); } - + public void setContext(ServletContextHandler context) { server.setHandler(context); this.context = context; } - + public void setReuseAddress(boolean on) { Connector connector = server.getConnectors()[0]; if (connector instanceof AbstractConnector) { @@ -36,7 +36,7 @@ public class ServerContainer { connector_.setReuseAddress(on); } } - + public int getPort() { if (port == 0) { return server.getConnectors()[0].getLocalPort(); @@ -44,17 +44,17 @@ public class ServerContainer { return port; } } - + public void start() throws Exception { server.start(); logger.info("Started server on port {}", getPort()); } - + public void stop() throws Exception { server.stop(); logger.info("Server stopped."); } - + public void join() throws InterruptedException { server.join(); } diff --git a/same/src/main/java/com/orbekk/same/http/StateServlet.java b/same/src/main/java/com/orbekk/same/http/StateServlet.java index 8b4d8c5..578bdfc 100644 --- a/same/src/main/java/com/orbekk/same/http/StateServlet.java +++ b/same/src/main/java/com/orbekk/same/http/StateServlet.java @@ -22,13 +22,13 @@ public class StateServlet extends HttpServlet { private ClientInterface client; private VariableFactory variableFactory; private final static String TITLE = "State viewer"; - + public StateServlet(ClientInterface client, VariableFactory variableFactory) { this.client = client; this.variableFactory = variableFactory; } - + private void handleSetState(HttpServletRequest request, HttpServletResponse response) throws IOException { if (request.getParameter("key") == null || @@ -36,13 +36,13 @@ public class StateServlet extends HttpServlet { response.getWriter().println( "Usage: action=set&key=DesiredKey&value=DesiredValue"); } - + try { String key = request.getParameter("key"); String value = request.getParameter("value"); Variable variable = variableFactory.createString(key); variable.set(value); - + response.getWriter().println("Updated component: " + key + "=" + value); } catch (UpdateConflict e) { @@ -50,7 +50,7 @@ public class StateServlet extends HttpServlet { throwableToString(e)); } } - + @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { @@ -78,7 +78,7 @@ public class StateServlet extends HttpServlet { w.println(client.getState()); w.println(""); } - + private void writeSetStateForm(HttpServletResponse response) throws IOException { PrintWriter w = response.getWriter(); @@ -90,7 +90,7 @@ public class StateServlet extends HttpServlet { w.println("

"); w.println(""); } - + private void writeHeader(HttpServletResponse response) throws IOException { PrintWriter w = response.getWriter(); w.println(""); @@ -99,7 +99,7 @@ public class StateServlet extends HttpServlet { w.println(""); w.println(""); } - + private void writeFooter(HttpServletResponse response) throws IOException { PrintWriter w = response.getWriter(); w.println(""); diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java index 2c37d02..cf6ca30 100644 --- a/same/src/main/java/com/orbekk/util/DelayedOperation.java +++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java @@ -1,99 +1,99 @@ package com.orbekk.util; public class DelayedOperation { - public static class Status { - public final static int OK = 1; - public final static int CONFLICT = 2; - public final static int ERROR = 3; - - private int status; - private String message; - - public static Status createOk() { - return new Status(OK, ""); - } - - public static Status createConflict(String message) { - return new Status(CONFLICT, message); - } - - public static Status createError(String message) { - return new Status(ERROR, message); - } - - public Status(int status, String message) { - this.status = status; - this.message = message; - } - - @Override public String toString() { - switch(status) { - case OK: - return "OK"; - case CONFLICT: - return "Conflicting update: " + message; - case ERROR: - return "Error: " + message; - } - throw new AssertionError("Unhandled case."); - } - - @Override public boolean equals(Object other) { - if (!(other instanceof Status)) { - return false; - } - Status o = (Status)other; - if (o.status != this.status) { - return false; - } - if (message == null) { - return o.message == null; - } - return message.equals(o.message); - } - } - - private T argument; - private Status status; - private boolean isDone; - private int identifier; - - public DelayedOperation(T argument) { - this.argument = argument; - } - - public Status getStatus() { - waitFor(); - return status; - } - - public synchronized void waitFor() { - while (!isDone) { - try { - wait(); - } catch (InterruptedException e) { - complete(new Status(Status.ERROR, "Thread interrupted.")); - } - } - } - - public synchronized boolean isDone() { - return isDone; - } - - public synchronized void complete(Status status) { - if (!isDone) { - isDone = true; - this.status = status; - notifyAll(); - } - } - - public synchronized int getIdentifier() { - return identifier; - } - - public synchronized void setIdentifier(int identifier) { - this.identifier = identifier; - } + public static class Status { + public final static int OK = 1; + public final static int CONFLICT = 2; + public final static int ERROR = 3; + + private int status; + private String message; + + public static Status createOk() { + return new Status(OK, ""); + } + + public static Status createConflict(String message) { + return new Status(CONFLICT, message); + } + + public static Status createError(String message) { + return new Status(ERROR, message); + } + + public Status(int status, String message) { + this.status = status; + this.message = message; + } + + @Override public String toString() { + switch(status) { + case OK: + return "OK"; + case CONFLICT: + return "Conflicting update: " + message; + case ERROR: + return "Error: " + message; + } + throw new AssertionError("Unhandled case."); + } + + @Override public boolean equals(Object other) { + if (!(other instanceof Status)) { + return false; + } + Status o = (Status)other; + if (o.status != this.status) { + return false; + } + if (message == null) { + return o.message == null; + } + return message.equals(o.message); + } + } + + private T argument; + private Status status; + private boolean isDone; + private int identifier; + + public DelayedOperation(T argument) { + this.argument = argument; + } + + public Status getStatus() { + waitFor(); + return status; + } + + public synchronized void waitFor() { + while (!isDone) { + try { + wait(); + } catch (InterruptedException e) { + complete(new Status(Status.ERROR, "Thread interrupted.")); + } + } + } + + public synchronized boolean isDone() { + return isDone; + } + + public synchronized void complete(Status status) { + if (!isDone) { + isDone = true; + this.status = status; + notifyAll(); + } + } + + public synchronized int getIdentifier() { + return identifier; + } + + public synchronized void setIdentifier(int identifier) { + this.identifier = identifier; + } } diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java index 2fb2c88..397c4b8 100644 --- a/same/src/main/java/com/orbekk/util/WorkQueue.java +++ b/same/src/main/java/com/orbekk/util/WorkQueue.java @@ -16,21 +16,21 @@ abstract public class WorkQueue extends Thread implements List { private Logger logger = LoggerFactory.getLogger(getClass()); private volatile List list = null; private volatile boolean done = false; - + public WorkQueue() { list = new ArrayList(); } - + public WorkQueue(Collection collection) { list = new ArrayList(collection); } - + public synchronized List getAndClear() { List copy = new ArrayList(list); list.clear(); return copy; } - + /** * OnChange event. * @@ -50,7 +50,7 @@ abstract public class WorkQueue extends Thread implements List { onChange(); } } - + @Override public void run() { while (!done) { @@ -71,8 +71,8 @@ abstract public class WorkQueue extends Thread implements List { } } } - - + + @Override public synchronized boolean add(E e) { notifyAll(); -- cgit v1.2.3