SynchronousQueue source code analysis (based on Java 8)

SynchronousQueue source code analysis (based on Java 8)

Welcome everyone to pay attention to , I hope it will be helpful to you, if you think it is possible, please give me some Star

1. Introduction

SynchronousQueue is a member of the BlockingQueue family. Unlike other members, it has the following characteristics:

1. The entire queue has no capacity, which means that every time you put the value in, you must wait for the corresponding consumer to take the data before you can put the data again 2. Queue corresponding to peek, contains, clear, isEmpty... and other methods are actually invalid 3. The entire queue is divided into fair (TransferQueue FIFO) and unfair modes (TransferStack LIFO default) 4. If you use TransferQueue, there will always be a dummy node in the queue Copy code

2. Constructor

/** * Creates a {@code SynchronousQueue} with nonfair access policy */ public SynchronousQueue() {this(false);} /** * Creates a {@code KSynchronousQueue} with the specified fairness policy * @param fair */ public SynchronousQueue(boolean fair){ //Use the fair value to determine whether to use queue or stack storage thread node internally transferer = fair? new TransferQueue<E>(): new TransferStack<E>(); } Copy code

We can see that TransferStack is used by default as the internal node container, and we can use fair to decide whether it is fair or not

3. Fair Mode TransferQueue

/** * This is a very typical queue, it has the following characteristics * 1. The entire queue has two nodes, head and tail * 2. There will be a dummy node when the queue is initialized * 3. The head node of this queue is a dummy node/or sentinel node, so the operation is always the second node in the queue (also in the design of AQS) */ /** Head node*/ transient volatile QNode head; /** Tail node*/ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was last inserted node * when it was cancelled */ /** * Corresponding to the interrupted or timed out predecessor node, the meaning of this node is to mark, and its next node is to be deleted * When to use: * When you want to delete the node node, if the node node is the end of the queue, start using this node, * why? * Everyone knows that you can delete a node directly A.CASNext(B,, but when node B is the last element in the entire queue, * A thread deletes node B, and a thread inserts a node after node B. This operation will easily cause the inserted node to be lost. This cleanMe is very similar * Delete and add marker nodes in ConcurrentSkipListMap, they all play the same role */ transient volatile QNode cleanMe; TransferQueue(){ /** * Construct a dummy node, and there will always be such a dummy node in the entire queue * The existence of dummy node makes there is no complicated if condition judgment in the code */ QNode h = new QNode(null, false); head = h; tail = h; } /** * Advance the head node, set = this, help gc, * This is the same as in ConcurrentLinkedQueue */ void advanceHead(QNode h, QNode nh){ if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){ = h;//forget old next help gc } } /** Update new tail node*/ void advanceTail(QNode t, QNode nt){ if(tail == t){ unsafe.compareAndSwapObject(this, tailOffset, t, nt); } } /** CAS set cleamMe node*/ boolean casCleanMe(QNode cmp, QNode val){ return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val); } Copy code

From the code, we know that TransferQueue is a dual queue, which will be a dummy node by default during initialization; the
most special one is cleanMeNode, cleanMeNode is a marked node, is a node that needs to be deleted due to interruption or timeout, and it is clearing At the end of the queue, the node is not deleted directly, but the predecessor node of the deleted node is marked as the cleanMe node in preparation for the next deletion. The function is similar to the marker node in ConcurrentSkipListMap, and it prevents insertion at the same place. The node is lost due to the deletion of the node at the same time. If you don't understand, you can see ConcurrentSkipListMap .

3. Fair mode TransferQueue transfer method

The main logic of this method:

1. If the queue is empty/the tail node in the queue is of the same type as itself, add node To the queue until timeout/interrupt/other threads match this thread The timeout/interrupt awaitFulfill method returns the node itself If the match is successful, either null (returned by the producer), or really passed the value (returned by the consumer) 2. The queue is not empty, and the node of the queue is the node that matches the current node, Perform data transfer matching, and help the node dequeue of the previous block through the advanceHead method Copy code

Look directly at the code transfer

/** * Puts or takes an item * Main method * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanosecond * @return */ @Override E transfer(E e, boolean timed, long nanos) { /** * Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for gurading against * seeing uninitialized head or tail value. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicity interspersed * * The main method of this producer/consumer is mainly divided into two situations * * 1. If the queue is empty/the tail node in the queue is of the same type as itself, add node * To the queue, until timeout/interrupt/other threads match this thread * The timeout/interrupt awaitFulfill method returns the node itself * If the match is successful, either null (returned by the producer), or really passed the value (returned by the consumer) * * 2. The queue is not empty, and the node of the queue is the node matched by the current node, * Perform data transfer matching, and help the node dequeue of the previous block through the advanceHead method */ QNode s = null;//constrcuted/reused as needed boolean isData = (e != null);//1. Judgment e != null is used to distinguish between producer and consumer for(;;){ QNode t = tail; QNode h = head; if(t == null || h == null){//2. The data is not initialized, continue again continue;//spin } if(h == t || t.isData == isData){//3. The queue is empty, or the tail node of the queue is the same as itself (note that this is the comparison with the tail node, and the following matching is performed with Compare) QNode tn =; if(t != tail){//4. tail has changed, try again continue; } if(tn != null){//5. Other threads added, so help advance tail advanceTail(t, tn); continue; } if(timed && nanos <= 0){//6. The called method is of the wait type, and it has timed out, and directly returns null. See the SynchronousQueue.poll() method directly, indicating that this poll call is only in the current queue. A matching thread is waiting to be [matched to have a return value return null; } if(s == null){ s = new QNode(e, isData);//7. Build the node QNode } if(!t.casNext(null, s)){//8. Add the new node to the queue continue; } advanceTail(t, s);//9. Help advance the tail node Object x = awaitFulfill(s, e, timed, nanos);//10. Call awaitFulfill, if the node is, perform some spins, if not, block directly, knowing that there are other threads matching it, or It interrupts the thread itself if(x == s){//11. If (x == s) node s corresponds to the thread wait timeout or thread interruption, otherwise x == null (s is the producer) or the value is really passed (s is consumer) clean(t, s);//12. Clear the contact s. If s is not the last node of the linked list, then CAS directly deletes the node. If s is the last node of the linked list, then either clear the previous cleamMe node (cleamMe != null), then set s.prev to cleanMe node, delete it next time or directly set s.prev to cleanMe return null; } if(!s.isOffList()){//13. Node s has no offlist advanceHead(t, s);//14. Advance the head node, next time you will call the node to match (advanceHead is called here, because the code can be executed to this side, indicating that s is already the node) if(x != null){//and forget fields s.item = s; } s.waiter = null;//15. Release thread ref } return (x != null)? (E)x :e; }else{//16. Perform thread matching operations. The matching operation starts from (note that there is a dummy node when the queue is first built, and the head node is always a dummy node. This is the same as in AQS) QNode m =;//17. Get and prepare to start matching if(t != tail || m == null || h != head){ continue;//18. Inconsistent reads, other threads have changed the structure of the queue inconsistent read } /** Producer and consumer match operation * 1. Get the item of m (note that m here is the next node of head * 2. Determine whether the pattern of isData and x match, only produce and consumer can be paired * 3. x == m judge whether node m has been cancelled, see (QNOde#tryCancel) * 4. m.casItem exchanges the data between the producer and the consumer (there is a situation where the cas operation may fail during concurrency) * 5. Dequeue the h node if the cas operation is successful * * Doubt: Why dequeue h instead of m node * Answer: Because every time you pair, h is a dummy node, and the real data node is */ Object x = m.item; if(isData == (x != null) ||//19. Whether the patterns of the two match (because other threads may force the matching node in a concurrent environment) x == m ||//20. m node thread interrupted or wait timed out !m.casItem(x, e)//21. Perform CAS operation to change the item value of the waiting thread (waiting may be concumer/producer) ){ advanceHead(h, m);//22. Advance the head node to retry (especially 21 operation failed) continue; } advanceHead(h, m);//23. The producer consumer exchanges data successfully and advances the head node LockSupport.unpark(m.waiter);//24. Change the m node in the waiting line, and in the awaitFulfill method because the item has changed, so x != e is established, return return (x != null)? (E)x: e;//25. If it is a producer, then x != null, return x, if it is a consumer, then x == null,. Return producer (actually node m) of e } } } Copy code

OK, let's sort out the general process:

1. At the beginning, the entire queue is empty, and the thread is directly encapsulated into QNode, and enters the spin waiting state through the awaitFulfill method. Unless it times out or the thread is interrupted, it waits until there is a thread that matches 2. If the next thread is the same as the tail node, then proceed to the first step, otherwise the data transfer (step 21), and then unpark the waiting thread 3. The waiting thread is awakened, returns from the awaitFulfill method, and finally returns the result Copy code

4. Fair mode TransferQueue awaitFulfill

/** * Spins/blocks until node s is fulfilled * * Main logic: If the node is, spin for a while, if not, call LockSupport.park/parkNanos() until other threads wake it up * * @param s the waiting node * @param e the comparsion value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s of cancelled */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos){ final long deadline = timed? System.nanoTime() + nanos: 0L;//1. Calculate the deadline time (only useful when timed is true) Thread w = Thread.currentThread();//2. Get the current thread int spins = (( == s)?//3. Spin only when the current node is, otherwise, isn t it a waste of CPU? (timed? maxTimeSpins: maxUntimedSpins): 0); for(;;){//loop until success if(w.isInterrupted()){//4. If the thread is interrupted, directly set item = this, and the return value will be judged in transfer (step 11 in transfer) s.tryCancel(e); } Object x = s.item; if(x != e){//5. Thread blocking -> wake up, thread interruption, waiting timeout, at this time x != e, return directly return x; } if(timed){ nanos = deadline-System.nanoTime(); if(nanos <= 0L){//6. Wait for timeout, change the item value of node, proceed to continue, the next step is to step 5 of awaitFulfill -> return s.tryCancel(e); continue; } } if(spins> 0){//7. spin decreases one by one --spins; } else if(s.waiter == null){ s.waiter = w; } else if(!timed){//8. Make a park without timeout LockSupport.park(this); } else if(nanos> spinForTimeoutThreshold){//9. The number of spins has passed, directly + timeout way park LockSupport.parkNanos(this, nanos); } } } Copy code

Sorting out the logic:

1. Calculate timeout time (if time = true) 2. Determine whether the current node is a node (there is a dummy node in the queue, and this is also the case in AQS), if it is, then perform spin assignment, other nodes do not have this need, and waste resources 3. The next step is spinning, and blocking is performed after the number of times is exceeded, until other threads wake up, or thread interruption (here, the thread interruption returns to Node itself) Copy code

5. Fair Mode TransferQueue clean

/** * Gets rid of cancelled node s with original predecessor pred. * Clear the node that is interrupted or waiting for timeout */ void clean(QNode pred, QNode s) { s.waiter = null;//forget thread//1. Clear the thread reference /* * At any given time, exactly one node on list cannot be * deleted - the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. * * At any time during the running of the program, the last inserted node cannot be deleted (the deletion here refers to the direct deletion through cas, because such direct deletion will cause the risk of deleting other nodes) * When node s is the last node, save s.pred as a cleamMe node, and perform the clear operation next time */ while ( == s) {//Return early if already unlinked//2. Determine == s, the following step 2 may lead to = next QNode h = head; QNode hn =;//Absorb cancelled first node as head if (hn != null && hn.isCancelled()) {//3. If hn is interrupted or timed out, advance the head pointer. If h is pred at this time, the condition " == s" in the loop is not satisfied, Exit loop advanceHead(h, hn); continue; } QNode t = tail;//Ensure consistent read for tail if (t == h)//4. The queue is empty, indicating that other threads are operating and deleting the node (note that there will always be a dummy node here) return; QNode tn =; if (t != tail)//5. Other threads change tail, continue to restart continue; if (tn != null) { advanceTail(t, tn);//6. Help advance the tail continue; } if (s != t) {//If not tail, try to unsplice//7. If the node s is not the tail node, then CAS delete the node directly (this kind of deletion in the middle of the queue is no risk) QNode sn =; if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe;//8. s is the tail node of the queue, then cleanMe comes out if (dp != null) {//Try unlinking previous cancelled node QNode d =;//9. cleanMe is not null, delete the s node that is deleted once, that is, node d here QNode dn; if (d == null ||//d is gone or//10. There are a few special cases here 1. The original s node (), which is the node d here, has been deleted; 2. The original node cleanMe has passed advanceHead Delete; 3 The original node s has been deleted (so! d.siCancelled), there are these three situations, directly clear cleanMe d == dp ||//d is off list or !d.isCancelled() ||//d not cancelled or (d != t &&//d not tail and//11. d is not a tail node, and dn has no offlist, directly delete the last node s (that is, node d here) through cas; in fact, it is cleared according to cleanMe Node in the middle of the queue (dn = != null &&//has successor dn != d &&//that is on list dp.casNext(d, dn)))//d unspliced casCleanMe(dp, null);//12. Clear the cleanMe node, where dp == pred, if it is established, it means that the node s is cleared. It succeeds and returns directly. Otherwise, you need to loop again, and then go to step 13, set this cleanMe And then return if (dp == pred) return;//s is already saved node } else if (casCleanMe(null, pred))//The original cleanMe is null, then mark pred as cleamMe to mark the next clear s node return;//Postpone cleaning s } } Copy code

The clean method is the difficulty in the entire code analysis process:

1. Concurrency is more difficult 2. The significance of the existence of cleanMe nodes Copy code

Calling this method is called when the node thread is interrupted or waiting for a timeout. When clearing, there are two cases for discussion:

1. The deleted node is not the tail node of the queue. At this time, it is directly deleted by pred.casNext(s, (similar to ConcurrentLikedQueue) 2. The deleted node is the end node of the team 1) At this time cleanMe == null, the pred node of the predecessor is marked as cleanMe to prepare for the next deletion 2) At this time cleanMe != null, first delete the node that needs to be deleted last time, then set cleanMe to null, and then assign pred to cleanMe At this time, we think of the marker node in ConcurrentSkipListMap. Yes, marker and cleanMe both play a role in preventing multiple deletion of nodes in a concurrent environment. Copy code


SynchronousQueue source code analysis (based on Java 8)