Dowemo
0 0 0 0

we've analyzed the implementation process of AbstractQueuedSynchronizer exclusive mode, and continue to look at the implementation process of AbstractQueuedSynchronizer shared mode acquire. A distinction between exclusive mode acquire and shared mode acquire can be improved for AbstractQueuedSynchronizer understanding.

Access to shared synchronization state

Just before you start, look at how to use it. It's only a good idea to understand why.

publicclassMyReadLockimplementsLock{privatefinal Sync sync;
 publicMyReadLock(int count){
 sync=new Sync(count);
 }
 privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{ Sync(int count){
 if (count<=0){
 thrownew IllegalArgumentException("count 必须大于0");
 }
 setState(count);
 }
 @OverrideprotectedinttryAcquireShared (int arg) {
 for (;;){
 int current=getState();
 int newCount=current-arg;
 if (newCount<0 || compareAndSetState(current,newCount)){
 return newCount;
 }
 }
 }
 @OverrideprotectedbooleantryReleaseShared(int arg) {
 for (;;){
 int current=getState();
 int newCount=current+arg;
 if (compareAndSetState(current,newCount)){
 returntrue;
 }
 }
 }
 public Condition newCondition() {
 returnnew ConditionObject();
 }
 }
 @Overridepublicvoidlock() {
 sync.acquireShared(1);
 }
 @OverridepublicvoidlockInterruptibly() throws InterruptedException {
 sync.acquireSharedInterruptibly(1);
 }
 @OverridepublicbooleantryLock() {
 return sync.tryReleaseShared(1);
 }
 @OverridepublicbooleantryLock(long time, TimeUnit unit) throws InterruptedException {
 return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
 }
 @Overridepublicvoidunlock() {
 sync.releaseShared(1);
 }
 @Overridepublic Condition newCondition() {
 return sync.newCondition();
 }
}

In fact, it's almost the same as exclusive, but the internal call method is changed, and the external interface name isn't changed.
Control the number of threads ( control concurrency ) that can simultaneously acquire synchronous states by passing in count, and here count or state can be understood as.

publicvoidlock() {
 sync.acquireShared(1);
 }

Every time you get a sync state, you'll be able to look at acquireshared by adding 1 operations ( of course you can feel free to see how you use the scene ).

acquireShared
publicfinalvoidacquireShared(int arg) {
 if (tryAcquireShared(arg) <0)
 doAcquireShared(arg);
 }

Again, we'll call our tryacquireshared.

@OverrideprotectedinttryAcquireShared (int arg) {
 for (;;){
 int current=getState();
 int newCount=current-arg;
 if (newCount<0 || compareAndSetState(current,newCount)){
 return newCount;
 }
 }
 }

It's the tryacquireshared that we rewrite, and the method overridden by the subclass is returned by the, which returns a boolean, whether the tryAcquire succeeds; When shared mode acquire is shared, an int variable is returned to determine whether <0 is.
In tryacquireshared, we do infinite loops, first get the current state, and decide whether to get the"resource"successfully:
( 1 ) if the newcount is greater than 0, the description must be available for synchronization, then state is set with cas; if it fails again, the newcount is returned, and we know that you return a number greater than 0, so it's better to return the number of remaining resources, which is better than the number of resources remaining.
( 2 ) if newcount <0, it isn't possible to get a synchronous state, and return a negative number.

doAcquireShared
/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */privatevoiddoAcquireShared(int arg) {
 //这里是共享模式了final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 if (p == head) {
 int r = tryAcquireShared(arg);
 if (r> = 0) {
 setHeadAndPropagate(node, r);
 p.next = null; //help GCif (interrupted)
 selfInterrupt();
 failed = false;
 return;
 }
 }
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
 }

Let's analyze what this code did:

( 1 ) addwaiter, which is the same as exclusive locks, to instantiate a node ( shared mode ) of all tryacquireshared <0, added to the synchronous queue end.
( 2 ) only the precursor node is the head node to try to get the synchronization state.
( 3 ) determine whether the current thread needs to suspend.
Most of the logic in the shared mode under acquire and exclusive mode is almost the same, and the biggest difference is that after tryacquireshared is successful, the acquire mode is set to the head node directly, and the shared mode executes the setheadandpropagate method to see what setheadandpropagate did.

