原子性操作

原子变量类 (Atomics)是基于CAS实现的能够保障对共享变量进行read-modify-write更新操作的原子性和可见性的一组工具类。所谓的read-modify-write更新操作,是指对共享变量的更新不是一个简单的赋值操作,而是变量的新值依赖于变量的旧值,例如自增操作 “count++”。由于volatile无法保障自增操作的原子性,而原子变量类的内部实现通常借助一个volatile变量并保障对该变量的read-modify-write更新操作的原子性,因此它可以被看作增强型的volatile变量。原子变量类一共有12个,可以被分为4组:
img

AtomicInteger

主要为因为在多线程下,i++不是一个原子性的操作,所以java提供了一个AtomicInteger

原理

  • Compare And Set(比较并交换)

  • cas(value,expected,newValue)

    • cpu指令级别的执行(原子性)
    1
    if(value == expected){	value = newValue}
  • ABA问题

    如果知识一个值是没有影响的,但是如果是一个对象,可能会造成影响,这个值(地址)指向的是一个对象,这个对象没有发生改变,但是对象里面的引用发生了改变 ,这个时候如果恰好有对应的逻辑操作,就会出现问题

Unsafe

一、Unsafe介绍

1、Unsafe简介

Unsafe类相当于是一个java语言中的后门类,提供了硬件级别的原子操作,所以在一些并发编程中被大量使用。jdk已经作出说明,该类对程序员而言不是一个安全操作,在后续的jdk升级过程中,可能会禁用该类。所以这个类的使用是一把双刃剑,实际项目中谨慎使用,以免造成jdk升级不兼容问题。

2、Unsafe Api

这里并不系统讲解Unsafe的所有功能,只介绍和接下来内容相关的操作

arrayBaseOffset:获取数组的基础偏移量

arrayIndexScale:获取数组中元素的偏移间隔,要获取对应所以的元素,将索引号和该值相乘,获得数组中指定角标元素的偏移量

getObjectVolatile:获取对象上的属性值或者数组中的元素

getObject:获取对象上的属性值或者数组中的元素,已过时

putOrderedObject设置对象的属性值或者数组中某个角标的元素,更高效

putObjectVolatile设置对象的属性值或者数组中某个角标的元素

putObject:设置对象的属性值或者数组中某个角标的元素,已过时

3、代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test02 {

public static void main(String[] args) throws Exception {
Integer[] arr = {2,5,1,8,10};

//获取Unsafe对象
Unsafe unsafe = getUnsafe();
//获取Integer[]的基础偏移量
int baseOffset = unsafe.arrayBaseOffset(Integer[].class);
//获取Integer[]中元素的偏移间隔
int indexScale = unsafe.arrayIndexScale(Integer[].class);

//获取数组中索引为2的元素对象
Object o = unsafe.getObjectVolatile(arr, (2 * indexScale) + baseOffset);
System.out.println(o); //1

//设置数组中索引为2的元素值为100
unsafe.putOrderedObject(arr,(2 * indexScale) + baseOffset,100);

System.out.println(Arrays.toString(arr));//[2, 5, 100, 8, 10]
}

//反射获取Unsafe对象
public static Unsafe getUnsafe() throws Exception {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
}
}
3.1、图解说明

juc

CountDownLatch

CyclicBarrier

基础

1
public class MyCyclicBarrier {    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier = new CyclicBarrier(N);        for (int i = 0; i < N; i++) {            new Writer(barrier).start();        }    }    private static class Writer extends Thread {        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");            try {                Thread.sleep(5000);      //以睡眠来模拟写入数据操作                System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");                cyclicBarrier.await();            } catch (InterruptedException | BrokenBarrierException e) {                e.printStackTrace();            }            System.out.println("所有线程写入完毕,继续处理其他任务...");        }    }}
1
线程Thread-1正在写入数据...线程Thread-0正在写入数据...线程Thread-2正在写入数据...线程Thread-3正在写入数据...线程Thread-2写入数据完毕,等待其他线程写入完毕线程Thread-0写入数据完毕,等待其他线程写入完毕线程Thread-1写入数据完毕,等待其他线程写入完毕线程Thread-3写入数据完毕,等待其他线程写入完毕所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...

