BlockingQueue---PriorityBlockingQueue

行者 / 2024-09-20 / 原文

总结

  一个无界的并发队列。

  按照元素的优先级顺序来处理元素。这种队列非常适合需要按照优先级处理任务的场景。

特性

  • 无界:默认情况下是无界的,可以存储任意数量的元素。
  • 基于优先级:队列中的元素根据它们的自然顺序或者由构造时提供的 Comparator 来排序
  • 线程安全:支持多线程同时进行插入和移除操作。
  • 非阻塞插入操作不会被阻塞,即使队列中已经有很多元素;但是取元素的操作可能会被阻塞,如果队列为空。

构造函数

  • PriorityBlockingQueue(): 创建一个默认初始容量为11的 PriorityBlockingQueue
  • PriorityBlockingQueue(int initialCapacity): 创建一个具有指定初始容量的 PriorityBlockingQueue
  • PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator): 创建一个具有指定初始容量并使用指定比较器的 PriorityBlockingQueue
  • PriorityBlockingQueue(Collection<? extends E> c): 用给定集合中的元素初始化队列。
  • PriorityBlockingQueue(PriorityBlockingQueue<? extends E> c): 用另一个 PriorityBlockingQueue 初始化当前队列。
  • PriorityBlockingQueue(SortedSet<? extends E> c): 用给定有序集合中的元素初始化队列。

方法

  • 插入操作:

    • put(E e): 将指定的元素插入此优先级队列中。这个方法实际上等同于 offer(E e),因为 PriorityBlockingQueue 是无界的。
    • offer(E e): 将指定的元素插入此优先级队列中。由于队列是无界的,所以总是会成功添加元素。
  • 移除操作:

    • take(): 检索并移除此队列的头最高优先级),如果队列为空,则等待有元素可用。
    • poll(): 检索并移除此队列的头最高优先级),如果队列为空,则立即返回 null
    • poll(long timeout, TimeUnit unit): 检索并移除此队列的头(最高优先级),如果队列为空,则最多等待指定的时间,如果超时队列仍然为空则返回 null
  • 检查操作:

    • peek(): 检索但不移除此队列的头(最高优先级);如果队列为空,则返回 null

 

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
        private static final int DEFAULT_INITIAL_CAPACITY = 11;
        private transient Object[] queue;
        private transient int size;
        private transient Comparator<? super E> comparator;
        private final ReentrantLock lock;

        public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }

        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }

        public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }

        // 插入指定元素到优先级队列
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)                                        //
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

        private E dequeue() {
            int n = size - 1;
            if (n < 0)
                return null;
            else {
                Object[] array = queue;
                E result = (E) array[0];
                E x = (E) array[n];
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftDownComparable(0, x, array, n);
                else
                    siftDownUsingComparator(0, x, array, n, cmp);
                size = n;
                return result;
            }
        }
    }