setHeadAndPropagate
/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate> 0or * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
 privatevoid setHeadAndPropagate(Node node, int propagate) {
 Node h = head; //Record old head for check below setHead(node);
/*
 * Try to signal next queued node if:
 * Propagation was indicated by caller,
 * or was recorded (as h.waitStatus either before
 * or after setHead) by a previous operation
 * (note: this uses sign-check of waitStatus because
 * PROPAGATE status may transition to SIGNAL.)
 * and * The next node is waiting in shared mode,
 * or we don't know, because it appears null *
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 */
 if (propagate> 0 || h == null || h.waitStatus <0 ||
 (h = head) == null || h.waitStatus <0) {
 Node s = node.next;
 if (s == null || s.isShared())
 doReleaseShared();
 }
 }

What's propagation? is the return value of our tryacquireshared, as well as the number of resources left.
( 1 ) first set the nodes that get the synchronization status to head.
( 2 ) the same as exclusive. After a node is wake up, it only needs to set the node to head. The same node is set to head. Once the head is set to head, if its subsequent node is shared, then if its subsequent node is shared, then the doreleaseshared method attempts to wake back to the node, and implements the backward propagation of shared state.
In order to better understand the whole process, we analyze doreleaseshared process directly.

doReleaseShared
/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
 private void doReleaseShared() {
/*
 * Ensure that a release propagates, even if there are other
 * in-progress acquires/releases. This proceeds in the usual
 * way of trying to unparkSuccessor of head if it needs
 * signal. But if it does not, status is set to PROPAGATE to
 * ensure that upon release, propagation continues.
 * Additionally, we must loopincase a new node is added
 * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status
 * fails, if so rechecking.
 */
 for (;;) {
 Node h = head;
 if (h!= null && h!= tail) {
 int ws = h.waitStatus;
 if (ws == Node.SIGNAL) {
 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 continue; //loop to recheck cases
 unparkSuccessor(h);
 }
 elseif (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 continue; //loopon failed CAS
 }
 if (h == head) //loopif head changed
 break;
 }
 }

In the exclusive form, we analyzed:
( 1 ) before the thread is suspended, the state of the precursor node is set to signal.
It's so in the share.

If the state of the head node is signal, then the description needs to wake the subsequent nodes, then set the state of the head node to initial 0, then wake the subsequent.

: when a wake occurs, subsequent threads will go into the synchronization state acquisition, if the acquisition success will be in the method
Assuming that the wake thread, thread b doesn't get a synchronous state or a new head after it has been acquired, then a find head doesn't change so that it can quit
If you wake up the thread, b gets the synchronization state and sets the new head, then a found head becomes changed, then the logic is executed again, and subsequent.
Head node itself 's waitstatus is 0, or when a thread wakes up other threads, the other thread is releasing the state, and the state of the header node is 0, then it'll try to.
In fact, it's still a bit more complex because there will be concurrency, which will be considered in what situations will be called, if it's simply a.
It's the last flow chart that helps to understand that it's only meant to represent the complex part, but it isn't very coherent, and eventually the process is out of order, the process is a bit complicated, and some places may be handled.

这里写图片描述

Release of shared mode synchronization state

It seems like we've been on top of it, but it's going to go over, there's a whole understanding.

publicvoidunlock() {
 sync.releaseShared(1);
 }

It's still starting from our method.

/**
 * Releases in shared mode. Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument. This value is conveyed to
 * {@link #tryReleaseShared} but is otherwise uninterpreted
 * and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */publicfinalbooleanreleaseShared(int arg) {
 if (tryReleaseShared(arg)) {
 doReleaseShared();
 returntrue;
 }
 returnfalse;
 }

Synchronizer calls the tryreleaseshared method we override

protectedbooleantryReleaseShared(int arg) {
 for (;;){
 int current=getState();
 int newCount=current+arg;
 if (compareAndSetState(current,newCount)){
 returntrue;
 }
 }
 }

After successful setup of the cas, you're in the doreleaseshared ( ) method, and that's why we're starting to analyze it, which is no longer repeated.
Our tryreleaseshared approach is actually a problem because it may cause repetitive release problems, and it's easy to do so, as long as you release the original count size.
analysis source code should be aware of the comments, actually speaking, it's helpful to analyze.

Condition interface

Any java object that has a set of monitor methods defined on ( java.lang. object ), mainly including wait ( ), notify ( ), notifyall ( ) method, which is matched with synchronized synchronization keywords, and can implement wait/notification mode. The condition interface also provides a monitor method similar to object, which can achieve wait/notification mode.

publicinterfaceCondition {void await() throws InterruptedException;
 void awaitUninterruptibly();
 long awaitNanos(long nanosTimeout) throws InterruptedException;
 boolean await(long time, TimeUnit unit) throws InterruptedException;
 boolean awaitUntil(Date deadline) throws InterruptedException;
 void signal();
 void signalAll();
}

await ( )
The current thread waits until a wake signal ( signal ) is received or is interrupted before it's received. After calling this method, the current thread releases the held lock. If the current wait thread is returned from the method, the lock is fetched before returning.

timeunit ( long time, unit )
After calling this method, the current thread is waiting until the current thread receives a wake signal before it's interrupted or reaches a specified wait time. Retur false if wait time timeout is detected before returning from this method; otherwise retur true.

awaitNanos ( long nanosTimeout )
It's equivalent to the timeunit ( long time, unit ) method, which is only waiting for
Wait time for nanosTimeout specified in nanoseconds. A return value is an estimate of the number of seconds remaining, and if a timeout is returned, a value less than 0 is returned. You can determine if you want to wait again and wait for the time to wait for this return value.

awaitUninterruptibly ( )
The current thread enters wait state until it's notified that the method isn't sensitive to interrupts.

awaituntil ( date deadline )
The current thread enters the wait state until it's notified, interrupted, or to a certain time, and returns true if no specified time is specified, otherwise.

( )
Wake a wait thread, and if all threads are waiting for this condition, select one of them. The thread must the lock before returning from the await.

signalAll ( )
Wake all wait threads, wake all threads if all threads are waiting for this condition. Each thread must the lock before returning from the await.

Make a demo, for example, the best fit for producers and consumers, but in a sized queue, the producer puts data in the queue, the consumer gets data, when the

publicclass FoodQueue<T> {
 //队列大小privateint size;
 //list 充当队列private List<T> food;
 //锁private Lock lock=new ReentrantLock();
 //保证队列大小不<0 的conditionprivate Condition notEmpty=lock.newCondition();
 //保证队列大小不>size的conditionprivate Condition notFull=lock.newCondition();
 publicFoodQueue(int size){
 this.size=size;
 food=new ArrayList<T>();
 }
 publicvoidproduct(T t) throws Exception{
 lock.lock();
 try{
 //如果队列满了,就不能生产了,等待消费者消费数据while (size==food.size()){
 notFull.await();
 }
 //队列已经有空位置了,放入数据 food.add(t);
 //队列已经有数据了,也就是不为空了,可以通知消费者消费了 notEmpty.signal();
 }finally {
 lock.unlock();
 }
 }
 public T consume() throws Exception{
 lock.lock();
 try{
 //队列为空,需要等待生产者生产数据while (food.size()==0){
 notEmpty.await();
 }
 //生产者生产了数据,可以拿掉一个数据 T t=food.remove(0);
 //通知消费者可以继续生产了 notFull.signal();
 return t;
 }finally {
 lock.unlock();
 }
 }
}

