线程池队列
线程池队列是指当线程池中核心线程满了之后,用于存放任务的工作队列,线程池中的队列有如下7种:
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
当查看这些队列的源码会发现除了LinkedBlockingDeque
,它们都实现了BlockQueue
接口,那么就先从BlockQueue
接口开始学习。
BlockQueue
接口
BlockQueue
是一个接口,它继承于Queue
,相对于Queue
不同的是,BlockQueue
支持下面两个操作:在检索元素时会等待队列变为非空;在存储元素时会等待队列的空间变为可用。
BlockQueue
方法有4种形式,用不同的方式去处理操作,可以把它这四种形式总结为下表:
Throws exception | Return Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e,time,unit) |
Remove | remove() | poll() | take() | poll(time,unit) |
Examine | element() | peek() | not applicable | not applicable |
BlockQueue
不接受null
元素。当有null
元素企图添加时会抛出NullPointerException
,通常null
用作标记值,表示轮询操作失败。
一个BlockQueue
可能有容量限制,在任何给定的时间都会有一个剩余容量,超出这个容量之后就不能在堵塞的情况下添加新元素。没有任何容量约束的堵塞队列总是会报道剩余容量为Integer.MAX_VALUE
。
BlockQueue
是线程安全的,所有的排队方法都是使用内部锁或其他形式的的并发控制来自动的达到它们的效果。但是,除非在实现种另外指定,否则批量收集操作addAll
、retainAll
、removeAll
以及containAll
不一定是自动执行的。举个例子:addAll(e)在只添加了e中的一些元素之后可能会失败或者抛出异常。
BlockQueue
实现主要用于生产者-消费者队列,但还支持集合接口。因此,可以使用remove(x)从队列中删除任意元素。然而,这样的操作通常执行得不是很有效,并且只打算偶尔使用,例如当队列消息被取消时。
基于一个典型的生产者-消费者的场景,其实用方法如下:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
内存一致性效应
:与其他并发集合一样,在将对象放入BlockingQueue之前的线程中的操作发生在从另一个线程中的BlockingQueue中访问或删除该元素之后的操作之前。
ArrayBlockingQueue
ArrayBlockingQueue
是BlockingQueue
的实现类,其类图如下:
ArrayBlockingQueue
是一个由数组支持的堵塞队列。这个队列对元素以先进先出(FIFO)规则进行排序。队列头部是进入队列时间最久的元素,队列尾部是进入队列时间最短的元素。新的元素会被插入到队尾,队列的检索元素操作会获取队列头部的元素。
这是一个典型的“有界缓冲区”,由大小固定的数组来保存生产者插入以及消费者需要提取的元素。一旦这个队列被创建,那么它的容量就是不可变的。如果往一个已满的队列中进行一个put
元素的操作那么该操作会被堵塞,如果对一个空队列进行take
操作那么该操作同样也会被堵塞。
核心变量以及常量
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
核心方法
既然是堵塞队列,那么我们就讲讲它的两个堵塞操作方法:
- put(E e)
这个方法是插入元素到队列的尾部,如果队列已满那么就会处于等待状态,其实现方式如下:
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//排队队列是否已满,如果满了就堵塞
while (count == items.length)
notFull.await();
//入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
这个方法逻辑并不复杂,需要注意的是,当队列满了之后,就会堵塞,直到元素被消费。我们来看看它入队的实现方式:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//判断队列是否已满
if (++putIndex == items.length)
//如果已满,那么就将下一个put的位置重置到初始位置
putIndex = 0;
count++;
//唤醒其他堵塞的出队操作
notEmpty.signal();
}
它在入队的时候,会判断队列是否满了,如果满了,那么就将下一个put的位置重置到初始位置,然后将队列的元素加一,并唤醒其它堵塞的出队操作。
- take
这个方法是获取队列头部的元素,如果队列为空,那么获取操作会被堵塞,直到新的元素被添加进来。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
其实take
操作和上面讲的put(E e)
没什么区别,唯一不同的就是堵塞的条件不一样而已,一个是当队列满了之后后续put
操作堵塞,一个是当队列为空时后续take
操作堵塞。我们主要来看看dequeue()
方法:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
假设队列的初始状态为满队列状态,我们通过上述的代码可以演示出队列过程:
先获取takeIndex
下表的值,并将该下标的值清空,然后判断下一个take位置是否为队列的最后一个位置,如果是,那么就将该takeIndex
重置为0,形成一个唤醒队列,最后将队列中的元素减一,唤醒其他堵塞的入队操作,并返回获取的值,然后重复上述过程。
下面的代码时使用ArrayBlockingQueue
的一个例子:
public class LearnArrayBlockingQueue {
private static final AtomicInteger count = new AtomicInteger();
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(10,false);
Producer producer = new Producer(blockingQueue);
Consumer consumer1 = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer1).start();
}
public static class Producer implements Runnable{
private final BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for(int i=0;i<10000000;i++) {
this.queue.put(producer());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private Object producer() {
return "Hello"+count.getAndIncrement();
}
}
public static class Consumer implements Runnable{
private final BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true){
consumer(this.queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consumer(Object object){
if(object instanceof String){
System.out.println((String)object);
}
}
}
}
LinkedBlockingQueue
LinkedBlockingQueue
是BlockingQueue
的实现类,其类图如下:
LinkedBlockingQueue
是一个基于链表的可选边界堵塞队列。这个队列对元素以先进先出(FIFO)的规则进行排序。队列头部是进入队列时间最久的元素,队列尾部是进入队列时间最短的元素。新的元素会被插入到队尾,队列的检索元素操作会获取队列头部的元素。列表队列通常比基于数组的队列有更高的吞吐量,但在大多数并发程序中可预测性较差。
可选的容量绑定构造函数参数用作防止队列过度扩展的一种方法。容量(如果未指定)等于Integer.MAX_VALUE。在每次插入时动态创建链接节点,除非这会使队列超出容量。
它跟LinkedBlockingQueue
一样,当队列满了之后后续的put
操作会被堵塞,当队列为空时,后续的take
操作同样也会被堵塞。
核心变量以及常量
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
核心方法
同样我们来看看这个队列的核心方法put
以及take
方法:
- put(E e)
将方法插入到队列的尾部,如果队列满了,那么就等待直到队列中有合适的空间插入。其实现代码如下:
public void put(E e) throws InterruptedException {
//对象为空时抛出空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
//创建新节点
Node<E> node = new Node<E>(e);
//获得锁
final ReentrantLock putLock = this.putLock;
//获得当前队列中的元素个数
final AtomicInteger count = this.count;
//显示锁,该锁类型为可中断锁
putLock.lockInterruptibly();
try {
//判断队列是否已满,如果满了就堵塞
while (count.get() == capacity) {
notFull.await();
}
//将元素入队
enqueue(node);
//队列元素个数+1
c = count.getAndIncrement();
//判断下一个入队的元素是否会超过边界,如果没有超过就唤醒等待的入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//唤醒其他堵塞的出队线程
if (c == 0)
signalNotEmpty();
}
其中入队的实现方式如下:
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
- offer(E e)
该方法也是插入元素的一种,跟put
方法不一样的是,它在每次插入之后都会返回一个特定的值,用来反馈队列是否已满。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//用来判断队列是否已满,如果满了就返回false,代表插入失败
if (count.get() == capacity)
return false;
int c = -1;
//创建新节点
Node<E> node = new Node<E>(e);
//获得锁
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//判断当前队列的容量是否小于边界容量,如果小于那么就做入队操作
if (count.get() < capacity) {
enqueue(node);
//队列元素数量加一
c = count.getAndIncrement();
//如果下一个要添加的元素是小于边界容量,那么就唤醒堵塞的入队线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
//唤醒其他的出队线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
- take
获取队列头部的元素。当队列为空时,后续的take
操作将会被堵塞,直到新的元素进入到队列中。其实现方式如下:
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列中的元素
final AtomicInteger count = this.count;
//获得锁
final ReentrantLock takeLock = this.takeLock;
//显示锁,这个锁可以被打断
takeLock.lockInterruptibly();
try {
//当队列为空时,那么就堵塞
while (count.get() == 0) {
notEmpty.await();
}
//出队
x = dequeue();
//队列元素自减一
c = count.getAndDecrement();
//如果元素个数大于1,那么就唤醒其堵塞的出队线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//唤醒其他堵塞的入队线程
if (c == capacity)
signalNotFull();
return x;
}
这个队列的使用方法跟上一小节的ArrayBlockingQueue
队列的使用方式并无差异,只不过说底层的实现有很大的区别,其区别如下:
ArrayBlockingQueue | LinkedBlockingQueue | |
---|---|---|
数据结构 | 数组 | 链表 |
堵塞实现方法 | 通知模式实现 | 通知模式实现 |
生产者消费者锁的情况 | 共用一把锁 | 生产者和消费者各自拥有一把锁 |
吞吐量 | 低 | 很高 |
SynchronousQueue
SynchronousQueue
也是BlockQueue
的实现类,其类图如下:
SynchronousQueue
是一个针对每一个插入操作都必须等待其他线程移除元素操作的响应的堵塞队列。
该队列相对于其他队列而言有两个特性:
- 队列中的容量始终为0,即该队列不会保存任何元素。
- 该队列提供了两种实现方式,一种是基于栈的实现,另一种是基于队列的实现。
构造方法
它的构造函数就两种,如下:
//无参构造函数
public SynchronousQueue() {
this(false);
}
//有参构造函数
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
之前提到该队列有两种实现方式,我们从上述代码中就可以看出,当fair
的值为true时,是基于队列的实现,此时等待的线程满足先进先出的访问顺序,反之则是基于栈的实现,此时访问顺序是先进后出。
核心方法
- 堵塞的
put(E e)
方法public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
- 堵塞的
take()
方法 ```java public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
其实这两个方法并没什么需要特别将的地方,我们真正需要讲的是`abstract E transfer(E e, boolean timed, long nanos)`这个方法的实现方式。
在这里我们就以默认的实现方式-基于栈的实现来进行学习:
我们先看下其定义的节点的三种模式常量:
```java
/** 表示这是个请求节点,从队列中取数据的标识(方法有 take,poll) */
static final int REQUEST = 0;
/** 表示这个是数据节点,插入数据到队列中的标识(方法有 offer,put) */
static final int DATA = 1;
/** 这个表示配对成功,只有一消费者和生产者进行配对成功后,才会更改为该状态 */
static final int FULFILLING = 2;
这三种模式常量记录了当前节点的三种状态,消费节点、生产节点以及匹配完成节点。
E transfer(E e, boolean timed, long nanos) {
/*
1.当栈为空或者已经包含了相同模式的节点的时候,会尝试将节点压到栈中然后等待匹配并返回它,
如果被取消则返回的值为空。
2.当栈包含有互相匹配的节点的时候,那么会尝试将完成节点压入栈中然后匹配等待的节点,并将这两个节点都从栈中弹出并返回它们。
3.如果栈顶已经拥有另一个完成节点,那么通过执行匹配和/或pop操作来帮助它,然后继续。
帮助的代码与实现的代码本质上是相同的,只是它不返回它们。
*/
SNode s = null; // constructed/reused as needed
//根据元素是否为Null来判断请求节点还是数据节点
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
//获取头指针
SNode h = head;
//判断头指针是否为空以及头指针的模式是否一致
if (h == null || h.mode == mode) { // empty or same-mode
//判断是否有设置超时时间
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
//将当前节点压入栈
} else if (casHead(h, s = snode(s, e, h, mode))) {
//随即进行线程堵塞
SNode m = awaitFulfill(s, timed, nanos);
//如果等待被取消
if (m == s) { // wait was cancelled
//清楚该节点,并返回Null
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//判断该节点是否为完成匹配的节点,那么就尝试完成匹配
} else if (!isFulfilling(h.mode)) { // try to fulfill
//判断线程是否被取消
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
//将当前节点压入栈
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
//拿到原来的栈头指针指向的节点
SNode m = s.next; // m is s's match
//如果该节点为空,那么就说明已经被其他线程匹配了
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
//那就匹配该节点指向的下一个节点
SNode mn = m.next;
//如果匹配到那么就弹出并返回匹配的s节点和m节点
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
//由其他线程进行配对
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
线程堵塞的实现代码如下:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* When a node/thread is about to block, it sets its waiter
* field and then rechecks state at least one more time
* before actually parking, thus covering race vs
* fulfiller noticing that waiter is non-null so should be
* woken.
*
* When invoked by nodes that appear at the point of call
* to be at the head of the stack, calls to park are
* preceded by spins to avoid blocking when producers and
* consumers are arriving very close in time. This can
* happen enough to bother only on multiprocessors.
*
* The order of checks for returning out of main loop
* reflects fact that interrupts have precedence over
* normal returns, which have precedence over
* timeouts. (So, on timeout, one last check for match is
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//计算自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
//判断是否失败,如果失败那么就尝试取消
if (w.isInterrupted())
s.tryCancel();
//拿到匹配的节点
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//判断自旋次数是否大于0
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
//如果没有匹配的节点,就保存当前堵塞的线程
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
//堵塞当前的线程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
可能上述代码的注释很难看懂出入队的方式,我们现在来举个例子:
假设有一个队列其初始状态为:头部指针指向null
,随后来了一条线程A,其数据也为A,那么由于头指针为null
且插入的方式为堵塞式的put
插入,那么现在将将该数据压入到栈中,并将指针指向栈顶,然后堵塞线程。接下来有来了一条线程B,其数据也是B,那么还是会将数据压入栈中,并将该节点的next节点指向最先入栈的A节点,并肩头指针指向栈顶,然后堵塞线程。如果再来了一条线程C,不同的是这是一条出队线程。那么这个时候会将该节点也压入栈中,头指针指向该节点,该节点的next节点指向B节点。那么后面就要开始完成匹配了。这个时候就要从栈中去取出节点来匹配,因为是基于栈的实现方式,所以其取出顺序为后进先出,也就是首先取出的是节点B为匹配对象。匹配完成后,线程C和B都直接返回,然后会将头指针指向最先入队的A节点。
从上面的例子我们可以看出,非公平策略和公平策略的出队顺序是不一样的。所以我们在程序设计中需要通过构造函数的fairl
参数来确定是使用公平策略还是非公平策略。
PrioityBlockingQueue
PrioityBlockingQueue
队列同样也是BlockingQueue
的实现类,其类图如下:
PriorityBlockingQueue
使用与PriorityQueue类相同的排序规则并提供阻塞检索操作的无界阻塞队列。虽然该队列在逻辑上是无界的,但是由于资源耗尽(导致OutOfMemoryError错误),尝试添加的操作可能失败。此类不允许空元素。依赖于自然排序的优先级队列也不允许插入不可比较的对象(这样做会导致ClassCastException)。
此类上的操作不能保证具有同等优先级的元素的排序。如果需要强制执行排序,可以定义自定义类或比较器,这些类或比较器使用辅助键断开主优先级值中的连接。例如,这里有一个类,它将先入先出的连接打破应用于可比元素。要使用它,您需要插入一个新的FIFOEntry(一个条目),而不是一个普通的条目对象:
class FIFOEntry<E extends Comparable<? super E>>
implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
}
public E getEntry() { return entry; }
public int compareTo(FIFOEntry<E> other) {
int res = entry.compareTo(other.entry);
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}
}
这个线程有如下特性:
- 跟
ArrayBlockingQueue
一样都是基于数组实现的。 - 是真正的无界队列,跟
LinkedBlockingQueue
不一样,它是真正意义上的无界,因为它会无限制的扩容,而LinkedBlockingQueue
的极限是Integer.MAX_VALUE
,只是因为这个界限太多,所以看似无界。 - 该队列属于权重队列,可以理解为它可以进行排序,是基于数组的堆结构排序。
- 出队顺序跟其他队列不一样,是根据权重来进行出队。
- 入队操作永远不会堵塞
核心方法
PriorityBlockQueue
队列的入队操作永远不会堵塞,因为它的队列是无界的,可以无限增长,它所有的入队操作都会调用offer(E e)
方法:
offer(E e)
插入指定的元素到这个队列的尾部,因为该队列是无界的,所以这个方法永远不会堵塞。
/**
put操作
我们可以看到该操作最终还是调用了offer(E e)方法
*/
public void put(E e) {
offer(e); // never need to block
}
//offer操作
public boolean offer(E e) {
//判断元素是否为null值,如果是,那么就抛出空指针异常
if (e == null)
throw new NullPointerException();
//获得锁,进行显示的上锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//判断数组是否满了,如果满了就尝试扩容数组
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//判断是否用了自定义比较器
Comparator<? super E> cmp = comparator;
//对堆进行排序,上浮操作
if (cmp == null)
siftUpComparable(n, e, array);
else
//使用自定义的比较器进行堆排序,上浮操作
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//唤醒其他堵塞的出队线程
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return true;
}
下面来看看,它是怎么实现排序的:
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 找出父节点所在的位置
int parent = (k - 1) >>> 1;
Object e = array[parent];
//比较该元素的大小,如果该节点大于父节点就不交换
if (key.compareTo((T) e) >= 0)
break;
//交换元素的位置
array[k] = e;
k = parent;
}
array[k] = key;
}
这是一个上浮调整排序,即父节点永远是所有节点中最小的值。可以理解为是一个最小堆排序。
我们可以通过如下代码来测试是否是根据权重进行出队:
public class LearnPriorityBlockingQueue {
public static void main(String[] args) {
PriorityBlockingQueue<Integer> priorityBlockingQueue = new PriorityBlockingQueue<>();
priorityBlockingQueue.offer(10);
priorityBlockingQueue.offer(20);
priorityBlockingQueue.offer(21);
priorityBlockingQueue.offer(5);
priorityBlockingQueue.offer(41);
try {
while(priorityBlockingQueue.remainingCapacity()>0){
System.out.println(priorityBlockingQueue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码是一个很简单的针对PriorityBlockingQueue
队列的插入弹出操作,其运行结果如下:
5
10
20
21
41
从结果看出确实是根据权重进行出队,且默认是依据从小到大的权重出队。
take()
PriorityBlockingQueue
的出队操作:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//当取出的元素为空时或者队列为空时那么就堵塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
上面的代码不是特别难以理解,需要注意的是当队列为空时,会造成堵塞。具体的出队操作实现:
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。其下浮调整的具体实现如下:
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
在位置为k这里插入元素x,通过反复将x降级到数的最下面,直到它小月或等于它的子元素或者叶子节点来保证堆不变。