技术标签: 阻塞队列 Java多线程 多线程 线程安全 队列 并发容器
并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
ArrayBlockingQueue:由数组
实现的有界阻塞队列
,在初始化时必须指定容器大小,按照FIFO
的方式存储元素。内部使用ReentrantLock和Condition实现,支持公平锁和非公平锁。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//内部数组
final Object[] items;
// 下一个待删除元素的索引: take, poll, peek, remove方法使用
int takeIndex;
//下一个待插入元素的索引: put, offer, add方法使用
int putIndex;
//当前队列中元素的个数
int count;
//唯一全局可重入独占“”:掌管所有读写操作的锁
final ReentrantLock lock;
//两个等待队列
/** 取元素条件队列:队列为空时,用于阻塞读线程,唤醒写线程 */
private final Condition notEmpty;
/** 写元素条件队列:队列已满时,用于阻塞写线程,唤醒读线程 */
private final Condition notFull
// Itrs表示队列和迭代器之间的共享数据,其实用来存储多个迭代器实例的
transient Itrs itrs = null;
可重入锁 ReentrantLock
实现的访问公平性,通过2个 Condition
保证了写入和获取元素的等待通知
从上面的入队/出队操作,可以看出,ArrayBlockingQueue的内部数组其实是一种环形结构
。
6
,我们来看下整个入队过程:初始时
插入元素“9”
插入元素“2”、“10”、“25”、“93”
插入元素 “90”
注意,此时再插入一个元素“90”,则putIndex变成6
,等于队列容量6
,由于是循环队列,所以会将takeIndex重置为0
:
这时队列已经满了(count==6)
,如果再有线程尝试插入元素
,并不会覆盖原有值,而是被阻塞
。
我们再来看下出队过程:
“9”
出队元素“2”、“10”、“25”、“93”
出队元素“90”
注意,此时再出队一个元素“90”,则takeIndex变成6
,等于队列容量6
,由于是循环队列,所以会将takeIndex重置为0
:
这时队列已经空了(count==0)
,如果再有线程尝试出队元素
,则会被阻塞
。
// 必须指定初始容量, 默认采用非公平策略
public ArrayBlockingQueue(int capacity) {
this(capacity, false);//默认构造非公平锁的阻塞队列
}
// 指定队列初始容量和公平/非公平策略的构造器.
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//初始化ReentrantLock重入锁,出队入队拥有这同一个锁
lock = new ReentrantLock(fair);
//初始化非空等待队列
notEmpty = lock.newCondition();
//初始化非满等待队列
notFull = lock.newCondition();
}
/**
* 根据已有集合构造队列
*/
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
//遍历添加指定集合的元素
checkNotNull(e);// 不能有null元素
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
//如果传入集合的个数超过了容量,抛出异常被catch,最多放capacity个
throw new IllegalArgumentException();
}
//循环结束,i刚好是写入元素的个数
count = i;
//修改 putIndex 为 c 的容量 +1,如果队列已满,则重置puIndex索引为0
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
可以看到,有3种构造函数:
核心思想:
- 将元素x置入数组中。
- 计算下一个元素应该存放的下标位置。
- 元素个数器递增,这里count前加了锁,值都是从主内存中获取,不会存在内存不可见问题,并且更新也会直接刷新回主内存中。
- 最后唤醒在条件队列notEmpty因取出元素(take)而被阻塞的一个线程。
//入队操作
private void enqueue(E x) {
final Object[] items = this.items;
//通过putIndex索引直接将元素添加到数组items中
items[putIndex] = x;
//下一个元素应该存放的下标位置:当putIndex索引大小等于数组长度时,将putIndex重置为0
//当队列索引(从0开始)与数组长度相等时,下次就需要从数组头部重写开始写入
if (++putIndex == items.length) putIndex = 0;
count++; // 元素个数+1
notEmpty.signal();// 唤醒一个notEmpty上的等待线程(可以来队列取元素了)
}
可以看到,enqueue(E) 方法会将元插入到数组尾部。
重置 putIndex 为 0
,添加后调用notEmpty.signal()
通知唤醒阻塞在取出元素(take)的线程。队尾阻塞式插入元素
,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列为空闲,或者元素被其他线程取出
。
InterruptedException异常
并返回。 //put操作将向队尾插入元素,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列不满。
//如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; // 唯一锁
// 可响应中断式地获取锁
lock.lockInterruptibly();
try {
//如果队列已满,则将当前线程包装为等待节点置入notFull的条件队列中。这里必须用while,防止虚假唤醒
while (count == items.length)
notFull.await();
// 队列非满,或者被消费者线程唤醒了,执行入队操作,往队尾写入一个元素,然后唤醒等待在notEmpty条件队列的首节点
enqueue(e);
} finally {
lock.unlock();// 解锁
}
}
响应中断
,当队列满了,就调用notFull.await() 阻塞等待
,等有消费者获取元素后继续执行;
enqueue(E)
。public boolean add(E e) {
return super.add(e);
}
//super.add() 的实现
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E)
,如果返回 false 就抛出异常。如果队列未满,则插入成功并返回true
, 如果队列已满则返回false。 public boolean offer(E e) {
checkNotNull(e); // 如果插入元素为null,则抛出NullPointerException异常
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列已满, 则返回false
if (count == items.length)
return false;
else {
// 否则则入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
可以看到 offer(E) 方法要先获取锁
,如果当前队列中元素已满,就立即返回 false,这点比 add() 友好一些;
enqueue(E)
入队:超时功能
,传入一个timeout
,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回false
。public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
//获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
//如果队列已满,但是时间到了,直接返回false
if (nanos <= 0)
return false;
//阻塞当前线程指定纳秒数,并更新剩余时间
nanos = notFull.awaitNanos(nanos);
}
//队列非满,或者在设定时间内被消费者线程唤醒了,执行入队操作,后唤醒等待在notEmpty条件队列的首节点
enqueue(e);
return true;
} finally {
lock.unlock();//释放锁
}
}
offer() 和 put()
方法很相似,不同之处在于需要设置等待超时时间
,超时未写入,就返回 false;否则调用enqueue(E)入队
,然后返回 true。核心思想:
- 获取元素,并将当前位置置null。
- 重新设置队头下标。
- 元素计数器递减。
- 更新迭代器中的元素数据,itrs默认情况下都是为null的,只有使用迭代器的时候才会实例化Itrs。
- 唤醒在条件队列notFull因写入操作(put)而被阻塞的一个线程。
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
//获取要删除的对象
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,如果相等说明队列已空,重置takeIndex为0
if (++takeIndex == items.length) takeIndex = 0;
count--;//队列个数减1
// 更新迭代器中的元素数据,itrs只用在使用迭代器的时候才实例化
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
notFull.signal(); // 唤醒一个notFull上的等待线程(可以插入元素到队列了)
return x;
}
默认情况下dequeue()
方法会从队首移除元素(即 takeIndex 位置)
。
向后移动 takeIndex
,如果已经到队尾,就归零(takeIndex =0)
。结合前面添加元素时的归零,可以看到,其实 ArrayBlockingQueue 是个环形数组。itrs. elementDequeued()
,这个 itrs 是 ArrayBlockingQueue 的内部类 Itrs 的对象
,看起来像是个迭代器,实际上它的作用是保证循环数组迭代时的正确性,具体实现比较复杂,这里暂不介绍。 //从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 可响应中断式地获取锁
lock.lockInterruptibly();
try {
//如果队列为空,则将当前写线程包装为等待节点加入notEmpty的条件队列中。这里必须用while,防止虚假唤醒
while (count == 0)
notEmpty.await();
// 队列非空,或者被生产者线程唤醒了,执行出队操作,出队时唤醒notEmpty的条件队列中的首节点
return dequeue();
} finally {
lock.unlock();// 释放锁
}
}
响应中断
,与 poll() 不同的是,如果队列为空会一直阻塞等待
,直到中断或者有元素,有元素时还是调用dequeue()
方法入队。返回null
。//poll方法,该方法获取并移除此队列的头元素,若队列为空,则返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果为空,返回null, 否则执行出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
dequeue()出队
:timeout
,获取元素超时会立即返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
// 队列仍为空,但是时间到了,必须返回了
if (nanos <= 0)
return null;
// 在条件队列里等着,但是需要更新时间
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
允许阻塞一段时间
,如果在阻塞一段时间还没有元素写入队列,就返回 null。直接获取队首元素
,只获取不出队。可能返回为null(队列为空)public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); //直接返回当前队列的头元素,但不删除
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
指的是equals方法判定相同
】的元素,移除成功返回true,如果队列为空或没有匹配元素,则返回false。 public boolean remove(Object o) {
if (o == null) return false;
//获取数组数据
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果此时队列不为null,这里是为了防止并发情况
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取当前要被删除元素的索引
int i = takeIndex;
//执行循环查找要删除的元素
do {
// 找到了对应的元素的位置,removeAt删除该位置的元素
if (o.equals(items[i])) {
removeAt(i);//执行删除
return true;//删除成功返回true
}
//当前删除索引执行加1后判断是否与数组长度相等
//若为true,说明索引已到数组尽头,将i设置为0
if (++i == items.length)
i = 0;
} while (i != putIndex);
//到达区间[takeIndex, putIndex)的边界,说明所有非null元素都找遍了
}
return false;//没有找到元素
} finally {
lock.unlock();//解锁
}
}
//移除removeIndex位置的元素:根据索引删除元素,实际上是把删除索引之后的元素往前移动一个位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果刚好删除的是队首,那刚好是一个出队动作
if (removeIndex == takeIndex) {
//如果是直接删除
items[takeIndex] = null;
//当前队列头元素加1并判断是否与数组长度相等,若为true设置为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列元素减1
if (itrs != null)
itrs.elementDequeued();//更新迭代器中的数据
}
//其他情况
else {
//如果要删除的元素不在队列头部,
//那么只需循环迭代把删除元素后面的所有元素往前移动一个位置
//获取下一个要被添加的元素的索引,作为循环判断结束条件
final int putIndex = this.putIndex;
//执行循环
for (int i = removeIndex;;) {
//获取要删除节点索引的下一个索引
int next = i + 1;
//判断是否已为数组长度,如果是从数组头部(索引为0)开始找
if (next == items.length)
next = 0;
//如果查找的索引不等于要添加元素的索引,说明移除的不是队尾,后面的元素补充上来
if (next != putIndex) {
items[i] = items[next];//把后一个元素前移覆盖要删除的元
i = next;
} else {
//在removeIndex索引之后的元素都往前移动完毕后清空最后一个元素
items[i] = null;
//最后putIndex当然也得左移,i此时肯定是putIndex - 1
this.putIndex = i;
break;//结束循环
}
}
count--;//队列元素减1
if (itrs != null)
itrs.removedAt(removeIndex);//更新迭代器数据
}
notFull.signal();//唤醒添加线程
}
一波源码看下来,ArrayBlockingQueue 使用可重入锁 ReentrantLock保证线程安全,通过两个 Condition 实现生产者-消费者模型,看起来很简单的样子,这背后要感谢 ReentrantLock 和 Condition 的功劳!
获取到AQS独占锁
才能进行操作如果队列为空,则读线程
将会被包装为条件节点扔到读线程等待条件队列中阻塞
,等待写线程写入新的元素,并唤醒等待中的读线程,反之亦然。超高并发的环境
,由于生产者-消息者共用一把锁,可能出现性能瓶
颈。后续,我们会介绍另一种基于单链表实现的阻塞队列——LinkedBlockingQueue
,该队列的最大特点是使用了“两把锁”
,以提升吞吐量。
文章浏览阅读3.4k次,点赞8次,收藏42次。一、什么是内部类?or 内部类的概念内部类是定义在另一个类中的类;下面类TestB是类TestA的内部类。即内部类对象引用了实例化该内部对象的外围类对象。public class TestA{ class TestB {}}二、 为什么需要内部类?or 内部类有什么作用?1、 内部类方法可以访问该类定义所在的作用域中的数据,包括私有数据。2、内部类可以对同一个包中的其他类隐藏起来。3、 当想要定义一个回调函数且不想编写大量代码时,使用匿名内部类比较便捷。三、 内部类的分类成员内部_成员内部类和局部内部类的区别
文章浏览阅读118次。分布式系统要求拆分分布式思想的实质搭配要求分布式系统要求按照某些特定的规则将项目进行拆分。如果将一个项目的所有模板功能都写到一起,当某个模块出现问题时将直接导致整个服务器出现问题。拆分按照业务拆分为不同的服务器,有效的降低系统架构的耦合性在业务拆分的基础上可按照代码层级进行拆分(view、controller、service、pojo)分布式思想的实质分布式思想的实质是为了系统的..._分布式系统运维工具
文章浏览阅读174次。1.数据源准备2.数据处理step1:数据表处理应用函数:①VLOOKUP函数; ② CONCATENATE函数终表:step2:数据透视表统计分析(1) 透视表汇总不同渠道用户数, 金额(2)透视表汇总不同日期购买用户数,金额(3)透视表汇总不同用户购买订单数,金额step3:讲第二步结果可视化, 比如, 柱形图(1)不同渠道用户数, 金额(2)不同日期..._exce l趋势分析数据量
文章浏览阅读3.3k次。堡垒机可以为企业实现服务器、网络设备、数据库、安全设备等的集中管控和安全可靠运行,帮助IT运维人员提高工作效率。通俗来说,就是用来控制哪些人可以登录哪些资产(事先防范和事中控制),以及录像记录登录资产后做了什么事情(事后溯源)。由于堡垒机内部保存着企业所有的设备资产和权限关系,是企业内部信息安全的重要一环。但目前出现的以下问题产生了很大安全隐患:密码设置过于简单,容易被暴力破解;为方便记忆,设置统一的密码,一旦单点被破,极易引发全面危机。在单一的静态密码验证机制下,登录密码是堡垒机安全的唯一_horizon宁盾双因素配置
文章浏览阅读7.7k次,点赞4次,收藏16次。Chrome作为一款挺不错的浏览器,其有着诸多的优良特性,并且支持跨平台。其支持(Windows、Linux、Mac OS X、BSD、Android),在绝大多数情况下,其的安装都很简单,但有时会由于网络原因,无法安装,所以在这里总结下Chrome的安装。Windows下的安装:在线安装:离线安装:Linux下的安装:在线安装:离线安装:..._chrome linux debian离线安装依赖
文章浏览阅读153次。中国发达城市榜单每天都在刷新,但无非是北上广轮流坐庄。北京拥有最顶尖的文化资源,上海是“摩登”的国际化大都市,广州是活力四射的千年商都。GDP和发展潜力是衡量城市的数字指...
文章浏览阅读3.3k次。前言spark在java使用比较少,多是scala的用法,我这里介绍一下我在项目中使用的代码配置详细算法的使用请点击我主页列表查看版本jar版本说明spark3.0.1scala2.12这个版本注意和spark版本对应,只是为了引jar包springboot版本2.3.2.RELEASEmaven<!-- spark --> <dependency> <gro_使用java调用spark注册进去的程序
文章浏览阅读4.8k次。汽车零部件开发工具巨头V公司全套bootloader中UDS协议栈源代码,自己完成底层外设驱动开发后,集成即可使用,代码精简高效,大厂出品有量产保证。:139800617636213023darcy169_uds协议栈 源代码
文章浏览阅读4.6k次,点赞20次,收藏148次。AUTOSAR基础篇之OS(下)前言首先,请问大家几个小小的问题,你清楚:你知道多核OS在什么场景下使用吗?多核系统OS又是如何协同启动或者关闭的呢?AUTOSAR OS存在哪些功能安全等方面的要求呢?多核OS之间的启动关闭与单核相比又存在哪些异同呢?。。。。。。今天,我们来一起探索并回答这些问题。为了便于大家理解,以下是本文的主题大纲:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JCXrdI0k-1636287756923)(https://gite_autosar 定义了 5 种多核支持类型
文章浏览阅读2.2k次,点赞6次,收藏14次。原因:自己写的头文件没有被加入到方案的包含目录中去,无法被检索到,也就无法打开。将自己写的头文件都放入header files。然后在VS界面上,右键方案名,点击属性。将自己头文件夹的目录添加进去。_vs2013打不开自己定义的头文件
文章浏览阅读3.3w次,点赞80次,收藏342次。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。当数据量很大时,count 的数量的指定可能会不起作用,Redis 会自动调整每次的遍历数目。_redis命令
文章浏览阅读449次,点赞3次,收藏3次。URP的设计目标是在保持高性能的同时,提供更多的渲染功能和自定义选项。与普通项目相比,会多出Presets文件夹,里面包含着一些设置,包括本色,声音,法线,贴图等设置。全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,主光源和附加光源在一次Pass中可以一起着色。URP:全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,一次Pass可以计算多个光源。可编程渲染管线:渲染策略是可以供程序员定制的,可以定制的有:光照计算和光源,深度测试,摄像机光照烘焙,后期处理策略等等。_urp渲染管线