|
| 1 | +--- |
| 2 | +title: Java 容器之 Queue |
| 3 | +date: 2018/06/29 |
| 4 | +categories: |
| 5 | +- javase |
| 6 | +tags: |
| 7 | +- java |
| 8 | +- javase |
| 9 | +- container |
| 10 | +--- |
| 11 | + |
| 12 | +# Java 容器之 Queue |
| 13 | + |
| 14 | +<!-- TOC depthFrom:2 depthTo:2 --> |
| 15 | + |
| 16 | +- [Queue 架构](#queue-架构) |
| 17 | +- [Queue 接口](#queue-接口) |
| 18 | +- [BlockingQueue 接口](#blockingqueue-接口) |
| 19 | +- [AbstractQueue 抽象类](#abstractqueue-抽象类) |
| 20 | +- [PriorityQueue 类](#priorityqueue-类) |
| 21 | +- [PriorityBlockingQueue 类](#priorityblockingqueue-类) |
| 22 | +- [LinkedBlockingQueue 类](#linkedblockingqueue-类) |
| 23 | +- [ArrayBlockingQueue 类](#arrayblockingqueue-类) |
| 24 | +- [SynchronousQueue](#synchronousqueue) |
| 25 | +- [资料](#资料) |
| 26 | + |
| 27 | +<!-- /TOC --> |
| 28 | + |
| 29 | +## Queue 架构 |
| 30 | + |
| 31 | +<div align="center"> |
| 32 | +<img src="https://raw.githubusercontent.com/dunwu/javase-notes/master/images/container/Queue-diagrams.png" /> |
| 33 | +</div> |
| 34 | + |
| 35 | +## Queue 接口 |
| 36 | + |
| 37 | +Queue 接口定义如下: |
| 38 | + |
| 39 | +```java |
| 40 | +public interface Queue<E> extends Collection<E> {} |
| 41 | +``` |
| 42 | + |
| 43 | +## BlockingQueue 接口 |
| 44 | + |
| 45 | +BlockingQueue 接口定义如下: |
| 46 | + |
| 47 | +```java |
| 48 | +public interface BlockingQueue<E> extends Queue<E> {} |
| 49 | +``` |
| 50 | + |
| 51 | +BlockingQueue 顾名思义,是一个阻塞队列。 |
| 52 | + |
| 53 | +在 BlockingQueue 中,如果获取队列元素但是队列为空时,会阻塞,等待队列中有元素再返回;如果添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。 |
| 54 | + |
| 55 | +BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用: |
| 56 | + |
| 57 | +1. 抛出异常; |
| 58 | +2. 返回特殊值(null 或 true/false,取决于具体的操作); |
| 59 | +3. 阻塞等待此操作,直到这个操作成功; |
| 60 | +4. 阻塞等待此操作,直到成功或者超时指定时间。 |
| 61 | + |
| 62 | +总结如下: |
| 63 | + |
| 64 | +| | _Throws exception_ | _Special value_ | _Blocks_ | _Times out_ | |
| 65 | +| ------- | ------------------ | --------------- | ---------------- | -------------------- | |
| 66 | +| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) | |
| 67 | +| Remove | remove() | poll() | take() | poll(time, unit) | |
| 68 | +| Examine | element() | peek() | _not applicable_ | _not applicable_ | |
| 69 | + |
| 70 | +BlockingQueue 的各个实现类都遵循了这些规则。 |
| 71 | + |
| 72 | +BlockingQueue 不接受 null 值元素。 |
| 73 | + |
| 74 | +## AbstractQueue 抽象类 |
| 75 | + |
| 76 | +AbstractQueue 抽象类定义如下: |
| 77 | + |
| 78 | +```java |
| 79 | +public abstract class AbstractQueue<E> |
| 80 | + extends AbstractCollection<E> |
| 81 | + implements Queue<E> {} |
| 82 | +``` |
| 83 | + |
| 84 | +AbstractQueue 类提供 Queue 接口的骨干实现,以最大限度地减少实现 Queue 接口所需的工作。 |
| 85 | + |
| 86 | +## PriorityQueue 类 |
| 87 | + |
| 88 | +PriorityQueue 类定义如下: |
| 89 | + |
| 90 | +```java |
| 91 | +public class PriorityQueue<E> extends AbstractQueue<E> |
| 92 | + implements java.io.Serializable {} |
| 93 | +``` |
| 94 | + |
| 95 | +### PriorityQueue 要点 |
| 96 | + |
| 97 | +1. PriorityQueue 实现了 Serializable,支持序列化。 |
| 98 | +2. PriorityQueue 类是基于优先级堆实现的无界优先级队列。 |
| 99 | +3. PriorityQueue 中的元素根据自然顺序或 Comparator 提供的顺序排序。 |
| 100 | +4. PriorityQueue 不接受 null 值元素。 |
| 101 | +5. PriorityQueue 不是线程安全的。 |
| 102 | + |
| 103 | +## PriorityBlockingQueue 类 |
| 104 | + |
| 105 | +PriorityBlockingQueue 类定义如下: |
| 106 | + |
| 107 | +```java |
| 108 | +public class PriorityBlockingQueue<E> extends AbstractQueue<E> |
| 109 | + implements BlockingQueue<E>, java.io.Serializable {} |
| 110 | +``` |
| 111 | + |
| 112 | +### PriorityBlockingQueue 要点 |
| 113 | + |
| 114 | +1. PriorityBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。 |
| 115 | +2. PriorityBlockingQueue 实现了 Serializable,支持序列化。 |
| 116 | +3. PriorityBlockingQueue 可以视为 PriorityQueue 的线程安全版本。 |
| 117 | +4. PriorityBlockingQueue 不接受 null 值元素。 |
| 118 | +5. PriorityBlockingQueue 的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。 |
| 119 | + |
| 120 | +### PriorityBlockingQueue 原理 |
| 121 | + |
| 122 | +PriorityBlockingQueue 有两个重要成员: |
| 123 | + |
| 124 | +```java |
| 125 | +private transient Object[] queue; |
| 126 | +private final ReentrantLock lock; |
| 127 | +``` |
| 128 | + |
| 129 | +- queue 是一个 Object 数组,用于保存 PriorityBlockingQueue 的元素。 |
| 130 | +- 而可重入锁 lock 则用于在执行插入、删除操作时,保证这个方法在当前线程释放锁之前,其他线程不能访问。 |
| 131 | + |
| 132 | +PriorityBlockingQueue 的容量虽然有初始化大小,但是不限制大小,如果当前容量已满,插入新元素时会自动扩容。 |
| 133 | + |
| 134 | +## LinkedBlockingQueue 类 |
| 135 | + |
| 136 | +LinkedBlockingQueue 类定义如下: |
| 137 | + |
| 138 | +```java |
| 139 | +public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
| 140 | + implements BlockingQueue<E>, java.io.Serializable {} |
| 141 | +``` |
| 142 | + |
| 143 | +### LinkedBlockingQueue 要点 |
| 144 | + |
| 145 | +1. LinkedBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。 |
| 146 | +2. LinkedBlockingQueue 实现了 Serializable,支持序列化。 |
| 147 | +3. LinkedBlockingQueue 是基于单链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。 |
| 148 | +4. LinkedBlockingQueue 中元素按照插入顺序保存(FIFO)。 |
| 149 | + |
| 150 | +### LinkedBlockingQueue 原理 |
| 151 | + |
| 152 | +```java |
| 153 | +// 队列容量 |
| 154 | +private final int capacity; |
| 155 | + |
| 156 | +// 队列中的元素数量 |
| 157 | +private final AtomicInteger count = new AtomicInteger(0); |
| 158 | + |
| 159 | +// 队头 |
| 160 | +private transient Node<E> head; |
| 161 | + |
| 162 | +// 队尾 |
| 163 | +private transient Node<E> last; |
| 164 | + |
| 165 | +// take, poll, peek 等读操作的方法需要获取到这个锁 |
| 166 | +private final ReentrantLock takeLock = new ReentrantLock(); |
| 167 | + |
| 168 | +// 如果读操作的时候队列是空的,那么等待 notEmpty 条件 |
| 169 | +private final Condition notEmpty = takeLock.newCondition(); |
| 170 | + |
| 171 | +// put, offer 等写操作的方法需要获取到这个锁 |
| 172 | +private final ReentrantLock putLock = new ReentrantLock(); |
| 173 | + |
| 174 | +// 如果写操作的时候队列是满的,那么等待 notFull 条件 |
| 175 | +private final Condition notFull = putLock.newCondition(); |
| 176 | +``` |
| 177 | + |
| 178 | +这里用了两个锁,两个 Condition,简单介绍如下: |
| 179 | + |
| 180 | +- takeLock 和 notEmpty 搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。 |
| 181 | +- putLock 需要和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。 |
| 182 | + |
| 183 | +## ArrayBlockingQueue 类 |
| 184 | + |
| 185 | +ArrayBlockingQueue 类定义如下: |
| 186 | + |
| 187 | +```java |
| 188 | +public class ArrayBlockingQueue<E> extends AbstractQueue<E> |
| 189 | + implements BlockingQueue<E>, java.io.Serializable {} |
| 190 | +``` |
| 191 | + |
| 192 | +### ArrayBlockingQueue 要点 |
| 193 | + |
| 194 | +1. ArrayBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。 |
| 195 | +2. ArrayBlockingQueue 实现了 Serializable,支持序列化。 |
| 196 | +3. ArrayBlockingQueue 是基于数组实现的无界阻塞队列。 |
| 197 | + |
| 198 | +### ArrayBlockingQueue 原理 |
| 199 | + |
| 200 | +ArrayBlockingQueue 的重要成员如下: |
| 201 | + |
| 202 | +```java |
| 203 | +// 用于存放元素的数组 |
| 204 | +final Object[] items; |
| 205 | +// 下一次读取操作的位置 |
| 206 | +int takeIndex; |
| 207 | +// 下一次写入操作的位置 |
| 208 | +int putIndex; |
| 209 | +// 队列中的元素数量 |
| 210 | +int count; |
| 211 | + |
| 212 | +// 以下几个就是控制并发用的同步器 |
| 213 | +final ReentrantLock lock; |
| 214 | +private final Condition notEmpty; |
| 215 | +private final Condition notFull; |
| 216 | +``` |
| 217 | + |
| 218 | +ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。 |
| 219 | + |
| 220 | +- 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。 |
| 221 | +- 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除,然后唤醒写线程队列的第一个等待线程。 |
| 222 | + |
| 223 | +对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数: |
| 224 | + |
| 225 | +1. 队列容量,其限制了队列中最多允许的元素个数; |
| 226 | +2. 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁; |
| 227 | +3. 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。 |
| 228 | + |
| 229 | +## SynchronousQueue |
| 230 | + |
| 231 | +SynchronousQueue 定义如下: |
| 232 | + |
| 233 | +```java |
| 234 | +public class SynchronousQueue<E> extends AbstractQueue<E> |
| 235 | + implements BlockingQueue<E>, java.io.Serializable {} |
| 236 | +``` |
| 237 | + |
| 238 | +1. SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。 |
| 239 | +2. SynchronousQueue 的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。 |
| 240 | +3. SynchronousQueue 中不能使用 peek 方法(在这里这个方法直接返回 null),peek 方法的语义是只读取不移除,显然,这个方法的语义是不符合 SynchronousQueue 的特征的。 |
| 241 | +4. SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代的。 |
| 242 | +5. 虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的。 |
| 243 | +6. 当然,SynchronousQueue 也不允许传递 null 值的(并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)。 |
| 244 | + |
| 245 | +## 资料 |
| 246 | + |
| 247 | +[解读 Java 并发队列 BlockingQueue](http://www.importnew.com/28053.html) |
0 commit comments