国瑾瑶 发表于 2025-6-18 00:03:03

rust进阶.并发.Tokio.1.Tokio简介

学习要,工作也不能拉下,所以这一段时间关于rust的博文少了些。
rust要学习的内容还很多,但我觉得应该优先打好基础,这其中比较关注的是并发。
提到rust的并发,先回忆在书本有许多的内容:
1.并发和并行
2.通过信道(channel)共享进程间数据
关键库和方法
std::sync::mpscmpsc::channel()std::threadthread::spawn() 3.通过内存共享进程间数据关键库和方法
std::sync::{Arc, Mutex}std::threadstd::thread::JoinHandleJoinHandle::join() 4.Send和Sync特质Send 标记 trait 表明实现了 Send 的类型值的所有权可以在线程间传送。几乎所有的 Rust 类型都是Send 的,
 *   不过有一些例外,包括 Rc,这是因为Rc的fetch_add(引用计数)属于非原子操作Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用5.core::future::future --未来值
 * 1.future 是一个现在可能还没有准备好但将在未来某个时刻准备好的值
 * 2.Rust 提供了 Future trait 作为基础组件,这样不同的异步操作就可以在不同的数据结构上实现
 * 3.每一个实现了 Future 的类型会维护自己的进度状态信息和 “ready” 的定义
 * 4.async 关键字可以用于代码块和函数
 * 5.在一个 async 块或 async 函数中,可以使用 await 关键字来等待一个 future 准备就绪,这一过程称为 等待一个 future
 * 6.检查一个 future 并查看其值是否已经准备就绪的过程被称为 轮询(polling)
 * 7.在大多数情况下,编写异步 Rust 代码时,我们使用 async 和 await 关键字。
 *    Rust 将其编译为等同于使用 Future trait 的代码,这非常类似于将 for 循环编译为等同于使用 Iterator trait 的代码
 
虽然rust的标准库已经可以解决并发的问题,但是异步操作据说还是需要依赖于第三方的运行时来进行,这其中有几个可以选择:
运行时库异步操作支持Future 支持性能表现易用性生态兼容性适用场景Tokio多线程工作窃取调度器,支持百万级并发连接丰富组合子(join!, select!),兼容 async-std★★★★★(行业标杆)★★★☆(中等学习曲线)★★★★★(Hyper/Tonic 等主流框架)高性能网络服务(API 网关、实时通信)async-std类似标准库的异步 API(async_std::fs 等)与 std::future 完全兼容,支持 Futures Unordered★★★☆(轻量高效)★★★★★(零学习成本)★★★☆(基础工具链完善)快速原型开发、CLI 工具、轻量 Web 服务Smol轻量级任务模型(类似 goroutine)模块化设计,可通过 async-channel 扩展★★★★☆(低资源占用)★★★★☆(简洁 API)★★☆(需依赖 async-executors)微服务、IoT 设备、极简依赖链项目Glommio基于 io_uring 的线程亲和性调度器实验性 Future API,支持 FutureExt 扩展★★☆(延迟波动)★☆(底层 API)★☆(无成熟 HTTP 库)研究性项目、探索新技术边界rustasync/runtime底层抽象,支持自定义线程池/调度策略灵活但复杂,需直接操作 Runtime::builder()★★★☆(可调优)★☆(高级开发者专用)★★☆(需自行集成生态)深度定制需求、自定义异步框架从上表可以看出,tokio相对比较突出,尤其是性能和生态方面,而这时应用开发所需要关注的。
 
一、tokio发展历史

Tokio 的诞生与 Rust 异步生态的演进紧密相连:

[*]起源与整合(2016年)
Tokio 最初是 futures(异步操作基本 API)和 mio(跨平台非阻塞 IO 库)的整合库,名为 futures_mio。2016 年 8 月更名为 Tokio,计划构建一整套异步设施,核心组件包括 tokio-core(单线程模型)、tokio-io、tokio-timer 等。
[*]性能与易用性优化

[*]引入宏实现 async/await 语法(如 futures-await 库),提升异步代码的可读性。
[*]通过 tokio-proto 等模块提供上层协议支持,但后续因易用性问题被逐步优化。