Implementation analysis of condition

An implementation class for condition is ConditionObject, ConditionObject is an internal class of the synchronizer AbstractQueuedSynchronizer, and the operation of condition needs to acquire the associated lock so that it needs to be connected with the synchronizer.
Each condition object contai & a queue ( wait queue ), the concept of a node in condition, and a node is co & tructed when the thread is placed in a waiting queue, and the definition of this node is actually reusing the definition of a node in a synchronizer ( a definition of a node in the article ).
Here's a part of the ConditionObject that intercepts the in the synchronizer:

publicclassConditionObjectimplementsCondition, java.io.Serializable {/** First node of condition queue. */privatetransient Node firstWaiter;
 /** Last node of condition queue. */privatetransient Node lastWaiter;
 publicConditionObject() { }
 private Node addConditionWaiter() {
 }
 privatevoidunlinkCancelledWaiters() {
 }
 publicfinalvoidsignal() {
 }
 publicfinalvoidsignalAll() {
 }
 publicfinalvoidawait() throws InterruptedException {
 }
. . .
}
Wait queue

Waiting queue is a queue of fifo that contai & a thread reference in each node in the queue, which is a thread waiting on the condition object, and if a thread calls condition. The wait ( ) method, then the thread releases the lock and causes nodes to join waiting queues and enter wait.
A condition contai & a wait queue and condition has the first node ( firstwaiter ) and the tail node ( lastwaiter ).
这里写图片描述

