滤冽 发表于 2025-6-9 17:59:42

百万架构师第四十五课:并发编程的基础|JavaGuide

原文链接
课程目标

1. 多线程的发展历史
2. 线程的应用
3. 并发编程的基础
4. 线程安全的问题
特定的指令,计算机不会存储指令,把指令写下来,一次性读取指令,批处理。
然后我们需要把批处理进行隔离、保存它的进度。
进程 —> 线程
单核CPU 只有可能会有一个进程去执行。
什么情况下应该使用多线程

线程出现的目的是什么?解决进程中多任务的实时性的问题?其实简单来说,就是解决“阻塞”的问题。阻塞的意思就是程序运行到某个函数或过程后等待某些事件发生而暂时停止 CPU 占用的情况,也就是说会使得 CPU 闲置。还有一些场景就是比如对于一个函数中的运算逻辑的性能问题,我们可以 通过多线程的技术,使得一个函数中的多个逻辑运算通过多线程技术达到一个并行执行,从而提高性能。
CPU 架构图解:


所以,多线程最终解决的就是“等待”的问题,所以简单总结的使用场景

[*]通过并行计算提高程序执行性能
[*]需要等待网络、I/O响应导致耗费大量的执行时间,可以采用异步线程的方式来减少阻塞
Tomcat 7 以前的 I/O 模型

多线程的应用场景

[*]客户端阻塞 如果客户端只有一个线程,这个线程发起读取文件的操作必须等待 IO 流返回,线程(客户端)才能做其他的事
[*]线程级别阻塞(BIO) : 客户端只有一个线程情况下,会导致整个客户端阻塞。那么我们可以使用多线程,一部分线程在等待 IO 操作返回的同时其他线程可以继续做其他的事。此时从客户端角度来说,客户端没有闲着。
tomcat 模型:


多个客户端都是阻塞的,我只有处理完一个请求才能接收下一个请求。然后客户端就会阻塞。所以 Tomcat 采用了多线程的技术。利用了多线程的技术实现了非阻塞。

如何应用多线程

在 JAVA 中有多个方式来实现多线程。继承 Thread 类、实现 Runable 接口、使用 ExecutorService 、Callable、Future 实现带返回结果的多线程。

[*]Thread
[*]Runable
[*]Callable / Future 可以实现带返回值的线程
继承 Thread 类创建线程

​        Thread 类本质上是实现了 Runable 接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过 Thread 类的 start() 方法。 start() 方法是一个 native 方法。它会启动一个新线程,并执行 run() 方法。这种实现多线程很简单,通过自己的类直接 extends Thread , 并重写 run() 方法,就可以启动新线程并执行自己定义的 run() 方法。
public class MyThread extends Thread{
    public static void main(String[] args) {
      new MyThread().start();
      new MyThread().start();
    }
    @Override
    public void run() {
      System.out.println("MyThrea run().....");
    }
}实现 Runable 接口创建线程

​        如果自己的类已经继承了另一个类,就无法直接继承 Thread,此时,可以实现 Runable 接口。
public class RunableDemo implements Runnable {
    public static void main(String[] args) {
      new Thread(new RunableDemo()).start();
      new Thread(new RunableDemo()).start();
    }
    @Override
    public void run() {
      System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
    }
}runable My run().....
runable My run().....错误的写法:

public class RunableDemo implements Runnable {
//    public static void main(String[] args) {
//      new Thread(new RunableDemo()).start();
//      new Thread(new RunableDemo()).start();
//    }

    public static void main(String[] args) {
      new RunableDemo().run();
      new RunableDemo().run();
    }
    @Override
    public void run() {
      System.out.println("[" + Thread.currentThread().getName() + "]" + "runable My run().....");
    }
}runable My run().....
runable My run().....实现 Callable 接口通过 FutureTask 包装器来创建 Thread 线程

