From 03f1fbd1040ab0a47a70dd383413dc287d4dd1cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 8 May 2012 11:08:56 +0200 Subject: Block operations on unstable connection. --- same/src/main/java/com/orbekk/same/Client.java | 77 ++++++++++++++++++---- .../main/java/com/orbekk/same/ClientInterface.java | 2 + 2 files changed, 66 insertions(+), 13 deletions(-) (limited to 'same/src/main/java') diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index f632241..c0bf708 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -18,6 +18,8 @@ package com.orbekk.same; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -54,9 +56,12 @@ public class Client { private final ClientInterface clientInterface = new ClientInterfaceImpl(); private final AtomicLong revision = new AtomicLong(0); - private List stateListeners = - new ArrayList(); + private List updateListeners = + new CopyOnWriteArrayList(); + private List connectionStateListeners = + new CopyOnWriteArrayList(); + public class ClientInterfaceImpl implements ClientInterface { private ClientInterfaceImpl() { } @@ -72,9 +77,11 @@ public class Client { final MasterState currentMasterInfo = masterInfo; final DelayedOperation op = new DelayedOperation(); if (connectionState != ConnectionState.STABLE) { - op.complete(DelayedOperation.Status.createError( - "Not connected to master: " + connectionState)); - return op; + logger.warn("Connection is {}. Delaying update.", connectionState); + try { + awaitConnectionState(ConnectionState.STABLE); + } catch (InterruptedException e) { + } } Services.Master master = connections.getMaster0( @@ -117,18 +124,28 @@ public class Client { @Override public void addStateListener(StateChangedListener listener) { - stateListeners.add(listener); + updateListeners.add(listener); } @Override public void removeStateListener(StateChangedListener listener) { - stateListeners.remove(listener); + updateListeners.remove(listener); } @Override public ConnectionState getConnectionState() { return Client.this.getConnectionState(); } + + @Override public void addConnectionStateListener( + ConnectionStateListener listener) { + connectionStateListeners.add(listener); + } + + @Override public void removeConnectionStateListener( + ConnectionStateListener listener) { + connectionStateListeners.remove(listener); + } } private Services.Client newServiceImpl = new Services.Client() { @@ -137,7 +154,7 @@ public class Client { boolean status = state.update(request.getId(), request.getData(), request.getRevision()); if (status) { - for (StateChangedListener listener : stateListeners) { + for (StateChangedListener listener : updateListeners) { listener.stateChanged(state.getComponent(request.getId())); } updateRevision(request.getRevision()); @@ -165,7 +182,7 @@ public class Client { highestRevision = revision.get(); } masterInfo = request; - connectionState = ConnectionState.STABLE; + setConnectionState(ConnectionState.STABLE); done.run(MasterTakeoverResponse.newBuilder() .setHighestKnownRevision(highestRevision) .build()); @@ -179,7 +196,7 @@ public class Client { request.getMasterId(), masterInfo.getMasterId()); return; } - connectionState = ConnectionState.UNSTABLE; + setConnectionState(ConnectionState.UNSTABLE); executor.execute(new MasterStarter(request)); done.run(Empty.getDefaultInstance()); } @@ -257,12 +274,12 @@ public class Client { this.rpcf = rpcf; this.executor = executor; } - + public void start() { } public void interrupt() { - connectionState = ConnectionState.DISCONNECTED; + setConnectionState(ConnectionState.DISCONNECTED); executor.shutdown(); } @@ -300,7 +317,7 @@ public class Client { public Rpc joinNetwork(Services.MasterState masterInfo) { logger.info("joinNetwork({})", masterInfo); - connectionState = ConnectionState.UNSTABLE; + setConnectionState(ConnectionState.UNSTABLE); reset(); Services.Master master = @@ -367,4 +384,38 @@ public class Client { updated = revision.compareAndSet(expected, update); } } + + private void setConnectionState(ConnectionState newState) { + connectionState = newState; + for (ConnectionStateListener listener : connectionStateListeners) { + listener.connectionStatusChanged(newState); + } + } + + private void awaitConnectionState(ConnectionState expected) throws InterruptedException { + class Listener implements ConnectionStateListener { + CountDownLatch done = new CountDownLatch(1); + ConnectionState expected; + + public Listener(ConnectionState expected) { + this.expected = expected; + } + + @Override public void connectionStatusChanged(ConnectionState state) { + if (state.equals(expected)) { + done.countDown(); + } + } + + public void await() throws InterruptedException { + if (Client.this.connectionState.equals(expected)) { + done.countDown(); + } + done.await(); + } + } + Listener listener = new Listener(expected); + getInterface().addConnectionStateListener(listener); + listener.await(); + } } diff --git a/same/src/main/java/com/orbekk/same/ClientInterface.java b/same/src/main/java/com/orbekk/same/ClientInterface.java index 3cd83c5..473073d 100644 --- a/same/src/main/java/com/orbekk/same/ClientInterface.java +++ b/same/src/main/java/com/orbekk/same/ClientInterface.java @@ -22,5 +22,7 @@ public interface ClientInterface { DelayedOperation set(State.Component component); void addStateListener(StateChangedListener listener); void removeStateListener(StateChangedListener listener); + void addConnectionStateListener(ConnectionStateListener listener); + void removeConnectionStateListener(ConnectionStateListener listener); ConnectionState getConnectionState(); } \ No newline at end of file -- cgit v1.2.3