[*]生态定位与标准化(2018年)
Rust 官方成立 net-wg 小组,将 futures 工作移交,并倡导“中立的 futures 提供基础能力,Tokio 专注 IO 相关接口”的设计。Tokio 虽未完全采纳此方案,但持续演进,成为事实标准的异步运行时。
[*]现状与影响
Tokio 凭借高性能(如支持百万级并发连接)、丰富工具链(异步文件/网络/定时器)和生态整合(如 Hyper、Axum 框架),成为 Rust 异步开发的首选方案。
命名由来:为什么叫 Tokio?

Tokio 的命名灵感来源于 “Tokyo(东京)+ IO” 的组合,寓意:

[*]高效与繁忙:东京作为国际化大都市,象征着高效处理海量任务的能力,与 Tokio 处理异步 IO 的目标契合。
[*]技术愿景:项目初期目标是构建像东京都市圈一样“繁忙却高效”的异步运行时。
Tokio 的字面意思


[*]日语语境:Tokio 在日语中没有实际含义,它是“东京”的西班牙语拼写变体。日语中“东京”的标准表达为平假名「とうきょう」或汉字「東京」。
[*]语言差异:西班牙语采用“Tokio”拼写东京,源于其语音规则(如保留词尾 -o),而日语官方罗马字转写为“Tokyo”。Tokio 这一拼写主要出现在西班牙语系国家或国际交流场景中。
总结

Tokio 的命名既体现了技术愿景(高效处理 IO),也蕴含了文化隐喻(东京的国际化形象)。其发展历程反映了 Rust 异步生态从底层整合到标准化、易用化的演进,最终成为高性能异步开发的基石。
二、tokio官方文档概要

以下内容来自tokio的文档。
通过: cargo doc --open --package tokio 能打开英文版本。
以下内容是对这个cargo doc的翻译。
 
2.1、tokio旅程

编写应用

tokio很适合用于编写应用,在这种情况下使用人不需要担忧所需要采用的tokio特性。
如果您不确定,那么我们建议你使用“full"以确保在构建应用过程中不会有什么阻滞。(是的,在这个年代磁盘已经不经不值钱了)
以下代码会安装tokio的所有模块:
tokio = { version = "1", features = ["full"] }
 
2.1.1、编写库

如果是编写库,自然以提供最轻便的单元包为目标。为了达成这个目标,你应该确保只使用了你需要的特性(模块)。如此,库的用户就可以不要启用一些无用的特性。
例如:
tokio = { version = "1", features = ["rt", "net"] }
 
2.1.2、使用任务(task)

rust的异步编程基于轻量、非阻塞的执行单元,这些执行单元称为tasks.
tokio::task模块提供了重要的工具一些和tasks一起工作:

[*]spawn函数和JoinHandle类型-前者负责基于tokio运行时调度一个新任务,后者则等待任务的输出
[*]用于异步任务中运行阻塞操作的函数
tokio::task模块在特性"rt"启用的时候才会提供。
tokio::sync包含了同步的功能,以便在需要的时候交流或者共享数据。这些功能包括:

[*]通道(oneshot,mpsc,watch和broadcast),用于在任务之间发送值
[*]非阻塞Mutex(互斥锁),用于控制对一个共享可变值得存取
[*]一个异步Barrier类型(屏障),用于多个任务在开始计算前进行同步
tokio::sync只有特性"sync"启用得时候才会提供。
 
tokio::time模块提供了用于追踪时间和调度工作的许多工具。包括设置任务的超时(timeouts),休眠(sleeping)将来要运行的工作,或者
以特定的间隔重复操作。
为了使用tokio:time,必须启用"time"特性。
 
最后,tokio提供了执行异步任务的运行时。大部分应用可以使用#宏,这样应用就可以基于tokio运行时运行。
然而,这个宏仅仅提供了基本的配置项。 作为一个替代,tokio:runtime模块提供了更多的强大API,这些api能够配置和管理运行时。
如果#宏无法满足需要,就应该使用这些api。
 
要使用这个运行时,必须启用"rt"或者"rt-multi-thread"特性,这样才可以分别开启当前线程的single-threaded scheduler(单线程调度器)和multi-thread scheduler(多线程调度器)。
runtime module documentation提供了更多的细节。 
此外,特性"macros"启用了#和#属性。
 
 
2.1.3、cpu绑定任务和阻塞代码

tokio能够基于一些线程同时运行许多任务(线程池),方式是重复唤起每个线程当前运行任务。
然而,这种唤起只能基于.await关键点,因此耗费较长运行时间的代码,如果没有遇到遇到.await,那么它们会阻止其它任务的运行。为了解决这个问题,tokio提供了两种线程:核心线程和阻塞线程。
 
