Method from EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue Detail: |
protected final void allowTake() {
synchronized(takeGuard_) {
takeGuard_.notify();
}
}
Notify a waiting take if needed * |
public synchronized int capacity() {
return capacity_;
}
Return the current capacity of this queue * |
protected synchronized Object extract() {
synchronized(head_) {
Object x = null;
LinkedNode first = head_.next;
if (first != null) {
x = first.value;
first.value = null;
head_ = first;
++takeSidePutPermits_;
notify();
}
return x;
}
}
Main mechanics for take/poll * |
protected void insert(Object x) {
--putSidePutPermits_;
LinkedNode p = new LinkedNode(x);
synchronized(last_) {
last_.next = p;
last_ = p;
}
}
Create and insert a node.
Call only under synch on putGuard_
* |
public boolean isEmpty() {
synchronized(head_) {
return head_.next == null;
}
}
|
public boolean offer(Object x,
long msecs) throws InterruptedException {
if (x == null) throw new IllegalArgumentException();
if (Thread.interrupted()) throw new InterruptedException();
synchronized(putGuard_) {
if (putSidePutPermits_ < = 0) {
synchronized(this) {
if (reconcilePutPermits() < = 0) {
if (msecs < = 0)
return false;
else {
try {
long waitTime = msecs;
long start = System.currentTimeMillis();
for(;;) {
wait(waitTime);
if (reconcilePutPermits() > 0) {
break;
}
else {
waitTime = msecs - (System.currentTimeMillis() - start);
if (waitTime < = 0) {
return false;
}
}
}
}
catch (InterruptedException ex) {
notify();
throw ex;
}
}
}
}
}
insert(x);
}
allowTake();
return true;
}
|
public Object peek() {
synchronized(head_) {
LinkedNode first = head_.next;
if (first != null)
return first.value;
else
return null;
}
}
|
public Object poll(long msecs) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Object x = extract();
if (x != null)
return x;
else {
synchronized(takeGuard_) {
try {
long waitTime = msecs;
long start = (msecs < = 0)? 0: System.currentTimeMillis();
for (;;) {
x = extract();
if (x != null || waitTime < = 0) {
return x;
}
else {
takeGuard_.wait(waitTime);
waitTime = msecs - (System.currentTimeMillis() - start);
}
}
}
catch(InterruptedException ex) {
takeGuard_.notify();
throw ex;
}
}
}
}
|
public void put(Object x) throws InterruptedException {
if (x == null) throw new IllegalArgumentException();
if (Thread.interrupted()) throw new InterruptedException();
synchronized(putGuard_) {
if (putSidePutPermits_ < = 0) { // wait for permit.
synchronized(this) {
if (reconcilePutPermits() < = 0) {
try {
for(;;) {
wait();
if (reconcilePutPermits() > 0) {
break;
}
}
}
catch (InterruptedException ex) {
notify();
throw ex;
}
}
}
}
insert(x);
}
// call outside of lock to loosen put/take coupling
allowTake();
}
|
protected final int reconcilePutPermits() {
putSidePutPermits_ += takeSidePutPermits_;
takeSidePutPermits_ = 0;
return putSidePutPermits_;
}
Move put permits from take side to put side;
return the number of put side permits that are available.
Call only under synch on puGuard_ AND this. |
public void setCapacity(int newCapacity) {
if (newCapacity < = 0) throw new IllegalArgumentException();
synchronized (putGuard_) {
synchronized(this) {
takeSidePutPermits_ += (newCapacity - capacity_);
capacity_ = newCapacity;
// Force immediate reconcilation.
reconcilePutPermits();
notifyAll();
}
}
}
Reset the capacity of this queue.
If the new capacity is less than the old capacity,
existing elements are NOT removed, but
incoming puts will not proceed until the number of elements
is less than the new capacity. |
public synchronized int size() {
/*
This should ideally synch on putGuard_, but
doing so would cause it to block waiting for an in-progress
put, which might be stuck. So we instead use whatever
value of putSidePutPermits_ that we happen to read.
*/
return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
}
Return the number of elements in the queue.
This is only a snapshot value, that may be in the midst
of changing. The returned value will be unreliable in the presence of
active puts and takes, and should only be used as a heuristic
estimate, for example for resource monitoring purposes. |
public Object take() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Object x = extract();
if (x != null)
return x;
else {
synchronized(takeGuard_) {
try {
for (;;) {
x = extract();
if (x != null) {
return x;
}
else {
takeGuard_.wait();
}
}
}
catch(InterruptedException ex) {
takeGuard_.notify();
throw ex;
}
}
}
}
|