synchronousqueue场景_【JUC】JDK1.8源码分析之SynchronousQueue(九)-程序员宅基地

技术标签: synchronousqueue场景  

一、前言

本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后理解线程池有很有好处,SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。

二、SynchronousQueue数据结构

由于SynchronousQueue的支持公平策略和非公平策略,所以底层可能两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。具体的数据结构如下

  说明:数据结构有两种类型,栈和队列;栈有一个头结点,队列有一个头结点和尾结点;栈用于实现非公平策略,队列用于实现公平策略。

三、SynchronousQueue源码分析

3.1 类的继承关系

public class SynchronousQueue extends AbstractQueue

implements BlockingQueue, java.io.Serializable {}

说明:SynchronousQueue继承了AbstractQueue抽象类,AbstractQueue定义了对队列的基本操作;同时实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。

3.2 类的内部类

SynchronousQueue的内部类框架图如下

说明:其中比较重要的类是左侧的三个类,Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现,WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解。下面着重看左侧的三个类。

① Transferer

abstract static class Transferer{/*** Performs a put or take.

*

*@parame if non-null, the item to be handed to a consumer;

* if null, requests that transfer return an item

* offered by producer.

*@paramtimed if this operation should timeout

*@paramnanos the timeout, in nanoseconds

*@returnif non-null, the item provided or received; if null,

* the operation failed due to timeout or interrupt --

* the caller can distinguish which of these occurred

* by checking Thread.interrupted.*/

//转移数据,put或者take操作

abstract E transfer(E e, boolean timed, longnanos);

}

View Code

说明:Transferer定义了transfer操作,用于take或者put数据。transfer方法由子类实现。

② TransfererStack

1. 类的继承关系

static final class TransferStack extends Transferer {}

说明:TransferStack继承Transferer抽象类,其实现了transfer方法。

2. 类的属性

static final class TransferStack extends Transferer{/** This extends Scherer-Scott dual stack algorithm, differing,

* among other ways, by using "covering" nodes rather than

* bit-marked pointers: Fulfilling operations push on marker

* nodes (with FULFILLING bit set in mode) to reserve a spot

* to match a waiting node.*/

/*Modes for SNodes, ORed together in node fields*/

/**Node represents an unfulfilled consumer*/

//表示消费数据的消费者

static final int REQUEST = 0;/**Node represents an unfulfilled producer*/

//表示生产数据的生产者

static final int DATA = 1;/**Node is fulfilling another unfulfilled DATA or REQUEST*/

//表示匹配另一个生产者或消费者

static final int FULFILLING = 2;/**The head (top) of the stack*/

//头结点

volatileSNode head;

}

View Code

说明:TransferStack有三种不同的状态,REQUEST,表示消费数据的消费者;DATA,表示生产数据的生产者;FULFILLING,表示匹配另一个生产者或消费者。任何线程对TransferStack的操作都属于上述3种状态中的一种。同时还包含一个head域,表示头结点。

3. 类的内部类

SNode类

1. 类的属性

static final classSNode {//下一个结点

volatile SNode next; //next node in stack//相匹配的结点

volatile SNode match; //the node matched to this//等待的线程

volatile Thread waiter; //to control park/unpark//元素项

Object item; //data; or null for REQUESTs//模式

intmode;//Note: item and mode fields don't need to be volatile//since they are always written before, and read after,//other volatile/atomic operations.//item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读//Unsafe mechanics//反射机制

private static finalsun.misc.Unsafe UNSAFE;//match域的内存偏移地址

private static final longmatchOffset;//next域的偏移地址

private static final longnextOffset;static{try{

UNSAFE=sun.misc.Unsafe.getUnsafe();

Class> k = SNode.class;

matchOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("match"));

nextOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

}catch(Exception e) {throw newError(e);

}

}

}

View Code

说明:SNode类表示栈中的结点,使用了反射机制和CAS来保证原子性的改变相应的域值。

2. 类的构造函数

SNode(Object item) {this.item =item;

}

View Code

说明:该构造函数仅仅设置了SNode的item域,其他域为默认值。

3. 核心函数分析

3.1. tryMatch函数

