找回密码
 立即注册
首页 业界区 业界 【多线程】Java多线程与并发编程全解析

【多线程】Java多线程与并发编程全解析

零幸 2025-6-3 00:03:05
Java多线程与并发编程全解析

多线程编程是Java中最具挑战性的部分之一,它能够显著提升应用程序的性能和响应能力。本文将全面解析Java多线程与并发编程的核心概念、线程安全机制以及JUC工具类的使用,并提供完整的代码示例。
1. 线程的基本操作与生命周期

Java线程的生命周期包括新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)、等待(Waiting)、超时等待(Timed Waiting)和终止(Terminated)七个状态。
  1. public class ThreadLifecycleExample {
  2.     public static void main(String[] args) throws InterruptedException {
  3.         // 创建线程
  4.         Thread t = new Thread(() -> {
  5.             System.out.println("线程状态1: " + Thread.currentThread().getState()); // RUNNABLE
  6.             
  7.             try {
  8.                 // 线程休眠,进入TIMED_WAITING状态
  9.                 Thread.sleep(1000);
  10.                 System.out.println("线程状态2: " + Thread.currentThread().getState());
  11.                
  12.                 // 同步块,可能进入BLOCKED状态
  13.                 synchronized (ThreadLifecycleExample.class) {
  14.                     System.out.println("线程获得锁");
  15.                 }
  16.                
  17.                 // 线程等待,进入WAITING状态
  18.                 synchronized (ThreadLifecycleExample.class) {
  19.                     ThreadLifecycleExample.class.wait();
  20.                 }
  21.             } catch (InterruptedException e) {
  22.                 e.printStackTrace();
  23.             }
  24.             System.out.println("线程状态3: " + Thread.currentThread().getState()); // RUNNABLE
  25.         });
  26.         
  27.         System.out.println("线程状态0: " + t.getState()); // NEW
  28.         
  29.         // 启动线程
  30.         t.start();
  31.         System.out.println("线程状态4: " + t.getState()); // RUNNABLE或TIMED_WAITING
  32.         
  33.         // 主线程休眠
  34.         Thread.sleep(2000);
  35.         System.out.println("线程状态5: " + t.getState()); // 可能是WAITING或TERMINATED
  36.         
  37.         // 唤醒等待的线程
  38.         synchronized (ThreadLifecycleExample.class) {
  39.             ThreadLifecycleExample.class.notify();
  40.         }
  41.         
  42.         // 等待线程执行完毕
  43.         t.join();
  44.         System.out.println("线程状态6: " + t.getState()); // TERMINATED
  45.     }
  46. }
复制代码
2. 线程安全与同步机制

线程安全问题主要由竞态条件(Race Condition)和内存可见性问题引起。Java提供了多种同步机制来解决这些问题。
  1. import java.util.concurrent.locks.Lock;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. public class ThreadSafetyExample {
  4.     private static int counter = 0; // 共享资源
  5.     private static final Object lock = new Object(); // 锁对象
  6.     private static final Lock reentrantLock = new ReentrantLock(); // 可重入锁
  7.    
  8.     // 方式1: synchronized方法
  9.     public static synchronized void incrementSynchronized() {
  10.         counter++;
  11.     }
  12.    
  13.     // 方式2: synchronized块
  14.     public static void incrementBlock() {
  15.         synchronized (lock) {
  16.             counter++;
  17.         }
  18.     }
  19.    
  20.     // 方式3: ReentrantLock
  21.     public static void incrementReentrantLock() {
  22.         reentrantLock.lock();
  23.         try {
  24.             counter++;
  25.         } finally {
  26.             reentrantLock.unlock();
  27.         }
  28.     }
  29.    
  30.     // 方式4: 使用原子类
  31.     private static java.util.concurrent.atomic.AtomicInteger atomicCounter = new java.util.concurrent.atomic.AtomicInteger(0);
  32.     public static void incrementAtomic() {
  33.         atomicCounter.incrementAndGet();
  34.     }
  35.    
  36.     // 演示线程不安全的情况
  37.     public static void incrementUnsafe() {
  38.         counter++; // 非线程安全操作
  39.     }
  40.    
  41.     public static void main(String[] args) throws InterruptedException {
  42.         int threadCount = 1000;
  43.         Thread[] threads = new Thread[threadCount];
  44.         
  45.         // 测试非线程安全的方法
  46.         counter = 0;
  47.         for (int i = 0; i < threadCount; i++) {
  48.             threads[i] = new Thread(ThreadSafetyExample::incrementUnsafe);
  49.             threads[i].start();
  50.         }
  51.         for (Thread t : threads) t.join();
  52.         System.out.println("非线程安全计数器结果: " + counter); // 可能不等于1000
  53.         
  54.         // 测试原子类
  55.         for (int i = 0; i < threadCount; i++) {
  56.             threads[i] = new Thread(ThreadSafetyExample::incrementAtomic);
  57.             threads[i].start();
  58.         }
  59.         for (Thread t : threads) t.join();
  60.         System.out.println("原子类计数器结果: " + atomicCounter.get()); // 一定等于1000
  61.     }
  62. }
