BlockingQueue---DelayQueue
总结
一个无界阻塞队列;
FIFO;
只包含实现了 Delayed 接口的元素,每个元素都有一个延迟时间,在该延迟时间结束之前,该元素不会从队列中可用。一旦元素的延迟到期,它就可以被取出了,并且取出的顺序是按照延迟到期的时间先后进行的。
通常用于实现定时任务调度、缓存过期等场景。例如,可以用来管理超时连接或自动删除过期的数据。
特性
- 无界:可以容纳任意数量的元素。
- 基于延迟:元素只有在其延迟时间到期后才可被取出。
- 线程安全:支持多线程同时进行插入和移除操作。
- 阻塞:取元素的操作(如
take())如果在没有延迟到期的元素时会被阻塞,直到有元素的延迟到期。
构造函数
DelayQueue(): 创建一个新的 DelayQueue 实例。
方法
-
插入操作:
put(E e): 将指定的延迟元素插入此队列。由于DelayQueue只接受实现了Delayed接口的对象,所以这里的E必须是Delayed类型。offer(E e): 尝试将指定的延迟元素插入此队列。与put方法类似,但不会抛出异常。
-
移除操作:
take(): 检索并移除此队列的头(其延迟已过的元素),如果队列为空,则等待有元素可用。poll(): 检索并移除此队列的头(其延迟已过的元素),如果队列为空,则立即返回null。poll(long timeout, TimeUnit unit): 检索并移除此队列的头(其延迟已过的元素),如果队列为空,则最多等待指定的时间,如果超时队列仍然为空则返回null。
-
检查操作:
peek(): 检索但不移除此队列的头(其延迟已过的元素);如果队列为空,则返回null。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
public DelayQueue() {}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
}
public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {
private static final int DEFAULT_INITIAL_CAPACITY = 11;
transient Object[] queue;
private int size = 0;
private final Comparator<? super E> comparator;
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
}
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}
}
示例
static class DelayedElement implements Delayed {
private final long delayTime; // 延迟时间
private final long expire; // 到期时间
private final String name; // 元素名称
public DelayedElement(long delay, String name) {
this.delayTime = delay;
this.name = name;
this.expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
if (other instanceof DelayedElement) {
DelayedElement otherDE = (DelayedElement) other;
return Long.compare(this.expire, otherDE.expire);
}
long diff = getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
@Override
public String toString() {
return "DelayedElement{" +
"name='" + name + '\'' +
", delay=" + delayTime + "ms" +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedElement> queue = new DelayQueue<>();
// 添加一些延迟元素
queue.put(new DelayedElement(1000, "One"));
queue.put(new DelayedElement(500, "Two"));
queue.put(new DelayedElement(2000, "Three"));
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
DelayedElement element = queue.take();
System.out.println("Consumed: " + element);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
consumer.start();
// 等待一段时间以观察输出
Thread.sleep(3000);
consumer.interrupt(); // 停止消费者
// 结果:
// Consumed: DelayedElement{name='Two', delay=500ms}
//Consumed: DelayedElement{name='One', delay=1000ms}
//Consumed: DelayedElement{name='Three', delay=2000ms}
}