From 445a738b9ebfd93f65fecd68a5fbbe61b7951bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 2 May 2012 12:54:04 +0200 Subject: Fix VariableUpdaterTask. Use CyclicCountDownLatch to improve VariableUpdaterTask. This implementation is much better, but unfortunately really hard to test. I'm not sure how to test this. --- .../java/com/orbekk/same/VariableUpdaterTask.java | 84 +++++++++++----------- .../java/com/orbekk/util/DelayedOperation.java | 12 ++++ 2 files changed, 53 insertions(+), 43 deletions(-) (limited to 'same/src/main') diff --git a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java index f0bcdd0..25be4f8 100644 --- a/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java +++ b/same/src/main/java/com/orbekk/same/VariableUpdaterTask.java @@ -15,72 +15,70 @@ */ package com.orbekk.same; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import com.orbekk.util.CyclicCountDownLatch; +import com.orbekk.util.DelayedOperation; + /** Updates a variable on-demand. */ public class VariableUpdaterTask extends Thread implements Variable.OnChangeListener { + private static Logger logger = Logger.getLogger(VariableUpdaterTask.class.getName()); private Variable variable; - private volatile T newValue; - private AtomicBoolean hasNewValue = new AtomicBoolean(false); - private AtomicBoolean isReady = new AtomicBoolean(true); + private AtomicReference newValue = new AtomicReference(); + private CyclicCountDownLatch hasNewValue = new CyclicCountDownLatch(1); + private CyclicCountDownLatch isReady = new CyclicCountDownLatch(1); public VariableUpdaterTask(Variable variable) { super("VariableUpdater"); this.variable = variable; } - public synchronized void set(T newValue) { - this.newValue = newValue; - hasNewValue.set(true); - notifyAll(); - } - - /** Update the variable once. */ - public void performWork() { - boolean shouldDoWork = false; - synchronized(this) { - shouldDoWork = hasNewValue.get() && isReady.get(); - hasNewValue.set(false); - isReady.set(false); - } - if (shouldDoWork) { - variable.set(newValue); - } + public void set(T newValue) { + this.newValue.set(newValue); + hasNewValue.countDown(); } - private synchronized void waitFor(AtomicBoolean v) { - if (Thread.currentThread().isInterrupted()) { - return; - } - while(!v.get()) { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + void setValue(T newValue) { + while (!Thread.currentThread().isInterrupted()) { + DelayedOperation op = variable.set(newValue); + if (op.getStatus().isOk()) { return; + } else if (op.getStatus().isConflict()) { + // We have a new value, but we need to wait for an update + // in order to overwrite it. + hasNewValue.countDown(); + return; + } else if (op.getStatus().isError()) { + // Error during update. Just retry. + logger.info("Error updating value. Status: " + op.getStatus()); + } else { + throw new AssertionError("Unknown state."); } } } + + @Override + public void valueChanged(Variable variable) { + isReady.countDown(); + } @Override public void run() { + isReady.countDown(); // Initially ready. variable.addOnChangeListener(this); - while (true) { - waitFor(isReady); - waitFor(hasNewValue); - if (Thread.currentThread().isInterrupted()) { - break; + while (!Thread.interrupted()) { + try { + hasNewValue.await(); + isReady.await(); + setValue(newValue.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - performWork(); } - variable.removeOnChangeListener(this); - } - - @Override - public synchronized void valueChanged(Variable unused) { - isReady.set(true); - notifyAll(); + logger.info("VariableUpdaterTask finished."); } } \ No newline at end of file diff --git a/same/src/main/java/com/orbekk/util/DelayedOperation.java b/same/src/main/java/com/orbekk/util/DelayedOperation.java index 48e7c05..caffa4f 100644 --- a/same/src/main/java/com/orbekk/util/DelayedOperation.java +++ b/same/src/main/java/com/orbekk/util/DelayedOperation.java @@ -47,6 +47,18 @@ public class DelayedOperation { return status == OK; } + public boolean isError() { + return status == ERROR; + } + + public boolean isConflict() { + return status == CONFLICT; + } + + public boolean canRetry() { + return isError(); + } + public int getStatusCode() { return status; } -- cgit v1.2.3