复制代码
3. JUC包中的并发工具类

JUC(java.util.concurrent)包提供了丰富的并发工具类,极大简化了多线程编程。
3.1 Executor框架与线程池

Executor框架是管理线程的核心组件,线程池是其主要实现。
  1. import java.util.concurrent.*;
  2. public class ExecutorFrameworkExample {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         // 创建固定大小的线程池
  5.         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  6.         
  7.         // 创建缓存线程池
  8.         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  9.         
  10.         // 创建单线程执行器
  11.         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  12.         
  13.         // 创建定时任务线程池
  14.         ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
  15.         
  16.         // 提交任务到固定大小线程池
  17.         for (int i = 0; i < 5; i++) {
  18.             final int taskId = i;
  19.             fixedThreadPool.submit(() -> {
  20.                 System.out.println("任务" + taskId + "在固定大小线程池执行");
  21.                 try {
  22.                     Thread.sleep(1000);
  23.                 } catch (InterruptedException e) {
  24.                     e.printStackTrace();
  25.                 }
  26.             });
  27.         }
  28.         
  29.         // 提交定时任务
  30.         scheduledExecutor.schedule(() -> {
  31.             System.out.println("延迟3秒执行的定时任务");
  32.         }, 3, TimeUnit.SECONDS);
  33.         
  34.         // 提交周期性任务
  35.         scheduledExecutor.scheduleAtFixedRate(() -> {
  36.             System.out.println("每2秒执行一次的周期性任务");
  37.         }, 1, 2, TimeUnit.SECONDS);
  38.         
  39.         // 关闭线程池
  40.         fixedThreadPool.shutdown();
  41.         cachedThreadPool.shutdown();
  42.         singleThreadExecutor.shutdown();
  43.         
  44.         // 等待定时任务执行一段时间后关闭
  45.         Thread.sleep(10000);
  46.         scheduledExecutor.shutdown();
  47.     }
  48. }
复制代码
3.2 CountDownLatch

CountDownLatch用于让一个或多个线程等待其他线程完成操作。
  1. import java.util.concurrent.CountDownLatch;
  2. public class CountDownLatchExample {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         int workerCount = 5;
  5.         CountDownLatch latch = new CountDownLatch(workerCount);
  6.         
  7.         // 创建并启动工作线程
  8.         for (int i = 0; i < workerCount; i++) {
  9.             final int workerId = i;
  10.             new Thread(() -> {
  11.                 System.out.println("工作线程" + workerId + "开始执行");
  12.                 try {
  13.                     // 模拟工作耗时
  14.                     Thread.sleep((long) (Math.random() * 5000));
  15.                     System.out.println("工作线程" + workerId + "完成任务");
  16.                 } catch (InterruptedException e) {
  17.                     e.printStackTrace();
  18.                 } finally {
  19.                     // 计数减1
  20.                     latch.countDown();
  21.                 }
  22.             }).start();
  23.         }
  24.         
  25.         // 主线程等待所有工作线程完成
  26.         System.out.println("主线程等待所有工作线程完成...");
  27.         latch.await();
  28.         System.out.println("所有工作线程已完成,主线程继续执行");
  29.     }
  30. }
复制代码
3.3 CyclicBarrier

CyclicBarrier用于多个线程互相等待,直到所有线程都到达某个屏障点。
  1. import java.util.concurrent.BrokenBarrierException;
  2. import java.util.concurrent.CyclicBarrier;
  3. public class CyclicBarrierExample {
  4.     public static void main(String[] args) {
  5.         int threadCount = 3;
  6.         // 创建CyclicBarrier,当3个线程都到达屏障时执行回调
  7.         CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
  8.             System.out.println("所有线程都已到达屏障,继续执行");
  9.         });
  10.         
  11.         // 创建并启动线程
  12.         for (int i = 0; i < threadCount; i++) {
  13.             final int threadId = i;
  14.             new Thread(() -> {
  15.                 try {
  16.                     System.out.println("线程" + threadId + "正在执行前置任务");
  17.                     Thread.sleep((long) (Math.random() * 3000));
  18.                     System.out.println("线程" + threadId + "已到达屏障");
  19.                     
  20.                     // 等待其他线程到达屏障
  21.                     barrier.await();
  22.                     
  23.                     System.out.println("线程" + threadId + "继续执行后续任务");
  24.                 } catch (InterruptedException | BrokenBarrierException e) {
  25.                     e.printStackTrace();
  26.                 }
  27.             }).start();
  28.         }
  29.     }
  30. }
