// package com.orbekk.same; // // import java.util.List; // import java.util.LinkedList; // import java.util.Map; // import java.util.HashMap; // import org.slf4j.Logger; // import org.slf4j.LoggerFactory; // // /** // * The implementation of a 'Same' state. // * // * This class manages the current state of the Same protocol. // */ // public class SameState extends Thread implements UrlReceiver { // private Logger logger = LoggerFactory.getLogger(getClass()); // private ConnectionManager connections; // // // TODO: Change the name of State. // private com.orbekk.same.State state = // new com.orbekk.same.State(); // // /** // * The client id of this participant. // */ // private String clientId; // // /** // * Stopping condition for this thread. // */ // private boolean stopped = false; // // private String _setState = null; // private Map _setParticipants = null; // // private Map pendingParticipants = // new HashMap(); // // public SameState(String networkName, String clientId, // ConnectionManager connections) { // state.setNetworkName(networkName); // this.clientId = clientId; // this.connections = connections; // state.setMasterId(clientId); // state.getParticipants().put(clientId, null); // } // // public String getMasterId() { // return state.getMasterId(); // } // // public synchronized Map getParticipants() { // return state.getParticipants(); // } // // /** // * Reset this SameService to an initial state. // * // * TODO: Implement fully. // */ // private synchronized void resetState() { // state = new com.orbekk.same.State(); // pendingParticipants.clear(); // } // // public synchronized void joinNetwork(String networkName, String masterId) { // resetState(); // state.setNetworkName(networkName); // state.setMasterId(masterId); // logger.info("Joined network {}.", networkName); // } // // public String getClientId() { // return clientId; // } // // public String getNetworkName() { // return state.getNetworkName(); // } // // public String getCurrentState() { // return state.getData(); // } // // /** // * TODO: Move to a separate library. // */ // public void librarySetNewState(String newState) { // connections.getConnection( // state.getParticipants().get(state.getMasterId())) // .setState(newState); // } // // public String getUrl() { // return state.getParticipants().get(clientId); // } // // @Override // public void setUrl(String url) { // logger.info("My URL is {}", url); // state.getParticipants().put(clientId, url); // } // // public synchronized void addParticipant(String clientId, String url) { // logger.info("PendingParticipant.add: {} ({})", clientId, url); // pendingParticipants.put(clientId, url); // notifyAll(); // } // // public synchronized void setParticipants(Map participants) { // logger.info("Pending operation: _setParticipants"); // _setParticipants = participants; // notifyAll(); // } // // public synchronized void setState(String newState) { // logger.info("Pending operation: _setState"); // _setState = newState; // notifyAll(); // } // // private synchronized void handleSetParticipants() { // if (_setParticipants != null) { // if (isMaster()) { // logger.error("{}: Master received setParticipants.", clientId); // } else { // logger.info("{}: New participants committed.", clientId); // state.getParticipants().clear(); // state.getParticipants().putAll(_setParticipants); // } // } // _setParticipants = null; // } // // public synchronized void handleSetState() { // if (_setState != null) { // if (isMaster()) { // broadcast(new ServiceOperation() { // @Override void run(SameService service) { // service.setState(_setState); // } // }); // } // state.setData(_setState); // _setState = null; // } // } // // private boolean isMaster() { // return state.getMasterId().equals(clientId); // } // // private synchronized void handleNewParticipants() { // if (!isMaster()) { // for (Map.Entry e : pendingParticipants.entrySet()) { // SameService master = connections.getConnection( // state.getParticipants().get(state.getMasterId())); // logger.info("Redirecting participant request to {}", // state.getMasterId()); // String clientId = e.getKey(); // String url = e.getValue(); // master.participateNetwork(state.getNetworkName(), clientId, // url); // } // } else { // state.getParticipants().putAll(pendingParticipants); // for (Map.Entry e : // pendingParticipants.entrySet()) { // String clientId = e.getKey(); // String url = e.getValue(); // logger.info("New participant: {} URL({})", clientId, url); // SameService remoteService = connections.getConnection(url); // remoteService.notifyParticipation(state.getNetworkName(), // state.getMasterId()); // broadcast(new ServiceOperation(){ // @Override void run(SameService service) { // service.setParticipants(state.getParticipants()); // } // }); // } // } // pendingParticipants.clear(); // } // // /** // * This method runs the pending commands to SameState. // * // * It should be called by the worker thread, but can be called directly // * for testing purposes to avoid threading in unit tests. // */ // synchronized void internalRun() { // handleNewParticipants(); // handleSetState(); // handleSetParticipants(); // } // // public synchronized void run() { // while (!stopped) { // internalRun(); // try { // wait(1000); // } catch (InterruptedException e) { // // Ignore interrupt in wait loop. // } // } // } // // public synchronized void stopSame() { // try { // stopped = true; // notifyAll(); // this.join(); // } catch (InterruptedException e) { // logger.warn("Got InterruptedException while waiting for SameState " + // "to finish. Ignoring."); // } // } // // public abstract static class ServiceOperation { // abstract void run(SameService service); // } // // public synchronized void broadcast(ServiceOperation operation) { // for (Map.Entry e : // state.getParticipants().entrySet()) { // String clientId = e.getKey(); // String url = e.getValue(); // if (!clientId.equals(this.clientId)) { // operation.run(connections.getConnection(url)); // } // } // } // }