summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-31 10:51:12 +0100
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-01-31 10:51:12 +0100
commit211fd778ac9011ec79d745767b7d23c8de738003 (patch)
tree9e6f1a804f8218903611325cbff84bb51f5f2975
parent00f5afe8de3544bf99f4d84c7904ed784f50074c (diff)
WorkList => WorkQueue – A worker thread helper class.
-rw-r--r--same/src/main/java/com/orbekk/util/WorkList.java21
-rw-r--r--same/src/main/java/com/orbekk/util/WorkQueue.java114
-rw-r--r--same/src/test/java/com/orbekk/util/WorkQueueTest.java37
3 files changed, 126 insertions, 46 deletions
diff --git a/same/src/main/java/com/orbekk/util/WorkList.java b/same/src/main/java/com/orbekk/util/WorkList.java
deleted file mode 100644
index a4e38d6..0000000
--- a/same/src/main/java/com/orbekk/util/WorkList.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.orbekk.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-public class WorkList<E> extends ArrayList<E> {
- public WorkList() {
- super();
- }
-
- public WorkList(Collection<? extends E> collection) {
- super(collection);
- }
-
- public synchronized List<E> copyAndClear() {
- List<E> copy = new WorkList<E>(this);
- clear();
- return copy;
- }
-}
diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java
index 252beaf..4eb54f5 100644
--- a/same/src/main/java/com/orbekk/util/WorkQueue.java
+++ b/same/src/main/java/com/orbekk/util/WorkQueue.java
@@ -6,11 +6,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A WorkList is a list for pending units of work.
*/
-public class WorkQueue<E> implements List<E> {
- List<E> list = null;
+abstract public class WorkQueue<E> extends Thread implements List<E> {
+ private Logger logger = LoggerFactory.getLogger(getClass());
+ private volatile List<E> list = null;
+ private volatile boolean done = false;
public WorkQueue() {
list = new ArrayList<E>();
@@ -20,118 +25,177 @@ public class WorkQueue<E> implements List<E> {
list = new ArrayList<E>(collection);
}
+ public synchronized List<E> getAndClear() {
+ List<E> copy = new ArrayList<E>(list);
+ list.clear();
+ return copy;
+ }
+
+ /**
+ * OnChange event.
+ *
+ * May be run even if the WorkQueue has not been changed.
+ * Called until the queue is empty.
+ */
+ protected abstract void onChange();
+
+ @Override
+ public void run() {
+ while (!done) {
+ if (!isEmpty()) {
+ onChange();
+ }
+ synchronized (this) {
+ try {
+ if (isEmpty()) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ done = true;
+ }
+ if (Thread.interrupted()) {
+ done = true;
+ }
+ }
+ }
+ }
+
+
@Override
- public boolean add(E e) {
+ public synchronized boolean add(E e) {
+ notifyAll();
return list.add(e);
}
@Override
- public void add(int index, E element) {
+ public synchronized void add(int index, E element) {
+ notifyAll();
list.add(index, element);
}
@Override
- public boolean addAll(Collection<? extends E> c) {
+ public synchronized boolean addAll(Collection<? extends E> c) {
+ notifyAll();
return list.addAll(c);
}
@Override
- public boolean addAll(int index, Collection<? extends E> c) {
+ public synchronized boolean addAll(int index, Collection<? extends E> c) {
+ notifyAll();
return list.addAll(index, c);
}
@Override
- public void clear() {
+ public synchronized void clear() {
+ notifyAll();
list.clear();
}
@Override
- public boolean contains(Object o) {
+ public synchronized boolean contains(Object o) {
+ notifyAll();
return list.contains(o);
}
@Override
- public boolean containsAll(Collection<?> c) {
+ public synchronized boolean containsAll(Collection<?> c) {
+ notifyAll();
return containsAll(c);
}
@Override
- public E get(int index) {
+ public synchronized E get(int index) {
+ notifyAll();
return list.get(index);
}
@Override
- public int indexOf(Object o) {
+ public synchronized int indexOf(Object o) {
+ notifyAll();
return list.indexOf(o);
}
@Override
- public boolean isEmpty() {
+ public synchronized boolean isEmpty() {
+ notifyAll();
return list.isEmpty();
}
@Override
- public Iterator<E> iterator() {
+ public synchronized Iterator<E> iterator() {
+ notifyAll();
return list.iterator();
}
@Override
- public int lastIndexOf(Object o) {
+ public synchronized int lastIndexOf(Object o) {
+ notifyAll();
return list.lastIndexOf(o);
}
@Override
- public ListIterator<E> listIterator() {
+ public synchronized ListIterator<E> listIterator() {
+ notifyAll();
return list.listIterator();
}
@Override
- public ListIterator<E> listIterator(int index) {
+ public synchronized ListIterator<E> listIterator(int index) {
+ notifyAll();
return list.listIterator(index);
}
@Override
- public boolean remove(Object o) {
+ public synchronized boolean remove(Object o) {
+ notifyAll();
return list.remove(o);
}
@Override
- public E remove(int index) {
+ public synchronized E remove(int index) {
+ notifyAll();
return list.remove(index);
}
@Override
- public boolean removeAll(Collection<?> c) {
+ public synchronized boolean removeAll(Collection<?> c) {
+ notifyAll();
return list.removeAll(c);
}
@Override
- public boolean retainAll(Collection<?> c) {
+ public synchronized boolean retainAll(Collection<?> c) {
+ notifyAll();
return list.retainAll(c);
}
@Override
- public E set(int index, E element) {
+ public synchronized E set(int index, E element) {
+ notifyAll();
return list.set(index, element);
}
@Override
- public int size() {
+ public synchronized int size() {
+ notifyAll();
return list.size();
}
@Override
- public List<E> subList(int fromIndex, int toIndex) {
+ public synchronized List<E> subList(int fromIndex, int toIndex) {
+ notifyAll();
return list.subList(fromIndex, toIndex);
}
@Override
- public Object[] toArray() {
+ public synchronized Object[] toArray() {
+ notifyAll();
return list.toArray();
}
@Override
- public <T> T[] toArray(T[] a) {
+ public synchronized <T> T[] toArray(T[] a) {
+ notifyAll();
return list.toArray(a);
}
diff --git a/same/src/test/java/com/orbekk/util/WorkQueueTest.java b/same/src/test/java/com/orbekk/util/WorkQueueTest.java
new file mode 100644
index 0000000..ab6e138
--- /dev/null
+++ b/same/src/test/java/com/orbekk/util/WorkQueueTest.java
@@ -0,0 +1,37 @@
+package com.orbekk.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class WorkQueueTest {
+ @Test
+ public void testPerformsWork() throws Exception {
+ final ArrayList<Integer> doubled = new ArrayList<Integer>();
+ WorkQueue<Integer> worker = new WorkQueue<Integer>() {
+ @Override protected void onChange() {
+ List<Integer> list = getAndClear();
+ for (int x : list) {
+ doubled.add(x * 2);
+ }
+ synchronized (doubled) {
+ doubled.notifyAll();
+ }
+ }
+ };
+
+ synchronized (doubled) {
+ worker.start();
+ worker.add(1);
+ doubled.wait();
+ worker.interrupt();
+ }
+
+ worker.join();
+ assertEquals(2, (int)doubled.get(0));
+ }
+
+}