Wait ( wait )
/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 * throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 * {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */publicfinalvoidawait() throws InterruptedException {
 if (Thread.interrupted())
 thrownew InterruptedException();
 Node node = addConditionWaiter();
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 while (!isOnSyncQueue(node)) {
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node))!= 0)
 break;
 }
 if (acquireQueued(node, savedState) && interruptMode!= THROW_IE)
 interruptMode = REINTERRUPT;
 if (node.nextWaiter!= null) //clean up if cancelled unlinkCancelledWaiters();
 if (interruptMode!= 0)
 reportInterruptAfterWait(interruptMode);
 }

( 1 ) if the thread is interrupted, throw an interrupt exception
( 2 ) generate a node node ( binding with the current thread ) and join the node to the waiting queue
( 3 ) release the lock ( state ) of the thread
( 4 ) until the current node isn't in the synchronization queue, suspend the thread.
( 5 ) after the thread wakes, get the synchronization state ( lock )
( 6 ) after a thread wakes, if it isn't a tail node, then the queue is checked to clear some canceled nodes.
The general process is that, now take a look at each of the execution steps:

Add node to wait queue

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */private Node addConditionWaiter() {
 Node t = lastWaiter;
 //If lastWaiter is cancelled, clean out.if (t!= null && t.waitStatus!= Node.CONDITION) {
 unlinkCancelledWaiters();
 t = lastWaiter;
 }
 Node node = new Node(Thread.currentThread(), Node.CONDITION);
 if (t == null)
 firstWaiter = node;
 else t.nextWaiter = node;
 lastWaiter = node;
 return node;
 }

If the state isn't condition, then the thread task is cancelled, then it needs to be removed from the wait queue.

private void unlinkCancelledWaiters() {
 Node t = firstWaiter;
 Node trail = null;
 while (t!= null) {
 Node next = t.nextWaiter;
 if (t.waitStatus!= Node.CONDITION) {
 t.nextWaiter = null;
 if (trail == null)
 firstWaiter = next;
 else trail.nextWaiter = next;
 if (next == null)
 lastWaiter = trail;
 }
 else trail = t;
 t = next;
 }
 }

This is very simple to traverse the waiting queue and remove the cancelled thread node from the queue.

To release a lock from a thread ( synchronous state )

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */finalint fullyRelease(Node node) {
 boolean failed = true;
 try {
 int savedState = getState();
 if (release(savedState)) {
 failed = false;
 return savedState;
 } else {
 thrownew IllegalMonitorStateException();
 }
 } finally {
 if (failed)
 node.waitStatus = Node.CANCELLED;
 }
 }

Get the entire state when state is released, because a thread may call ( reentrant ) the lock ( ) method more than once, so that the state is released all the time, so that the later thread is.
think about this problem: Does condition support shared access locks? my inference isn't supported, why, in fullyrelease we see that we'll release the entire state, but if it's shared, but it isn't a shared lock for other threads.

Determine whether the current node is in the synchronization queue:

/**
 * Returns trueif a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @returntrueifis reacquiring
 */
 final boolean isOnSyncQueue(Node node) {
 if (node.waitStatus == Node.CONDITION || node.prev == null)
 returnfalse;
 if (node.next!= null) // If has successor, it must be on queue
 returntrue;
/*
 * node.prev can be non-null, but not yet on queue because
 * the CAS to place it on queue can fail. So we have to
 * traverse from tail to make sure it actually made it. It
 * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be
 * there, so we hardly ever traverse much.
 */
 return findNodeFromTail(node);
 }

( 1 ) if the state of the current node is equal to condition or the precursor node pre is null, not in synchronization queue
( 2 ) if the current node 's next node isn't null, it's represented in the synchronization queue, and the next and pre are used in the synchronous queue, waiting for the queue to not be used.
( 3 ) if the current node 's precursor node pre isn't null, it can't be noted that the node is in the synchronization queue, and see the following code

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 //Try the fast path of enq; backup to full enq on failure Node pred = tail;
 if (pred!= null) {
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
 pred.next = node;
 return node;
 }
 }
 enq(node);
 return node;
 }

