线程基础

Callable 实现原理

Callable 接口按照 Runnable 方式写作一个可带返回参数的接口,即 call() 参照 run()。

其中有两点需要考虑:

  1. 有返参,但主线程如何从线程中获取返参
  2. Thread 执行方法中,只有 Runnable 类型 参数

所以此时:

  1. 构建一个 Future 接口,其中 get()方法是用于获取 call() 方法结果
  2. 构建一个 RunableFuture 接口,继承 Runnable 和 Future

这样,实现了 RunnableFuture 接口的实现类 FutureTask 可以解决以上问题

具体为:

  1. FutureTask 中 Callable 属性赋值
  2. 重写 run()方法,其中调用 callable 的 call() 方法获取到返回值,将返回值调用 set()方法赋值给 outcome 属性,同时将 state(
    即线程状态属性) 赋值为 normal
  3. 然后 futureTask 调用 get() 方法去拿取 outcome 的值,因为是异步的,此时需要判断 state 属性判断线程是否执行完成
  4. 如果没有执行完,则通过 awaitDone() 方法循环等待,可以设置超时时间,如果超时则报错,如果未设置,则一直循环获取
  5. 其中有一个 WaitNode 内部类,获取当前线程状态,从而循环判断 state 的值

创建线程方式

Thread 源码上写的就两种

  1. 继承 Thread 类
  2. 实现 Runnable 接口

future 本质也是实现了 runnable 接口,在 run 方法中调用了 callback 的 call() 方法

start 方法

​ 实际是调用了 start0() 本地方法,同时调用本地方法前会有检查,如执行两次 start 方法,会抛出异常

​ Thread 对象实则只是堆对象,真正调用线程是通过 Thread 的 start 方法去调用 start0()本地方法调用的,而 run() 方法对于线程类似于主线程的
main() 方法,让线程知道需要执行该方法。

线程终止方式

自然死亡

  1. run() 跑完

  2. 抛出未捕捉异常

自定义死亡

  1. 使用 stop()

    已弃用,不能确定线程状态,直接停止,线程可能没有跑完就停了

  2. 中断

    A 调用 B 的 interrupt()方法,设置 B 的中断标记位为 true,B 是否终止由 B 线程自行决定,B 通过 isInterrupted()
    来获取中断标志位,程序员可以在程序中自行编写结束代码

    不建议自定义中断标志位,因为线程如果有阻塞调用,线程不会获取到 cpu 资源,无法执行对应赋值方法,即线程无法第一时获取到自定义的中断标志位

    而 interrupt 调用时,线程阻塞方法自带了中断标志的检查,即修改中断标志位后,线程也会自己被唤醒,从而能执行中断方法

线程的生命状态

初始 - 运行中 - 就绪 - 阻塞 - 超时等待 - 等待 - 终止

image-20230307175119099

阻塞

只有被 关键字才会阻塞

yield 实例

ConcurrentHashMap 初始化时,会调用 yield() 方法

原因是在 ConcurrentHashMap 在初始化时没有生成数组,而是在 put 时才会生成数组,在多个线程调用 put
时,为了让数组最快生成,则让一个线程生成,其他线程让出 cpu 资源。

线程的调度

协同式调度

线程自己控制,如果阻塞可能就一直阻塞,无法让给其他线程

抢占式调度

系统控制,线程执行时间不可控,阻塞后,其他线程可以抢占 cpu 资源

java 是抢占式调度,通过 ** 线程优先级 **,在就绪状态时,优先级越高的线程越容易执行

线程实现

内核实现

语言启动,则硬件线程和操作系统是 1:1 实现,cpu 一个线程对应一个操作系统线程

用户实现

操作系统实现逻辑线程,可以让硬件线程和操作系统作为 1:N,一个 cpu 对应操作系统的多个逻辑线程

golang 对应

混合实现

即实现多个逻辑线程同时,可以对应多个硬件线程,即 M:N

协程

适用于 IO 密集型(高并发),计算密集型反而会降速

有栈协程

无栈协程

java 的协程——Quasar

1
2
3
4
5
6
<!-- 引入 -->
<dependency>
<groupId>co.parlleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.9</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 使用
public void quasarTest(){
CountDownLatch count=new CountDownLatch(10000);
StopWatch stopWatch=new StopWatch();
stopWatch.start();
IntStream.range(0,10000).forEach(i->new Fiber(){
@Override
protected String run()throws SuspendExecution,InterruptedException{
// Quasar 中 Thread 和 Fiber 都被称为 Strand,Fiber 不能调用 Thread.sleep() 休眠
Strand.sleep(1000);
count.countDown();
return"aa";
}
}.start());
count.await();
stopWatch.stop;
}
// jdk19 引入新的协程——虚拟线程,暂不稳定
// 引用 quasar 时,需要添加启动项
// -javaagent:jar 包地址 \quasar-core-0.7.9.jar