额外操作

线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable方法,其他线程如果没有后续则死亡。

只需要在创建CyclicBarrier的时候,提供一个后续运行的run的方法

1
CyclicBarrier barrier = new CyclicBarrier(n, () -> System.out.println("当前线程"+Thread.currentThread().getName()+"统一执行方法"));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
// 只有一个线程会执行这个方法,其他线程因为程序运行完毕而死亡
当前线程Thread-1统一执行方法
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

等待指定的时间

在4个线程中故意让最后一个线程启动延迟,在前面三个线程都到达了barrier之后,等待了指定的时间发现第四个线程还没有达到barrier,就抛出异常并继续执行后面的任务,不在等待第四个线程,但是第四个线程在时间到了后还是会继续执行。

1
public class MyCyclicBarrierAwaitTime {    static AtomicInteger stock = new AtomicInteger(0);    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier = new CyclicBarrier(N);        for (int i = 0; i < N; i++) {            if (i < N - 1)                new Writer(barrier).start();            else {                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                new Writer(barrier).start();            }        }    }    static class Writer extends Thread {        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");            try {                Thread.sleep(5000);      //以睡眠来模拟写入数据操作                System.out.println("线程" + Thread.currentThread().getName() + "库存销售数量:" + stock.getAndAdd(1));                try {                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);                } catch (TimeoutException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            } catch (InterruptedException e) {                e.printStackTrace();            } catch (Exception e) {                System.out.println("    线程" + Thread.currentThread().getName() + "等待超时");            }            System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");        }    }}
1
线程Thread-0正在写入数据...线程Thread-2正在写入数据...线程Thread-1正在写入数据...线程Thread-0库存销售数量:1线程Thread-2库存销售数量:2线程Thread-1库存销售数量:0线程Thread-3正在写入数据...    线程Thread-1等待超时    线程Thread-2等待超时Thread-2所有线程写入完毕,继续处理其他任务...Thread-1所有线程写入完毕,继续处理其他任务...Thread-0所有线程写入完毕,继续处理其他任务...java.util.concurrent.TimeoutException	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)	at top.thread1.mycyclibarrier.MyCyclicBarrierAwaitTime$Writer.run(MyCyclicBarrierAwaitTime.java:51)线程Thread-3库存销售数量:3    线程Thread-3等待超时Thread-3所有线程写入完毕,继续处理其他任务...

CyclicBarrier重用

初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。

1
public class MyCyclicBarrierAwaitTime {    static AtomicInteger stock = new AtomicInteger(0);    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier = new CyclicBarrier(N);        for (int i = 0; i < N; i++) {            if (i < N - 1)                new Writer(barrier).start();            else {                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                new Writer(barrier).start();            }        }    }    static class Writer extends Thread {        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");            try {                Thread.sleep(5000);      //以睡眠来模拟写入数据操作                System.out.println("线程" + Thread.currentThread().getName() + "库存销售数量:" + stock.getAndAdd(1));                try {                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);                } catch (TimeoutException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            } catch (InterruptedException e) {                e.printStackTrace();            } catch (Exception e) {                System.out.println("    线程" + Thread.currentThread().getName() + "等待超时");            }            System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");        }    }}
1
Thread-3所有线程写入完毕,继续处理其他任务...Thread-2所有线程写入完毕,继续处理其他任务...Thread-1所有线程写入完毕,继续处理其他任务...Thread-0所有线程写入完毕,继续处理其他任务...CyclicBarrier重用线程Thread-4正在写入数据...线程Thread-5正在写入数据...线程Thread-6正在写入数据...线程Thread-7正在写入数据...

Semaphore

被称为信号量,或者许可证

可以控制同时访问的线程数,通过acquire()获取一个许可,如果没有就等待,而release释放一个许可,只有拿到许可证的线程才能接着向下执行

