diff options
Diffstat (limited to 'same/src/main/java')
-rw-r--r-- | same/src/main/java/com/orbekk/same/VariableUpdaterTask.java | 84 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/util/DelayedOperation.java | 12 |
2 files changed, 53 insertions, 43 deletions
diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java index f0bcdd0..25be4f8 100644 --- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java +++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java @@ -15,72 +15,70 @@ */ package com.orbekk.same; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import com.orbekk.util.CyclicCountDownLatch; +import com.orbekk.util.DelayedOperation; + /** Updates a variable on-demand. */ public class VariableUpdaterTask<T> extends Thread implements Variable.OnChangeListener<T> { + private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName()); private Variable<T> variable; - private volatile T newValue; - private AtomicBoolean hasNewValue = new AtomicBoolean(false); - private AtomicBoolean isReady = new AtomicBoolean(true); + private AtomicReference<T> newValue = new AtomicReference<T>(); + private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1); + private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1); public VariableUpdaterTask(Variable<T> variable) { super("VariableUpdater"); this.variable = variable; } - public synchronized void set(T newValue) { - this.newValue = newValue; - hasNewValue.set(true); - notifyAll(); - } - - /** Update the variable once. */ - public void performWork() { - boolean shouldDoWork = false; - synchronized(this) { - shouldDoWork = hasNewValue.get() && isReady.get(); - hasNewValue.set(false); - isReady.set(false); - } - if (shouldDoWork) { - variable.set(newValue); - } + public void set(T newValue) { + this.newValue.set(newValue); + hasNewValue.countDown(); } - private synchronized void waitFor(AtomicBoolean v) { - if (Thread.currentThread().isInterrupted()) { - return; - } - while(!v.get()) { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + void setValue(T newValue) { + while (!Thread.currentThread().isInterrupted()) { + DelayedOperation op = variable.set(newValue); + if (op.getStatus().isOk()) { return; + } else if (op.getStatus().isConflict()) { + // We have a new value, but we need to wait for an update + // in order to overwrite it. + hasNewValue.countDown(); + return; + } else if (op.getStatus().isError()) { + // Error during update. Just retry. + logger.info("Error updating value. Status: " + op.getStatus()); + } else { + throw new AssertionError("Unknown state."); } } } + + @Override + public void valueChanged(Variable<T> variable) { + isReady.countDown(); + } @Override public void run() { + isReady.countDown(); // Initially ready. variable.addOnChangeListener(this); - while (true) { - waitFor(isReady); - waitFor(hasNewValue); - if (Thread.currentThread().isInterrupted()) { - break; + while (!Thread.interrupted()) { + try { + hasNewValue.await(); + isReady.await(); + setValue(newValue.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - performWork(); } - variable.removeOnChangeListener(this); - } - - @Override - public synchronized void valueChanged(Variable<T> unused) { - isReady.set(true); - notifyAll(); + logger.info("VariableUpdaterTask finished."); } }
\ No newline at end of file diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java index 48e7c05..caffa4f 100644 --- a/same/src/main/java/com/orbekk/util/DelayedOperation.java +++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java @@ -47,6 +47,18 @@ public class DelayedOperation { return status == OK; } + public boolean isError() { + return status == ERROR; + } + + public boolean isConflict() { + return status == CONFLICT; + } + + public boolean canRetry() { + return isError(); + } + public int getStatusCode() { return status; } |