JDK源码分析-BlockingQueue

概述

BlockingQueue 意为“阻塞队列”,它在 JDK 中是一个接口。

所谓阻塞,简单来说就是当某些条件不满足时,让线程处于等待状态。 例如经典的“生产者-消费者”模型,当存放产品的容器满的时候,生产者处于等待状态; 而当容器为空的时候,消费者处于等待状态。阻塞队列的概念与该场景类似。

BlockingQueue 的 继承关系如下:

可以看到 BlockingQueue 继承自 Queue 接口, 它的常用实现类有 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue 等。 同前面一样,本文先分析 Bl ockingQueue 接口的方法定义,后文再分析其实现类的代码。

PS: 从这个继承体系也可以看出来,直接实现接口的是抽象类,而实现类则通常继承自抽象类。为什么要这样设计呢?因为有些接口的实现类会有多个,而这些类之间有一部分逻辑是相似或者相同的,因此就把这部分逻辑提取到抽象类中,避免代码冗余。

代码分析

BlockingQueue 的方法定义如下:

其方法简 单分析如下:


 

// 将指定元素插入到队列,若成功返回 true,否则抛出异常

boolean add(E e);


// 将指定元素插入到队列,若成功返回 true,否则返回 false

boolean offer(E e);


// 将指定元素插入到队列,若队列已满则等待

void put(E e) throws InterruptedException;


// 将指定元素插入到队列,若成功返回 true,否则返回 false,有超时等待

boolean offer(E e, long timeout, TimeUnit unit)

throws InterruptedException


// 获取并移除队列的头部,若为空则等待

E take() throws InterruptedException;


// 获取并移除队列的头部,有超时等待(若超时返回 null)

E poll(long timeout, TimeUnit unit)

throws InterruptedException;


// 返回队列可以接收的容量,若无限制则返回 Integer.MAX_VALUE

int remainingCapacity();


// 删除指定的元素(如果存在),返回是否删除成功

boolean remove(Object o);


// 是否包含指定元素

public boolean contains(Object o);


// 从此队列中删除所有可用元素,并将它们添加到给定集合中

int drainTo(Collection<? super E> c);


// 从此队列中删除所有可用元素,并将它们添加到给定集合中(指定大小)

int drainTo(Collection<? super E> c, int maxElements);

主要方法小结如下:

Throws exceptions

Special value

Blocks

Time out

Insert

add(e)

offer(e)

put(e)

offer(e, time, unit)

Remove

remove()

poll()

take()

poll(time, unit)

Examine

element()

peek()

-

-

Queue 接口前文「 JDK源码-Queue, Deque 已进行分析 ,这里不再赘述。

典型用法

生产者:


 

class Producer implements Runnable {

private final BlockingQueue queue;

Producer(BlockingQueue q) { queue = q; }

public void run() {

try {

// 将产品放入队列

while (true) { queue.put(produce()); }

} catch (InterruptedException ex) { ... handle ...}

}

Object produce() { ... }

}

消费者:


 

class Consumer implements Runnable {

private final BlockingQueue queue;

Consumer(BlockingQueue q) { queue = q; }

public void run() {

try {

// 从队列中消费产品

while (true) { consume(queue.take()); }

} catch (InterruptedException ex) { ... handle ...}

}

void consume(Object x) { ... }

}

测试类:


 

class Setup {

void main() {

BlockingQueue q = new SomeQueueImplementation();

// 创建并启动一个生产者和两个消费者

Producer p = new Producer(q);

Consumer c1 = new Consumer(q);

Consumer c2 = new Consumer(q);

new Thread(p).start();

new Thread(c1).start();

new Thread(c2).start();

}

}

PS: 上述代码是 BlockingQueu e 的文档提供的,仅供参考。

小结

BlockingQueue 是一个接口,它主要定义了阻塞队列的一些方法。阻塞队列在并发编程中使用较多,比如线程池。

相关阅读:

JDK源码-Queue, Deque

Stay hungry, stay foolish.

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章