复制代码
3.4 Semaphore

Semaphore用于控制同时访问某个资源的线程数量。
  1. import java.util.concurrent.Semaphore;
  2. public class SemaphoreExample {
  3.     private static final int MAX_PERMITS = 3; // 最多允许3个线程同时访问
  4.     private static final Semaphore semaphore = new Semaphore(MAX_PERMITS);
  5.    
  6.     public static void main(String[] args) {
  7.         // 创建10个线程,但最多只允许3个同时执行
  8.         for (int i = 0; i < 10; i++) {
  9.             final int threadId = i;
  10.             new Thread(() -> {
  11.                 try {
  12.                     // 获取许可
  13.                     semaphore.acquire();
  14.                     System.out.println("线程" + threadId + "获取到许可,开始执行");
  15.                     
  16.                     // 模拟执行任务
  17.                     Thread.sleep((long) (Math.random() * 5000));
  18.                     
  19.                     System.out.println("线程" + threadId + "执行完毕,释放许可");
  20.                     // 释放许可
  21.                     semaphore.release();
  22.                 } catch (InterruptedException e) {
  23.                     e.printStackTrace();
  24.                 }
  25.             }).start();
  26.         }
  27.     }
  28. }
复制代码
3.5 Exchanger

Exchanger用于两个线程之间交换数据。
  1. import java.util.concurrent.Exchanger;
  2. public class ExchangerExample {
  3.     public static void main(String[] args) {
  4.         Exchanger<String> exchanger = new Exchanger<>();
  5.         
  6.         // 生产者线程
  7.         new Thread(() -> {
  8.             try {
  9.                 String dataToSend = "来自生产者的数据";
  10.                 System.out.println("生产者发送: " + dataToSend);
  11.                
  12.                 // 交换数据
  13.                 String receivedData = exchanger.exchange(dataToSend);
  14.                 System.out.println("生产者收到: " + receivedData);
  15.             } catch (InterruptedException e) {
  16.                 e.printStackTrace();
  17.             }
  18.         }).start();
  19.         
  20.         // 消费者线程
  21.         new Thread(() -> {
  22.             try {
  23.                 String dataToSend = "来自消费者的数据";
  24.                 System.out.println("消费者发送: " + dataToSend);
  25.                
  26.                 // 交换数据
  27.                 String receivedData = exchanger.exchange(dataToSend);
  28.                 System.out.println("消费者收到: " + receivedData);
  29.             } catch (InterruptedException e) {
  30.                 e.printStackTrace();
  31.             }
  32.         }).start();
  33.     }
  34. }
复制代码
3.6 Future和CompletableFuture

Future用于异步获取计算结果,CompletableFuture是Future的增强版,提供了更丰富的异步编程功能。
  1. import java.util.concurrent.*;
  2. public class FutureExample {
  3.     public static void main(String[] args) throws InterruptedException, ExecutionException {
  4.         ExecutorService executor = Executors.newSingleThreadExecutor();
  5.         
  6.         // 使用Future
  7.         Future<Integer> future = executor.submit(() -> {
  8.             // 模拟耗时计算
  9.             Thread.sleep(2000);
  10.             return 1 + 2;
  11.         });
  12.         
  13.         // 主线程可以做其他事情
  14.         System.out.println("主线程继续执行");
  15.         
  16.         // 获取异步计算结果
  17.         if (future.isDone()) {
  18.             System.out.println("计算已完成,结果: " + future.get());
  19.         } else {
  20.             System.out.println("计算未完成,等待...");
  21.             System.out.println("计算结果: " + future.get()); // 阻塞直到计算完成
  22.         }
  23.         
  24.         // 使用CompletableFuture
  25.         CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
  26.             try {
  27.                 Thread.sleep(1000);
  28.             } catch (InterruptedException e) {
  29.                 e.printStackTrace();
  30.             }
  31.             return 3 + 4;
  32.         });
  33.         
  34.         // 链式调用,处理计算结果
  35.         completableFuture
  36.             .thenApply(result -> result * 2)
  37.             .thenAccept(finalResult -> System.out.println("CompletableFuture最终结果: " + finalResult));
  38.         
  39.         // 多任务组合
  40.         CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> 10);
  41.         CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 20);
  42.         
  43.         CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
  44.         
  45.         CompletableFuture<Integer> combinedResult = allTasks.thenApply(v -> {
  46.             try {
  47.                 return task1.get() + task2.get();
  48.             } catch (InterruptedException | ExecutionException e) {
  49.                 e.printStackTrace();
  50.                 return 0;
  51.             }
  52.         });
  53.         
  54.         System.out.println("组合任务结果: " + combinedResult.get());
  55.         
  56.         executor.shutdown();
  57.     }
  58. }
