找回密码
 立即注册
首页 业界区 业界 Rust 中的 Tokio 线程同步机制

Rust 中的 Tokio 线程同步机制

溥价 2025-9-23 18:26:23
本文分享自天翼云开发者社区《Rust 中的 Tokio 线程同步机制》,作者:l****n
Rust 中的 Tokio 线程同步机制

在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio 是一个强大的异步运行时库,为 Rust 提供了多种线程同步机制。以下是一些常见的同步机制:

  • Mutex
  • RwLock
  • Barrier
  • Semaphore
  • Notify
  • oneshot 和 mpsc 通道
  • watch 通道
1. Mutex

Mutex(互斥锁)是最常见的同步原语之一,用于保护共享数据。它确保同一时间只有一个线程能够访问数据,从而避免竞争条件。
  1. use tokio::sync::Mutex;
  2. use std::sync::Arc;
  3. #[tokio::main]
  4. async fn main() {
  5.     let data = Arc::new(Mutex::new(0));
  6.     let mut handles = vec![];
  7.     for _ in 0..10 {
  8.         let data = data.clone();
  9.         let handle = tokio::spawn(async move {
  10.             let mut lock = data.lock().await;
  11.             *lock += 1;
  12.         });
  13.         handles.push(handle);
  14.     }
  15.     for handle in handles {
  16.         handle.await.unwrap();
  17.     }
  18.     println!("Result: {}", *data.lock().await);
  19. }
复制代码
2. RwLock

RwLock(读写锁)允许多线程同时读取数据,但只允许一个线程写入数据。它比 Mutex 更加灵活,因为在读取多于写入的场景下,它能提高性能。功能上,他是读写互斥、写写互斥、读读兼容。
  1. use tokio::sync::RwLock;
  2. use std::sync::Arc;
  3. #[tokio::main]
  4. async fn main() {
  5.     let data = Arc::new(RwLock::new(0));
  6.     let read_data = data.clone();
  7.     let read_handle = tokio::spawn(async move {
  8.         let lock = read_data.read().await;
  9.         println!("Read: {}", *lock);
  10.     });
  11.     let write_data = data.clone();
  12.     let write_handle = tokio::spawn(async move {
  13.         let mut lock = write_data.write().await;
  14.         *lock += 1;
  15.         println!("Write: {}", *lock);
  16.     });
  17.     read_handle.await.unwrap();
  18.     write_handle.await.unwrap();
  19. }
复制代码
3. Barrier

Barrier 是一种同步机制,允许多个线程在某个点上进行同步。当线程到达屏障时,它们会等待直到所有线程都到达,然后一起继续执行。
  1. use tokio::sync::Barrier;
  2. use std::sync::Arc;
  3. #[tokio::main]
  4. async fn main() {
  5.     let barrier = Arc::new(Barrier::new(3));
  6.     let mut handles = vec![];
  7.     for i in 0..3 {
  8.         let barrier = barrier.clone();
  9.         let handle = tokio::spawn(async move {
  10.             println!("Before wait: {}", i);
  11.             barrier.wait().await;
  12.             println!("After wait: {}", i);
  13.         });
  14.         handles.push(handle);
  15.     }
  16.     for handle in handles {
  17.         handle.await.unwrap();
  18.     }
  19. }
复制代码
4. Semaphore

Semaphore(信号量)是一种用于控制对资源访问的同步原语。它允许多个线程访问资源,但有一个最大并发数限制。
  1. #[tokio::test]
  2. async fn test_sem() {
  3.     let semaphore = Arc::new(Semaphore::new(3));
  4.     let mut handles = vec![];
  5.     for i in 0..5 {
  6.         let semaphore = semaphore.clone();
  7.         let handle = tokio::spawn(async move {
  8.             let permit = semaphore.acquire().await.unwrap();
  9.             let now = Local::now();
  10.             println!("Got permit: {} at {:?}", i, now);
  11.             println!(
  12.                 "Semaphore available permits before sleep: {}",
  13.                 semaphore.available_permits()
  14.             );
  15.             sleep(Duration::from_secs(5)).await;
  16.             drop(permit);
  17.             println!(
  18.                 "Semaphore available permits after sleep: {}",
  19.                 semaphore.available_permits()
  20.             );
  21.         });
  22.         handles.push(handle);
  23.     }
  24.     for handle in handles {
  25.         handle.await.unwrap();
  26.     }
  27. }