booleantryMatch(SNode s) {if (match == null &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { //本结点的match域为null并且比较并替换match域成功//获取本节点的等待线程

Thread w =waiter;if (w != null) { //存在等待的线程//waiters need at most one unpark//将本结点的等待线程重新置为null

waiter = null;//unpark等待线程

LockSupport.unpark(w);

}return true;

}//如果match不为null或者CAS设置失败,则比较match域是否等于s结点,若相等,则表示已经完成匹配,匹配成功

return match ==s;

}

View Code

说明:将s结点与本结点进行匹配,匹配成功,则unpark等待线程。具体流程如下

① 判断本结点的match域是否为null,若为null,则进入步骤②,否则,进入步骤⑤

② CAS设置本结点的match域为s结点,若成功,则进入步骤③,否则,进入步骤⑤

③ 判断本结点的waiter域是否为null,若不为null,则进入步骤④,否则,进入步骤⑤

④ 重新设置本结点的waiter域为null,并且unparkwaiter域所代表的等待线程。进入步骤⑥

⑤ 比较本结点的match域是否为本结点,若是,则进入步骤⑥,否则,进入步骤⑦

⑥ 返回true

⑦ 返回false

4. 核心函数分析

4.1 isFulfilling函数

static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

View Code

说明:表示是否包含FULFILLING标记。

4.2 transfer函数

E transfer(E e, boolean timed, longnanos) {/** Basic algorithm is to loop trying one of three actions:

*

* 1. If apparently empty or already containing nodes of same

* mode, try to push node on stack and wait for a match,

* returning it, or null if cancelled.

*

* 2. If apparently containing node of complementary mode,

* try to push a fulfilling node on to stack, match

* with corresponding waiting node, pop both from

* stack, and return matched item. The matching or

* unlinking might not actually be necessary because of

* other threads performing action 3:

*

* 3. If top of stack already holds another fulfilling node,

* help it out by doing its match and/or pop

* operations, and then continue. The code for helping

* is essentially the same as for fulfilling, except

* that it doesn't return the item.*/SNode s= null; //constructed/reused as needed//根据e确定此次转移的模式(是put or take)

int mode = (e == null) ?REQUEST : DATA;for (;;) { //无限循环//保存头结点

SNode h =head;if (h == null || h.mode == mode) { //头结点为null或者头结点的模式与此次转移的模式相同//empty or same-mode

if (timed && nanos <= 0) { //设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作//can't wait

if (h != null && h.isCancelled()) //头结点不为null并且头结点被取消

casHead(h, h.next); //重新设置头结点(弹出之前的头结点)//pop cancelled node

else //头结点为null或者头结点没有被取消//返回null

return null;

}else if (casHead(h, s = snode(s, e, h, mode))) { //生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点//Spins/blocks until node s is matched by a fulfill operation.//空旋或者阻塞直到s结点被FulFill操作所匹配

SNode m =awaitFulfill(s, timed, nanos);if (m == s) { //匹配的结点为s结点(s结点被取消)//wait was cancelled//清理s结点

clean(s);//返回

return null;

}if ((h = head) != null && h.next == s) //h重新赋值为head头结点,并且不为null;头结点的next域为s结点,表示有结点插入到s结点之前,完成了匹配//比较并替换head域(移除插入在s之前的结点和s结点)

casHead(h, s.next); //help s's fulfiller//根据此次转移的类型返回元素

return (E) ((mode == REQUEST) ?m.item : s.item);

}

}else if (!isFulfilling(h.mode)) { //没有FULFILLING标记,尝试匹配//try to fulfill

if (h.isCancelled()) //被取消//already cancelled//比较并替换head域(弹出头结点)

casHead(h, h.next); //pop and retry

else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点

for (;;) { //无限循环//loop until matched or waiters disappear//保存s的next结点

SNode m = s.next; //m is s's match

if (m == null) { //next域为null//all waiters are gone//比较并替换head域

casHead(s, null); //pop fulfill node//赋值s为null

s = null; //use new node next time

break; //restart main loop

}//m结点的next域

SNode mn =m.next;if (m.tryMatch(s)) { //尝试匹配,并且成功//比较并替换head域(弹出s结点和m结点)

casHead(s, mn); //pop both s and m//根据此次转移的类型返回元素

return (E) ((mode == REQUEST) ?m.item : s.item);

}else //匹配不成功//lost match//比较并替换next域(弹出m结点)

s.casNext(m, mn); //help unlink

}

}

}else { //头结点正在匹配//help a fulfiller//保存头结点的next域

SNode m = h.next; //m与h可以匹配//m is h's match

if (m == null) //next域为null//waiter is gone//比较并替换head域(m被其他结点匹配了,需要弹出h)

casHead(h, null); //pop fulfilling node

else { //next域不为null//获取m结点的next域

SNode mn =m.next;if (m.tryMatch(h)) //m与h匹配成功//help match//比较并替换head域(弹出h和m结点)

casHead(h, mn); //pop both h and m

else //匹配不成功//lost match//比较并替换next域(移除m结点)

h.casNext(m, mn); //help unlink

}

}

}

}