核心线程用于运行异步代码,tokio默认为一个cpu内核唤起一个内核线程。但我们可以使用环境变量TOKIO_WORKER_THREADS覆盖默认值。
 
阻塞线程则是根据需要唤起,能用于运行阻塞代码。这些代码会阻止其它任务运行,并让它自己保持活跃(即使一段时间没有用),这个保持活跃的功能可以通过thread_keep_alive进行配置。
由于tokio无法换出一个阻塞的任务,就像它可以和异步代码一起工作(?),阻塞线程数上限非常大。这个上限可以同故宫Builder进行配置。
为了唤起一个阻塞任务,我们应该使用spawn_blocking函数。
#
async fn main() {
    // This is running on a core thread.

    let blocking_task = tokio::task::spawn_blocking(|| {
      // This is running on a blocking thread.
      // Blocking here is ok.
    });

    // We can wait for the blocking task like this:
    // If the blocking task panics, the unwrap below will propagate the
    // panic.
    blocking_task.await.unwrap();

如果我们的代码和cpu密切关联,那么我们应该会希望限制cpu上运行的线程数。因此我们应该用一个单独的线程池来处理和cpu关系密切的任务。例如,我们可以考虑使用rayon库。它也能创建额外的tokio运行时,用于处理cpu关系密切的任务,但是如果我们这么做,就应该谨慎一些,因为这些额外的运行时候只运行cpu密切的任务,如果运行IO密切的任务,那么表现不理想。
提示:如果使用rayon,我们可以创建一个通道,当rayon任务完成时,用于把结果发送回tokio。
 
2.1.4、异步IO

tokio能够调度和运行任务,而且也提供用于异步处理io的每一样东西。
tokio::io模块提供了tokio异步核心io功能,包括AsyncRead,AsyncWrite,AsyncBufRead特质。
此外,当启用了"io-util"特性后,tokio也提供了和这些特质相关的组合体和方法,并构建一个异步的组件给std::io。
 
tokio也包含了执行各种io的api,api和操作系统做异步的交互,它们包括:

[*]tokio::net-包含了非阻塞版本的TCP,UDP和UNIX Domain Socket(要求启用net特新)
[*]tokio::fs -类似std::fs,用于异步处理文件系统io,要求启用特性fs
[*]tokio::signal-用于异步处理unix和windows的型号,要求启用signal特性
[*]tokio::process-用于唤起和管理字进程,要求启用process特性
 
2.2、一个简单的示例

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
      let (mut socket, _) = listener.accept().await?;

      tokio::spawn(async move {
            let mut buf = ;

            // In a loop, read data from the socket and write the data back.
            loop {
                let n = match socket.read(&mut buf).await {
                  // socket closed
                  Ok(0) => return,
                  Ok(n) => n,
                  Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                  }
                };

                // Write the data back
                if let Err(e) = socket.write_all(&buf).await {
                  eprintln!("failed to write to socket; err = {:?}", e);
                  return;
                }
            }
      });
    }

这个例子使用的tokio的异步io,包括net和io。
每当接到一个socket连接,就回启动一个任务,这个任务回读取并写回内容到socket中。
 
2.3、特征/特性标记

分为不可靠和可靠两个部分。
注意,当前文章基于tokio的1.45.0版本。
编码名称启用说明是否稳定full所有全部 rt运行时tokio::spawn,当前线程调度器和非调度器工具✔rt-multi-thread多线程运行时更重的多线程,工作窃取调度器(work-stealing scheduler)?✔io-utilio工具基于IO的Ext特质等✔io-std标准IOStdout,Stdin和Stderr类型✔net网络tokio::net类型(TpcStream,UnixStream,UdpSocket),还有AsyncFd(Unix类系统,可用于linux)和PollAio(FreeBsd)✔time时间tokio::time类型,计时器✔process进程tokio::proecss类型✔macros宏tokio::main和tokio::test宏✔sync同步tokio:sync类型✔signal信号tokio::signal类型✔fs文件系统tokio::fs类型✔test-util测试工具tokio运行时测试框架✔parking_lot特车位? ✔tracing追踪要求启用构建标记tokio_unstable❌其它 

