百木园-与人分享,
就是让自己快乐。

浅析AQS

1、AbstractQueue抽象队列、BlockingQueue阻塞队列、Deque双端队列

2、Queue FIFO先进先出,

      写入:队列满阻塞等待,取出:队列满阻塞等待生产

3、使用场景:多线程并发处理、线程池

4、阻塞队列(BlockingQueue)——四组API(添加、移除、判断队列首部的场景)

==1、会抛出异常

==2、有返回值,不抛出异常

==3、阻塞等待(一直阻塞等待)

==4、超时等待

=========抛出异常 add()\\remove()\\element()=========

//抛出异常
   public static void atest(){
       //数组类型的阻塞队列 要指定 队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
       //添加操作 返回Boolean值
       System.out.println(blockingQueue.add(\"a\"));
       System.out.println(blockingQueue.add(\"b\"));
       System.out.println(blockingQueue.add(\"c\"));
       /*
       java.lang.IllegalStateException: Queue full
       队列已满 抛出异常
        */
//       System.out.println(blockingQueue.add(\"d\"));
       //检测队首元素
       System.out.println(\"队首元素\"+blockingQueue.element());
       //移除操作 返回移除的添加的元素
       System.out.println(blockingQueue.remove());
       System.out.println(blockingQueue.remove());
       System.out.println(blockingQueue.remove());
       /*
       java.util.NoSuchElementException
       元素为空 不能再移除 抛出异常
        */
//       System.out.println(blockingQueue.remove());
  }    public static void atest(){
       //数组类型的阻塞队列 要指定 队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
       //添加操作 返回Boolean值
       System.out.println(blockingQueue.add(\"a\"));
       System.out.println(blockingQueue.add(\"b\"));
       System.out.println(blockingQueue.add(\"c\"));
       /*
       java.lang.IllegalStateException: Queue full
       队列已满 抛出异常
        */
//       System.out.println(blockingQueue.add(\"d\"));

       //移除操作 返回移除的添加的元素
       System.out.println(blockingQueue.remove());
       System.out.println(blockingQueue.remove());
       System.out.println(blockingQueue.remove());
       /*
       java.util.NoSuchElementException
       元素为空 不能再移除 抛出异常
        */
//       System.out.println(blockingQueue.remove());
  }

=========有返回值,不抛出异常 add()\\remove()\\element()=========

//  不抛出异常
   public static void btest(){
       ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
       //添加 返回Boolean值 添加成功 返回true
       System.out.println(blockingQueue.offer(\"a\"));
       System.out.println(blockingQueue.offer(\"b\"));
       System.out.println(blockingQueue.offer(\"c\"));
       // 队列大小为3   继续添加元素   不能继续添加 返回false
       System.out.println(blockingQueue.offer(\"d\"));

       //移除 返回元素
       System.out.println(blockingQueue.poll());
       System.out.println(blockingQueue.poll());
       System.out.println(blockingQueue.poll());
       // 队列在元素全部移除后 继续移除操作   不能继续移除 返回null
       System.out.println(blockingQueue.poll());
  }

=========阻塞等待(一直阻塞等待) add()\\remove()\\element()=========

//    一直阻塞等待    public static void ctest() throws InterruptedException {        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);        //添加元素        blockingQueue.put(\"a\");        blockingQueue.put(\"b\");        blockingQueue.put(\"c\");        // 队列已满  如果继续添加元素  则造成阻塞  一直阻塞等待  程序一直在等//        blockingQueue.put(\"d\");        //移除元素        System.out.println(blockingQueue.take());        System.out.println(blockingQueue.take());        System.out.println(blockingQueue.take());        //队列为空   如果继续移除元素  则造成阻塞  一直在等待元素来移除  程序一直在等//        System.out.println(blockingQueue.take());•    }

=========超时等待 add()\\remove()\\element()=========

//    等待超时退出
   public static void dtest() throws InterruptedException {
       ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
       //添加元素 offer(元素,时间,单位) offer方法的重载方法
       System.out.println(blockingQueue.offer(\"a\"));
       System.out.println(blockingQueue.offer(\"b\"));
       System.out.println(blockingQueue.offer(\"c\"));
       //队列已满 继续添加元素 等待超过2秒就结束程序
       System.out.println(blockingQueue.offer(\"d\", 2, TimeUnit.SECONDS));

       //移除元素 poll(时间,单位) poll方法的重载方法
       System.out.println(blockingQueue.poll());
       System.out.println(blockingQueue.poll());
       System.out.println(blockingQueue.poll());
       //队列为空 继续移除元素 等待超过2秒就结束程序
       System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
  }

5、同步队列

==没有容量(即不长时间存储元素)

    进去一个元素,必须等待取出(take())后才能再往里边继续添加(put())

==例子:

    BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();//同步队列
   //添加元素 线程
   new Thread(()->{
       try {
           System.out.println(Thread.currentThread().getName()+\"put 1\");
           blockingQueue.put(\"1\");
           System.out.println(Thread.currentThread().getName()+\"put 2\");
           blockingQueue.put(\"2\");
           System.out.println(Thread.currentThread().getName()+\"put 3\");
           blockingQueue.put(\"3\");
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  },\"线程1\").start();

   //移除元素 线程
   new Thread(()->{
       try {
           TimeUnit.SECONDS.sleep(2);
           System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
           TimeUnit.SECONDS.sleep(2);
           System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
           TimeUnit.SECONDS.sleep(2);
           System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  },\"线程2\").start();
}

 

AQS

1、一个用来构建锁和同步器的框架,定义了锁的实现机制,并开放出扩展的地方,让子类去实现。(封装得很好,但又有子类扩展的地方)

例如:lock(加锁)时锁的内部机制:

      使用锁Lock时,AQS开放state字段,然子类可以根据state字段来决定是否能够获得锁,对于获取不到锁的线程,AQS会自动进行管理,无需子类锁关心。

2、使用AQS能够简单且高效地构造出大量应用广泛的同步器:ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等,都基于AQS。也可以自定义同步器。

3、AQS底层:

==(1)由 同步队列 + 条件队列 联合实现(CLH队列锁)

====一般情况下有同步队列(双向链表)组成,条件队列(单向链表)不是必须存在的,当程序中存在condition时,才会存在此列表。

====同步队列管理获取不到锁的线程的排队和释放

====条件队列是在一定场景下,对同步队列的补充(非必须的),如,获得锁的线程从空队列中拿数据(队列是空的,拿不到数据的),此时,条件队列会管理该线程,使线程阻塞。

==(2)核心思想:AQS内部维护一个CLH队列来管理。

====线程请求共享资源时,

     如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效线程,并且将共享资源设置为锁定状态;

     如果被请求的共享资源被占用,则需要CLH队列锁实现的机制来实现线程阻塞等待以及线程被唤醒使锁的分配:即将暂时获取不到锁的线程加入到队列中。(上边的同步队列)

==(3)CLH锁队列:

====是一个虚拟的双向队列(不存在队列实例,仅存在节点间的关联关系)

====AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个个节点(Node)实现锁的分配。

==(4)AQS中同步队列的工作流程和数据结构:(结合两图理解)

 

 

====工作过程:

(1)当前线程获取同步状态失败,同步器会将当线程及等待状态等信息构成一个Node节点,加入CLH队列中,放在队尾,同步器重新设置尾节点。

(2)加入队列后,会阻塞当前线程

(3)同步状态被释放并且同步器重新设置首节点,同步器唤醒等待队列中第一个节点,让其再次获取同步状态。

====AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。

====AQS使用CAS对该同步状态进行原子性操作实现对其值的修改

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

====状态信息通过protected类型的getState,setState,compareAndSetState进行操作

//返回同步状态的当前值
protected final int getState() {
   return state;
}
//设置同步状态的值
protected final void setState(int newState) {
   state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

4、AQS对资源的共享方式

定义了两种资源共享方式:

==(1)Exclusive(独占):又分为公平锁和非公平锁

       只有一个线程能执行,如ReentrantLock

====公平锁:按照线程在队列中的排队顺序, 先到者先拿到锁

====非公平锁:当前线程要获取锁时,无视队列内的顺序,直接去强锁,谁抢到了就是谁的。

==(2)Share(共享):

       多个线程可以同时执行,如Semaphore\\CyclicBarrier\\ReadWriteLock\\CountDownLatch

==(3)ReentrantReadWriteLock:组合两种资源共享方式的

====读锁运行多个线程同时执行对同一资源进行读操作

====写锁仅允许一个线程能执行对同一资源进行写操作

==(4)不同自定义的同步器有不同的共享资源方式,自定义同步器在实现时,只需要实现共享资源state的获取与释放方式即可。

 

5、AQS使用的设计模式

(1)基于模板设计模式的

(2)自定义同步器的方式:

==1、继承AbstractQueuedSynchronizer并重写指定的方法

        重写方法是指对于共享资源state的获取和释放

==2、将AQS组合在自定义同步组件的实现中,并调用其模板方法

       这些模板方法会调用上边继承重写的方法

====AQS提高的模板方法

protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

====除以上的方法外,AQS类中其它方法不能被重写,都为final

====例:

=======独占方式=======

ReentrantLock

(1)state初始化为0,表示未锁定状态;

(2)A线程lock()时,会调用tryAcquire()独占该锁并state+1;

(3)其它线程tryAcquire()时会失败,直到A线程unlock()到state=0,即释放,其它线程才能获取该锁。

(4)A线程释放前,A自己时可以重复获取此锁的(state++),即可重入

(5)A线程释放,一定要将state回归为state=0

=======共享方式=======

CountDownLatch,任务分为N个子线程进行执行

(1)state初始化为N(与线程数一致)

(2)N个子线程并行每执行countDown()一次,state CAS减一

(3)所有子线程都执行完成后即state=0,会unpark()主调用线程

(4)主调用线程从await()函数返回,继续后续的操作

 

6、AQS组件

(1)Semaphore(信号量):允许多个线程同时访问

====synchronized和ReentrantLock是一次只允许一个线程访问某个资源

(2)CountDownLatch(倒计时器):同步工具类,用来协调多个线程间的同步,通常用来控制线程等待,可以让某个线程等待直到倒计时结束,再开始执行。

(3)CyclicBarrier(循环栅栏):可以实现线程间的技术等待,功能更强大复杂。

== Cyclic(可循环使用)的Barrier(屏障):

====让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障(同步点)时,才会打开屏障,所有被拦截的线程此时才会继续执行。

====CyclicBarrier的默认构造方法:CyclicBarrier(int parties)

=======parties参数:表示屏障拦截的线程数量

=======每个线程调用await()方法告知CyclicBarrier已到达屏障,然后被阻塞。


来源:https://www.cnblogs.com/hexiayuliang666/p/16156939.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 浅析AQS

相关推荐

  • 暂无文章