View Code

说明:此函数用于生产或者消费一个元素,并且transfer函数调用了awaitFulfill函数,之后会通过一个例子给出流程。

4.3 awaitFulfill函数

SNode awaitFulfill(SNode s, boolean timed, longnanos) {/** When a node/thread is about to block, it sets its waiter

* field and then rechecks state at least one more time

* before actually parking, thus covering race vs

* fulfiller noticing that waiter is non-null so should be

* woken.

*

* When invoked by nodes that appear at the point of call

* to be at the head of the stack, calls to park are

* preceded by spins to avoid blocking when producers and

* consumers are arriving very close in time. This can

* happen enough to bother only on multiprocessors.

*

* The order of checks for returning out of main loop

* reflects fact that interrupts have precedence over

* normal returns, which have precedence over

* timeouts. (So, on timeout, one last check for match is

* done before giving up.) Except that calls from untimed

* SynchronousQueue.{poll/offer} don't check interrupts

* and don't wait at all, so are trapped in transfer

* method rather than calling awaitFulfill.*/

//根据timed标识计算截止时间

final long deadline = timed ? System.nanoTime() + nanos : 0L;//获取当前线程

Thread w =Thread.currentThread();//根据s确定空旋等待的时间

int spins = (shouldSpin(s) ?(timed? maxTimedSpins : maxUntimedSpins) : 0);for (;;) { //无限循环,确保操作成功

if (w.isInterrupted()) //当前线程被中断//取消s结点

s.tryCancel();//获取s结点的match域

SNode m =s.match;if (m != null) //m不为null,存在匹配结点//返回m结点

returnm;if (timed) { //设置了timed//确定继续等待的时间

nanos = deadline -System.nanoTime();if (nanos <= 0L) { //继续等待的时间小于等于0,等待超时//取消s结点

s.tryCancel();//跳过后面的部分,继续

continue;

}

}if (spins > 0) //空旋等待的时间大于0//确实是否还需要继续空旋等待

spins = shouldSpin(s) ? (spins-1) : 0;else if (s.waiter == null) //等待线程为null//设置waiter线程为当前线程

s.waiter = w; //establish waiter so can park next iter

else if (!timed) //没有设置timed标识//禁用当前线程并设置了阻塞者

LockSupport.park(this);else if (nanos > spinForTimeoutThreshold) //继续等待的时间大于阈值//禁用当前线程,最多等待指定的等待时间,除非许可可用

LockSupport.parkNanos(this, nanos);

}

}

View Code

说明:此函数表示当前线程自旋或阻塞,直到结点被匹配。awaitFulfill函数调用了shouldSpin函数

4.4 shouldSpin函数

booleanshouldSpin(SNode s) {//获取头结点

SNode h =head;//s为头结点或者头结点为null或者h包含FULFILLING标记,返回true

return (h == s || h == null ||isFulfilling(h.mode));

}

View Code

说明:此函数表示是当前结点所包含的线程(当前线程)进行空旋等待,有如下情况需要进行空旋等待

① 当前结点为头结点

② 头结点为null

③ 头结点正在匹配中

4.5 clean函数