1
public class MySemaphore {    public static void main(String[] args) {        int N = 8;            //工人数        Semaphore semaphore = new Semaphore(5); //机器数目        for (int i = 0; i < N; i++) {            new Worker(i, semaphore).start();        }    }    static class Worker extends Thread {        private int num;        private Semaphore semaphore;        public Worker(int num, Semaphore semaphore) {            this.num = num;            this.semaphore = semaphore;        }        @Override        public void run() {            try {                semaphore.acquire();                System.out.println("工人" + this.num + "占用一个机器在生产...");                Thread.sleep(2000);                System.out.println("工人" + this.num + "释放出机器");                semaphore.release();            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}
1
工人1占用一个机器在生产...工人0占用一个机器在生产...工人3占用一个机器在生产...工人2占用一个机器在生产...工人5占用一个机器在生产...工人0释放出机器工人2释放出机器工人3释放出机器工人1释放出机器工人7占用一个机器在生产...工人5释放出机器工人4占用一个机器在生产...工人6占用一个机器在生产...工人4释放出机器工人7释放出机器工人6释放出机器

Phaser

具有设置多重屏障的功能,相当于CountDownLatch和CyclicBarrier的结合体。

例如:

我们需要控制线程的启动时机,让线程同时启动,我们通常使用CountDownLatch实现,可以用Phasert代替;

我们系统某些任务在某一个阶段同时执行,然后在所有任务执行完成后,同时进入下一个阶段继续执行,可以使用CyclicBarrier和reset实现,但是也可以使用Phaser代替;

Phaser()

无参构造方法,创建一个Phaser对象。默认parties个数为0。后续可以通过register()/bulkRegister()方法来修改新的parties。在每个Phaser实例内部,会持有几个状态数据:终止状态、已经注册的parties个数、当前phase下已到达的parties个数、当前phaser阶段数。

Phaser(int parties)

有参构造方法,一个Phaser对象,并初始一定数量的parties,相当于在初始化Phaser实例后,在register此数量的parties。

arrive()

一直阻塞,等待当前phase下其他parties到达。parties必须大于0,如果使用Phaser()创建实例,parties没有被register具体的值,调用此方法将会抛出异常。此方法同时返回当前phase周期数,如果Phaser实例已经终止,则返回负数。

awaitAdvance(int phase)

阻塞方法,等待phase周期数下其他所有的prties都到达,参数指定了当前阻塞的phase周期阶段数。如果指定的phase与phaser实例当前的phase不一致,会立即返回。可以这样来使用awaitAdvance(arrive())

1
public class MyPhaser {    static Random r = new Random();    static MarriagePhase phaser =  new MarriagePhase();    public static void main(String[] args) {        phaser.bulkRegister(7);        for (int i = 0; i < 5; i++) {            new Thread(new Person("P"+i)).start();        }        new Thread(new Person("新娘")).start();        new Thread(new Person("新郎")).start();    }    private static class MarriagePhase extends Phaser {        @Override        protected boolean onAdvance(int phase, int registeredParties) {            switch (phase){                case 0:                    System.out.println("所有人都到齐了");                    return false;                case 1:                    System.out.println("所有人都吃完了");                    return false;                case 2:                    System.out.println("所有人都离开了");                    return false;                case 3:                    System.out.println("婚礼结束了");                    return false;                default:                    return true;            }        }    }    private static class Person implements Runnable {        String name;        public Person(String name) {            this.name = name;        }        @Override        public void run() {            // phrase中的阶段控制主要是这里面的阶段控制            arrive();            eat();            leave();            hug();        }        public void arrive(){            try {                Thread.sleep(r.nextInt(1000));                System.out.println(name+"到达现场");                // 阶段控制实际上是这个产生效果,arriveAndAwaitAdvance说明到达了                phaser.arriveAndAwaitAdvance();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        public void eat(){            try {                Thread.sleep(r.nextInt(1000));                System.out.println(name+"吃完");                phaser.arriveAndAwaitAdvance();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        public void leave(){            try {                Thread.sleep(r.nextInt(1000));                System.out.println(name+"离开");                phaser.arriveAndAwaitAdvance();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        public void hug(){            try {                if ("新郎".equals(name) || "新娘".equals(name)){                    Thread.sleep(r.nextInt(1000));                    System.out.println(name+"拥抱");                    phaser.arriveAndAwaitAdvance();                }else {                    //                    phaser.arriveAndDeregister();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}
1
P0到达现场新郎到达现场P1到达现场P2到达现场新娘到达现场P3到达现场P4到达现场所有人都到齐了P1吃完P0吃完新娘吃完P3吃完新郎吃完P2吃完P4吃完所有人都吃完了P4离开P3离开P2离开新娘离开P1离开P0离开新郎离开所有人都离开了新郎拥抱新娘拥抱婚礼结束了

phrase源码分析文章推荐:https://segmentfault.com/a/1190000019685090?utm_source=tag-newest

exchanger

并发容器总结

JDK 提供的这些容器大部分在 java.util.concurrent 包中。

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装我们的 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本—— ConcurrentHashMap 的诞生。

ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

CopyOnWriteArrayList

CopyOnWriteArrayList 简介

1
2
3
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?

CopyOnWriteArrayList 是如何做到的?

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

CopyOnWriteArrayList 读取和写入源码简单分析

CopyOnWriteArrayList 读取操作的实现

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

1
/** The array, accessed only via getArray/setArray. */    private transient volatile Object[] array;    public E get(int index) {        return get(getArray(), index);    }    @SuppressWarnings("unchecked")    private E get(Object[] a, int index) {        return (E) a[index];    }    final Object[] getArray() {        return array;    }

3.3.2 CopyOnWriteArrayList 写入操作的实现

CopyOnWriteArrayList 写入操作 add()方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。

ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。

BlockingQueue

BlockingQueue 简介

上面我们己经提到了 ConcurrentLinkedQueue 作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的相关实现类:

BlockingQueue 的实现类

下面主要介绍一下 3 个常见的 BlockingQueue 的实现类:ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueueBlockingQueue 接口的有界队列实现类,底层采用数组来实现。

1
public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable{}

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:

1
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE

相关构造方法:

1
/**     *某种意义上的无界队列     * Creates a {@code LinkedBlockingQueue} with a capacity of     * {@link Integer#MAX_VALUE}.     */    public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }    /**     *有界队列     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.     *     * @param capacity the capacity of this queue     * @throws IllegalArgumentException if {@code capacity} is not greater     *         than zero     */    public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node<E>(null);    }

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

推荐文章: 《解读 Java 并发队列 BlockingQueue》

ConcurrentSkipListMap

下面这部分内容参考了极客时间专栏《数据结构与算法之美》以及《实战 Java 高并发程序设计》。

为了引出 ConcurrentSkipListMap,先带着大家简单理解一下跳表。

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表的本质是同时维护了多个链表,并且链表是分层的,

2级索引跳表

最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。

跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。

在跳表中查找元素18

查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。

从上面很容易看出,跳表是一种利用空间换时间的算法。

使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap

常用的并发工具类

闭锁:CountDownLatch

​ 闭锁允许一个线程或多个线程等待特定的情况,同步完成线程中其他任务

回环栅栏:CyclicBarrier

​ CycliBarrier和CountDownLatch都可以协同多个线程,让指定数量的线程等待其他所有的线程都满足某个条件之后继续执行。CyclicBarrier可以重复使用(reset),而CountDownLatch只能使用一次,如果还需要使用,必须重新new一个CountDownLatch对象。构造方法CyclicBarrier(int,Runnable)所有线程达到屏障后,执行Runnable。

信号量:Semaphore

​ 信号量用来访问同时访问特定资源的线程数量。

交换者:Exchanger

​ Exchanger交换者用于在两个线程之间传输数据,被调用后等待另一个线程到达交换点,然后相互交换数据。

实现线程通信

实现一个容器,提供两个方法add、size,线程1添加10个元素到容器,线程2监控元素的个数,当个数为5个时,线程2提供并结束。

方案1使用volatile

1
public class MyContainerVolatile {     ArrayList<Object> list = new ArrayList<>();    public void add(Object o) {        list.add(o);    }    public int size() {        return list.size();    }    public static void main(String[] args) {        MyContainerVolatile myContainerVolatile = new MyContainerVolatile();        new Thread(() -> {            for (int i = 0; i < 10; i++) {                myContainerVolatile.add(new Object());                System.out.println("add-" + i);                try {                    Thread.sleep(500);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }, "t1").start();        new Thread(() -> {            while (true) {                if (myContainerVolatile.size() == 5) {                    break;                }            }            System.out.println("t2结束");        }, "t2").start();    }}

image-20210909101839474

如果没有添加volatile,那么第二个线程会一直等待,因为第二个线程一直在进行计算,它不会去住内存中再次读取数据(除非cpu有空闲)

所以需要添加一个volatile

1
volatile ArrayList<Object> list = new ArrayList<>();
1
add-0add-1add-2add-3add-4t2结束add-5add-6add-7add-8add-9

优势:保证的线程的通讯

劣势:线程2的cpu在一直空转,浪费性能。

方案2使用wait和notify

当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。

当 notify/notifyAll() 被执行时,会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。

1
/*wait会释放锁,notify则不会。t1中notify唤醒t2,本线程不会释放锁,会一直执行下去直至被wait或synchronized代码块结束*/public class MyContainerVolatile2 {    volatile ArrayList<Object> list = new ArrayList<>();    public void add(Object o) {        list.add(o);    }    public int size() {        return list.size();    }    public static void main(String[] args) {        MyContainerVolatile myContainerVolatile = new MyContainerVolatile();        final Object lock = new Object();        new Thread(() -> {            synchronized (lock) {                System.out.println(" ***线程t2启动*** ");                if (myContainerVolatile.size() != 5) {                    try {                        System.out.println(" ***线程t2阻塞了*** ");                        lock.wait();                        System.out.println(" ***线程t2继续往下运行了*** ");                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }                System.out.println(" ***线程t2唤醒了t1*** ");                lock.notify();                System.out.println(" ***线程t2结束了*** ");            }        }, "t2").start();        new Thread(() -> {            System.out.println(" ***线程t1启动*** ");            synchronized (lock) {                for (int i = 0; i < 10; i++) {                    myContainerVolatile.add(new Object());                    System.out.println("add-" + i);                    if (myContainerVolatile.size() == 5) {                        lock.notify();                        try {                            lock.wait();                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                    }                    try {                        Thread.sleep(500);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }, "t1").start();    }}
1
***线程t2启动***  ***线程t2阻塞了***  ***线程t1启动*** add-0add-1add-2add-3add-4 ***线程t2继续往下运行了***  ***线程t2唤醒了t1***  ***线程t2结束了*** add-5add-6add-7add-8add-9Process finished with exit code 0

方案3使用CountDownLatch的awaitcountdown方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class MyContainerVolatile3 {
volatile ArrayList<Object> list = new ArrayList<>();

public void add(Object o) {
list.add(o);
}

public int size() {
return list.size();
}

public static void main(String[] args) {
MyContainerVolatile3 myContainerVolatile = new MyContainerVolatile3();
CountDownLatch latch = new CountDownLatch(1);

new Thread(() -> {
System.out.println(" ***线程t2启动*** ");
if (myContainerVolatile.size() != 5) {
try {
System.out.println(" ***线程t2阻塞了*** ");
latch.await();
System.out.println(" ***线程t2继续往下运行了*** ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(" ***线程t2唤醒了t1*** ");
System.out.println(" ***线程t2结束了*** ");

}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
System.out.println(" ***线程t1启动*** ");
for (int i = 0; i < 10; i++) {
myContainerVolatile.add(new Object());
System.out.println("add-" + i);
if (myContainerVolatile.size() == 5) {
latch.countDown();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 ***线程t2启动*** 
***线程t2阻塞了***
***线程t1启动***
add-0
add-1
add-2
add-3
add-4
***线程t2继续往下运行了***
***线程t2唤醒了t1***
***线程t2结束了***
add-5
add-6
add-7
add-8
add-9

实现线程安全队列

方案1wait和notify实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/*写一个固定容量的同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。
* wait和notifyAll方法来实习。*/
public class ProducerCustomerWaitNotifyAll {
final private LinkedList<Object> list = new LinkedList<Object>();
final private int MAX = 10; //最多十个元素

public synchronized void put(Object t) {
while (list.size() == MAX) {
try {
System.out.println("生产者:" + Thread.currentThread().getName() + "被阻塞了,队列满了");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
System.out.println("生产者:" + Thread.currentThread().getName() + "生产了了一个,当前队列中还剩余" + list.size() + " 唤醒了消费线程");
this.notifyAll(); //通知消费者进程进行消费
//notify只叫醒一个,叫醒的可能还是生产者,所有线程一直wait,程序就卡死了,所以用notifyAll
}

public synchronized Object get() {
Object t = null;
while (list.size() == 0) { //
try {
System.out.println(" 消费者:" + Thread.currentThread().getName() + "被阻塞了,队列空了");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = list.removeFirst();
System.out.println(" 消费者:" + Thread.currentThread().getName() + "消费了一个,当前队列中还剩余" + list.size() + " 唤醒了生产线程");
this.notifyAll(); //通知生产者进程进行生产
return t;
}

public static void main(String[] args) {
ProducerCustomerWaitNotifyAll pc = new ProducerCustomerWaitNotifyAll();
for (int i = 0; i < 10; i++) { //10个消费者
new Thread(() -> {
for (int j = 0; j < 5; j++) { //每个消费者最多消费5个
pc.get();
}
}, "c" + i).start();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for (int i = 0; i < 2; i++) { //2个生产者
new Thread(() -> {
for (int j = 0; j < 25; j++) { //每个生产者最多生产25个
pc.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
      消费者:c0被阻塞了,队列空了
消费者:c3被阻塞了,队列空了
消费者:c5被阻塞了,队列空了
消费者:c4被阻塞了,队列空了
消费者:c2被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c9被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
消费者:c9消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c9被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c2被阻塞了,队列空了
消费者:c4被阻塞了,队列空了
消费者:c5被阻塞了,队列空了
消费者:c3被阻塞了,队列空了
消费者:c0被阻塞了,队列空了
生产者:p1生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余3 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余4 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余5 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余6 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余7 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余8 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余9 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余10 唤醒了消费线程
生产者:p1被阻塞了,队列满了
消费者:c0消费了一个,当前队列中还剩余9 唤醒了生产线程
消费者:c0消费了一个,当前队列中还剩余8 唤醒了生产线程
消费者:c0消费了一个,当前队列中还剩余7 唤醒了生产线程
消费者:c0消费了一个,当前队列中还剩余6 唤醒了生产线程
消费者:c0消费了一个,当前队列中还剩余5 唤醒了生产线程
消费者:c3消费了一个,当前队列中还剩余4 唤醒了生产线程
消费者:c3消费了一个,当前队列中还剩余3 唤醒了生产线程
消费者:c3消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c3消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c3消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c5被阻塞了,队列空了
消费者:c4被阻塞了,队列空了
消费者:c2被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c9被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
消费者:c9消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c9被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c2被阻塞了,队列空了
消费者:c4被阻塞了,队列空了
消费者:c5被阻塞了,队列空了
生产者:p1生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余3 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余4 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余5 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余6 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余7 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余8 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余9 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余10 唤醒了消费线程
生产者:p1被阻塞了,队列满了
消费者:c5消费了一个,当前队列中还剩余9 唤醒了生产线程
消费者:c5消费了一个,当前队列中还剩余8 唤醒了生产线程
消费者:c5消费了一个,当前队列中还剩余7 唤醒了生产线程
消费者:c5消费了一个,当前队列中还剩余6 唤醒了生产线程
消费者:c5消费了一个,当前队列中还剩余5 唤醒了生产线程
消费者:c4消费了一个,当前队列中还剩余4 唤醒了生产线程
消费者:c4消费了一个,当前队列中还剩余3 唤醒了生产线程
消费者:c4消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c4消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c4消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c2被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c9被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
消费者:c9消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c9被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
消费者:c2被阻塞了,队列空了
生产者:p1生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余3 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余4 唤醒了消费线程
生产者:p1生产了了一个,当前队列中还剩余5 唤醒了消费线程
消费者:c2消费了一个,当前队列中还剩余4 唤醒了生产线程
消费者:c2消费了一个,当前队列中还剩余3 唤醒了生产线程
消费者:c2消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c2消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c2消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c1被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c9被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
消费者:c9消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c9被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
消费者:c1被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余3 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余4 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余5 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余6 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余7 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余8 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余9 唤醒了消费线程
消费者:c1消费了一个,当前队列中还剩余8 唤醒了生产线程
消费者:c1消费了一个,当前队列中还剩余7 唤醒了生产线程
消费者:c1消费了一个,当前队列中还剩余6 唤醒了生产线程
消费者:c1消费了一个,当前队列中还剩余5 唤醒了生产线程
消费者:c1消费了一个,当前队列中还剩余4 唤醒了生产线程
消费者:c8消费了一个,当前队列中还剩余3 唤醒了生产线程
消费者:c8消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c8消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c8消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c8被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
消费者:c9被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
消费者:c9消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c6被阻塞了,队列空了
消费者:c7被阻塞了,队列空了
消费者:c8被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余3 唤醒了消费线程
消费者:c8消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c7消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c7消费了一个,当前队列中还剩余0 唤醒了生产线程
消费者:c7被阻塞了,队列空了
消费者:c6被阻塞了,队列空了
生产者:p0生产了了一个,当前队列中还剩余1 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余2 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余3 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余4 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余5 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余6 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余7 唤醒了消费线程
生产者:p0生产了了一个,当前队列中还剩余8 唤醒了消费线程
消费者:c6消费了一个,当前队列中还剩余7 唤醒了生产线程
消费者:c6消费了一个,当前队列中还剩余6 唤醒了生产线程
消费者:c6消费了一个,当前队列中还剩余5 唤醒了生产线程
消费者:c6消费了一个,当前队列中还剩余4 唤醒了生产线程
消费者:c6消费了一个,当前队列中还剩余3 唤醒了生产线程
消费者:c7消费了一个,当前队列中还剩余2 唤醒了生产线程
消费者:c7消费了一个,当前队列中还剩余1 唤醒了生产线程
消费者:c7消费了一个,当前队列中还剩余0 唤醒了生产线程

Process finished with exit code 0

方案2Lock和Condition实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class ProducerCustomerLock {
final private LinkedList<Object> list = new LinkedList<>();
final private int MAX = 10;
private int count = 0;

private Lock lock = new ReentrantLock();

private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();


public synchronized void put(Object t) {
lock.lock();
try {
while (list.size() == MAX) {
System.out.println("生产者:" + Thread.currentThread().getName() + "被阻塞了,容器满了");
producer.await();
}
list.add(t);
System.out.println("生产者:" + Thread.currentThread().getName() + "生产了了一个,当前容器中还剩余" + list.size() + " 唤醒了消费线程");
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public synchronized Object get() {
Object obj = null;
lock.lock();
try {
while (list.size() == 0) { //
System.out.println(" 消费者:" + Thread.currentThread().getName() + "被阻塞了,容器空了");
consumer.await();
}
obj = list.removeFirst();
System.out.println(" 消费者:" + Thread.currentThread().getName() + "消费了一个,当前容器中还剩余" + list.size() + " 唤醒了生产线程");
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return obj;
}

public static void main(String[] args) {
ProducerCustomerWaitNotifyAll pc = new ProducerCustomerWaitNotifyAll();
for (int i = 0; i < 10; i++) { //10个消费者
new Thread(() -> {
for (int j = 0; j < 5; j++) { //每个消费者最多消费5个
pc.get();
}
}, "c" + i).start();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for (int i = 0; i < 2; i++) { //2个生产者
new Thread(() -> {
for (int j = 0; j < 25; j++) { //每个生产者最多生产25个
pc.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}

实现两个线程交替执行