[*]
[*]Some methods on task::JoinSet
[*]runtime::RuntimeMetrics
[*]
[*]
[*]
[*]
要求启用构建标记tokio_unstable
❌特别注意:AsyncRead and AsyncWrite  这两个特质总是可用,即使没有申明(即这是最基本的部分)
 
如何在构建的时候设置tokio_unstable
在项目的 .cargo/config.toml中配置

rustflags = ["--cfg", "tokio_unstable"]此外,通过环境变量配置也可以。
windows
$Env:RUSTFLAGS="--cfg tokio_unstable"linux
export RUSTFLAGS="--cfg tokio_unstable" 
2.4、支持的平台

tokio目前支持以下平台:

[*]Linux
[*]Windows
[*]Andriod(API 级别21)
[*]macOS
[*]iOS
[*]FreeBs
未来,tokio还会支持这些平台。然而,将来的版本可能会调整要求,这些要求包括libc版本,api级别,或者特定的FreeBSD版本。
 
除了这些平台,tokio也会倾向于在mio包能够运行的平台上工作。 mio可以支持的平台参见:in mio’s documentation
然而,这些额外的平台,将来可能不被支持。
注意,Wine平台不同于windows。
2.4.1、wasm支持

tokio对于WASM平台的支持存在一些限制。
如果不启用tokio_unstable标记,那么可以支持以下特性:

[*]sync
[*]macros
[*]io-util
[*]rt
[*]time
如果企图支持其它特性,那么会导致编译失败。
time模块只会在那些支持timers(例如wasm32-wasi)的wasm平台上工作。
 
注意:如果运行时变得无限期空闲,那么它会立刻终止,而不是永久阻塞。 对于不支持time的平台,这意味着运行时任何时候都不会变得空闲。
 
2.4.2、不稳定的WASM支持

tokio有可以不稳定地支持一些wasm特性。这种情况下要求启用tokio_unstable标记。
tokio::net可以支持wasm32-wasi。然而,不是所有方法都支持网络类型,当WASI不支持创建创建新的套接字的时候。
因此,套接字必须通过FromRawFd特质创建。
 
2.5、单元项目

 
2.5.1、重新导出

pub use task::spawn;
 
2.5.2、模块

注:模块基本和特性对应
编码名称  说明fs文件异步文件工具ioio异步io中的特质,助手,和定义net网络TCP/UPD/Unix相关process进程异步进程管理runtime运行时tokio运行时signal信号异步信号处理stream流流相关sync同步异步上下文中同步操作task任务异步绿色线程time时间追踪时间的工具 
2.5.3、宏


[*]join-等待并发的多个分支,当所有分支完成就会返回
[*]pin-在栈上钉住一个值
[*]select-等待多个并发的分支,当第一个任务完成,则返回,并放弃其它分支
[*]task_local-定义一个新的本地键,类型是tokio::task::LocalKey
[*]try_join-等待多个并发的分支,如果所有的成功Ok(_)则返回,如果发现一个Err()也会返回
 
2.5.4、属性宏


[*]main-标记选定的运行时执行的异步函数。这个宏可以配置一个运行时,而不要求用户使用Runtime或者Builder
[*]test-类似main,但只用于测试环境。
 
2.6、单元包

相关单元包。
这个没有什么特别值得写得内容,罗列下:
bytes 
顾名思义,和自己操作有关的,例如
use bytes::{BytesMut, BufMut};

let mut buf = BytesMut::with_capacity(1024);
buf.put(&b"hello world"[..]);
buf.put_u16(1234);

let a = buf.split();
assert_eq!(a, b"hello world\x04\xD2"[..]);

buf.put(&b"goodbye world"[..]);

let b = buf.split();
assert_eq!(b, b"goodbye world"[..]);

assert_eq!(buf.capacity(), 998);bytes主要用于和网络操作有关的场景。 tokio利用了零拷贝的编程。
可以使用rust已有的类型来创建字节数组,例如&[]或者Vev,但推荐使用Bytes
 
cfg_if
是一个配置有关的宏,构建类似if/else的代码结构。
tokio由于需要需要兼顾多个平台,不可避免要存在不少的if/else.
示例:
cfg_if::cfg_if! {
    if # {
      fn foo() { /* unix specific functionality */ }
    } else if # {
      fn foo() { /* non-unix, 32-bit functionality */ }
    } else {
      fn foo() { /* fallback implementation */ }
    }