voidclean(SNode s) {//s结点的item设置为null

s.item = null; //forget item//waiter域设置为null

s.waiter = null; //forget thread

/** At worst we may need to traverse entire stack to unlink

* s. If there are multiple concurrent calls to clean, we

* might not see s if another thread has already removed

* it. But we can stop when we see any node known to

* follow s. We use s.next unless it too is cancelled, in

* which case we try the node one past. We don't check any

* further because we don't want to doubly traverse just to

* find sentinel.*/

//获取s结点的next域

SNode past =s.next;if (past != null && past.isCancelled()) //next域不为null并且next域被取消//重新设置past

past =past.next;//Absorb cancelled nodes at head

SNode p;while ((p = head) != null && p != past && p.isCancelled()) //从栈顶头结点开始到past结点(不包括),将连续的取消结点移除//比较并替换head域(弹出取消的结点)

casHead(p, p.next);//Unsplice embedded nodes

while (p != null && p != past) { //移除上一步骤没有移除的非连续的取消结点//获取p的next域

SNode n =p.next;if (n != null && n.isCancelled()) //n不为null并且n被取消//比较并替换next域

p.casNext(n, n.next);else

//设置p为n

p =n;

}

}

View Code

说明:此函数用于移除从栈顶头结点开始到该结点(不包括)之间的所有已取消结点。

③ TransferQueue

1. 类的继承关系

static final class TransferQueue extends Transferer {}

说明:TransferQueue继承Transferer抽象类,其实现了transfer方法。

2. 类的属性

static final class TransferQueue extends Transferer{/** This extends Scherer-Scott dual queue algorithm, differing,

* among other ways, by using modes within nodes rather than

* marked pointers. The algorithm is a little simpler than

* that for stacks because fulfillers do not need explicit

* nodes, and matching is done by CAS'ing QNode.item field

* from non-null to null (for put) or vice versa (for take).*/

/**Head of queue*/

//队列的头结点

transient volatileQNode head;/**Tail of queue*/

//队列的尾结点

transient volatileQNode tail;/*** Reference to a cancelled node that might not yet have been

* unlinked from queue because it was the last inserted node

* when it was cancelled.*/

//指向一个取消的结点,当一个结点是最后插入队列时,当被取消时,它可能还没有离开队列

transient volatileQNode cleanMe;

}

View Code

说明:队列存在一个头结点和一个尾节点,分别指示队头和队尾,还包含了一个指示取消结点的域。

3. 类的内部类

QNode类

QNode的源码如下

static final classQNode {//下一个结点

volatile QNode next; //next node in queue//元素项

volatile Object item; //CAS'ed to or from null//等待线程

volatile Thread waiter; //to control park/unpark//是否为数据

final booleanisData;//构造函数

QNode(Object item, booleanisData) {//初始化item域

this.item =item;//初始化isData域

this.isData =isData;

}//比较并替换next域

booleancasNext(QNode cmp, QNode val) {return next == cmp &&UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}//比较并替换item域

booleancasItem(Object cmp, Object val) {return item == cmp &&UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

}/*** Tries to cancel by CAS'ing ref to this as item.*/

//取消本结点,将item域设置为自身

voidtryCancel(Object cmp) {

UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);

}//是否被取消

booleanisCancelled() {//item域是否等于自身

return item == this;

}/*** Returns true if this node is known to be off the queue

* because its next pointer has been forgotten due to

* an advanceHead operation.*/

//是否不在队列中

booleanisOffList() {//next与是否等于自身

return next == this;

}//Unsafe mechanics//反射机制

private static finalsun.misc.Unsafe UNSAFE;//item域的偏移地址

private static final longitemOffset;//next域的偏移地址

private static final longnextOffset;static{try{

UNSAFE=sun.misc.Unsafe.getUnsafe();

Class> k = QNode.class;

itemOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("item"));

nextOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

}catch(Exception e) {throw newError(e);

}

}

}

View Code

说明:QNode表示队列中的结点,并且通过反射和CAS原子性的修改对应的域值。

4. 类的构造函数

TransferQueue() {//初始化一个哨兵结点

QNode h = new QNode(null, false); //initialize to dummy node.//设置头结点

head =h;//设置尾结点

tail =h;

}

View Code

说明:该构造函数用于初始化一个队列,并且初始化了一个哨兵结点,头结点与尾节点均指向该哨兵结点。

5. 核心函数分析

5.1 transfer函数