复制代码
4. 线程池的原理与最佳实践

线程池通过复用线程减少线程创建和销毁的开销,提高性能。
  1. import java.util.concurrent.*;
  2. public class ThreadPoolBestPractice {
  3.     public static void main(String[] args) {
  4.         // 自定义线程池配置
  5.         ThreadPoolExecutor executor = new ThreadPoolExecutor(
  6.             5,                   // 核心线程数
  7.             10,                  // 最大线程数
  8.             60,                  // 线程空闲时间
  9.             TimeUnit.SECONDS,
  10.             new LinkedBlockingQueue<>(100), // 任务队列
  11.             Executors.defaultThreadFactory(), // 线程工厂
  12.             new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
  13.         );
  14.         
  15.         // 提交任务
  16.         for (int i = 0; i < 20; i++) {
  17.             final int taskId = i;
  18.             executor.submit(() -> {
  19.                 System.out.println("任务" + taskId + "由线程" + Thread.currentThread().getName() + "执行");
  20.                 try {
  21.                     Thread.sleep(1000);
  22.                 } catch (InterruptedException e) {
  23.                     e.printStackTrace();
  24.                 }
  25.             });
  26.         }
  27.         
  28.         // 监控线程池状态
  29.         System.out.println("线程池状态: 核心线程数=" + executor.getCorePoolSize() +
  30.                           ", 最大线程数=" + executor.getMaximumPoolSize() +
  31.                           ", 当前线程数=" + executor.getPoolSize() +
  32.                           ", 活跃线程数=" + executor.getActiveCount() +
  33.                           ", 队列任务数=" + executor.getQueue().size());
  34.         
  35.         // 关闭线程池
  36.         executor.shutdown(); // 不再接受新任务,但会执行完已提交的任务
  37.         
  38.         try {
  39.             // 等待所有任务完成
  40.             if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
  41.                 executor.shutdownNow(); // 强制关闭
  42.             }
  43.         } catch (InterruptedException e) {
  44.             executor.shutdownNow();
  45.         }
  46.         
  47.         System.out.println("线程池已关闭");
  48.     }
  49. }
复制代码
5. 并发集合类

JUC包提供了多种线程安全的集合类,替代了传统的同步集合。
  1. import java.util.*;
  2. import java.util.concurrent.*;
  3. public class ConcurrentCollectionExample {
  4.     public static void main(String[] args) throws InterruptedException {
  5.         // ConcurrentHashMap示例
  6.         ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
  7.         
  8.         // 多个线程同时操作map
  9.         Thread t1 = new Thread(() -> {
  10.             for (int i = 0; i < 1000; i++) {
  11.                 concurrentMap.put("key" + i, i);
  12.             }
  13.         });
  14.         
  15.         Thread t2 = new Thread(() -> {
  16.             for (int i = 0; i < 1000; i++) {
  17.                 concurrentMap.get("key" + i);
  18.             }
  19.         });
  20.         
  21.         t1.start();
  22.         t2.start();
  23.         t1.join();
  24.         t2.join();
  25.         
  26.         System.out.println("ConcurrentHashMap大小: " + concurrentMap.size());
  27.         
  28.         // CopyOnWriteArrayList示例
  29.         CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
  30.         
  31.         Thread writer = new Thread(() -> {
  32.             for (int i = 0; i < 100; i++) {
  33.                 list.add("element" + i);
  34.                 try {
  35.                     Thread.sleep(10);
  36.                 } catch (InterruptedException e) {
  37.                     e.printStackTrace();
  38.                 }
  39.             }
  40.         });
  41.         
  42.         Thread reader = new Thread(() -> {
  43.             for (int i = 0; i < 20; i++) {
  44.                 System.out.println("List内容: " + list);
  45.                 try {
  46.                     Thread.sleep(100);
  47.                 } catch (InterruptedException e) {
  48.                     e.printStackTrace();
  49.                 }
  50.             }
  51.         });
  52.         
  53.         writer.start();
  54.         reader.start();
  55.         writer.join();
  56.         reader.join();
  57.         
  58.         // ConcurrentLinkedQueue示例
  59.         ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
  60.         
  61.         // 生产者线程
  62.         Thread producer = new Thread(() -> {
  63.             for (int i = 0; i < 10; i++) {
  64.                 queue.offer("item" + i);
  65.                 System.out.println("生产: " + "item" + i);
  66.                 try {
  67.                     Thread.sleep(200);
  68.                 } catch (InterruptedException e) {
  69.                     e.printStackTrace();
  70.                 }
  71.             }
  72.         });
  73.         
  74.         // 消费者线程
  75.         Thread consumer = new Thread(() -> {
  76.             while (true) {
  77.                 String item = queue.poll();
  78.                 if (item != null) {
  79.                     System.out.println("消费: " + item);
  80.                 } else if (producer.getState() == Thread.State.TERMINATED) {
  81.                     break; // 生产者已结束且队列为空
  82.                 }
  83.                 try {
  84.                     Thread.sleep(300);
  85.                 } catch (InterruptedException e) {
  86.                     e.printStackTrace();
  87.                 }
  88.             }
  89.         });
  90.         
  91.         producer.start();
  92.         consumer.start();
  93.         producer.join();
  94.         consumer.join();
  95.     }
  96. }