lock_api
和锁相关的api。
大体可以看作是多标准rust锁有关类型的封装.好处在于省掉了不少繁复的操作,例如:
示例:
use lock_api::{RawMutex, Mutex, GuardSend};
use std::sync::atomic::{AtomicBool, Ordering};

// 1. Define our raw lock type
pub struct RawSpinlock(AtomicBool);

// 2. Implement RawMutex for this type
unsafe impl RawMutex for RawSpinlock {
    const INIT: RawSpinlock = RawSpinlock(AtomicBool::new(false));

    // A spinlock guard can be sent to another thread and unlocked there
    type GuardMarker = GuardSend;

    fn lock(&self) {
      // Note: This isn't the best way of implementing a spinlock, but it
      // suffices for the sake of this example.
      while !self.try_lock() {}
    }

    fn try_lock(&self) -> bool {
      self.0
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_ok()
    }

    unsafe fn unlock(&self) {
      self.0.store(false, Ordering::Release);
    }
}

// 3. Export the wrappers. This are the types that your users will actually use.
pub type Spinlock<T> = lock_api::Mutex<RawSpinlock, T>;
pub type SpinlockGuard<'a, T> = lock_api::MutexGuard<'a, RawSpinlock, T>; 
mio
看起来有点像minio,但和minio不同,mio用于构建非阻塞的IO应用,而且它的实现比较快,层级比较低。能尽量减少操作系统的负荷。
mio包括多个子模块,包括event,features,guide,net,windows。
 
parking_lot
提供了比rust标准库更小更快更灵活的实现,包括Mutex,RwLock,Condvar,Once等.
看起来有点像lock_api
 
parking_lot_core
和parking_lot有关的内容。
 
pin_project_lite
轻量版本的pin-project。
 
proc_macro2
过程宏的包装器。
例如:
extern crate proc_macro;

#
pub fn my_derive(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
    let input = proc_macro2::TokenStream::from(input);

    let output: proc_macro2::TokenStream = {
      /* transform input */
    };

    proc_macro::TokenStream::from(output)

quote
宏,用于把rust语法树数据结构转为源码。
let tokens = quote! {
    struct SerializeWith #generics #where_clause {
      value: &'a #field_ty,
      phantom: core::marker::PhantomData<#item_ty>,
    }

    impl #generics serde::Serialize for SerializeWith #generics #where_clause {
      fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
      where
            S: serde::Serializer,
      {
            #path(self.value, serializer)
      }
    }

    SerializeWith {
      value: #value,
      phantom: core::marker::PhantomData::<#item_ty>,
    }
}; 
scopeguard
范围守卫,允许匿名函数在范围之外运行。
看起来像歪门邪道!
extern crate scopeguard;

fn f() {
    let _guard = scopeguard::guard((), |_| {
      println!("Hello Scope Exit!");
    });

    // rest of the code here.

    // Here, at the end of `_guard`'s scope, the guard's closure is called.
    // It is also called if we exit this scope through unwinding instead.

smallvec
顾名思义,是小型的向量,但它的大小可变。可以提升性能,针对只需要很少数据的场景。
 
socket2
用于创建和使用套接字,和网络编程相关。
但它的缺陷是不够通用,因为要求尽量使用操作系统的已有的能力。
use std::net::{SocketAddr, TcpListener};
use socket2::{Socket, Domain, Type};

// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;

socket.set_only_v6(false)?;
let address: SocketAddr = "[::1]:12345".parse().unwrap();
socket.bind(&address.into())?;
socket.listen(128)?;

let listener: TcpListener = socket.into();
// ... 
 
syn
名字容易联想到synchronize,其实应该是syntax。
用于解析rust的代码流为语法树,进场和其它宏相关。
例如:
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, DeriveInput};

#
pub fn my_macro(input: TokenStream) -> TokenStream {
    // Parse the input tokens into a syntax tree
    let input = parse_macro_input!(input as DeriveInput);

    // Build the output, possibly using quasi-quotation
    let expanded = quote! {
      // ...
    };

    // Hand the output tokens back to the compiler
    TokenStream::from(expanded)

tokio
运行时,用于编写可靠的网络应用,同时还不需要牺牲速度。
它是一个事件驱动,非阻塞的平台,能用于编写异步应用。
 
其它
tokio_macros,unicode_ident,windows_sys,windows_targets,windows_x86_64_msvc.
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: rust进阶.并发.Tokio.1.Tokio简介