E transfer(E e, boolean timed, longnanos) {/*Basic algorithm is to loop trying to take either of

* two actions:

*

* 1. If queue apparently empty or holding same-mode nodes,

* try to add node to queue of waiters, wait to be

* fulfilled (or cancelled) and return matching item.

*

* 2. If queue apparently contains waiting items, and this

* call is of complementary mode, try to fulfill by CAS'ing

* item field of waiting node and dequeuing it, and then

* returning matching item.

*

* In each case, along the way, check for and try to help

* advance head and tail on behalf of other stalled/slow

* threads.

*

* The loop starts off with a null check guarding against

* seeing uninitialized head or tail values. This never

* happens in current SynchronousQueue, but could if

* callers held non-volatile/final ref to the

* transferer. The check is here anyway because it places

* null checks at top of loop, which is usually faster

* than having them implicitly interspersed.*/QNode s= null; //constructed/reused as needed//确定此次转移的类型(put or take)

boolean isData = (e != null);for (;;) { //无限循环,确保操作成功//获取尾结点

QNode t =tail;//获取头结点

QNode h =head;if (t == null || h == null) //看到未初始化的头尾结点//saw uninitialized value//跳过后面的部分,继续

continue; //spin

if (h == t || t.isData == isData) { //头结点与尾结点相等或者尾结点的模式与当前结点模式相同//empty or same-mode//获取尾结点的next域

QNode tn =t.next;if (t != tail) //t不为尾结点,不一致,重试//inconsistent read

continue;if (tn != null) { //tn不为null,有其他线程添加了tn结点//lagging tail//设置新的尾结点为tn

advanceTail(t, tn);//跳过后面的部分,继续

continue;

}if (timed && nanos <= 0) //设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作//can't wait//返回null

return null;if (s == null) //s为null//新生一个结点并赋值给s

s = newQNode(e, isData);if (!t.casNext(null, s)) //设置t结点的next域不成功//failed to link in//跳过后面的部分,继续

continue;//设置新的尾结点

advanceTail(t, s); //swing tail and wait//Spins/blocks until node s is fulfilled//空旋或者阻塞直到s结点被匹配

Object x =awaitFulfill(s, e, timed, nanos);if (x == s) { //x与s相等,表示已经取消//wait was cancelled//清除

clean(t, s);//返回null

return null;

}if (!s.isOffList()) { //s结点还没离开队列//not already unlinked//设置新的头结点

advanceHead(t, s); //unlink if head

if (x != null) //x不为null//and forget fields//设置s结点的item

s.item =s;//设置s结点的waiter域为null

s.waiter = null;

}return (x != null) ?(E)x : e;

}else { //模式互补//complementary-mode//获取头结点的next域(匹配的结点)

QNode m = h.next; //node to fulfill

if (t != tail || m == null || h != head) //t不为尾结点或者m为null或者h不为头结点(不一致)//跳过后面的部分,继续

continue; //inconsistent read//获取m结点的元素域

Object x =m.item;if (isData == (x != null) || //m结点被匹配//m already fulfilled

x == m || //m结点被取消//m cancelled

!m.casItem(x, e)) { //CAS操作失败//lost CAS

advanceHead(h, m); //队列头结点出队列,并重试//dequeue and retry

continue;

}//匹配成功,设置新的头结点

advanceHead(h, m); //successfully fulfilled//unpark m结点对应的等待线程

LockSupport.unpark(m.waiter);return (x != null) ?(E)x : e;

}

}

}

View Code

说明:此函数用于生产或者消费一个元素,并且transfer函数调用了awaitFulfill函数,之后会通过一个例子给出流程。

5.2 awaitFulfill函数

Object awaitFulfill(QNode s, E e, boolean timed, longnanos) {/*Same idea as TransferStack.awaitFulfill*/

//根据timed标识计算截止时间

final long deadline = timed ? System.nanoTime() + nanos : 0L;//获取当前线程

Thread w =Thread.currentThread();//计算空旋时间

int spins = ((head.next == s) ?(timed? maxTimedSpins : maxUntimedSpins) : 0);for (;;) { //无限循环,确保操作成功

if (w.isInterrupted()) //当前线程被中断//取消

s.tryCancel(e);//获取s的元素域

Object x =s.item;if (x != e) //元素不为e//返回

returnx;if (timed) { //设置了timed//计算继续等待的时间

nanos = deadline -System.nanoTime();if (nanos <= 0L) { //继续等待的时间小于等于0//取消

s.tryCancel(e);//跳过后面的部分,继续

continue;

}

}if (spins > 0) //空旋时间大于0//减少空旋时间

--spins;else if (s.waiter == null) //等待线程为null//设置等待线程

s.waiter =w;else if (!timed) //没有设置timed标识//禁用当前线程并设置了阻塞者

LockSupport.park(this);else if (nanos > spinForTimeoutThreshold) //继续等待的时间大于阈值//禁用当前线程,最多等待指定的等待时间,除非许可可用

LockSupport.parkNanos(this, nanos);

}

}