复制代码
6. 原子操作类

原子操作类基于CAS(Compare-And-Swap)实现,提供了高效的线程安全操作。
  1. import java.util.concurrent.atomic.*;
  2. public class AtomicExample {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         // AtomicInteger示例
  5.         AtomicInteger atomicInteger = new AtomicInteger(0);
  6.         
  7.         // 多个线程同时递增
  8.         Thread[] threads = new Thread[10];
  9.         for (int i = 0; i < 10; i++) {
  10.             threads[i] = new Thread(() -> {
  11.                 for (int j = 0; j < 1000; j++) {
  12.                     atomicInteger.incrementAndGet(); // 原子递增
  13.                 }
  14.             });
  15.             threads[i].start();
  16.         }
  17.         
  18.         // 等待所有线程完成
  19.         for (Thread t : threads) {
  20.             t.join();
  21.         }
  22.         
  23.         System.out.println("AtomicInteger最终值: " + atomicInteger.get()); // 应输出10000
  24.         
  25.         // AtomicReference示例
  26.         AtomicReference<String> atomicReference = new AtomicReference<>("初始值");
  27.         
  28.         Thread t1 = new Thread(() -> {
  29.             boolean updated = atomicReference.compareAndSet("初始值", "新值1");
  30.             System.out.println("线程1更新结果: " + updated);
  31.         });
  32.         
  33.         Thread t2 = new Thread(() -> {
  34.             boolean updated = atomicReference.compareAndSet("初始值", "新值2");
  35.             System.out.println("线程2更新结果: " + updated);
  36.         });
  37.         
  38.         t1.start();
  39.         t2.start();
  40.         t1.join();
  41.         t2.join();
  42.         
  43.         System.out.println("AtomicReference最终值: " + atomicReference.get());
  44.         
  45.         // LongAdder示例 - 高并发场景下比AtomicLong更高效
  46.         LongAdder longAdder = new LongAdder();
  47.         
  48.         Thread[] adderThreads = new Thread[20];
  49.         for (int i = 0; i < 20; i++) {
  50.             adderThreads[i] = new Thread(() -> {
  51.                 for (int j = 0; j < 10000; j++) {
  52.                     longAdder.increment();
  53.                 }
  54.             });
  55.             adderThreads[i].start();
  56.         }
  57.         
  58.         // 等待所有线程完成
  59.         for (Thread t : adderThreads) {
  60.             t.join();
  61.         }
  62.         
  63.         System.out.println("LongAdder最终值: " + longAdder.sum());
  64.     }
  65. }
复制代码
总结

Java多线程与并发编程是一个复杂但强大的领域,掌握这些核心概念和工具能够帮助你编写高效、安全且易于维护的多线程应用程序。
关键要点回顾:

  • 线程的生命周期和基本操作
  • 线程安全与同步机制(synchronized、ReentrantLock、原子类)
  • JUC包中的并发工具类(Executor框架、CountDownLatch、CyclicBarrier等)
  • 线程池的原理和最佳实践
  • 并发集合类(ConcurrentHashMap、CopyOnWriteArrayList等)
  • 原子操作类(AtomicInteger、LongAdder等)
通过合理使用这些工具和技术,可以有效解决多线程编程中的各种挑战,如竞态条件、内存可见性和线程管理等问题。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册