DelayQueue简介 DelayQueue(延迟队列)是java.util.concurrent包下的适用于一些非即时执行场景下的并发集合。
数据以PriorityQueue的结构存储,借助ReentrantLock保证线程安全,使用Condition完成对线程
的精确控制。
类定义如下:
1 2 public class DelayQueue <E extends Delayed > extends AbstractQueue <E > implements BlockingQueue <E >
从类定义可以看到,队列中的元素对象都需要实现Delayed接口,通过实现Delayed的compareTo()和getDelay()方法实现元素的排序和取出消费的判断。 而DelayQueue本身是BlockingQueue的一个实现,未到执行时间的元素对象不会被取出,而是阻塞当前线程让其等待至任务的执行时间。
属性信息 1 2 3 4 5 6 7 8 9 10 11 private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null ;private final Condition available = lock.newCondition();
这里定义的线程leader,参考的多线程的Leader/Follower模式设计。 其思想是当有多个消费者线程去获取队列的元素对象时,同一个时刻只有一个线程成为leader等待队首对象,当取得队首对象时就通知其他的线程取代他成为leader等待下一个队首。
Condition这里用来精确的控制线程,当等待的队首对象还未到执行时间时,会使用Condition的await()方法让当前线程等待。
核心方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } } public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0 ) return null ; else return q.poll(); } finally { lock.unlock(); } } public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
DelayQueue最重要的方法便是take(),消费者线程通过调用take()去依次取出队首任务进行处理。 只要理解了Leader/Follower模式就不难理解take()的逻辑。
示例 这里用一个单线程生产者/消费者的示例展示下DelayQueue的基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class DelayTask implements Delayed { private final Long delay; private final Long exprie; private final Long create; private final String msg; public DelayTask (Long delay, String msg) { this .delay = delay; this .create = System.currentTimeMillis(); this .exprie = create + delay; this .msg = msg; } @Override public long getDelay (TimeUnit unit) { long now = System.currentTimeMillis(); return unit.convert(exprie - now, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { long delay1 = getDelay(TimeUnit.SECONDS); long delay2 = o.getDelay(TimeUnit.SECONDS); return Long.compare(delay1, delay2); } @Override public String toString () { StringBuilder sb = new StringBuilder(); sb.append("delay:" ).append(getDelay(TimeUnit.SECONDS)).append("msg:" ).append(msg); return sb.toString(); } } public interface Delayed extends Comparable <Delayed > { long getDelay (TimeUnit unit) ; }
DelayQueue中的元素类需要实现Delayed,实现getDelay()计算任务的剩余执行时间。 PriorityQueue中的元素都需要继承Comparable,否则无法排序,这里是通过让Delayed继承来实现,然后在子类中重写compareTo()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static void producer (DelayQueue<DelayTask> queue) { new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { String msg = "aaaaa" + i; try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } DelayTask task = new DelayTask(100000L , msg); queue.offer(task); logger.info("生产任务:{}" , task.toString()); } }).start(); } public static void consumer (DelayQueue<DelayTask> queue) { new Thread(() -> { while (true ) { DelayTask task = null ; try { task = queue.take(); } catch (InterruptedException e) { logger.info("消费任务异常:{}" , e.getLocalizedMessage()); } if (null != task) { logger.info("消费任务:{}" , task.toString()); } else { logger.info("没有待消费的任务" ); } } }).start(); }
小结 DelayQueue可应用于一些执行时间较为灵活的场景,比如开课前30分钟发送通知,但是课程的时间并不
固定,就可以动态获取课程上课时间后定义一个延迟任务等待执行。
示例中为了方便采用的单线程,但实际开发中,应该用多个线程作为消费者去处理队列中的任务。特别
是当任务的逻辑较为复杂时,单线程处理会导致后续任务超时,至于线程数可以根据实际测试去设置。
在写示例的过程中,有想到一个问题,就是如果有较多的任务需要在同一个时间节点执行,这时一个
DelayQueue就无法处理。肯定会有大量的任务超时。我的想法是,如果不考虑其他方案,可能需要采用多
个DelayQueue,同一个队列中避免执行时间相同的任务。在实际开发中,我们也要注意是否会有大量任务
的执行时间点一样。