summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--same/src/main/java/com/orbekk/same/VariableUpdaterTask.java84
-rw-r--r--same/src/main/java/com/orbekk/util/DelayedOperation.java12
-rw-r--r--same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java76
3 files changed, 53 insertions, 119 deletions
diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
index f0bcdd0..25be4f8 100644
--- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
+++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
@@ -15,72 +15,70 @@
*/
package com.orbekk.same;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import com.orbekk.util.CyclicCountDownLatch;
+import com.orbekk.util.DelayedOperation;
+
/** Updates a variable on-demand.
*/
public class VariableUpdaterTask<T> extends Thread
implements Variable.OnChangeListener<T> {
+ private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName());
private Variable<T> variable;
- private volatile T newValue;
- private AtomicBoolean hasNewValue = new AtomicBoolean(false);
- private AtomicBoolean isReady = new AtomicBoolean(true);
+ private AtomicReference<T> newValue = new AtomicReference<T>();
+ private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1);
+ private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1);
public VariableUpdaterTask(Variable<T> variable) {
super("VariableUpdater");
this.variable = variable;
}
- public synchronized void set(T newValue) {
- this.newValue = newValue;
- hasNewValue.set(true);
- notifyAll();
- }
-
- /** Update the variable once. */
- public void performWork() {
- boolean shouldDoWork = false;
- synchronized(this) {
- shouldDoWork = hasNewValue.get() && isReady.get();
- hasNewValue.set(false);
- isReady.set(false);
- }
- if (shouldDoWork) {
- variable.set(newValue);
- }
+ public void set(T newValue) {
+ this.newValue.set(newValue);
+ hasNewValue.countDown();
}
- private synchronized void waitFor(AtomicBoolean v) {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- while(!v.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ void setValue(T newValue) {
+ while (!Thread.currentThread().isInterrupted()) {
+ DelayedOperation op = variable.set(newValue);
+ if (op.getStatus().isOk()) {
return;
+ } else if (op.getStatus().isConflict()) {
+ // We have a new value, but we need to wait for an update
+ // in order to overwrite it.
+ hasNewValue.countDown();
+ return;
+ } else if (op.getStatus().isError()) {
+ // Error during update. Just retry.
+ logger.info("Error updating value. Status: " + op.getStatus());
+ } else {
+ throw new AssertionError("Unknown state.");
}
}
}
+
+ @Override
+ public void valueChanged(Variable<T> variable) {
+ isReady.countDown();
+ }
@Override
public void run() {
+ isReady.countDown(); // Initially ready.
variable.addOnChangeListener(this);
- while (true) {
- waitFor(isReady);
- waitFor(hasNewValue);
- if (Thread.currentThread().isInterrupted()) {
- break;
+ while (!Thread.interrupted()) {
+ try {
+ hasNewValue.await();
+ isReady.await();
+ setValue(newValue.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- performWork();
}
- variable.removeOnChangeListener(this);
- }
-
- @Override
- public synchronized void valueChanged(Variable<T> unused) {
- isReady.set(true);
- notifyAll();
+ logger.info("VariableUpdaterTask finished.");
}
} \ No newline at end of file
diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java
index 48e7c05..caffa4f 100644
--- a/same/src/main/java/com/orbekk/util/DelayedOperation.java
+++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java
@@ -47,6 +47,18 @@ public class DelayedOperation {
return status == OK;
}
+ public boolean isError() {
+ return status == ERROR;
+ }
+
+ public boolean isConflict() {
+ return status == CONFLICT;
+ }
+
+ public boolean canRetry() {
+ return isError();
+ }
+
public int getStatusCode() {
return status;
}
diff --git a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java b/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java
deleted file mode 100644
index 026545a..0000000
--- a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.orbekk.same;
-
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Mockito.*;
-
-@SuppressWarnings("unchecked")
-public class VariableUpdaterTaskTest {
- Variable<String> v;
- VariableUpdaterTask<String> updater;
-
- @Before public void setUp() {
- v = mock(Variable.class);
- updater = new VariableUpdaterTask<String>(v);
- }
-
- @Test
- public void updatesValue() {
- updater.set("FirstValue");
- updater.performWork();
- verify(v).set("FirstValue");
- }
-
- @Test
- public void noUpdateIfNotSet() {
- updater.set("FirstValue");
- updater.performWork();
- reset(v);
- updater.performWork();
- verify(v, never()).set(anyString());
- }
-
- @Test
- public void noUpdateIfNotReady() {
- updater.set("FirstValue");
- updater.performWork();
- reset(v);
- updater.set("SecondValue");
- updater.performWork();
- verify(v, never()).set(anyString());
- }
-
- @Test
- public void updatesWhenReady() {
- updater.set("Value1");
- updater.performWork();
- reset(v);
- updater.valueChanged(null);
- updater.set("Value2");
- updater.performWork();
- verify(v).set("Value2");
- }
-
- @Test
- public void choosesLastUpdate() {
- updater.set("FirstValue");
- updater.set("SecondValue");
- updater.performWork();
- verify(v).set("SecondValue");
- }
-}