View Code

说明:此函数表示当前线程自旋或阻塞,直到结点被匹配。

5.3 clean函数

voidclean(QNode pred, QNode s) {//设置等待线程为null

s.waiter = null; //forget thread

/** At any given time, exactly one node on list cannot be

* deleted -- the last inserted node. To accommodate this,

* if we cannot delete s, we save its predecessor as

* "cleanMe", deleting the previously saved version

* first. At least one of node s or the node previously

* saved can always be deleted, so this always terminates.*/

/** 在任何时候,最后插入的结点不能删除,为了满足这个条件

* 如果不能删除s结点,我们将s结点的前驱设置为cleanMe结点

* 删除之前保存的版本,至少s结点或者之前保存的结点能够被删除

* 所以最后总是会结束*/

while (pred.next == s) { //pred的next域为s//Return early if already unlinked//获取头结点

QNode h =head;//获取头结点的next域

QNode hn = h.next; //Absorb cancelled first node as head

if (hn != null && hn.isCancelled()) { //hn不为null并且hn被取消//设置新的头结点

advanceHead(h, hn);//跳过后面的部分,继续

continue;

}//获取尾结点,保证对尾结点的读一致性

QNode t = tail; //Ensure consistent read for tail

if (t == h) //尾结点为头结点,表示队列为空//返回

return;//获取尾结点的next域

QNode tn =t.next;if (t != tail) //t不为尾结点,不一致,重试//跳过后面的部分,继续

continue;if (tn != null) { //tn不为null//设置新的尾结点

advanceTail(t, tn);//跳过后面的部分,继续

continue;

}if (s != t) { //s不为尾结点,移除s//If not tail, try to unsplice

QNode sn =s.next;if (sn == s || pred.casNext(s, sn)) //

return;

}//获取cleanMe结点

QNode dp =cleanMe;if (dp != null) { //dp不为null,断开前面被取消的结点//Try unlinking previous cancelled node//获取dp的next域

QNode d =dp.next;

QNode dn;if (d == null || //d is gone or

d == dp || //d is off list or

!d.isCancelled() || //d not cancelled or

(d != t && //d not tail and

(dn = d.next) != null && //has successor

dn != d && //that is on list

dp.casNext(d, dn))) //d unspliced

casCleanMe(dp, null);if (dp ==pred)return; //s is already saved node

} else if (casCleanMe(null, pred))return; //Postpone cleaning s

}

}

View Code

说明:此函数用于移除已经被取消的结点。

3.3 类的属性

public class SynchronousQueue extends AbstractQueue

implements BlockingQueue, java.io.Serializable {//版本序列号

private static final long serialVersionUID = -3223113410248163686L;//可用的处理器

static final int NCPUS =Runtime.getRuntime().availableProcessors();//最大空旋时间

static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;//无限时的等待的最大空旋时间

static final int maxUntimedSpins = maxTimedSpins * 16;//超时空旋等待阈值

static final long spinForTimeoutThreshold = 1000L;//用于序列化

privateReentrantLock qlock;privateWaitQueue waitingProducers;privateWaitQueue waitingConsumers;

}

View Code

说明:SynchronousQueue类的属性包含了空旋等待时间相关的属性。

3.4 类的构造函数

1. SynchronousQueue()型构造函数

publicSynchronousQueue() {//非公平策略(先进后出)

this(false);

}

View Code

说明:该构造函数用于创建一个具有非公平访问策略的 SynchronousQueue。

2. SynchronousQueue(boolean)型构造函数

public SynchronousQueue(booleanfair) {//根据指定的策略生成不同的结构

transferer = fair ? new TransferQueue() : new TransferStack();

}

View Code

说明:创建一个具有指定公平策略的 SynchronousQueue。

3.5 核心函数分析