复制代码
最终的结果如下
  1. Got permit: 0 at 2024-08-08T21:03:04.374666+08:00
  2. Semaphore available permits before sleep: 2
  3. Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00
  4. Semaphore available permits before sleep: 1
  5. Got permit: 2 at 2024-08-08T21:03:04.375563+08:00
  6. Semaphore available permits before sleep: 0
  7. Semaphore available permits after sleep: 0
  8. Semaphore available permits after sleep: 0
  9. Semaphore available permits after sleep: 1
  10. Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00
  11. Semaphore available permits before sleep: 1
  12. Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00
  13. Semaphore available permits before sleep: 1
  14. Semaphore available permits after sleep: 2
  15. Semaphore available permits after sleep: 3
复制代码
5. Notify

Notify 是一种用于线程间通知的简单机制。它允许一个线程通知其他线程某些事件的发生。
  1. use tokio::sync::Notify;
  2. use std::sync::Arc;
  3. #[tokio::main]
  4. async fn main() {
  5.     let notify = Arc::new(Notify::new());
  6.     let notify_clone = notify.clone();
  7.     let handle = tokio::spawn(async move {
  8.         notify_clone.notified().await;
  9.         println!("Received notification");
  10.     });
  11.     notify.notify_one();
  12.     handle.await.unwrap();
  13. }
复制代码
6. oneshot 和 mpsc 通道

oneshot 通道用于一次性发送消息,而 mpsc 通道则允许多个生产者发送消息到一个消费者。一般地onshot用于异常通知、启动分析等功能。mpsc用于实现异步消息同步
oneshot
  1. use tokio::sync::oneshot;
  2. #[tokio::main]
  3. async fn main() {
  4.     let (tx, rx) = oneshot::channel();
  5.     tokio::spawn(async move {
  6.         tx.send("Hello, world!").unwrap();
  7.     });
  8.     let message = rx.await.unwrap();
  9.     println!("Received: {}", message);
  10. }
复制代码
mpsc
  1. use tokio::sync::mpsc;
  2. #[tokio::main]
  3. async fn main() {
  4.     let (tx, mut rx) = mpsc::channel(32);
  5.     tokio::spawn(async move {
  6.         tx.send("Hello, world!").await.unwrap();
  7.     });
  8.     while let Some(message) = rx.recv().await {
  9.         println!("Received: {}", message);
  10.     }
  11. }
复制代码
7. watch 通道

watch 通道用于发送和接收共享状态的更新。它允许多个消费者监听状态的变化。
  1. use tokio::sync::watch;
  2. #[tokio::main]
  3. async fn main() {
  4.     let (tx, mut rx) = watch::channel("initial");
  5.     tokio::spawn(async move {
  6.         tx.send("updated").unwrap();
  7.     });
  8.     while rx.changed().await.is_ok() {
  9.         println!("Received: {}", *rx.borrow());
  10.     }
  11. }
复制代码
watch通道​:

  • 用于广播状态更新,一个生产者更新状态,多个消费者获取最新状态。
  • 适合配置变更、状态同步等场景。
mpsc通道​:

  • 用于传递消息队列,多个生产者发送消息,一个消费者逐条处理。
  • 适合任务队列、事件驱动等场景。
总结

Rust 中的 Tokio 提供了丰富的线程同步机制,可以根据具体需求选择合适的同步原语。常用的同步机制包括:

  • Mutex:互斥锁,保护共享数据。
  • RwLock:读写锁,允许并发读,写时独占。
  • Barrier:屏障,同步多个线程在某一点。
  • Semaphore:信号量,控制并发访问资源。
  • Notify:通知机制,用于线程间通知。
  • oneshot 和 mpsc 通道:消息传递机制。
  • watch 通道:状态更新机制。
通过这些同步机制,可以在 Rust 中编写高效、安全的并发程序。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

前天 02:59

举报

分享、互助 让互联网精神温暖你我
您需要登录后才可以回帖 登录 | 立即注册