summaryrefslogtreecommitdiff
path: root/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'same/src/main/java/com/orbekk/same/VariableUpdaterTask.java')
-rw-r--r--same/src/main/java/com/orbekk/same/VariableUpdaterTask.java84
1 files changed, 41 insertions, 43 deletions
diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
index f0bcdd0..25be4f8 100644
--- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
+++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
@@ -15,72 +15,70 @@
*/
package com.orbekk.same;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import com.orbekk.util.CyclicCountDownLatch;
+import com.orbekk.util.DelayedOperation;
+
/** Updates a variable on-demand.
*/
public class VariableUpdaterTask<T> extends Thread
implements Variable.OnChangeListener<T> {
+ private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName());
private Variable<T> variable;
- private volatile T newValue;
- private AtomicBoolean hasNewValue = new AtomicBoolean(false);
- private AtomicBoolean isReady = new AtomicBoolean(true);
+ private AtomicReference<T> newValue = new AtomicReference<T>();
+ private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1);
+ private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1);
public VariableUpdaterTask(Variable<T> variable) {
super("VariableUpdater");
this.variable = variable;
}
- public synchronized void set(T newValue) {
- this.newValue = newValue;
- hasNewValue.set(true);
- notifyAll();
- }
-
- /** Update the variable once. */
- public void performWork() {
- boolean shouldDoWork = false;
- synchronized(this) {
- shouldDoWork = hasNewValue.get() && isReady.get();
- hasNewValue.set(false);
- isReady.set(false);
- }
- if (shouldDoWork) {
- variable.set(newValue);
- }
+ public void set(T newValue) {
+ this.newValue.set(newValue);
+ hasNewValue.countDown();
}
- private synchronized void waitFor(AtomicBoolean v) {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- while(!v.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ void setValue(T newValue) {
+ while (!Thread.currentThread().isInterrupted()) {
+ DelayedOperation op = variable.set(newValue);
+ if (op.getStatus().isOk()) {
return;
+ } else if (op.getStatus().isConflict()) {
+ // We have a new value, but we need to wait for an update
+ // in order to overwrite it.
+ hasNewValue.countDown();
+ return;
+ } else if (op.getStatus().isError()) {
+ // Error during update. Just retry.
+ logger.info("Error updating value. Status: " + op.getStatus());
+ } else {
+ throw new AssertionError("Unknown state.");
}
}
}
+
+ @Override
+ public void valueChanged(Variable<T> variable) {
+ isReady.countDown();
+ }
@Override
public void run() {
+ isReady.countDown(); // Initially ready.
variable.addOnChangeListener(this);
- while (true) {
- waitFor(isReady);
- waitFor(hasNewValue);
- if (Thread.currentThread().isInterrupted()) {
- break;
+ while (!Thread.interrupted()) {
+ try {
+ hasNewValue.await();
+ isReady.await();
+ setValue(newValue.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- performWork();
}
- variable.removeOnChangeListener(this);
- }
-
- @Override
- public synchronized void valueChanged(Variable<T> unused) {
- isReady.set(true);
- notifyAll();
+ logger.info("VariableUpdaterTask finished.");
}
} \ No newline at end of file