​        有的时候,我们可能需要让异步执行的线程在执行完以后,提供一个返回值到当前的主线程,主线程需要这个值进行后续的逻辑处理,那么这个时候,就需要带返回值的线程了。
/***
* 当你想要异步的线程执行你的某一个逻辑,那么在这个运行结束以后
* 我想要拿到子线程运行的结果
*/
public class CallableDemo implements Callable<String> {
    public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newSingleThreadExecutor();

      CallableDemo callableDemo = new CallableDemo();

      Future<String> future = executorService.submit(callableDemo);
      /***
         * 这里可以写其他的业务
         * 去写其他东西
         */
      String returnValue = future.get(); // 这个地方在阻塞
      System.out.println(returnValue);
      executorService.shutdown();
    }

    @Override
    public String call() throws Exception {
      return "darain" + 1;
    }
}如何把多线程用得优雅

合理地利用异步操作,可以大大地提升程序的处理性能,下面这个案例,如何看过 zookeeper 源码的同学应该看到过。
通过阻塞队列以及多线程的方式,实现对请求的异步化处理,提升处理性能。
模仿多个线程处理同一个请求

@Data
public class Request {
    private String name;
}public interface RequestProcessor {
    void processorRequest(Request requset);
}@RequiredArgsConstructor
public class PrintProcessor extends Thread implements RequestProcessor {
    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
    private final RequestProcessor nextProcess;

    @Override
    public void processorRequest(Request requset) {
      linkedBlockingQueue.add(requset);
    }

