线程延时队列DelayQueue

  • 1、DelayQueue的作用
  • 2、DelayQueue使用规则
  • 3、DelayQueue的Demo
  • 4、DelayQueue原理 *
  • 5、DelayQueue的实现 *

1、DelayQueue的作用

作用便是线程调度机制通过粒度特别细的时间控制相应的元素在合适的时间出队,使用场景也是非常多的比如:缓存机制当中使用这个队列来控制缓存队列当中的缓存是否过期,如果过期则执行缓存过期以后的相应逻辑。

2、DelayQueue使用规则

DelayQueue从名字,我们知道是一个队列,并且继承自BlockingQueue,是一个阻塞队列,那么就有着阻塞队列的特点,当队列为空的时候,take方法将阻塞,这里需要注意的便是DelayQueue是一个无届队列,也就意味着,没有当队列为满的时候,所以就没有队满。put/add 阻塞的情况,其次DelayQueue最显著的特点便是,这个队列当中根据时间为优先级对队中的元素进行排序,如果我们take一个元素的时候,这个元素的时间值没有到,便会自动阻塞,当时间到以后take到元素。在DelayQueue这个队列当中所描述的任何一种元素必须实现Delay接口,而这个接口当中便定义了两个方法,我们通过这两个方法来进行队当中的排序,以及take的时候,相应的时间粒度的控制。

时间最长的在对头还是时间最短的在对头,这个主要看如何实现元素的 compareTo 方法。

  • public int compareTo(T o);
  • long getDelay(TimeUnit unit);

这里需要注意的是在getDelay当中获取时间的单位是NANOSECONDS,所以在写getDelay这个方法的时候注意使用的时间单位是NANOSECONDS。

3、DelayQueue的Demo

下面我们通过一个例子来看一下这个DelayQueue到底应该如何使用。本例子当中我们先根据时间短的先执行,打印了一遍执行顺序,然后通过一个List将原本初始化的时候的Runnable进行了保存,然后在最后进行打印,打印的顺序,便是初始化的顺序,也就是入队的时候的顺序。还有在方括号当中便是延时的时间,可以通过运行看一下,是否是等了相应的时间打印出来的这句话。目的就是让大家可以很直观的看到,我们通过DelayQueue是按照一个什么顺序进行排序的,最后通过一个任务将整个系统结束掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/***
* 探究无届队列DelayQueue
* @author pengchengliu
*
*/
public class TestDelayQueue {
public static void main(String[] args) {
Random random = new Random(47);
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> delayQueue = new DelayQueue<DelayedTask>();

// 初始化 delayQueue ,
for (int i = 0; i < 20; i++) {
delayQueue.put(new DelayedTask(random.nextInt(5000)));
}
delayQueue.add(new DelayedTask.EndSentinel(5000, newCachedThreadPool));

// 启动任务开关
newCachedThreadPool.execute(new DelayedTaskConsumer(delayQueue));
}
}
class DelayedTask implements Runnable, Delayed {
private static int counter = 0 ;
private final int id = counter ++ ;
private final int delta ;
private final long trigger ;
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();

public DelayedTask(int delayInMilliseconds) {
this.delta = delayInMilliseconds ;
// 从较细粒度到较粗粒度的舍位转换,这样会失去精确性。例如,将 999 毫秒转换为秒的结果为 0。
// 使用参数从较粗粒度到较细粒度转换,如果参数为负,则在数字上溢出至 Long.MIN_VALUE,
// 如果为正,则为 Long.MAX_VALUE。 将1毫秒转换为毫微秒
trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(this.delta, TimeUnit.MILLISECONDS);
sequence.add(this) ;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed delayed) {
DelayedTask task = (DelayedTask) delayed ;
if (this.delta < task.delta) return -1 ;
if (this.delta > task.delta) return 1 ;
return 0;
}

@Override
public void run() {
System.out.println(this + " ");
}

@Override
public String toString() {
return String.format("[%1$-4d]", delta) + " Task:" + id;
}

public String summary () {
return "(" + id + ":" + delta + ")";
}


public static class EndSentinel extends DelayedTask {

private ExecutorService exec ;

public EndSentinel(int delayInMilliseconds, ExecutorService exec) {
super(delayInMilliseconds);
this.exec = exec ;
}

@Override
public void run() {
for (DelayedTask task : sequence) {
System.out.print(task.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow() ;
}

}
}
class DelayedTaskConsumer implements Runnable {

private DelayQueue<DelayedTask> q ;

public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q ;
}

@Override
public void run() {
try {
while (!Thread.interrupted()){
//注意:手动调用run方法
q.take().run();
}
} catch (InterruptedException e) {}
System.out.println("Finish DelayedTaskConsumer");
}
}
[128 ] Task:11
[200 ] Task:7
[429 ] Task:5
[520 ] Task:18
[555 ] Task:1
[961 ] Task:4
[998 ] Task:16
[1207] Task:9
[1693] Task:2
[1809] Task:14
[1861] Task:3
[2278] Task:15
[3288] Task:10
[3551] Task:12
[4258] Task:0
[4258] Task:19
[4522] Task:8
[4589] Task:13
[4861] Task:17
[4868] Task:6
(0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task:20 Calling shutdownNow()
Finish DelayedTaskConsumer