This is a method of adding nodes in a synchronous queue, which first sets node. Pointcuts = gbxml, and then the cas set tail, but it may fail to set the precursor node of the node, causing the node to set the precursor node, but the node doesn't join the synchronous queue. So there's a findnodefromtail method, looking forward from the tail node, if the current node is found, it isn't in the synchronization queue.

( 4 ) if the thread is wakes up after the thread is wakes, to get the synchronization state ( released before suspend ), this method is analyzed in the above article.

Summary

( 1 ) the thread is a lock before executing the so no cas is used to guarantee thread security.
( 2 ) after the call is invoked, the thread releases all locks and then suspended.
( 3 ) after the thread is waking up, the thread is reconnected to the lock competition, and before the thread is suspended, the thread isn't in the synchronous queue, but the
( 4 ) await indicates that the thread has to the lock.

Notification ( signal )
/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 * returns {@code false}
 */publicfinalvoidsignal() {
 if (!isHeldExclusively())
 thrownew IllegalMonitorStateException();
 Node first = firstWaiter;
 if (first!= null)
 doSignal(first);
 }

( 1 ) we see that if you aren't currently exclusive, then an exception is thrown, which means that the condition isn't applicable to shared patterns, and isn't verified.
( 2 ) take the fi t wait node and then wake up

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */privatevoiddoSignal(Node first) {
 do {
 if ( (firstWaiter = first.nextWaiter) == null)
 lastWaiter = null;
 first.nextWaiter = null;
 } while (!transferForSignal(first) &&
 (first = firstWaiter)!= null);
 }

( 1 ) reset firstwaiter to point to the nextwaiter of the first waiter
( 2 ) if the first waiter 's nextwaiter is null, it indicates that there's only one waiter in the current queue, and lastwaiter is empty.
Because firstwaiter is going to be signal, it isn't used, and nextwaiter is empty, and the transferforsignal method is executed.

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns trueif successful.
 * @param node the node
 * @returntrueif successfully transferred (elsethe node was
 * cancelled before signal)
 */
 final boolean transferForSignal(Node node) {
/*
 * If cannot change waitStatus, the node has been cancelled.
 */
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/*
 * Splice onto queue andtrytoset waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt toset waitStatus fails, wake up to resync (in which
 * case the waitStatus can be transiently and harmlessly wrong).
 */
 Node p = enq(node);
 int ws = p.waitStatus;
 if (ws> 0 ||!compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 LockSupport.unpark(node.thread); returntrue;
 }

From the note we can see that the method is to convert a node from the condition queue to a AbstractQueuedSynchronizer queue ( whether we've solved the question above ). )

Summarize the implementation of the method:

( 1 ) attempt to set the waitstatus of node from condition to 0, which failed to return false
( 2 ) the current node enters the calling enq method into the AbstractQueuedSynchronizer queue
( 3 ) current nodes pass waitstatus to signal by cas mechanism and wake threads.
Return true to represent a wake success.
Here we can get an important conclusion: It doesn't mean that the code after a ( ) node is wakes up and doesn't mean that the code follows it immediately, and it's added to the end of the AbstractQueuedSynchronizer queue.
( 4 ) if the transferforsignal execution fails, false, then wake the next node, and firstwaiter is reset, which is the node that will be removed to wait for the queue to be removed. Here's a question? if the thread is cancelled, then the wake fails, and the node is correct, if the thread isn't canceled, but it'll still wake up if the thread doesn't? Unless you interrupt it.

Ok, to the await ( ) and signal ( ) analysis, the analysis of the await ( ) and signal ( ) is done. It's simpler than the exclusive mode analysis. In addition, the condition has several other methods, wait, no response waiting, wake all nodes, and all of the nodes are, and then wake up.

Implementation principle of java operation

java concurrency - unde & tanding volatile synchronized

java concurrent -abstractqueuedsynchronizer ( synchronizer ) - exclusive mode




Copyright © 2011 Dowemo All rights reserved.    Creative Commons   AboutUs