    @Override
    public void run() {
      while (true) {
            try {
                Request requset = linkedBlockingQueue.take();
                out.println("[" + Thread.currentThread().getName() + "] " + "print Data:" + requset);
                nextProcess.processorRequest(requset);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
}@RequiredArgsConstructor
public class SaveProcessor extends Thread implements RequestProcessor {

    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();

    @Override
    public void processorRequest(Request requset) {
      linkedBlockingQueue.add(requset);
    }

    @Override
    public void run() {
      while (true) {
            try {
                Request requset = linkedBlockingQueue.take();
                System.out.println("[" + Thread.currentThread().getName() + "] " + "save data:" + requset);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
}/***
* 我们去处理的时候,用异步线程去处理。
* 当我们把一个请求丢过来的时候,不是直接去处理,而是通过异步线程去处理。
* zookeeper 就是类似的处理,一方面,你可以通过你的处理把职责划分开。
* 一方面你可以通过异步线程的处理去提升你程序的性能
* 合理地利用你 CPU 的资源
*
* 这个和 zookeeper 里边非常像
*/
public class Demo {
    private final PrintProcessor printProcessor;

    public Demo() {
      SaveProcessor saveProcessor = new SaveProcessor();
      saveProcessor.start();
      printProcessor = new PrintProcessor(saveProcessor);
      printProcessor.start();
    }

    public static void main(String[] args) {
      Request requset = new Request();
      requset.setName("darian");
      new Demo().doTest(requset);
    }

    public void doTest(Request request) {
      printProcessor.processorRequest(request);
    }
}
就像一个链表一样地,上一个对象的引用指向下一个对象。是不会乱序的。
线程的基础知识

​        线程作为操作系统调度的最小单元,并且能够让多线程同时执行,极大地提高了程序的性能,在多核的环境下的优势更加明显。但是在多线程的使用过程中如果对它的特性和原理不够了解的话,就容易造成各种问题。
线程的状态(六种)

JAVA 线程既然能够创建,那么也会销毁,所以线程是存在生命周期的。那么我们接下来从线程的生命周期开始去了解线程。
线程一共六种状态
(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED)
NEW

​        初始状态,线程被构建,但是还没有调用 #start 方法
RUNNABLE

​        运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为 "运行中"
BLOCKED

阻塞,表示线程进如等待状态,也就是线程因为某种原因放弃了 CPU 的使用权,阻塞也分为几种情况。

[*]等待阻塞 运行的线程调用了 #wait 方法,JVM 会把当前线程放到等待队列
[*]同步阻塞synchronized ,运行的线程在获取对象的同步锁时,若该同步锁被其他线程占用了,那么 JVM 会把当前的线程放入到锁池中。
[*]其他阻塞sleep / join运行的线程执行 Thread.sleep()或者 t.join 方法,或者发出了 I/O 请求时, JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复。
WAITING
等待   (waiting) 是我们的线程调用了一个 #wait 方法,实际上也会变成一个阻塞。就是我们没有办法继续去运行线程了。
TIME_WAITING

​       超时等待状态,超时以后自动返回
TERMINATED

​        终止状态,表示当前线程执行完毕
线程运行状态图:


线程的运行状态有两种状态,
不存在就绪的状态,只是为了描述它的一个状态。
打开 Thread 类,搜索 state 有哪些状态,它写得很清楚。
当运行中的线程的时间片被 CPU 抢占的时候,那么它又会变成一个就绪状态。
线程执行完就是终止。
synchroninzed 就是让这个线程获得锁。获得锁,就意味着,其他线程在调用这个方法的时候,它会阻塞。当我们获得锁的时候。比如说我们现在有两个线程。第一个 T1 线程访问同步代码块。同步代码块里面,首先它会获得一个锁。当 T2 线程进来以后,它是没有办法获得锁的。
线程状态:

public class ThreadStatusDemo {
    public static void main(String[] args) {
      new Thread(() -> {
            while (true) {
                try {
                  TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
            }
      }, "timewaiting").start();

      new Thread(() -> {
            while (true) {   // 我们在一个循环里边获得一个锁
                synchronized (ThreadStatusDemo.class) {
                  try {
                        // 然后调用 wait()方法,是因为它调用 wait 方法之前必须要获得锁
                        ThreadStatusDemo.class.wait();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                }
            }
      }, "waiting").start();


      new Thread(new blockDemo(), "blockDemo-0").start();
      new Thread(new blockDemo(), "blockDemo-1").start();
    }

    static class blockDemo extends Thread {
      @Override
      public void run() {
            synchronized (blockDemo.class) {
                while (true) {
                  try { // 100 秒,一直让它阻塞
                        TimeUnit.SECONDS.sleep(100);
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                }
            }
      }
    }
}通过相应的命令显示线程状态:


[*]打开终端或者命令提示符,键入 JPS ,(JDK 1.5 提供的要给显示当前所有的 JAVA 进程 PID 的命令),可以获得相应进程的 PID
[*]根据上一步骤获得的 PID,继续输入jstack+ pid(jstack 时 JAVA 虚拟机自带的一种堆栈跟踪工具。jstack 会打印出给定的 JAVA 进程 ID 或 core file 或远程调试服务的 java 堆栈信息)
我们在写线程的时候,最好定义一个名称。我们去查看问题的时候,有利于我们去排查问题。
阻塞状态,blocked 当 synchronized 加锁的情况下,两个线程同时去访问一个方法,这个时候,就会存在 阻塞。
JPS 是 JDK 1.5 以后,显示所有 JAVA 进程的命令。
jstack 30112可以查看线程的状态。

[*]blockDemo-0 获得锁,变成了一个 TIMED_WAITING 的状态。#sleep
[*]blockDemo-1 没有拿到锁(on object monitor)
[*]TIMED_WAITING#sleep 方法
"DestroyJavaVM" #18 prio=5 os_prio=0 tid=0x0000000002a02800 nid=0x697c waiting on condition
   java.lang.Thread.State: RUNNABLE

"blockDemo-1" #17 prio=5 os_prio=0 tid=0x0000000029066800 nid=0x5f74 waiting for monitor entry
   java.lang.Thread.State: BLOCKED (on object monitor)

"blockDemo-0" #15 prio=5 os_prio=0 tid=0x0000000029065800 nid=0x36f8 waiting on condition
   java.lang.Thread.State: TIMED_WAITING (sleeping)

"waiting" #13 prio=5 os_prio=0 tid=0x0000000029061000 nid=0x7bd0 in Object.wait()
   java.lang.Thread.State: WAITING (on object monitor)

"timewaiting" #12 prio=5 os_prio=0 tid=0x0000000029094800 nid=0x6bd4 waiting on condition
   java.lang.Thread.State: TIMED_WAITING (sleeping)

"Service Thread" #11 daemon prio=9 os_prio=0 tid=0x0000000027540800 nid=0x8310 runnable
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #10 daemon prio=9 os_prio=2 tid=0x000000002747e000 nid=0x5344 waiting on condition
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000002747d000 nid=0x24c0 waiting on condition
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x0000000027475800 nid=0x7c30 waiting on condition
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x0000000027474800 nid=0x5c78 waiting on condition
   java.lang.Thread.State: RUNNABLE

"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000002745c800 nid=0xde0 runnable
   java.lang.Thread.State: RUNNABLE

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x00000000273ba000 nid=0x3434 waiting on condition
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x0000000027411800 nid=0x839c runnable
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x00000000273a3800 nid=0x79bc in Object.wait()
   java.lang.Thread.State: WAITING (on object monitor)

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x0000000002afa000 nid=0x4e7c in Object.wait()
   java.lang.Thread.State: WAITING (on object monitor)我们很多时候,要多发现线程的信息。
线程的启动和终止

你怎么去启动一个线程?终止?
#startnative 方法,告诉 JVM 去启动一个线程。然后调用 #run 方法去执行。
#stop 方法是不建议使用的。 @Deprecated !!它就像我们在 Linux 系统中,kill 命令一样,就是我不知道我当前这个线程是不是还在运行,有没有还没处理完的。没有处理完的话,我强制关闭,就会出现一些数据问题,和一些不可预测地问题出现。 #susped, #resume 。
怎么样优雅的关闭?我们关闭 Tomcat 也好,关闭一些进程也好,我们都会提供一些优雅的方式去关闭。一些指令去执行,一般的中间件都会做一个操作,一般都会先去阻止后续的请求进来,然后等待正在运行的线程执行完以后优雅地停止掉。
#interrupt 优雅中断的方式。

​        当其他线程通过调用当前线程的 #interrupt 方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己。线程通过检查自身是或否被中断来进行响应,可以通过 isIntrrupted() 来判断是否被中断。
实现线程终止的逻辑:
public class InterruptDemo {
    private static int i;
    public static void main(String[] args) throws InterruptedException {
      Thread thread = new Thread(() -> {
            // 我去判断是否中断这个线程
            while (!Thread.currentThread().isInterrupted()) {
                i++;
            }
            System.out.println(i);
      }, "interruptDemo");
      thread.start();
      TimeUnit.SECONDS.sleep(1);
      // 通过线程的 interrupt设置标识为 true
      System.out.println(thread.isInterrupted());
      thread.interrupt();
      System.out.println(thread.isInterrupted());
    }
}这种通过表示为或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止。因此更加安全和优雅。
Thread.interrupted

通过 interrupt,设置了一个标识告诉线程可以终止运行了。线程中还提供了静态方法 Thread.interrupted() 对设置中断标识的线程复位。比如在线程,外边的线程调用 thread.interrupt 来设置中断标识,而在线程里边,又通过 Thread.interrupted 把线程的标识进行了复位。
public static void interrupt1() throws InterruptedException {
    Thread thread = new Thread(() -> {
      while (true) {
            boolean interrupted = Thread.currentThread().isInterrupted();
            if (interrupted) {
                System.out.println("before:" + interrupted);
                Thread.interrupted(); // 对线程进行复位,中断标识为 false
                System.out.println("after:" + Thread.currentThread().isInterrupted());
            }
      }
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);
    thread.interrupt(); // 设置中断标识为 true
}before:true
after:false其他的线程复位

​        除了通过 Thread.interrupted 方法对线程中断标识进行复位以外,还有一种被动复位的场景,就是对抛出 interruptedException 异常的方法,在 interruptedException 抛出之前, JVM 会先把线程的中断标识位清除,然后会抛出 InterruptedException 这个时候,如果调用 #isInterrupted 方法,将会返回 false。
public static void interrupt2() throws InterruptedException {
    Thread thread = new Thread(() -> {
      while (true) {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                // 抛出该异常,会将复位表示设置为 false
                e.printStackTrace();
            }
      }
    });
    thread.start();
    thread.interrupt(); // 将复位表示设置为 true
    TimeUnit.SECONDS.sleep(1);
    System.out.println("before:" + thread.isInterrupted());
    TimeUnit.SECONDS.sleep(1);
    System.out.println("after:" + thread.isInterrupted());
}
通过指令的方式,volatile boolean isStop = false; 这样的一个方式,也是可以的。通过内存的可见。
interrupt 和我们设置标志变量的方式是一样的。
java.lang.Thread#interrupt

[*]java.lang.Thread#interrupt0   native 方法
thread.cpp
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
debug_only(check_for_dangling_thread_pointer(thread);)
// Note:If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}os_linux.cpp
void os::interrupt(Thread* thread) {
    assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer");
    OSThread* osthread = thread->osthread();
    if (!osthread->interrupted()) {
      osthread->set_interrupted(true);
      // More than one thread can get here with the same value of osthread,
      // resulting in multiple notifications. We do, however, want the store
      // to interrupted() to be visible to other threads before we execute unpark().
      OrderAccess::fence();
      ParkEvent * const slp = thread->_SleepEvent ;
      if (slp != NULL) slp->unpark() ;
    }
    // For JSR166. Unpark even if interrupt status already was set
    if (thread->is_Java_thread())
      ((JavaThread*)thread)->parker()->unpark();
    ParkEvent * ev = thread->_ParkEvent ;
    if (ev != NULL) ev->unpark() ;
}内存屏障 fence() ,让标志位改变,让所有线程看见,和 volatile 一个意思。
unpark() 线程。
其实就是通过 unpark 去唤醒
Thread#interrupted是一个静态方法,对设置的中断标识的线程进行复位。
线程的停止方法之 2

​        除了通过 #interrupt 标识去中断线程以外,我们可以通过 :
​        定义一个 volatile 修饰的成员变量,来控制线程的终止。这实际上是应用了 volatile 实现多线程之间的共享变量可见性这一特点来实现的。
public class ThreadStopDemo3 {
    // 这种和 interrupted 方式是一样的。
    private static volatile boolean stop = true;

    public static void main(String[] args) throws InterruptedException {
      Thread thread = new Thread(() -> {
            int i = 0;
            while (!stop) {
                i++;
            }
      });
      thread.start();
      System.out.println("begin start thread");
      Thread.sleep(1000);
      stop = true;
    }
}线程的安全性


[*]可见性
[*]原子性
[*]有序性
认识这三个问题。
可见性

/***
* 可见性问题
*/
public class VisableDemo {
    // 加上 volatile 之后,才可以停止。
    private volatile static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
      Thread thread = new Thread(() -> {
            int i = 0;
            while (!stop) {
                i++;
            }
      });
      thread.start();
      TimeUnit.SECONDS.sleep(1);
      stop = true;
    }
}原子性

/*** * */public class AutomicDemo {    private static int count = 0;    public static void main(String[] args) throws InterruptedException {      for (int i = 0; i < 1000; i++) {            new Thread(AutomicDemo::inc).start();      }      Thread.sleep(4000);      System.out.println("y运行结果:" + count);      // y运行结果:952    (
页: [1]
查看完整版本: 百万架构师第四十五课:并发编程的基础|JavaGuide