`

AbstractQueuedSynchronizer

 
阅读更多

AbstractQueuedSynchronizer 是多线程实现同步的基类,采用先进先出线程队列,队列中存有某些线程,这些线程由于某些状态或条件不满足(例如未获得锁)而进入阻塞状态,当条件满足时(例如持有锁的线程释放了锁)先入队列的阻塞线程被唤醒,它再次判断某些状态或条件是否满足而继续执行或再次进入阻塞状态。 

 

 AbstractQueuedSynchronizer 支持两种锁机制,共享锁,独占锁。

 共享锁:同一时刻允许多个线程获得锁。 

 独占锁:同一时刻只允许一条线程能获得锁。

 

成员变量:private volatile int state,表示某种状态条件,由子类定义其意义。

AQS 提供了对state 操作的方法:

    1.getState() 返回state的值

    2.setState(int state) 设定state的值 

    3. 原子的设定state的值:

/**
*原子的设定state的值,一般在循环中使用。
* expect 期望值
* update 更新值
* 
* 如果expect =state,则更新state=update 并返回真,否则返回假
*/ 
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

 

 

独占模式相关方法:

 

/**
*  模板方法tryAcquire返回真,表示某种状态或条件满足(主要是根据state判断),当前线程可以继续执行,
*  模板方法tryAcquire返回假,表示某种状态或条件不满足(主要是根据state判断),线程加入FIFO队列,当前线程阻塞。直至线程被唤醒(release方法被调用并线程处于队列中head之后的第一个节点位置(FIFO)),
*  当前线程会再次判断某种状态或条件是否满足,从而执行或是阻塞
*/
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//addWaiter(Node.EXCLUSIVE)生成等待节点并存入队列
            selfInterrupt();
}
public final void  acquireInterruptibly(int agr){
	......同上,但支持中断
}
 final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {//无限循环
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//线程被唤醒后,如果该节点是head之后的第一个节点(FIFO),则再次判断再次调用tryAcquire判断某种状态或条件是否满足,如果满足当前线程可以继续执行,否则再次进入阻塞状态直至再次被唤醒。
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//调用LockSupport.park(this)阻塞当前线程
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

/**
*运行中的线程,调用tryRelease方法
*模板方法tryRelease返回真,把阻塞队列中某个线程恢复。
*模板方法tryRelease返回假,则退出。
*/
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//调用LockSupport.unpark 把阻塞队中head之后第一个阻塞线程恢复
            return true;
        }
        return false;
    }
            
 tryAcquire 和 tryRelease方法采用模板模式由子类实现。

 

 

 

ReentrantLock 类使用独占模式

   state=0表示独占锁未被占用,其它线程可以争抢独占锁,抢到锁的线程更新state+=1,表明锁被占用

   state>0 表示独占锁被某线程占用,某它相要获得独占锁的线程只能进入阻塞状态,直至持有锁的线程完全释放锁(更新state=0)。 

ReentrantLock中非公平锁源码分析:

 

  final void lock() {
            if (compareAndSetState(0, 1))//如果state=0,即锁未被占用,则获得锁,将state改为1 ,使其它线程不能够获得锁而阻塞
                setExclusiveOwnerThread(Thread.currentThread());//设置获得独占锁的线程
            else//未抢到锁
                acquire(1);//内部调用tryAcquire判断是否能够获得锁,能够获得锁,则继续执行,不能获得锁则阻塞
        }
    /**
    *获取锁的操作方法 
    * 返回真 ,获得锁
    * 返回假,没有获得锁
    */   
 final boolean tryAcquire(int acquires) {//
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//锁未占用
                if (compareAndSetState(0, acquires)) {//占用锁,改变state值其它线程就不能获得锁了。
                    setExclusiveOwnerThread(current);//设定获占锁的线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//当前线程已经占用了锁,
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        
        public void unlock() {//释放锁
        	sync.release(1);//内部调用tryRelease来判断是否能释放锁
    		}
                    
        /**
        *
        *返回真,锁被释放
        *返回假,锁未被释放
        */
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//c==0多重锁完释放,否则还是当前线程独占锁,只不过锁定次数-1了。
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }        
 

 

共享模式的相关方法:

/**
*  当前线程加入队列,如果当前线程位于队列head之后的第一个并tryAcquireShared方法返回值>=0则运行,否则阻塞。
*
*  模板方法tryAcquireShared返回值=0,表示某种状态或条件满足(主要是根据state判断),当前线程可以继续执行
*  模板方法tryAcquireShared返回值>0,表示某种状态或条件满足(主要是根据state判断),当前线程可以继续执行,并允许更多的线程运行在共享模式下:将队列中位于当前线程之后的线程唤醒,后续线程如果条件或状态满足可以运行,否则再次进入阻塞状态
*  
*  模板方法tryAcquireShared返回值<0,表示某种状态或条件不满足(主要是根据state判断),当前线程阻塞。直至线程被唤醒(releaseShared方法被调用,并且线程处于队列中head之后的第一个节点位置(FIFO)),
*  当前线程会再次判断某种状态或条件是否满足,从而执行或是阻塞
*/
       
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//共享模式、加入队列
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//位于头部
                    int r = tryAcquireShared(arg);//某种状态或条件是否满足
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//是否唤醒其它线程
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt());//调用LockSupport.park(this)阻塞当前线程
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }  


public final void  acquireSharedInterruptibly(int agr){
	......同上,但支持中断
}

/**
*运行中的线程,把阻塞线程唤醒
*
*模板方法tryReleaseShared返回真,把阻塞队列中先进入的线程恢复。
*模板方法tryReleaseShared返回假,则退出。
*/
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//调用LockSupport.unpark 把阻塞队中head之后第一个阻塞线程恢复
            return true;
        }
        return false;
    }

 

tryAcquireShared 和 tryReleaseShared方法由子类实现。

 

FutureTask 使用共享模式

   state =1 表示任务线程正在运行,调用get方法的线程只能阻塞,只至线程完成(设定state=2)或线程已取消(设定state=4)

   state =2 表示任务线程已运行完成,

   state =4 表示任务线程已取消,

 

FutureTask 源码分析:

/**
        *get方法返回 Callable任务的结果,如果Callable任务未完成,则线程阻塞直至任务运行完成,获取结果返回
        *
        /
        public V get() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;//返回
        }
        protected int tryAcquireShared(int ignore) {
          return ((getState() & (RAN | CANCELLED)) != 0 &&  runner == null)?1:-1;
          //Callable任务已运行完或取消,则返回1,GET方法可以继续运行
          //Callable任务未运行完成,则返回-1,get方法阻塞,直至任务运行完成,阻塞线程被唤醒
 
        }
        /**
        * 线程任务执行方法
        */
        public void run() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());//调用callable任务,保存返回结果
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
   void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {//任务状态改成已完成
                    result = v;//任务运行结果赋值
                    releaseShared(0);//释放阻塞线程
                    done();
		    return;
                }
            }
        }
        
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;//返回真,唤醒调用GET方法阻塞线程返回任务结果
            
        }

 

    

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics