EDU.oswego.cs.dl.util.concurrent
public class: WaitFreeQueue [javadoc |
source]
java.lang.Object
EDU.oswego.cs.dl.util.concurrent.WaitFreeQueue
All Implemented Interfaces:
Channel
A wait-free linked list based queue implementation.
While this class conforms to the full Channel interface, only the
put
and poll
methods are useful in most
applications. Because the queue does not support blocking
operations, take
relies on spin-loops, which can be
extremely wasteful.
This class is adapted from the algorithm described in Simple,
Fast, and Practical Non-Blocking and Blocking Concurrent Queue
Algorithms by Maged M. Michael and Michael L. Scott. This
implementation is not strictly wait-free since it relies on locking
for basic atomicity and visibility requirements. Locks can impose
unbounded waits, although this should not be a major practical
concern here since each lock is held for the duration of only a few
statements. (However, the overhead of using so many locks can make
it less attractive than other Channel implementations on JVMs where
locking operations are very slow.)
Field Summary |
---|
protected volatile Node | head | Head of list is always a dummy node * |
protected volatile Node | tail | Pointer to last node on list * |
protected final Object | tailLock | Lock for simulating CAS for tail field * |
Methods from java.lang.Object: |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Method from EDU.oswego.cs.dl.util.concurrent.WaitFreeQueue Detail: |
protected synchronized boolean CASHead(Node oldHead,
Node newHead) {
if (head == oldHead) {
head = newHead;
return true;
}
else
return false;
}
Simulate CAS for head field, using 'this' lock * |
protected boolean CASTail(Node oldTail,
Node newTail) {
synchronized(tailLock) {
if (tail == oldTail) {
tail = newTail;
return true;
}
else
return false;
}
}
Simulate CAS for tail field * |
protected Object extract() throws InterruptedException {
for (;;) {
Node h = head;
Node first = h.next;
if (first == null)
return null;
Object result = first.value;
if (CASHead(h, first))
return result;
}
}
Main dequeue algorithm, called by poll, take. * |
public boolean offer(Object x,
long msecs) throws InterruptedException {
put(x);
return true;
}
|
public Object peek() {
Node first = head.next;
if (first == null)
return null;
// Note: This synch unnecessary after JSR-133.
// It exists only to guarantee visibility of returned object,
// No other synch is needed, but "old" memory model requires one.
synchronized(this) {
return first.value;
}
}
|
public Object poll(long msecs) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if (msecs < = 0)
return extract();
long startTime = System.currentTimeMillis();
for(;;) {
Object x = extract();
if (x != null)
return x;
else if (System.currentTimeMillis() - startTime >= msecs)
return null;
else
Thread.sleep(0);
}
}
Spin until poll returns a non-null value or time elapses.
if msecs is positive, a Thread.sleep(0) is performed on each iteration
as a heuristic to reduce contention. |
public void put(Object x) throws InterruptedException {
if (x == null) throw new IllegalArgumentException();
if (Thread.interrupted()) throw new InterruptedException();
Node n = new Node(x);
for(;;) {
Node t = tail;
// Try to link new node to end of list.
if (t.CASNext(null, n)) {
// Must now change tail field.
// This CAS might fail, but if so, it will be fixed by others.
CASTail(t, n);
return;
}
// If cannot link, help out a previous failed attempt to move tail
CASTail(t, t.next);
}
}
|
public Object take() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
for(;;) {
Object x = extract();
if (x != null)
return x;
else
Thread.sleep(0);
}
}
Spin until poll returns a non-null value.
You probably don't want to call this method.
A Thread.sleep(0) is performed on each iteration
as a heuristic to reduce contention. If you would
rather use, for example, an exponential backoff,
you could manually set this up using poll. |