BlockingQueue---PriorityBlockingQueue
总结
一个无界的并发队列。
按照元素的优先级顺序来处理元素。这种队列非常适合需要按照优先级处理任务的场景。
特性
- 无界:默认情况下是无界的,可以存储任意数量的元素。
- 基于优先级:队列中的元素根据它们的自然顺序或者由构造时提供的
Comparator来排序。 - 线程安全:支持多线程同时进行插入和移除操作。
- 非阻塞:插入操作不会被阻塞,即使队列中已经有很多元素;但是取元素的操作可能会被阻塞,如果队列为空。
构造函数
PriorityBlockingQueue(): 创建一个默认初始容量为11的PriorityBlockingQueue。PriorityBlockingQueue(int initialCapacity): 创建一个具有指定初始容量的PriorityBlockingQueue。PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator): 创建一个具有指定初始容量并使用指定比较器的PriorityBlockingQueue。PriorityBlockingQueue(Collection<? extends E> c): 用给定集合中的元素初始化队列。PriorityBlockingQueue(PriorityBlockingQueue<? extends E> c): 用另一个PriorityBlockingQueue初始化当前队列。PriorityBlockingQueue(SortedSet<? extends E> c): 用给定有序集合中的元素初始化队列。
方法
-
插入操作:
put(E e): 将指定的元素插入此优先级队列中。这个方法实际上等同于offer(E e),因为PriorityBlockingQueue是无界的。offer(E e): 将指定的元素插入此优先级队列中。由于队列是无界的,所以总是会成功添加元素。
-
移除操作:
take(): 检索并移除此队列的头(最高优先级),如果队列为空,则等待有元素可用。poll(): 检索并移除此队列的头(最高优先级),如果队列为空,则立即返回null。poll(long timeout, TimeUnit unit): 检索并移除此队列的头(最高优先级),如果队列为空,则最多等待指定的时间,如果超时队列仍然为空则返回null。
-
检查操作:
peek(): 检索但不移除此队列的头(最高优先级);如果队列为空,则返回null。
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
// 插入指定元素到优先级队列
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null) //
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
}