在分析了TransferStack和TransferQueue的相关函数后,SynchronousQueue的函数的分析就非常简单。

//将指定元素添加到此队列,如有必要则等待另一个线程接收它

public void put(E e) throwsInterruptedException {//e为null则抛出异常

if (e == null) throw newNullPointerException();if (transferer.transfer(e, false, 0) == null) { //进行转移操作//中断当前线程

Thread.interrupted();throw newInterruptedException();

}

}//将指定元素插入到此队列,如有必要则等待指定的时间,以便另一个线程接收它

public boolean offer(E e, longtimeout, TimeUnit unit)throwsInterruptedException {//e为null则抛出异常

if (e == null) throw newNullPointerException();if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) //进行转移操作

return true;if (!Thread.interrupted()) //当前线程没有被中断//返回

return false;throw newInterruptedException();

}//如果另一个线程正在等待以便接收指定元素,则将指定元素插入到此队列

public booleanoffer(E e) {//e为null则抛出异常

if (e == null) throw newNullPointerException();return transferer.transfer(e, true, 0) != null; //进行转移操作

}//获取并移除此队列的头,如有必要则等待另一个线程插入它

public E take() throwsInterruptedException {//进行转移操作

E e = transferer.transfer(null, false, 0);if (e != null)returne;

Thread.interrupted();throw newInterruptedException();

}//获取并移除此队列的头,如有必要则等待指定的时间,以便另一个线程插入它

public E poll(long timeout, TimeUnit unit) throwsInterruptedException {

E e= transferer.transfer(null, true, unit.toNanos(timeout));if (e != null || !Thread.interrupted()) //元素不为null或者当前线程没有被中断

returne;throw newInterruptedException();

}//如果另一个线程当前正要使用某个元素,则获取并移除此队列的头

publicE poll() {return transferer.transfer(null, true, 0);

}//始终返回 true

public booleanisEmpty() {return true;

}//始终返回 0

public intsize() {return 0;

}//始终返回 0

public intremainingCapacity() {return 0;

}//不执行任何操作

public voidclear() {

}//始终返回false

public booleancontains(Object o) {return false;

}//始终返回false

public booleanremove(Object o) {return false;

}//除非给定 collection 为空,否则返回 false

public boolean containsAll(Collection>c) {returnc.isEmpty();

}//始终返回 false

public boolean removeAll(Collection>c) {return false;

}//始终返回 false

public boolean retainAll(Collection>c) {return false;

}//始终返回 null

publicE peek() {return null;

}//返回一个空迭代器,其中 hasNext 始终返回 false

public Iteratoriterator() {returnCollections.emptyIterator();

}//

public Spliteratorspliterator() {returnSpliterators.emptySpliterator();

}//返回一个 0 长度的数组

publicObject[] toArray() {return new Object[0];

}//将指定数组的第 0 个元素设置为 null(如果该数组有非 0 的长度)并返回它

public T[] toArray(T[] a) {if (a.length > 0)

a[0] = null;returna;

}//移除此队列中所有可用的元素,并将它们添加到给定 collection 中

public int drainTo(Collection super E>c) {if (c == null)throw newNullPointerException();if (c == this)throw newIllegalArgumentException();int n = 0;for (E e; (e = poll()) != null;) {

c.add(e);++n;

}returnn;

}//最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中

public int drainTo(Collection super E> c, intmaxElements) {if (c == null)throw newNullPointerException();if (c == this)throw newIllegalArgumentException();int n = 0;for (E e; n < maxElements && (e = poll()) != null;) {

c.add(e);++n;

}returnn;

}

View Code

说明:SynchronousQueue的函数很大程度都是依托于TransferStack或TransferQueue的transfer函数,所以,了解transfer函数就可以了解SynchronousQueue的原理。

四、示例

下面通过一个示例来详细了解SynchronousQueue的使用。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue();

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result truec2 take result10p2 offer resulttruec1 take result50

说明:该示例中,有两个生产者p1、p2和两个消费者c1、c2,按照c1、c2、p1、p2的顺序启动,并且每个线程启动后休眠100ms,则可能有如下的时序图

说明:时序图中,c1线程的take操作早于c2线程的take操作早于p1线程的offer操作早于p2线程的offer操作。

根据示例源码可知,此SynchronousQueue采用非公平策略,即底层采用栈结构。