守护线程

支持工作,如 GC 等

1
2
3
4
UserThread userThread=new UserThread();// 死循环
userThread.setDaemon(true);// 设置为守护线程
userThread.start();// 如未设置守护线程,main 线程结束后仍然一直运行
// 守护线程在 main 线程停止后会自动死亡

线程间通信

管道的输入输出流

案例:** 数据库 ** 统计 ** 报表 ** 生成 **excel 文件 ** 上传给 ** 远端 **,
使用管道则可以不用在本地生成 excel 文件,直接上传给远端

具体实现为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// PipedOutputStream 和 PipedInputStream(字节)、PipedReader 和 PipedWriter(字符)
public void test(){
PipedWriter out=new PipedWriter();
PipedReader in=new PipedReader();
out.connect(in);
Thread printThread=new Thread(new Print(in),"PrintThread");
printThread.start();
int receive=0
try{
// 控制台输入
while((receive=System.in.read())!=-1){
out.write(receive)
}
}finally{
out.close();
}
}

static class Print implements Runnable {
private PipedRead in;

public Print(PipedReader in) {
this.in = in;
}

@Override
public void run() {
int receive = 0;
try {
// 实际业务中可以将输入流直接通过网络通信写出
while ((receive = in.read()) != -1) {
sout((char) receive)
}
} catch (Exception e) {
}
}
}

join()

保证三个线程依次跑完,可以用 join() 方法

synchronized 内置锁

强制将并行变为串行,只能传入对象实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 对象锁
public void Block(){
synchronized (this){
count++;
}
// 等价
synchronized (obj){
count++;
}
}
// 等价
public synchronized void Block(){
count++;
}

// 类锁
public synchronized static void Block(){
count++;
}

// sout() 方法底层适用了 synchronized 关键字

1.8 之后引入了轻量级偏向锁,如果竞争少的情况下会 cas 自旋拿锁

对象锁和类锁

对象锁:用于对象实例,或者对象实例方法

类锁:写在静态方法,或者类的 class 对象上

1
2
3
4
5
6
7
// 想要锁生效,两线程调用的锁对象必须是同一对象
// 错误案例:锁对象在一直重新赋值
Integer i=0;
synchronized(i){
i++;// 在这儿累加时,i 被重新赋值,所以锁对象一直在改变
}

volatile

保证了不同线程对变量操作的可见性,线程修改后立即可见

适用场景:一个线程写,多个线程读

不是锁,无法保证串行

等待 / 通知机制

wait() 未设置超时时间,设置线程为等待

wait(Long time) 设置超时时间,设置线程为超时等待

等待和通知范式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 等待方:
加锁(对象){
// 超时机制
// 这里设置等待时长
// 这里设置超时时刻
// while 会判断等待时长是否小于 0,小于 0 则不等待了
while(条件 A 不满足){
// 等待时添加等待时长,如果中途唤醒,则会重新赋值等待时长
对象.wait 方法(等待方法,会释放锁)
// 唤醒后,通过超时时刻 - 当前时间,重新赋值等待时长
}
业务逻辑
}
通知方:
加锁(对象){
业务逻辑,改变条件 A;
对象.notify 方法(通知方法)
}
1
2
// 实际案例

面试题

方法和锁

yield()、sleep() 线程方法,不会释放锁

wait(),notify() 对象方法,wait()会释放锁,notify() 通知作用,对锁无影响

为什么 wait()和 notify() 需要在同步块里调用

如果不这么设计,会导致 LostWeakUp, 丢失唤醒信号。

即不在同步块时,A 可能会在 B 执行 notify()之后再 wait(),这样就会丢失 B 的 notify() 信号,从而一直 wait。

同步块则保证了 A 执行了 wait()方法之后释放了锁,B 才能执行 notify() 通知方法

为什么要循环中检查等待条件

因为多线程 wait()时,当 notifyAll() 通知所有线程后,多个线程不一定就都能满足条件,如果不满足,则还需要进入等待状态

比如生产者生产一条消息被多个消费者竞争,始终只有一个消费者线程抢占到锁拿到该消息,抢占到消息 (即抢占到锁) 的消费者线程执行完后则释放锁让其他线程执行,其他未抢占到消息的线程抢到锁执行后,则仍然继续等待,所以不代表唤醒后就一定能执行,每次唤醒后还需要重新判断是否抢占到资源,如果没有,则还需要执行等待方法

CompletableFuture

future 的缺点

  1. 调用 get() 方法会一直阻塞直到计算完成,没有提供完成通知的方法,也不具有回调函数的方法
  2. 链式调用和结果聚合处理时,需要调用的多个 future 进行处理,
  3. 没有提供异常处理方法

CompletableFuture 针对以上痛点都有自己的设计