多线程进阶-JUC
Table of Contents generated with DocToc (opens new window)
- 2、JUC(java.util.concurrent)
# 2、JUC(java.util.concurrent)
java默认有两个线程:main和gc
并发:多个线程操作同一个资源类,将资源类丢入线程。编写代码时,当资源类就是一个纯粹的对象,包含属性和操作方法,然后new线程去调用资源类的方法
- 解耦
public class ThreadTest { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sell(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sell(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sell(); } }, "C").start(); } } //独立的资源类 class Ticket { private int number = 30; public synchronized void sell() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了票:" + (number--) + ",剩余:" + number); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Lock相关
# java锁的种类
- 公平锁/非公平锁
- 可重入锁
- 独享锁/共享锁
- 互斥锁/读写锁
- 乐观锁/悲观锁
- 分段锁
- 偏向锁/轻量级锁/重量级锁
- 自旋锁
# 1.公平锁/非公平锁
1.公平锁是指多个线程按照申请锁的顺序来获取锁。 2.非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。 对于Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。 对于synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。
# 2.可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。对于Java ReentrantLock而言, 其名字是Reentrant Lock即是重新进入锁。对于synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
synchronized void setA() throws Exception{ Thread.sleep(1000); setB(); } synchronized void setB() throws Exception{ Thread.sleep(1000); }
1
2
3
4
5
6
7上面的代码就是一个可重入锁的一个特点,如果不是可重入锁的话,setB可能不会被当前线程执行,可能造成死锁。
# 3.独享锁/共享锁
独享锁是指该锁一次只能被一个线程所持有;共享锁是指该锁可被多个线程所持有。
对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。读锁的共享锁可保证并发读是非常高效的,读写、写读 、写写的过程是互斥的。独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。对于synchronized而言,当然是独享锁。
# 4.互斥锁/读写锁
上面说到的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。互斥锁在Java中的具体实现就是ReentrantLock;读写锁在Java中的具体实现就是ReadWriteLock。
# 5.乐观锁/悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。
悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。比如Java里面的同步原语synchronized关键字的实现就是悲观锁。
乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS(Compare and Swap 比较并交换)实现的。
# 6.分段锁
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。当需要put元素的时候,并不是对整个HashMap进行加锁,而是先通过hashcode来知道他要放在那一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。但是,在统计size的时候,可就是获取HashMap全局信息的时候,就需要获取所有的分段锁才能统计。
分段锁的设计目的是细化锁的粒度,当操作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁操作。
# 7.偏向锁/轻量级锁/重量级锁
这三种锁是指锁的状态,并且是针对synchronized。在Java 5通过引入锁升级的机制来实现高效synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
# 8.自旋锁
在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
CAS中常用自旋锁
# 锁的使用
# 1.AQS
AbstractQueuedSynchronized 抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…
AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
# 2.CAS
CAS(Compare and Swap 比较并交换)是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
CAS操作中包含三个操作数——需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,否则处理器不做任何操作。无论哪种情况,它都会在CAS 指令之前返回该位置的值(在CAS的一些特殊情况下将仅返回CAS是否成功,而不提取当前值)。CAS有效地说明了“ 我认为位置V应该包含值A;如果包含该值,则将 B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可”。这其实和乐观锁的冲突检查 + 数据更新的原理是一样的。
JAVA对CAS的支持:
在JDK1.5中新增java.util.concurrent包就是建立在CAS之上的。相对于对于synchronized 这种阻塞算法,CAS是非阻塞算法的一种常见实现。所以java.util.concurrent在性能上有了很大的提升。
以java.util.concurrent包中的AtomicInteger为例,看一下在不使用锁的情况下是如何保证线程安全的。主要理解 getAndIncrement方法,该方法的作用相当于 ++i 操作。
public class AtomicInteger extends Number implements java.io.Serializable { private volatile int value; public final int get() { return value; } public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3.示例
synchronized验证可重入锁
//import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable{ //private ReentrantLock reentrantLock = new ReentrantLock(); public synchronized void get(){ System.out.println("2 enter thread name-->" + Thread.currentThread().getName()); //reentrantLock.lock(); System.out.println("3 get thread name-->" + Thread.currentThread().getName()); set(); //reentrantLock.unlock(); System.out.println("5 leave run thread name-->" + Thread.currentThread().getName()); } public synchronized void set(){ //reentrantLock.lock(); System.out.println("4 set thread name-->" + Thread.currentThread().getName()); //reentrantLock.unlock(); } @Override public void run() { System.out.println("1 run thread name-->" + Thread.currentThread().getName()); get(); } public static void main(String[] args){ Test test = new Test(); for(int i = 0; i < 10; i++){ new Thread(test, "thread-" + i).start(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35运行结果:
1 run thread name-->thread-0 1 run thread name-->thread-3 1 run thread name-->thread-2 1 run thread name-->thread-1 2 enter thread name-->thread-0 3 get thread name-->thread-0 1 run thread name-->thread-4 4 set thread name-->thread-0 5 leave run thread name-->thread-0 2 enter thread name-->thread-4 3 get thread name-->thread-4 1 run thread name-->thread-7 4 set thread name-->thread-4 1 run thread name-->thread-6 1 run thread name-->thread-5 5 leave run thread name-->thread-4 1 run thread name-->thread-8 1 run thread name-->thread-9 2 enter thread name-->thread-1 3 get thread name-->thread-1 4 set thread name-->thread-1 5 leave run thread name-->thread-1 2 enter thread name-->thread-2 3 get thread name-->thread-2 4 set thread name-->thread-2 5 leave run thread name-->thread-2 2 enter thread name-->thread-3 3 get thread name-->thread-3 4 set thread name-->thread-3 5 leave run thread name-->thread-3 2 enter thread name-->thread-9 3 get thread name-->thread-9 4 set thread name-->thread-9 5 leave run thread name-->thread-9 2 enter thread name-->thread-8 3 get thread name-->thread-8 4 set thread name-->thread-8 5 leave run thread name-->thread-8 2 enter thread name-->thread-5 3 get thread name-->thread-5 4 set thread name-->thread-5 5 leave run thread name-->thread-5 2 enter thread name-->thread-6 3 get thread name-->thread-6 4 set thread name-->thread-6 5 leave run thread name-->thread-6 2 enter thread name-->thread-7 3 get thread name-->thread-7 4 set thread name-->thread-7 5 leave run thread name-->thread-7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51ReentrantLock
ReentrantLock既可以构造公平锁又可以构造非公平锁,默认为非公平锁,将上面的代码改为用ReentrantLock实现,再次运行。
import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable{ //默认为非公平锁 private ReentrantLock reentrantLock = new ReentrantLock(); //公平锁 private ReentrantLock reentrantLock = new ReentrantLock(true); public void get(){ System.out.println("2 enter thread name-->" + Thread.currentThread().getName()); reentrantLock.lock(); System.out.println("3 get thread name-->" + Thread.currentThread().getName()); set(); reentrantLock.unlock(); System.out.println("5 leave run thread name-->" + Thread.currentThread().getName()); } public void set(){ reentrantLock.lock(); System.out.println("4 set thread name-->" + Thread.currentThread().getName()); reentrantLock.unlock(); } @Override public void run() { System.out.println("1 run thread name-->" + Thread.currentThread().getName()); get(); } public static void main(String[] args){ Test test = new Test(); for(int i = 0; i < 10; i++){ new Thread(test, "thread-" + i).start(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38公平锁在多个线程想要同时获取锁的时候,会发现再排队,按照先来后到的顺序进行。
ReentrantReadWriteLock
读写锁的性能都会比排它锁要好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock
特性 说明 公平性选择 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平 重进入 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁 锁降级 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁 import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test { public static void main(String[] args){ for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { Cache.put("key", new String(Thread.currentThread().getName() + " joke")); } }, "threadW-"+ i).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(Cache.get("key")); } }, "threadR-"+ i).start(); new Thread(new Runnable() { @Override public void run() { Cache.clear(); } }, "threadC-"+ i).start(); } } } class Cache { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); // 获取一个key对应的value public static final Object get(String key) { r.lock(); try { System.out.println("get " + Thread.currentThread().getName()); return map.get(key); } finally { r.unlock(); } } // 设置key对应的value,并返回旧有的value public static final Object put(String key, Object value) { w.lock(); try { System.out.println("put " + Thread.currentThread().getName()); return map.put(key, value); } finally { w.unlock(); } } // 清空所有的内容 public static final void clear() { w.lock(); try { System.out.println("clear " + Thread.currentThread().getName()); map.clear(); } finally { w.unlock(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72运行结果:
clear threadC-0 get threadR-2 null put threadW-2 get threadR-0 threadW-2 joke get threadR-3 threadW-2 joke clear threadC-1 put threadW-3 clear threadC-3 clear threadC-2 get threadR-1 null put threadW-1 put threadW-0 put threadW-4 get threadR-4 threadW-4 joke clear threadC-4 get threadR-5 null put threadW-5 put threadW-6 get threadR-6 threadW-6 joke get threadR-7 threadW-6 joke put threadW-7 clear threadC-6 put threadW-8 get threadR-8 threadW-8 joke clear threadC-8 get threadR-9 null clear threadC-5 clear threadC-9 clear threadC-7 put threadW-9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Lock接口的实现类
# ReentrantLock类
- 公平锁:可以按照线程到来的顺序执行
- 非公平锁:线程可以插队(默认)
上述Ticket类使用Lock实现
class Ticket {
private int number = 30;
Lock lock = new ReentrantLock();
public void sell() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了票:" + (number--) + ",剩余:" + number);
}
}catch (Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# synchronized与Lock的异同?
- synchronized是内置的java关键字,而Lock是一个java类(接口)
- synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁
- 当线程2所需的锁被线程1获取时,使用Lock锁就不会让线程2一直等待下去:tryLock()和tryLock(long time,TimeUnit timeUnit)可以尝试获取锁
- synchronized是可重入锁,是不可以中断的,非公平的;
- synchronized适合锁少量的代码,Lock适合锁大量的代码
- Lock是显式锁(手动开启和关闭锁,别忘记关闭锁),synchronized是隐式锁,出了作用域自动释放
- Lock只有代码块锁,synchronized有代码块锁和方法锁
- 使用Lock锁,JVM将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类)
- 优先使用顺序:Lock→同步代码块(已经进入了方法体,分配了相应资源)→同步方法(在方法体之外)
# 生产者消费者问题复习
- synchronized实现
public class WaitNotify {
public static void main(String[] args) {
Number number = new Number();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
number.incr();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
number.decr();
}
}, "B").start();
}
}
class Number {
private Integer number = 0;
public synchronized void incr() {
//不该A操作 等待
if (number != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(Thread.currentThread().getName() + "->" + number);
//操作完后 通知其他线程
notifyAll();
}
public synchronized void decr() {
//不该B操作 等待
if (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(Thread.currentThread().getName() + "->" + number);
//操作完后 通知其他线程
notifyAll();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 虚假唤醒问题
这里有一个问题:当存在大于2个线程时,使用if去判断线程是否该等待这个条件时,会造成虚假唤醒
虚假唤醒的理解:在多线程的情况下,当多个线程执行了wait()方法后,需要其它线程执行notify()或者notifyAll()方法去唤醒,假如被阻塞的多个线程都被唤醒,但实际情况是被唤醒的线程中有一部分线程是不应该被唤醒的,那么对于这些不应该被唤醒的线程而言就是虚假唤醒。
说明:在上述例子中,假如线程AC执行+1,线程BD执行-1,当A执行+1前,C和BD均阻塞在if方法体的最开始部分(已经进入了if方法体中),A执行完+1操作后,number变为1,唤醒其他线程,线程C、BD均被唤醒,此时B(-1)得到操作权,B(-1)回到就绪状态,number变成0,然后此时可能D(-1)比C(+1)先得到操作权,但是它并没有判断此时的number是不是1(因为if中的语句只执行一次),
它又对number-1,number就变成了-1。
总结:出现虚假唤醒的原因是从阻塞态到就绪态再到运行态没有进行判断,我们只需要让其每次得到操作权时都进行判断就可以了
官方建议:wait()应该总是出现在循环中(使用while()来判断)
class Number { private Integer number = 0; public synchronized void incr() { //使用while 防止虚假唤醒问题 while (number != 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number++; System.out.println(Thread.currentThread().getName() + "->" + number); //操作完后 通知其他线程 notifyAll(); } public synchronized void decr() { //使用while 防止虚假唤醒问题 while (number == 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println(Thread.currentThread().getName() + "->" + number); //操作完后 通知其他线程 notifyAll(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用Lock及其相关的await()、signal()、signalAll()方法替代synchronized及wait()、notify()、此方法相当于notifyAll()
private final Lock lock = new ReentrantLock(); //通过Lock接口的实例可以获取Condition对象 private final Condition condition = lock.newCondition(); //此方法相当于wait() condition.await(); //此方法相当于notifyAll() condition.signalAll();
1
2
3
4
5
6
7
通过一个Lock获取多个Condition对象实现精准唤醒线程:
思路:给执行不同任务的线程分别指定一个Condition对象,线程在使用await()时,使用自己的Condition对象调用;在signal()时,可以用别的线程的Condition对象调用,这样就实现了一个线程精准唤醒另一个线程。
代码示例:
/**
* @author zdk
* @date 2021/11/7 19:41
* A执行完调用B B执行完调用C C执行完调用A,即ABC三线程顺序循环执行
*/
public class LockCondition {
public static void main(String[] args) {
Number number = new Number();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
number.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
number.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
number.printC();
}
}, "C").start();
}
}
class Number {
private Integer number = 1;
private final Lock lock = new ReentrantLock();
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
private final Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
//业务,判断->执行->通知
while (number != 1) {
//等待
condition1.await();
}
//A执行
System.out.println(Thread.currentThread().getName() + ":AAAA");
//唤醒指定的 B
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
//业务,判断->执行->通知
while (number != 2) {
//等待
condition2.await();
}
//A执行
System.out.println(Thread.currentThread().getName() + ":BBBB");
//唤醒指定的 C
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
//业务,判断->执行->通知
while (number != 3) {
//等待
condition3.await();
}
//A执行
System.out.println(Thread.currentThread().getName() + ":CCCC");
//唤醒指定的 A
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# 集合的线程安全
//创建ArrayList集合
public class CollectionThread {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println("list = " + list);
},String.valueOf(i)).start();
}
}
}
//会出现ConcurrentModificationException 并发修改异常
//此方法没有synchronized声明 是线程不安全的
boolean add(E e);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
出现ConcurrentModificationException的原因:输出list时,调用了ArrayList的toString()方法,而ArrayList中的toString方法是继承自AbstractCollection的,里面是调用了Iterator迭代器就行输出的,而List的迭代器是有fail-fast机制的(防止并发操作出现错误),ConcurrentModificationException是由迭代器抛出的。迭代器遍历调用next方法,next方法调用了一个方法来判断“预期修改次数”和“实际修改次数”,迭代器遍历List的同时add数据的话“实际修改次数”会自增加一,这样的话“预期修改次数”和“实际修改次数”就不想等,所以抛出并发修改异常。
要修改为线程安全的做法:
Vector
Vector下的add普遍都是线程安全的
public synchronized boolean add(E e) { modCount++; add(e, elementData, elementCount); return true; }
1
2
3
4
5
6在改变代码时,只需要将其修改为
List<String> list = new Vector<>();
但此方法用的比较少,因为在jdk 1.0的版本适用Collections
Collections类中的很多方法都是static静态 其中有一个方法是返回指定列表支持的同步(线程安全的)列表为
synchronizedList(List <T> list)
具体代码为List<String> list = Collections.synchronizedList(new ArrayList<>());
1Collections的几个static方法可以让我们得到线程安全的Lits、Set、Map
不过此方法也比较古老,很少使用
所以我们使用以下三个类:CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap来实现集合的线程安全
CopyOnWriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
1涉及的底层原理为写时复制技术
读的时候并发(多个线程操作)
写的时候独立,先复制相同的空间到某个区域,将其写到新区域,旧新合并,并且读新区域(每次加新内容都写到新区域,覆盖合并之前旧区域,读取新区域添加的内容)
源码:
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14CopyOnWriteArraySet
使用方法同上的CopyOnWriteArrayList
ConcurrentHashMap
ConcurrentHashMap实现线程安全二队原理与1、2两个类不同
在ConcurrentHashMap没有出现以前,jdk使用hashtable来实现线程安全,但是hashtable是将整个hash表锁住,所以效率很低下。
ConcurrentHashMap将数据分别放到多个Segment中,默认16个,每一个Segment中又包含了多个HashEntry列表数组。
向 ConcurrentHashMap 中插入数据或者读取数据,首先都要将相应的 Key 映射到对应的 Segment,因此不用锁定整个类, 只要对单个的 Segment 操作进行上锁操作就可以了。理论上如果有 n 个 Segment,那么最多可以同时支持 n 个线程的并发访问,从而大大提高了并发访问的效率。
对于一个key,需要经过三次hash操作,才能最终定位这个元素的位置,这三次hash分别为:
- 对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);
- 将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;
- 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。
每一个Segment都拥有一个锁,当进行写操作时,只需要锁定一个Segment,而其它Segment中的数据是可以访问的。
https://blog.csdn.net/dianzijinglin/article/details/80997935 hashtable解析
https://www.cnblogs.com/dolphin0520/p/3932905.html 1.6版本concurrentHashMap实现
https://blog.csdn.net/jianghuxiaojin/article/details/52006118
https://www.jianshu.com/p/d10256f0ebea 1.8版本concurrentHashMap实现
# Callable
与Runnable接口相比,Callable功能更强大
- 相比run()方法,call()可以有返回值
- call()方法可以抛出异常
- Callable支持泛型的返回值
- 需要借助FutureTask类,比如获取返回结果
- Future接口
- 可以对具体Runnable、Callable任务的执行结果进行取消、查询是否完成、获取结果等
- FutureTask是Futrue接口的唯一的实现类
- FutureTask同时实现了Runnable,Future接口。它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值
关系说明:
new Thread(new Runnable()).start();
new Thread(new FutureTask<V>()).start();//因为FutureTask也是Runnable接口的一个实现类
new Thread(newFutureTask<V>(new Callable())).start();//FutureTask<V>的构造方法可以传入Callable接口
2
3
4
代码示例:
//1.创建一个实现了Callable接口的实现类
class NumberCallable implements Callable<Integer> {
//2.实现call()方法,将线程要执行的操作声明在call()中
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
if (i % 2 == 0) {
sum += i;
}
}
return sum;
}
}
/**
* @author zdk
* @date 2021/11/6 17:25
*/
public class CallableTest {
public static void main(String[] args) {
//3.创建Callable接口实现类的对象
NumberCallable numberCallable = new NumberCallable();
//4.将此Callable接口实现类的对象作为参数传递到FutureTask的构造器中,创建FutureTask对象
FutureTask<Integer> futureTask = new FutureTask<>(numberCallable);
//5.将FutureTask的对象作为参数传递到Thread的构造器中,创建Thread对象,调用其start()
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
try {
//6.可以通过FutureTask对象的get()方法获取call()方法的返回值
//get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值
//get()会产生阻塞
Integer sum = futureTask.get();
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
new Thread(futureTask,"A").start(); new Thread(futureTask,"B").start();
- 启动两个一样的线程,实际只会输出一个sum,是因为在多线程并发的情况下使用Callable,运行结果会做缓存处理,提高效率
- 使用get()获取返回值时可能会阻塞,所以一般放在程序最后执行,或者采用异步调用获取方式
# 常用的辅助类
# CountDownLatch
两个常用的方法:
- await():使得当前线程在锁存器计数至0之前一直等待(除非线程被中断)
- coutDown():递减锁存器的计数,如果计数减为0,将释放所有等待的线程
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await() 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句
具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果
代码示例:
如果不加 CountDownLatch类,会出现线程混乱执行,小张们还未走完 就已经锁门了
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 走了");
countDownLatch.countDown();
}, "小张" + i).start();
}
try {
//等待计数器归零 然后再向下执行
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有人都走完了 关门");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# CyclicBarrier(篱栅)
该类是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点,在设计一组固定大小的线程的程序中,这些线程必须互相等待,这个类很有用,因为barrier在释放等待线程后可以重用,所以称为循环barrier
代码示例:七个葫芦娃救yeye
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () ->
System.out.println("集齐了7个葫芦娃,可以救yeye了"));
for (int i = 0; i < 7; i++) {
new Thread(()->{
System.out.println("集齐"+Thread.currentThread().getName());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"葫芦娃"+(i+1)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
注意:如果将for循环的次数改为15,会有两次集齐输出,然后会一直阻塞
因为后面只有1个线程未执行了,而cyclicBarrier.await()仍然需要等到有7个线程执行完。
阅读源码发现,CyclicBarrier内部执行await()时调用dowait()方法,仍是减操作
# Semaphore(信号量)
Semaphore是一个计数信号量,从概念上讲,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动
- 构造方法:
Semaphore(int permits)
创建具有给定的许可数和非公平的公平设置的Semaporepublic Semaphore(int permits) { sync = new NonfairSync(permits); }
1
2
3public Semaphore(int permits, boolean fair)
创建具有给定的许可数和可选择的公平设置的Semaporepublic Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
1
2
3
- 常用的方法:
acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断release()
释放一个许可,将其返回给信号量- 一般
acquire()
都会抛出异常,release
在finally
中执行
代码示例
public class SemaphoreDemo { public static void main(String[] args) { //定义信号量及允许的大小 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(()->{ try { //当前线程获取信号量,如果没获取到,等待 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "进入饭店吃饭"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "吃完离开了饭店"); }catch (InterruptedException e){ e.printStackTrace(); }finally { //当前线程释放它所拿到的信号量 semaphore.release(); } },"顾客"+(i+1)).start(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Semaphore的用处
主要用于多个共享资源互斥的使用的场景,如并发限流,控制最大线程数
# 读写锁(ReadWriteLock接口)
概念:
- ReadWriteLock维护一对关联的locks,一个用于只读操作,一个用于写入。
- 读操作可以由多个线程同时执行,而写操作同时只能有一个线程执行
- ReadWriteLock有且仅有一个实现类 ReentrantReadWriteLock:可重入读写锁
- 原则上读写问题使用synchronized和Lock可以解决,但是粒度较读写锁大。
- 写锁相当于独占锁,读锁相当于共享锁
相比于普通互斥锁:
- 相比互斥锁,更加细粒度
- 在理论上,通过使用读写锁允许的并发性增加将导致性能改进超过使用互斥锁
- 读写锁是否会提高使用互斥锁的性能,取决于数据被读取的频率与被修改的频率相比,读取和写入操作的持续时间以及数据的争用。即是,将尝试同时读取或写入数据的线程数。例如,最初填充数据的集合,然后经常被修改的频繁搜索(例如某种目录)是使用读写锁的理想候选。 然而,如果更新变得频繁,那么数据的大部分时间将被专门锁定,并且并发性增加很少。不建议使用读写锁。简单理解就是 经常被修改的读取的数据,建议使用读写锁,对于不长变动而且 并发很少的情况,不建议使用读写锁。
独占锁(写锁):一次只能被一个线程持有
共享锁(读锁):可以被多个线程同时持有
ReadWriteLock中:
读-读:可以共存
读-写:不能共存
写-写:不能共存
自定义缓存不加锁情况:
public class ReadWriteLockDemo { public static void main(String[] args) { MyCache cache = new MyCache(); // MyCacheWriteReadLock cache = new MyCacheWriteReadLock(); // MyCacheSynchronized cache = new MyCacheSynchronized(); // MyCacheLock cache = new MyCacheLock(); // 模拟4个线程写入缓存 for (int i = 1; i <= 4; i++) { final int temp = i; new Thread(()->{ cache.save(temp +"",temp+""); },String.valueOf(i)).start(); } // 模拟4个线程读取缓存 for (int i = 1; i <= 4; i++) { final int temp = i; new Thread(()->{ cache.get(temp +""); },String.valueOf(i)).start(); } } } /** * 模拟缓存 写入、读取 不加任何锁 */ class MyCache { private volatile Map<String,String> cache = new HashMap<>(); public void save(String key,String value){ System.out.println(Thread.currentThread().getName() + "===》开始写入数据"); cache.put(key,value); System.out.println(Thread.currentThread().getName() + "===》写入成功了 " + key + "---" + value); } public void get(String key){ System.out.println(Thread.currentThread().getName() + "===》开始获取数据"); String result = cache.get(key); System.out.println(Thread.currentThread().getName() + "===》获取成功了 " + result); } } 输出: 1===》开始写入数据 ======> 正确的逻辑应该是写入然后读取,但是不加锁的时候,读取之前被 多个线程同时更新了缓存 4===》开始写入数据 4===》写入成功了 4---4 3===》开始写入数据 2===》开始写入数据 2===》写入成功了 2---2 3===》写入成功了 3---3 1===》写入成功了 1---1 1===》开始获取数据 2===》开始获取数据 2===》获取成功了 2 3===》开始获取数据 3===》获取成功了 3 4===》开始获取数据 1===》获取成功了 1 4===》获取成功了 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64加上读写锁:
- // 读写锁对象 ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
- // 加入写锁,保证只能同时有一个线程来写,// 释放写锁 reentrantReadWriteLock.writeLock().lock(); reentrantReadWriteLock.writeLock().unlock();
- // 加上读锁,保证读的时候不能有其他线程写 // 释放读锁 reentrantReadWriteLock.readLock().lock(); reentrantReadWriteLock.readLock().unlock();
/** * 2. 模拟缓存 加读写锁 写入、读取 */ class MyCacheWriteReadLock { private volatile Map<String,String> cache = new HashMap<>(); /** * 读写锁 */ ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); public void save(String key,String value){ // 加入写锁,保证只能同时有一个线程来写 reentrantReadWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "===》开始写入数据"); cache.put(key,value); System.out.println(Thread.currentThread().getName() + "===》写入成功了 " + key + "---" + value); } catch (Exception e) { e.printStackTrace(); } finally { // 释放写锁 reentrantReadWriteLock.writeLock().unlock(); } } public void get(String key){ // 加上读锁,保证读的时候不能有其他线程写 reentrantReadWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "===》开始获取数据"); String result = cache.get(key); System.out.println(Thread.currentThread().getName() + "===》获取成功了 " + result); } catch (Exception e) { e.printStackTrace(); } finally { // 释放读锁 reentrantReadWriteLock.readLock().unlock(); } } } 输出: 2===》开始写入数据 2===》写入成功了 2---2 3===》开始写入数据 3===》写入成功了 3---3 1===》开始写入数据 1===》写入成功了 1---1 4===》开始写入数据 4===》写入成功了 4---4 1===》开始获取数据 1===》获取成功了 1 4===》开始获取数据 3===》开始获取数据 3===》获取成功了 3 2===》开始获取数据 2===》获取成功了 2 4===》获取成功了 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62# 加上Lock锁和synchronized情况:
/** * 3. 模拟缓存 加synchronized 写入、读取 */ class MyCacheSynchronized { private volatile Map<String,String> cache = new HashMap<>(); public synchronized void save(String key,String value){ System.out.println(Thread.currentThread().getName() + "===》开始写入数据"); cache.put(key,value); System.out.println(Thread.currentThread().getName() + "===》写入成功了 " + key + "---" + value); } public synchronized void get(String key){ System.out.println(Thread.currentThread().getName() + "===》开始获取数据"); String result = cache.get(key); System.out.println(Thread.currentThread().getName() + "===》获取成功了 " + result); } } /** * 4. 模拟缓存 加Lock锁 写入、读取 */ class MyCacheLock { private volatile Map<String,String> cache = new HashMap<>(); Lock lock = new ReentrantLock(); public void save(String key,String value){ lock.lock(); try { System.out.println(Thread.currentThread().getName() + "===》开始写入数据"); cache.put(key,value); System.out.println(Thread.currentThread().getName() + "===》写入成功了 " + key + "---" + value); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void get(String key){ System.out.println(Thread.currentThread().getName() + "===》开始获取数据"); String result = cache.get(key); System.out.println(Thread.currentThread().getName() + "===》获取成功了 " + result); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 阻塞队列(BlockingQueue)
- 阻塞队列是共享队列(多线程操作),一端输入,一端输出
- 队列的容量不是无限的,满了之后就会进入阻塞,取出也同理
- 当队列是空的,从队列中获取元素的操作将会别阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
- 试图从空的队列中获取元素的线程将会被阻塞,知道其他线程往空的队列中插入新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
# 四组API
方式 | 抛出异常 | 不抛出异常、有返回值 | 一直阻塞等待 | 超时等待(设置等待的时间,超时则不再等待) |
---|---|---|---|---|
添加 | add(E e) | offer() | put(E e) | offer(E e,long time,TimeUnit timeUnit) |
移除 | remove() | poll() | take() | poll(long time,TimeUnit timeUnit) |
判断队列首部元素 | element() | peek() |
代码示例:
public class BlockingQueueDemo01 {
public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
// test3();
test4();
}
public static void test1(){
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//element查看队首元素 失败抛出异常
System.out.println(blockingQueue.element());
//peek查看队首元素 失败不抛出异常
System.out.println(blockingQueue.peek());
//IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//remove移除队列首元素 返回移除的值,如果没有,抛出异常NoSuchElementException
// System.out.println(blockingQueue.remove());
}
public static void test2(){
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//offer加入元素 失败返回false 不抛出异常
System.out.println(blockingQueue.offer("d"));
System.out.println("====");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//poll移除队列首元素 失败返回null 不抛出异常
System.out.println(blockingQueue.poll());
}
public static void test3() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//加入元素 抛出异常 队列满后一直阻塞等待
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");
//获取队列首元素 不抛出异常 队列空时一直等待阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
public static void test4() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//offer加入元素(超时等待),等待两秒,如果仍不能加入,则返回false 不抛出异常 退出
System.out.println(blockingQueue.offer("d", 2,TimeUnit.SECONDS));
System.out.println("====");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//poll移除队列首元素(超时等待) 超时失败返回null 不抛出异常 退出
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 阻塞队列分类
# 1.ArrayBlockingQueue
- 由数组结构组成的有界阻塞队列
- ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,无法并行
# 2.LinkedBlockingQueue
由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
- 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
# 3.DelayQueue
使用优先级队列实现的延迟无界阻塞队列
- DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,
因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
# 4.PriorityBlockingQueue
基于优先级的阻塞队列 支持优先级排序的无界阻塞队列
不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
# 5.SynchronousQueue
一种无缓冲的等待队列 相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区) 不存储元素的阻塞队列,也即单个元素的队列
同步队列:加入一个元素必须等待取出之后,才能再往里面放入一个元素
存:put() 取:take()
声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。 公平模式和非公平模式的区别: • 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
• 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者
而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
# 6.LinkedTransferQueue
由链表结构组成的无界阻塞 TransferQueue 队列 由链表组成的无界阻塞队列
- 预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,生成一个节点(节点元素为 null)入队,消费者线程被等待在这个节点上,生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回
# 7.LinkedBlockingDeque
由链表结构组成的双向阻塞队列 阻塞有两种情况
- 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException异常
- 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
# 线程池
连接池:连接池是创建和管理一个连接的缓冲池的技术,这些连接准备好被任何需要它们的线程使用
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度
特点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以进行统一分配,调优和监控。
要点:三大方法、七大参数、四种拒绝策略
public class ThreadPoolDemo01 {
public static void main(String[] args) {
//单个线程
// ExecutorService executorService = Executors.newSingleThreadExecutor();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 200; i++) {
executorService.execute(()-> System.out.println(Thread.currentThread().getName()+" over"));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 七大参数源码
public ThreadPoolExecutor(int corePoolSize, //核心线程数
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间数
TimeUnit unit, //超时时间单位
BlockingQueue<Runnable> workQueue, //工作阻塞队列
ThreadFactory threadFactory, //创建线程的工厂
RejectedExecutionHandler handler //拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int corePoolSize
, 常驻线程数量(核心)int maximumPoolSize
,最大线程数量long keepAliveTime,TimeUnit unit
,线程存活时间BlockingQueue<Runnable> workQueue
,阻塞队列(排队的线程放入)ThreadFactory threadFactory
,线程工厂,用于创建线程RejectedExecutionHandler handler
拒绝策略(线程满了且阻塞队列满了如何处理)
# 四种拒绝策略类:
抛异常
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
谁调用找谁
CallerRunsPolicy:"调用者运行":一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。例如如果当前线程池和阻塞队列均达到了最大容量,仍有多的线程希望执行,则会使用主线程(main)执行
抛弃最久执行当前
DiscardOldestPolicy:抛弃队列中等待最久的任务(尝试和最早的竞争),然后把当前任务加入队列中,尝试再次提交当前任务,
失败也不会抛出异常
不理不问
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略
线程池抽象:
代码示例
:
@Test
public void test(){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//10个线程,但最大线程数只有5,且用于等待的阻塞队列容量为3
//总的能容纳的线程数为8 如果如下 超过 则会出现 RejectedExecutionException
//因为设定的拒绝策略为AbortPolicy
try {
for (int i = 0; i < 9; i++) {
threadPoolExecutor.execute(()-> System.out.println(Thread.currentThread().getName()+" 处理"));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# CPU密集型和IO密集型
池的大小如何定义:
CPU密集型:CPU多少核心,即可同时执行多少条线程
可以通过 Runtime.getRuntime().availableProcessors();代码获取CPU核心数
IO密集型:判断程序中十分耗IO的线程数量,将最大线程数设为大于此数量
# ForkJoin(分支合并)
从jdk1.7开始存在。可以并行执行任务,提高效率。建议在大数据量时使用
ForkJoin特点:工作窃取
举例:A线程任务执行了一部分时,B线程任务已执行完,这时B线程就会将A线程的任务"窃取"过来自己完成。
概念:
将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果
- ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
- ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行
- RecursiveTask: 继承后可以实现递归(自己调自己)调用的任务
创建分支合并对象,通过该对象调用内部方法
demo:
public class ForkJoinDemo {
/**
* ForkJoin 150-220ms
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void test1() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
RecursiveTask<Long> demoTest = new ForkJoinDemoTest(0L, 1000000000L);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> submit = forkJoinPool.submit(demoTest);
long result = submit.get();
long end = System.currentTimeMillis();
System.out.println("结果:"+result+" 花费时间(ms):"+(end-start));
}
/**
* stream并行流方式 150ms左右
*/
@Test
public void test2(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 1_0000_00000).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("结果:"+sum+" 花费时间(ms):"+(end-start));
}
}
class ForkJoinDemoTest extends RecursiveTask<Long> {
private final Integer temp = 10000;
private long start;
private long end;
public ForkJoinDemoTest(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) <= temp){
long sum = 0L;
for (long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else{
long mid = (start+end)/2;
ForkJoinDemoTest task1 = new ForkJoinDemoTest(start, mid);
ForkJoinDemoTest task2 = new ForkJoinDemoTest(mid, end);
task1.fork();
task2.fork();
return task1.join() + task2.join();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 异步回调
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息
类中的具体引用类以及接口:
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类
- 异步调用没有返回值的方法:runAsync()
- 异步调用有返回值的方法:supplyAsync()
- 主线程调用get()方法会阻塞
demo:
public class CompletableFutureDemo {
/**
* 无返回值
*/
@Test
public void noReturnValue() throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务执行完成");
});
System.out.println("151515");
//主线程调用 产生阻塞 返回null
System.out.println(voidCompletableFuture.get());
}
@Test
public void hasReturnValue() throws ExecutionException, InterruptedException {
CompletableFuture<Boolean> booleanCompletableFuture = CompletableFuture.supplyAsync(()->{
System.out.println("正在执行异步操作");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 1/0;
return true;
});
//whenComplete是当上述异步任务执行成功后的回调
//exceptionally是当异步任务发生异常时的回调
Boolean result = booleanCompletableFuture.whenComplete((u, t) -> {
System.out.println("u = " + u);
System.out.println("t = " + t);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return false;
}).get();
System.out.println("result = " + result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
注:whenComplete方法的参数是一个BiConsumer<? super T, ? super Throwable>类型的消费型接口
# Future和CompletableFuture
对比这两种方法,前者仍为同步,后者为异步。
Future在java里,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Future,在Future里面有isDone方法来帕努单任务是否处理结束,还有get方法可以一直阻塞,直到任务结束然后获取结果,但总体来说这种方式还是同步的,因为需要客户端不断阻塞等待或者不但轮询才能知道任务是否完成。
Future相较于CompletableFuture的缺点
不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
不支持进一步的非阻塞调用
通过Future的get方法会一直阻塞到任务完成,但是想在获取任务结果之后执行额外的任务就不行了,因为Future不支持回调函数,所以无法实现这个功能
不支持链式调用
对于Future 执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的
不支持多个Future合并
比如有10个Future并行执行,我们想在所有的Future运行完毕后,执行某些函数,是没办法通过Future实现的
不支持异常处理
Future没有任何与异常处理相关的API,所以在异步运行时,出现问题不好定位