summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-02 12:54:04 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-02 12:54:04 +0200
commit445a738b9ebfd93f65fecd68a5fbbe61b7951bf8 (patch)
tree33b0a4b949235a1d4b093a8725c9c322ab4c96f8
parent0b34606c3868db0333d2c5efea497c10fb41ff7d (diff)
Fix VariableUpdaterTask.
Use CyclicCountDownLatch to improve VariableUpdaterTask. This implementation is much better, but unfortunately really hard to test. I'm not sure how to test this.
-rw-r--r--same/src/main/java/com/orbekk/same/VariableUpdaterTask.java84
-rw-r--r--same/src/main/java/com/orbekk/util/DelayedOperation.java12
-rw-r--r--same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java76
3 files changed, 53 insertions, 119 deletions
diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
index f0bcdd0..25be4f8 100644
--- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
+++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java
@@ -15,72 +15,70 @@
*/
package com.orbekk.same;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import com.orbekk.util.CyclicCountDownLatch;
+import com.orbekk.util.DelayedOperation;
+
/** Updates a variable on-demand.
*/
public class VariableUpdaterTask<T> extends Thread
implements Variable.OnChangeListener<T> {
+ private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName());
private Variable<T> variable;
- private volatile T newValue;
- private AtomicBoolean hasNewValue = new AtomicBoolean(false);
- private AtomicBoolean isReady = new AtomicBoolean(true);
+ private AtomicReference<T> newValue = new AtomicReference<T>();
+ private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1);
+ private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1);
public VariableUpdaterTask(Variable<T> variable) {
super("VariableUpdater");
this.variable = variable;
}
- public synchronized void set(T newValue) {
- this.newValue = newValue;
- hasNewValue.set(true);
- notifyAll();
- }
-
- /** Update the variable once. */
- public void performWork() {
- boolean shouldDoWork = false;
- synchronized(this) {
- shouldDoWork = hasNewValue.get() && isReady.get();
- hasNewValue.set(false);
- isReady.set(false);
- }
- if (shouldDoWork) {
- variable.set(newValue);
- }
+ public void set(T newValue) {
+ this.newValue.set(newValue);
+ hasNewValue.countDown();
}
- private synchronized void waitFor(AtomicBoolean v) {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- while(!v.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ void setValue(T newValue) {
+ while (!Thread.currentThread().isInterrupted()) {
+ DelayedOperation op = variable.set(newValue);
+ if (op.getStatus().isOk()) {
return;
+ } else if (op.getStatus().isConflict()) {
+ // We have a new value, but we need to wait for an update
+ // in order to overwrite it.
+ hasNewValue.countDown();
+ return;
+ } else if (op.getStatus().isError()) {
+ // Error during update. Just retry.
+ logger.info("Error updating value. Status: " + op.getStatus());
+ } else {
+ throw new AssertionError("Unknown state.");
}
}
}
+
+ @Override
+ public void valueChanged(Variable<T> variable) {
+ isReady.countDown();
+ }
@Override
public void run() {
+ isReady.countDown(); // Initially ready.
variable.addOnChangeListener(this);
- while (true) {
- waitFor(isReady);
- waitFor(hasNewValue);
- if (Thread.currentThread().isInterrupted()) {
- break;
+ while (!Thread.interrupted()) {
+ try {
+ hasNewValue.await();
+ isReady.await();
+ setValue(newValue.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- performWork();
}
- variable.removeOnChangeListener(this);
- }
-
- @Override
- public synchronized void valueChanged(Variable<T> unused) {
- isReady.set(true);
- notifyAll();
+ logger.info("VariableUpdaterTask finished.");
}
} \ No newline at end of file
diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java
index 48e7c05..caffa4f 100644
--- a/same/src/main/java/com/orbekk/util/DelayedOperation.java
+++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java
@@ -47,6 +47,18 @@ public class DelayedOperation {
return status == OK;
}
+ public boolean isError() {
+ return status == ERROR;
+ }
+
+ public boolean isConflict() {
+ return status == CONFLICT;
+ }
+
+ public boolean canRetry() {
+ return isError();
+ }
+
public int getStatusCode() {
return status;
}
diff --git a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java b/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java
deleted file mode 100644
index 026545a..0000000
--- a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.orbekk.same;
-
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Mockito.*;
-
-@SuppressWarnings("unchecked")
-public class VariableUpdaterTaskTest {
- Variable<String> v;
- VariableUpdaterTask<String> updater;
-
- @Before public void setUp() {
- v = mock(Variable.class);
- updater = new VariableUpdaterTask<String>(v);
- }
-
- @Test
- public void updatesValue() {
- updater.set("FirstValue");
- updater.performWork();
- verify(v).set("FirstValue");
- }
-
- @Test
- public void noUpdateIfNotSet() {
- updater.set("FirstValue");
- updater.performWork();
- reset(v);
- updater.performWork();
- verify(v, never()).set(anyString());
- }
-
- @Test
- public void noUpdateIfNotReady() {
- updater.set("FirstValue");
- updater.performWork();
- reset(v);
- updater.set("SecondValue");
- updater.performWork();
- verify(v, never()).set(anyString());
- }
-
- @Test
- public void updatesWhenReady() {
- updater.set("Value1");
- updater.performWork();
- reset(v);
- updater.valueChanged(null);
- updater.set("Value2");
- updater.performWork();
- verify(v).set("Value2");
- }
-
- @Test
- public void choosesLastUpdate() {
- updater.set("FirstValue");
- updater.set("SecondValue");
- updater.performWork();
- verify(v).set("SecondValue");
- }
-}