diff options
Diffstat (limited to 'same/src')
-rw-r--r-- | same/src/main/java/com/orbekk/util/WorkList.java | 21 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/util/WorkQueue.java | 114 | ||||
-rw-r--r-- | same/src/test/java/com/orbekk/util/WorkQueueTest.java | 37 |
3 files changed, 126 insertions, 46 deletions
diff --git a/same/src/main/java/com/orbekk/util/WorkList.java b/same/src/main/java/com/orbekk/util/WorkList.java deleted file mode 100644 index a4e38d6..0000000 --- a/same/src/main/java/com/orbekk/util/WorkList.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.orbekk.util; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -public class WorkList<E> extends ArrayList<E> { - public WorkList() { - super(); - } - - public WorkList(Collection<? extends E> collection) { - super(collection); - } - - public synchronized List<E> copyAndClear() { - List<E> copy = new WorkList<E>(this); - clear(); - return copy; - } -} diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java index 252beaf..4eb54f5 100644 --- a/same/src/main/java/com/orbekk/util/WorkQueue.java +++ b/same/src/main/java/com/orbekk/util/WorkQueue.java @@ -6,11 +6,16 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A WorkList is a list for pending units of work. */ -public class WorkQueue<E> implements List<E> { - List<E> list = null; +abstract public class WorkQueue<E> extends Thread implements List<E> { + private Logger logger = LoggerFactory.getLogger(getClass()); + private volatile List<E> list = null; + private volatile boolean done = false; public WorkQueue() { list = new ArrayList<E>(); @@ -20,118 +25,177 @@ public class WorkQueue<E> implements List<E> { list = new ArrayList<E>(collection); } + public synchronized List<E> getAndClear() { + List<E> copy = new ArrayList<E>(list); + list.clear(); + return copy; + } + + /** + * OnChange event. + * + * May be run even if the WorkQueue has not been changed. + * Called until the queue is empty. + */ + protected abstract void onChange(); + + @Override + public void run() { + while (!done) { + if (!isEmpty()) { + onChange(); + } + synchronized (this) { + try { + if (isEmpty()) { + wait(); + } + } catch (InterruptedException e) { + done = true; + } + if (Thread.interrupted()) { + done = true; + } + } + } + } + + @Override - public boolean add(E e) { + public synchronized boolean add(E e) { + notifyAll(); return list.add(e); } @Override - public void add(int index, E element) { + public synchronized void add(int index, E element) { + notifyAll(); list.add(index, element); } @Override - public boolean addAll(Collection<? extends E> c) { + public synchronized boolean addAll(Collection<? extends E> c) { + notifyAll(); return list.addAll(c); } @Override - public boolean addAll(int index, Collection<? extends E> c) { + public synchronized boolean addAll(int index, Collection<? extends E> c) { + notifyAll(); return list.addAll(index, c); } @Override - public void clear() { + public synchronized void clear() { + notifyAll(); list.clear(); } @Override - public boolean contains(Object o) { + public synchronized boolean contains(Object o) { + notifyAll(); return list.contains(o); } @Override - public boolean containsAll(Collection<?> c) { + public synchronized boolean containsAll(Collection<?> c) { + notifyAll(); return containsAll(c); } @Override - public E get(int index) { + public synchronized E get(int index) { + notifyAll(); return list.get(index); } @Override - public int indexOf(Object o) { + public synchronized int indexOf(Object o) { + notifyAll(); return list.indexOf(o); } @Override - public boolean isEmpty() { + public synchronized boolean isEmpty() { + notifyAll(); return list.isEmpty(); } @Override - public Iterator<E> iterator() { + public synchronized Iterator<E> iterator() { + notifyAll(); return list.iterator(); } @Override - public int lastIndexOf(Object o) { + public synchronized int lastIndexOf(Object o) { + notifyAll(); return list.lastIndexOf(o); } @Override - public ListIterator<E> listIterator() { + public synchronized ListIterator<E> listIterator() { + notifyAll(); return list.listIterator(); } @Override - public ListIterator<E> listIterator(int index) { + public synchronized ListIterator<E> listIterator(int index) { + notifyAll(); return list.listIterator(index); } @Override - public boolean remove(Object o) { + public synchronized boolean remove(Object o) { + notifyAll(); return list.remove(o); } @Override - public E remove(int index) { + public synchronized E remove(int index) { + notifyAll(); return list.remove(index); } @Override - public boolean removeAll(Collection<?> c) { + public synchronized boolean removeAll(Collection<?> c) { + notifyAll(); return list.removeAll(c); } @Override - public boolean retainAll(Collection<?> c) { + public synchronized boolean retainAll(Collection<?> c) { + notifyAll(); return list.retainAll(c); } @Override - public E set(int index, E element) { + public synchronized E set(int index, E element) { + notifyAll(); return list.set(index, element); } @Override - public int size() { + public synchronized int size() { + notifyAll(); return list.size(); } @Override - public List<E> subList(int fromIndex, int toIndex) { + public synchronized List<E> subList(int fromIndex, int toIndex) { + notifyAll(); return list.subList(fromIndex, toIndex); } @Override - public Object[] toArray() { + public synchronized Object[] toArray() { + notifyAll(); return list.toArray(); } @Override - public <T> T[] toArray(T[] a) { + public synchronized <T> T[] toArray(T[] a) { + notifyAll(); return list.toArray(a); } diff --git a/same/src/test/java/com/orbekk/util/WorkQueueTest.java b/same/src/test/java/com/orbekk/util/WorkQueueTest.java new file mode 100644 index 0000000..ab6e138 --- /dev/null +++ b/same/src/test/java/com/orbekk/util/WorkQueueTest.java @@ -0,0 +1,37 @@ +package com.orbekk.util; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class WorkQueueTest { + @Test + public void testPerformsWork() throws Exception { + final ArrayList<Integer> doubled = new ArrayList<Integer>(); + WorkQueue<Integer> worker = new WorkQueue<Integer>() { + @Override protected void onChange() { + List<Integer> list = getAndClear(); + for (int x : list) { + doubled.add(x * 2); + } + synchronized (doubled) { + doubled.notifyAll(); + } + } + }; + + synchronized (doubled) { + worker.start(); + worker.add(1); + doubled.wait(); + worker.interrupt(); + } + + worker.join(); + assertEquals(2, (int)doubled.get(0)); + } + +} |