summaryrefslogtreecommitdiff
path: root/same
diff options
context:
space:
mode:
Diffstat (limited to 'same')
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java77
-rw-r--r--same/src/main/java/com/orbekk/same/ClientInterface.java2
2 files changed, 66 insertions, 13 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index f632241..c0bf708 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -18,6 +18,8 @@ package com.orbekk.same;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -54,9 +56,12 @@ public class Client {
private final ClientInterface clientInterface = new ClientInterfaceImpl();
private final AtomicLong revision = new AtomicLong(0);
- private List<StateChangedListener> stateListeners =
- new ArrayList<StateChangedListener>();
+ private List<StateChangedListener> updateListeners =
+ new CopyOnWriteArrayList<StateChangedListener>();
+ private List<ConnectionStateListener> connectionStateListeners =
+ new CopyOnWriteArrayList<ConnectionStateListener>();
+
public class ClientInterfaceImpl implements ClientInterface {
private ClientInterfaceImpl() {
}
@@ -72,9 +77,11 @@ public class Client {
final MasterState currentMasterInfo = masterInfo;
final DelayedOperation op = new DelayedOperation();
if (connectionState != ConnectionState.STABLE) {
- op.complete(DelayedOperation.Status.createError(
- "Not connected to master: " + connectionState));
- return op;
+ logger.warn("Connection is {}. Delaying update.", connectionState);
+ try {
+ awaitConnectionState(ConnectionState.STABLE);
+ } catch (InterruptedException e) {
+ }
}
Services.Master master = connections.getMaster0(
@@ -117,18 +124,28 @@ public class Client {
@Override
public void addStateListener(StateChangedListener listener) {
- stateListeners.add(listener);
+ updateListeners.add(listener);
}
@Override
public void removeStateListener(StateChangedListener listener) {
- stateListeners.remove(listener);
+ updateListeners.remove(listener);
}
@Override
public ConnectionState getConnectionState() {
return Client.this.getConnectionState();
}
+
+ @Override public void addConnectionStateListener(
+ ConnectionStateListener listener) {
+ connectionStateListeners.add(listener);
+ }
+
+ @Override public void removeConnectionStateListener(
+ ConnectionStateListener listener) {
+ connectionStateListeners.remove(listener);
+ }
}
private Services.Client newServiceImpl = new Services.Client() {
@@ -137,7 +154,7 @@ public class Client {
boolean status = state.update(request.getId(), request.getData(),
request.getRevision());
if (status) {
- for (StateChangedListener listener : stateListeners) {
+ for (StateChangedListener listener : updateListeners) {
listener.stateChanged(state.getComponent(request.getId()));
}
updateRevision(request.getRevision());
@@ -165,7 +182,7 @@ public class Client {
highestRevision = revision.get();
}
masterInfo = request;
- connectionState = ConnectionState.STABLE;
+ setConnectionState(ConnectionState.STABLE);
done.run(MasterTakeoverResponse.newBuilder()
.setHighestKnownRevision(highestRevision)
.build());
@@ -179,7 +196,7 @@ public class Client {
request.getMasterId(), masterInfo.getMasterId());
return;
}
- connectionState = ConnectionState.UNSTABLE;
+ setConnectionState(ConnectionState.UNSTABLE);
executor.execute(new MasterStarter(request));
done.run(Empty.getDefaultInstance());
}
@@ -257,12 +274,12 @@ public class Client {
this.rpcf = rpcf;
this.executor = executor;
}
-
+
public void start() {
}
public void interrupt() {
- connectionState = ConnectionState.DISCONNECTED;
+ setConnectionState(ConnectionState.DISCONNECTED);
executor.shutdown();
}
@@ -300,7 +317,7 @@ public class Client {
public Rpc joinNetwork(Services.MasterState masterInfo) {
logger.info("joinNetwork({})", masterInfo);
- connectionState = ConnectionState.UNSTABLE;
+ setConnectionState(ConnectionState.UNSTABLE);
reset();
Services.Master master =
@@ -367,4 +384,38 @@ public class Client {
updated = revision.compareAndSet(expected, update);
}
}
+
+ private void setConnectionState(ConnectionState newState) {
+ connectionState = newState;
+ for (ConnectionStateListener listener : connectionStateListeners) {
+ listener.connectionStatusChanged(newState);
+ }
+ }
+
+ private void awaitConnectionState(ConnectionState expected) throws InterruptedException {
+ class Listener implements ConnectionStateListener {
+ CountDownLatch done = new CountDownLatch(1);
+ ConnectionState expected;
+
+ public Listener(ConnectionState expected) {
+ this.expected = expected;
+ }
+
+ @Override public void connectionStatusChanged(ConnectionState state) {
+ if (state.equals(expected)) {
+ done.countDown();
+ }
+ }
+
+ public void await() throws InterruptedException {
+ if (Client.this.connectionState.equals(expected)) {
+ done.countDown();
+ }
+ done.await();
+ }
+ }
+ Listener listener = new Listener(expected);
+ getInterface().addConnectionStateListener(listener);
+ listener.await();
+ }
}
diff --git a/same/src/main/java/com/orbekk/same/ClientInterface.java b/same/src/main/java/com/orbekk/same/ClientInterface.java
index 3cd83c5..473073d 100644
--- a/same/src/main/java/com/orbekk/same/ClientInterface.java
+++ b/same/src/main/java/com/orbekk/same/ClientInterface.java
@@ -22,5 +22,7 @@ public interface ClientInterface {
DelayedOperation set(State.Component component);
void addStateListener(StateChangedListener listener);
void removeStateListener(StateChangedListener listener);
+ void addConnectionStateListener(ConnectionStateListener listener);
+ void removeConnectionStateListener(ConnectionStateListener listener);
ConnectionState getConnectionState();
} \ No newline at end of file