学习要,工作也不能拉下,所以这一段时间关于rust的博文少了些。
rust要学习的内容还很多,但我觉得应该优先打好基础,这其中比较关注的是并发。
提到rust的并发,先回忆在书本有许多的内容:
1.并发和并行
2.通过信道(channel)共享进程间数据
关键库和方法
std::sync::mpscmpsc::channel()std::threadthread::spawn() 3.通过内存共享进程间数据关键库和方法
std::sync::{Arc, Mutex}std::threadstd::thread::JoinHandleJoinHandle::join() 4.Send和Sync特质Send 标记 trait 表明实现了 Send 的类型值的所有权可以在线程间传送。几乎所有的 Rust 类型都是Send 的,
* 不过有一些例外,包括 Rc,这是因为Rc的fetch_add(引用计数)属于非原子操作Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用5.core::future::future --未来值
* 1.future 是一个现在可能还没有准备好但将在未来某个时刻准备好的值
* 2.Rust 提供了 Future trait 作为基础组件,这样不同的异步操作就可以在不同的数据结构上实现
* 3.每一个实现了 Future 的类型会维护自己的进度状态信息和 “ready” 的定义
* 4.async 关键字可以用于代码块和函数
* 5.在一个 async 块或 async 函数中,可以使用 await 关键字来等待一个 future 准备就绪,这一过程称为 等待一个 future
* 6.检查一个 future 并查看其值是否已经准备就绪的过程被称为 轮询(polling)
* 7.在大多数情况下,编写异步 Rust 代码时,我们使用 async 和 await 关键字。
* Rust 将其编译为等同于使用 Future trait 的代码,这非常类似于将 for 循环编译为等同于使用 Iterator trait 的代码
虽然rust的标准库已经可以解决并发的问题,但是异步操作据说还是需要依赖于第三方的运行时来进行,这其中有几个可以选择:
运行时库异步操作支持Future 支持性能表现易用性生态兼容性适用场景Tokio多线程工作窃取调度器,支持百万级并发连接丰富组合子(join!, select!),兼容 async-std★★★★★(行业标杆)★★★☆(中等学习曲线)★★★★★(Hyper/Tonic 等主流框架)高性能网络服务(API 网关、实时通信)async-std类似标准库的异步 API(async_std::fs 等)与 std::future 完全兼容,支持 Futures Unordered★★★☆(轻量高效)★★★★★(零学习成本)★★★☆(基础工具链完善)快速原型开发、CLI 工具、轻量 Web 服务Smol轻量级任务模型(类似 goroutine)模块化设计,可通过 async-channel 扩展★★★★☆(低资源占用)★★★★☆(简洁 API)★★☆(需依赖 async-executors)微服务、IoT 设备、极简依赖链项目Glommio基于 io_uring 的线程亲和性调度器实验性 Future API,支持 FutureExt 扩展★★☆(延迟波动)★☆(底层 API)★☆(无成熟 HTTP 库)研究性项目、探索新技术边界rustasync/runtime底层抽象,支持自定义线程池/调度策略灵活但复杂,需直接操作 Runtime::builder()★★★☆(可调优)★☆(高级开发者专用)★★☆(需自行集成生态)深度定制需求、自定义异步框架从上表可以看出,tokio相对比较突出,尤其是性能和生态方面,而这时应用开发所需要关注的。
一、tokio发展历史
Tokio 的诞生与 Rust 异步生态的演进紧密相连:
- 起源与整合(2016年)
Tokio 最初是 futures(异步操作基本 API)和 mio(跨平台非阻塞 IO 库)的整合库,名为 futures_mio。2016 年 8 月更名为 Tokio,计划构建一整套异步设施,核心组件包括 tokio-core(单线程模型)、tokio-io、tokio-timer 等。
- 性能与易用性优化
- 引入宏实现 async/await 语法(如 futures-await 库),提升异步代码的可读性。
- 通过 tokio-proto 等模块提供上层协议支持,但后续因易用性问题被逐步优化。
- 生态定位与标准化(2018年)
Rust 官方成立 net-wg 小组,将 futures 工作移交,并倡导“中立的 futures 提供基础能力,Tokio 专注 IO 相关接口”的设计。Tokio 虽未完全采纳此方案,但持续演进,成为事实标准的异步运行时。
- 现状与影响
Tokio 凭借高性能(如支持百万级并发连接)、丰富工具链(异步文件/网络/定时器)和生态整合(如 Hyper、Axum 框架),成为 Rust 异步开发的首选方案。
命名由来:为什么叫 Tokio?
Tokio 的命名灵感来源于 “Tokyo(东京)+ IO” 的组合,寓意:
- 高效与繁忙:东京作为国际化大都市,象征着高效处理海量任务的能力,与 Tokio 处理异步 IO 的目标契合。
- 技术愿景:项目初期目标是构建像东京都市圈一样“繁忙却高效”的异步运行时。
Tokio 的字面意思
- 日语语境:Tokio 在日语中没有实际含义,它是“东京”的西班牙语拼写变体。日语中“东京”的标准表达为平假名「とうきょう」或汉字「東京」。
- 语言差异:西班牙语采用“Tokio”拼写东京,源于其语音规则(如保留词尾 -o),而日语官方罗马字转写为“Tokyo”。Tokio 这一拼写主要出现在西班牙语系国家或国际交流场景中。
总结
Tokio 的命名既体现了技术愿景(高效处理 IO),也蕴含了文化隐喻(东京的国际化形象)。其发展历程反映了 Rust 异步生态从底层整合到标准化、易用化的演进,最终成为高性能异步开发的基石。
二、tokio官方文档概要
以下内容来自tokio的文档。
通过: cargo doc --open --package tokio 能打开英文版本。
以下内容是对这个cargo doc的翻译。
2.1、tokio旅程
编写应用
tokio很适合用于编写应用,在这种情况下使用人不需要担忧所需要采用的tokio特性。
如果您不确定,那么我们建议你使用“full"以确保在构建应用过程中不会有什么阻滞。(是的,在这个年代磁盘已经不经不值钱了)
以下代码会安装tokio的所有模块:
tokio = { version = "1", features = ["full"] }
2.1.1、编写库
如果是编写库,自然以提供最轻便的单元包为目标。为了达成这个目标,你应该确保只使用了你需要的特性(模块)。如此,库的用户就可以不要启用一些无用的特性。
例如:
tokio = { version = "1", features = ["rt", "net"] }
2.1.2、使用任务(task)
rust的异步编程基于轻量、非阻塞的执行单元,这些执行单元称为tasks.
tokio::task模块提供了重要的工具一些和tasks一起工作:
- spawn函数和JoinHandle类型-前者负责基于tokio运行时调度一个新任务,后者则等待任务的输出
- 用于异步任务中运行阻塞操作的函数
tokio::task模块在特性"rt"启用的时候才会提供。
tokio::sync包含了同步的功能,以便在需要的时候交流或者共享数据。这些功能包括:
- 通道(oneshot,mpsc,watch和broadcast),用于在任务之间发送值
- 非阻塞Mutex(互斥锁),用于控制对一个共享可变值得存取
- 一个异步Barrier类型(屏障),用于多个任务在开始计算前进行同步
tokio::sync只有特性"sync"启用得时候才会提供。
tokio::time模块提供了用于追踪时间和调度工作的许多工具。包括设置任务的超时(timeouts),休眠(sleeping)将来要运行的工作,或者
以特定的间隔重复操作。
为了使用tokio:time,必须启用"time"特性。
最后,tokio提供了执行异步任务的运行时。大部分应用可以使用#[tokio::main]宏,这样应用就可以基于tokio运行时运行。
然而,这个宏仅仅提供了基本的配置项。 作为一个替代,tokio:runtime模块提供了更多的强大API,这些api能够配置和管理运行时。
如果#[tokio:main]宏无法满足需要,就应该使用这些api。
要使用这个运行时,必须启用"rt"或者"rt-multi-thread"特性,这样才可以分别开启当前线程的single-threaded scheduler(单线程调度器)和multi-thread scheduler(多线程调度器)。
runtime module documentation提供了更多的细节。
此外,特性"macros"启用了#[tokio::main]和#[tokio::test]属性。
2.1.3、cpu绑定任务和阻塞代码
tokio能够基于一些线程同时运行许多任务(线程池),方式是重复唤起每个线程当前运行任务。
然而,这种唤起只能基于.await关键点,因此耗费较长运行时间的代码,如果没有遇到遇到.await,那么它们会阻止其它任务的运行。为了解决这个问题,tokio提供了两种线程:核心线程和阻塞线程。
核心线程用于运行异步代码,tokio默认为一个cpu内核唤起一个内核线程。但我们可以使用环境变量TOKIO_WORKER_THREADS覆盖默认值。
阻塞线程则是根据需要唤起,能用于运行阻塞代码。这些代码会阻止其它任务运行,并让它自己保持活跃(即使一段时间没有用),这个保持活跃的功能可以通过thread_keep_alive进行配置。
由于tokio无法换出一个阻塞的任务,就像它可以和异步代码一起工作(?),阻塞线程数上限非常大。这个上限可以同故宫Builder进行配置。
为了唤起一个阻塞任务,我们应该使用spawn_blocking函数。- #[tokio::main]
- async fn main() {
- // This is running on a core thread.
- let blocking_task = tokio::task::spawn_blocking(|| {
- // This is running on a blocking thread.
- // Blocking here is ok.
- });
- // We can wait for the blocking task like this:
- // If the blocking task panics, the unwrap below will propagate the
- // panic.
- blocking_task.await.unwrap();
- }
复制代码
如果我们的代码和cpu密切关联,那么我们应该会希望限制cpu上运行的线程数。因此我们应该用一个单独的线程池来处理和cpu关系密切的任务。例如,我们可以考虑使用rayon库。它也能创建额外的tokio运行时,用于处理cpu关系密切的任务,但是如果我们这么做,就应该谨慎一些,因为这些额外的运行时候只运行cpu密切的任务,如果运行IO密切的任务,那么表现不理想。
提示:如果使用rayon,我们可以创建一个通道,当rayon任务完成时,用于把结果发送回tokio。
2.1.4、异步IO
tokio能够调度和运行任务,而且也提供用于异步处理io的每一样东西。
tokio::io模块提供了tokio异步核心io功能,包括AsyncRead,AsyncWrite,AsyncBufRead特质。
此外,当启用了"io-util"特性后,tokio也提供了和这些特质相关的组合体和方法,并构建一个异步的组件给std::io。
tokio也包含了执行各种io的api,api和操作系统做异步的交互,它们包括:
- tokio::net-包含了非阻塞版本的TCP,UDP和UNIX Domain Socket(要求启用net特新)
- tokio::fs -类似std::fs,用于异步处理文件系统io,要求启用特性fs
- tokio::signal-用于异步处理unix和windows的型号,要求启用signal特性
- tokio::process-用于唤起和管理字进程,要求启用process特性
2.2、一个简单的示例
- use tokio::net::TcpListener;
- use tokio::io::{AsyncReadExt, AsyncWriteExt};
- #[tokio::main]
- async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let listener = TcpListener::bind("127.0.0.1:8080").await?;
- loop {
- let (mut socket, _) = listener.accept().await?;
- tokio::spawn(async move {
- let mut buf = [0; 1024];
- // In a loop, read data from the socket and write the data back.
- loop {
- let n = match socket.read(&mut buf).await {
- // socket closed
- Ok(0) => return,
- Ok(n) => n,
- Err(e) => {
- eprintln!("failed to read from socket; err = {:?}", e);
- return;
- }
- };
- // Write the data back
- if let Err(e) = socket.write_all(&buf[0..n]).await {
- eprintln!("failed to write to socket; err = {:?}", e);
- return;
- }
- }
- });
- }
- }
复制代码
这个例子使用的tokio的异步io,包括net和io。
每当接到一个socket连接,就回启动一个任务,这个任务回读取并写回内容到socket中。
2.3、特征/特性标记
分为不可靠和可靠两个部分。
注意,当前文章基于tokio的1.45.0版本。
编码 | 名称 | 启用说明 | 是否稳定 | full | 所有 | 全部 | | rt | 运行时 | tokio::spawn,当前线程调度器和非调度器工具 | ✔ | rt-multi-thread | 多线程运行时 | 更重的多线程,工作窃取调度器(work-stealing scheduler)? | ✔ | io-util | io工具 | 基于IO的Ext特质等 | ✔ | io-std | 标准IO | Stdout,Stdin和Stderr类型 | ✔ | net | 网络 | tokio::net类型(TpcStream,UnixStream,UdpSocket),还有AsyncFd(Unix类系统,可用于linux)和PollAio(FreeBsd) | ✔ | time | 时间 | tokio::time类型,计时器 | ✔ | process | 进程 | tokio::proecss类型 | ✔ | macros | 宏 | tokio::main和tokio::test宏 | ✔ | sync | 同步 | tokio:sync类型 | ✔ | signal | 信号 | tokio::signal类型 | ✔ | fs | 文件系统 | tokio::fs类型 | ✔ | test-util | 测试工具 | tokio运行时测试框架 | ✔ | parking_lot | 特车位? | | ✔ | tracing | 追踪 | 要求启用构建标记tokio_unstable | ❌ | 其它 | |
- [task::Builder]
- Some methods on task::JoinSet
- runtime::RuntimeMetrics
- [runtime::Builder:
n_task_spawn]
- [runtime::Builder:
n_task_terminate]
- [runtime::Builder::unhandled_panic]
- [runtime::TaskMeta]
要求启用构建标记tokio_unstable
| ❌ | 特别注意:AsyncRead and AsyncWrite 这两个特质总是可用,即使没有申明(即这是最基本的部分)
如何在构建的时候设置tokio_unstable
在项目的 .cargo/config.toml中配置- [build]
- rustflags = ["--cfg", "tokio_unstable"]
复制代码 此外,通过环境变量配置也可以。
windows- $Env:RUSTFLAGS="--cfg tokio_unstable"
复制代码 linux- export RUSTFLAGS="--cfg tokio_unstable"
复制代码
2.4、支持的平台
tokio目前支持以下平台:
- Linux
- Windows
- Andriod(API 级别21)
- macOS
- iOS
- FreeBs
未来,tokio还会支持这些平台。然而,将来的版本可能会调整要求,这些要求包括libc版本,api级别,或者特定的FreeBSD版本。
除了这些平台,tokio也会倾向于在mio包能够运行的平台上工作。 mio可以支持的平台参见:in mio’s documentation
然而,这些额外的平台,将来可能不被支持。
注意,Wine平台不同于windows。
2.4.1、wasm支持
tokio对于WASM平台的支持存在一些限制。
如果不启用tokio_unstable标记,那么可以支持以下特性:
- sync
- macros
- io-util
- rt
- time
如果企图支持其它特性,那么会导致编译失败。
time模块只会在那些支持timers(例如wasm32-wasi)的wasm平台上工作。
注意:如果运行时变得无限期空闲,那么它会立刻终止,而不是永久阻塞。 对于不支持time的平台,这意味着运行时任何时候都不会变得空闲。
2.4.2、不稳定的WASM支持
tokio有可以不稳定地支持一些wasm特性。这种情况下要求启用tokio_unstable标记。
tokio::net可以支持wasm32-wasi。然而,不是所有方法都支持网络类型,当WASI不支持创建创建新的套接字的时候。
因此,套接字必须通过FromRawFd特质创建。
2.5、单元项目
2.5.1、重新导出
pub use task::spawn;
2.5.2、模块
注:模块基本和特性对应
编码 | 名称 | 说明 | fs | 文件 | 异步文件工具 | io | io | 异步io中的特质,助手,和定义 | net | 网络 | TCP/UPD/Unix相关 | process | 进程 | 异步进程管理 | runtime | 运行时 | tokio运行时 | signal | 信号 | 异步信号处理 | stream | 流 | 流相关 | sync | 同步 | 异步上下文中同步操作 | task | 任务 | 异步绿色线程 | time | 时间 | 追踪时间的工具 |
2.5.3、宏
- join-等待并发的多个分支,当所有分支完成就会返回
- pin-在栈上钉住一个值
- select-等待多个并发的分支,当第一个任务完成,则返回,并放弃其它分支
- task_local-定义一个新的本地键,类型是tokio::task:
ocalKey
- try_join-等待多个并发的分支,如果所有的成功Ok(_)则返回,如果发现一个Err()也会返回
2.5.4、属性宏
- main-标记选定的运行时执行的异步函数。这个宏可以配置一个运行时,而不要求用户使用Runtime或者Builder
- test-类似main,但只用于测试环境。
2.6、单元包
相关单元包。
这个没有什么特别值得写得内容,罗列下:
bytes
顾名思义,和自己操作有关的,例如- use bytes::{BytesMut, BufMut};
- let mut buf = BytesMut::with_capacity(1024);
- buf.put(&b"hello world"[..]);
- buf.put_u16(1234);
- let a = buf.split();
- assert_eq!(a, b"hello world\x04\xD2"[..]);
- buf.put(&b"goodbye world"[..]);
- let b = buf.split();
- assert_eq!(b, b"goodbye world"[..]);
- assert_eq!(buf.capacity(), 998);
复制代码 bytes主要用于和网络操作有关的场景。 tokio利用了零拷贝的编程。
可以使用rust已有的类型来创建字节数组,例如&[]或者Vev,但推荐使用Bytes
cfg_if
是一个配置有关的宏,构建类似if/else的代码结构。
tokio由于需要需要兼顾多个平台,不可避免要存在不少的if/else.
示例:- cfg_if::cfg_if! {
- if #[cfg(unix)] {
- fn foo() { /* unix specific functionality */ }
- } else if #[cfg(target_pointer_width = "32")] {
- fn foo() { /* non-unix, 32-bit functionality */ }
- } else {
- fn foo() { /* fallback implementation */ }
- }
- }
复制代码
lock_api
和锁相关的api。
大体可以看作是多标准rust锁有关类型的封装.好处在于省掉了不少繁复的操作,例如:
示例:- use lock_api::{RawMutex, Mutex, GuardSend};
- use std::sync::atomic::{AtomicBool, Ordering};
- // 1. Define our raw lock type
- pub struct RawSpinlock(AtomicBool);
- // 2. Implement RawMutex for this type
- unsafe impl RawMutex for RawSpinlock {
- const INIT: RawSpinlock = RawSpinlock(AtomicBool::new(false));
- // A spinlock guard can be sent to another thread and unlocked there
- type GuardMarker = GuardSend;
- fn lock(&self) {
- // Note: This isn't the best way of implementing a spinlock, but it
- // suffices for the sake of this example.
- while !self.try_lock() {}
- }
- fn try_lock(&self) -> bool {
- self.0
- .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
- .is_ok()
- }
- unsafe fn unlock(&self) {
- self.0.store(false, Ordering::Release);
- }
- }
- // 3. Export the wrappers. This are the types that your users will actually use.
- pub type Spinlock<T> = lock_api::Mutex<RawSpinlock, T>;
- pub type SpinlockGuard<'a, T> = lock_api::MutexGuard<'a, RawSpinlock, T>;
复制代码
mio
看起来有点像minio,但和minio不同,mio用于构建非阻塞的IO应用,而且它的实现比较快,层级比较低。能尽量减少操作系统的负荷。
mio包括多个子模块,包括event,features,guide,net,windows。
parking_lot
提供了比rust标准库更小更快更灵活的实现,包括Mutex,RwLock,Condvar,Once等.
看起来有点像lock_api
parking_lot_core
和parking_lot有关的内容。
pin_project_lite
轻量版本的pin-project。
proc_macro2
过程宏的包装器。
例如:- extern crate proc_macro;
- #[proc_macro_derive(MyDerive)]
- pub fn my_derive(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
- let input = proc_macro2::TokenStream::from(input);
- let output: proc_macro2::TokenStream = {
- /* transform input */
- };
- proc_macro::TokenStream::from(output)
- }
复制代码
quote
宏,用于把rust语法树数据结构转为源码。- let tokens = quote! {
- struct SerializeWith #generics #where_clause {
- value: &'a #field_ty,
- phantom: core::marker::PhantomData<#item_ty>,
- }
- impl #generics serde::Serialize for SerializeWith #generics #where_clause {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: serde::Serializer,
- {
- #path(self.value, serializer)
- }
- }
- SerializeWith {
- value: #value,
- phantom: core::marker::PhantomData::<#item_ty>,
- }
- };
复制代码
scopeguard
范围守卫,允许匿名函数在范围之外运行。
看起来像歪门邪道!- extern crate scopeguard;
- fn f() {
- let _guard = scopeguard::guard((), |_| {
- println!("Hello Scope Exit!");
- });
- // rest of the code here.
- // Here, at the end of `_guard`'s scope, the guard's closure is called.
- // It is also called if we exit this scope through unwinding instead.
- }
复制代码
smallvec
顾名思义,是小型的向量,但它的大小可变。可以提升性能,针对只需要很少数据的场景。
socket2
用于创建和使用套接字,和网络编程相关。
但它的缺陷是不够通用,因为要求尽量使用操作系统的已有的能力。- use std::net::{SocketAddr, TcpListener};
- use socket2::{Socket, Domain, Type};
- // Create a TCP listener bound to two addresses.
- let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
- socket.set_only_v6(false)?;
- let address: SocketAddr = "[::1]:12345".parse().unwrap();
- socket.bind(&address.into())?;
- socket.listen(128)?;
- let listener: TcpListener = socket.into();
- // ...
复制代码
syn
名字容易联想到synchronize,其实应该是syntax。
用于解析rust的代码流为语法树,进场和其它宏相关。
例如:- use proc_macro::TokenStream;
- use quote::quote;
- use syn::{parse_macro_input, DeriveInput};
- #[proc_macro_derive(MyMacro)]
- pub fn my_macro(input: TokenStream) -> TokenStream {
- // Parse the input tokens into a syntax tree
- let input = parse_macro_input!(input as DeriveInput);
- // Build the output, possibly using quasi-quotation
- let expanded = quote! {
- // ...
- };
- // Hand the output tokens back to the compiler
- TokenStream::from(expanded)
- }
复制代码
tokio
运行时,用于编写可靠的网络应用,同时还不需要牺牲速度。
它是一个事件驱动,非阻塞的平台,能用于编写异步应用。
其它
tokio_macros,unicode_ident,windows_sys,windows_targets,windows_x86_64_msvc.
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |