From 445a738b9ebfd93f65fecd68a5fbbe61b7951bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 2 May 2012 12:54:04 +0200 Subject: Fix VariableUpdaterTask. Use CyclicCountDownLatch to improve VariableUpdaterTask. This implementation is much better, but unfortunately really hard to test. I'm not sure how to test this. --- .../java/com/orbekk/same/VariableUpdaterTask.java | 84 +++++++++++----------- .../java/com/orbekk/util/DelayedOperation.java | 12 ++++ .../com/orbekk/same/VariableUpdaterTaskTest.java | 76 -------------------- 3 files changed, 53 insertions(+), 119 deletions(-) delete mode 100644 same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java index f0bcdd0..25be4f8 100644 --- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java +++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java @@ -15,72 +15,70 @@ */ package com.orbekk.same; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import com.orbekk.util.CyclicCountDownLatch; +import com.orbekk.util.DelayedOperation; + /** Updates a variable on-demand. */ public class VariableUpdaterTask extends Thread implements Variable.OnChangeListener { + private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName()); private Variable variable; - private volatile T newValue; - private AtomicBoolean hasNewValue = new AtomicBoolean(false); - private AtomicBoolean isReady = new AtomicBoolean(true); + private AtomicReference newValue = new AtomicReference(); + private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1); + private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1); public VariableUpdaterTask(Variable variable) { super("VariableUpdater"); this.variable = variable; } - public synchronized void set(T newValue) { - this.newValue = newValue; - hasNewValue.set(true); - notifyAll(); - } - - /** Update the variable once. */ - public void performWork() { - boolean shouldDoWork = false; - synchronized(this) { - shouldDoWork = hasNewValue.get() && isReady.get(); - hasNewValue.set(false); - isReady.set(false); - } - if (shouldDoWork) { - variable.set(newValue); - } + public void set(T newValue) { + this.newValue.set(newValue); + hasNewValue.countDown(); } - private synchronized void waitFor(AtomicBoolean v) { - if (Thread.currentThread().isInterrupted()) { - return; - } - while(!v.get()) { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + void setValue(T newValue) { + while (!Thread.currentThread().isInterrupted()) { + DelayedOperation op = variable.set(newValue); + if (op.getStatus().isOk()) { return; + } else if (op.getStatus().isConflict()) { + // We have a new value, but we need to wait for an update + // in order to overwrite it. + hasNewValue.countDown(); + return; + } else if (op.getStatus().isError()) { + // Error during update. Just retry. + logger.info("Error updating value. Status: " + op.getStatus()); + } else { + throw new AssertionError("Unknown state."); } } } + + @Override + public void valueChanged(Variable variable) { + isReady.countDown(); + } @Override public void run() { + isReady.countDown(); // Initially ready. variable.addOnChangeListener(this); - while (true) { - waitFor(isReady); - waitFor(hasNewValue); - if (Thread.currentThread().isInterrupted()) { - break; + while (!Thread.interrupted()) { + try { + hasNewValue.await(); + isReady.await(); + setValue(newValue.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - performWork(); } - variable.removeOnChangeListener(this); - } - - @Override - public synchronized void valueChanged(Variable unused) { - isReady.set(true); - notifyAll(); + logger.info("VariableUpdaterTask finished."); } } \ No newline at end of file diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java index 48e7c05..caffa4f 100644 --- a/same/src/main/java/com/orbekk/util/DelayedOperation.java +++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java @@ -47,6 +47,18 @@ public class DelayedOperation { return status == OK; } + public boolean isError() { + return status == ERROR; + } + + public boolean isConflict() { + return status == CONFLICT; + } + + public boolean canRetry() { + return isError(); + } + public int getStatusCode() { return status; } diff --git a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java b/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java deleted file mode 100644 index 026545a..0000000 --- a/same/src/test/java/com/orbekk/same/VariableUpdaterTaskTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Copyright 2012 Kjetil Ørbekk - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.orbekk.same; - -import org.junit.Before; -import org.junit.Test; -import static org.mockito.Mockito.*; - -@SuppressWarnings("unchecked") -public class VariableUpdaterTaskTest { - Variable v; - VariableUpdaterTask updater; - - @Before public void setUp() { - v = mock(Variable.class); - updater = new VariableUpdaterTask(v); - } - - @Test - public void updatesValue() { - updater.set("FirstValue"); - updater.performWork(); - verify(v).set("FirstValue"); - } - - @Test - public void noUpdateIfNotSet() { - updater.set("FirstValue"); - updater.performWork(); - reset(v); - updater.performWork(); - verify(v, never()).set(anyString()); - } - - @Test - public void noUpdateIfNotReady() { - updater.set("FirstValue"); - updater.performWork(); - reset(v); - updater.set("SecondValue"); - updater.performWork(); - verify(v, never()).set(anyString()); - } - - @Test - public void updatesWhenReady() { - updater.set("Value1"); - updater.performWork(); - reset(v); - updater.valueChanged(null); - updater.set("Value2"); - updater.performWork(); - verify(v).set("Value2"); - } - - @Test - public void choosesLastUpdate() { - updater.set("FirstValue"); - updater.set("SecondValue"); - updater.performWork(); - verify(v).set("SecondValue"); - } -} -- cgit v1.2.3