① c1执行take操作,主要的函数调用如下

说明:其中,c1线程进入awaitFulfill后,会空旋等待,直到空旋时间消逝,会调用LockSupport.park函数,会禁用当前线程(c1),直至许可可用。

② c1执行take操作,主要的函数调用如下

说明:其中,c2线程进入awaitFulfill后,会空旋等待,直到空旋时间消逝,会调用LockSupport.park函数,会禁用当前线程(c2),直至许可可用。并且此时栈中有两个节点,c2线程所在的结点和c1线程所在的结点。

③ p1线程执行offer(10)操作,主要的函数调用如下

说明:在执行offer(10)操作后,c2线程所在的结点与头结点进行了匹配(头结点生产数据,c2线程所在的结点消费数据),c2线程被unpark,可以继续运行,而c1线程还是被park中(非公平策略)。

③ c2线程被unpark后,继续运行,主要函数调用如下(由于c2线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)

说明:c2线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回10,再从take函数中返回10。

④ p2线程执行offer(50)操作,主要的函数调用如下

说明:在执行offer(50)操作后,c1线程所在的结点与头结点进行了匹配(头结点生产数据,c1线程所在的结点消费数据),c1线程被unpark,可以继续运行。

⑤ c1线程被unpark后,继续运行,主要函数调用如下(由于c1线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)

说明:c1线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回50,再从take函数中返回50。

上述是使用非公平策略的结果(首先匹配c2线程所在的结点,之后再匹配c1线程所在结点)。

修改示例,改用公平策略。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue(true);

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result truec1 take result10p2 offer resulttruec2 take result50

说明:从运行结果可知,c1线程会比c2线程先匹配(因为采用公平策略,先入队列先匹配,所以c1先得到匹配,然后再匹配c2)。具体的流程图与非公平策略类似,在此不再累赘。

当再次修改源码,还是使用非公平策略,只是改变c1、c2、p1、p2之间的启动顺序。更改为p1->c1->p2->c2。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue();

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result falsep2 offer resulttruec1 take result50

说明:此时,只有c1线程得到了匹配,p1线程存放元素,直接返回的false,因为此时没有消费者线程等待,而p2线程与c1线程进行了匹配,p2线程存放元素成功,c1线程获取元素成功,并且此时,c2线程还是处于park状态,此时应用程序无法正常结束。所以,可知,必须要先有取操作,然后存操作,两者才能正确的匹配,若先是存操作,然后再是取操作,此时无法匹配成功,会阻塞,取操作期待下一个存操作进行匹配。

五、总结

SynchronousQueue的源码就分析到这里,SynchronousQueue适合一对一的匹配场景,没有容量,无法缓存。有了这个基础,之后会方便分析线程池框架的源码,谢谢各位园友的观看~

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39836530/article/details/111777826

智能推荐

攻防世界_难度8_happy_puzzle_攻防世界困难模式攻略图文-程序员宅基地

文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文

达梦数据库的导出(备份)、导入_达梦数据库导入导出-程序员宅基地

文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作  导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释:   cwy_init/init_123..._达梦数据库导入导出

js引入kindeditor富文本编辑器的使用_kindeditor.js-程序员宅基地

文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js

STM32学习过程记录11——基于STM32G431CBU6硬件SPI+DMA的高效WS2812B控制方法-程序员宅基地

文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6

计算机网络-数据链路层_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输

软件测试工程师移民加拿大_无证移民,未受过软件工程师的教育(第1部分)-程序员宅基地

文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...

随便推点

Thinkpad X250 secure boot failed 启动失败问题解决_安装完系统提示secureboot failure-程序员宅基地

文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure

C++如何做字符串分割(5种方法)_c++ 字符串分割-程序员宅基地

文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割

2013第四届蓝桥杯 C/C++本科A组 真题答案解析_2013年第四届c a组蓝桥杯省赛真题解答-程序员宅基地

文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答

基于供需算法优化的核极限学习机(KELM)分类算法-程序员宅基地

文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。

metasploitable2渗透测试_metasploitable2怎么进入-程序员宅基地

文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入

Python学习之路:从入门到精通的指南_python人工智能开发从入门到精通pdf-程序员宅基地

文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf

推荐文章

热门文章

相关标签