
异步编程的武器库:Tokio、async-std 和实战案例
上篇咱们学了异步编程的基础(async/await、Future),但光有语法不够啊——你得有地方跑才行!
就像你学会了开车(async/await),但得有车(运行时)才能上路。
Rust 的异步生态里有几个主要的"车":
今天咱们就聊聊这些运行时,以及怎么用它们写出靠谱的异步代码。
异步运行时是调度和执行 Future 的环境。它负责:
生活化类比:
想象一个餐厅:
特性 | Tokio | async-std | smol |
|---|---|---|---|
流行度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
功能 | 最全 | 中等 | 轻量 |
学习曲线 | 陡峭 | 平缓 | 中等 |
性能 | 优秀 | 良好 | 优秀 |
生态 | 最丰富 | 中等 | 较小 |
适用场景 | 生产环境 | 学习/原型 | 嵌入式/CLI |
Tokio
├── Runtime(运行时)
│ ├── 线程池
│ └── 任务调度器
├── Task(任务)
│ ├── spawn
│ └── join
├── I/O
│ ├── TcpListener/TcpStream
│ ├── UdpSocket
│ └── File
├── Sync(同步原语)
│ ├── Mutex/RwLock
│ ├── Channel
│ └── Semaphore
└── Time(时间)
├── sleep
├── interval
└── timeout
// 直接用 async-std 替换 std
use async_std::net::TcpListener; // 替代 std::net
use async_std::fs::File; // 替代 std::fs
use async_std::task; // 任务管理
说人话:
async-std 的 API 设计和标准库几乎一样,学习成本低。
// Cargo.toml
[dependencies]
tokio = { version = "1.0", features = ["full"] }
// main.rs
#[tokio::main]
async fn main() {
println!("Hello, Tokio!");
}
说人话:
#[tokio::main] 宏把 main 函数变成异步的,并创建运行时。
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 并发执行多个任务
let task1 = tokio::spawn(async {
sleep(Duration::from_secs()).await;
println!("任务 1 完成");
});
let task2 = tokio::spawn(async {
sleep(Duration::from_secs()).await;
println!("任务 2 完成");
});
let task3 = tokio::spawn(async {
sleep(Duration::from_secs()).await;
println!("任务 3 完成");
});
// 等待所有任务完成
let (r1, r2, r3) = tokio::join!(task1, task2, task3);
println!("结果:{:?}, {:?}, {:?}", r1, r2, r3);
}
输出:
任务 1 完成
任务 2 完成
任务 3 完成
结果:Ok(1), Ok(2), Ok(3)
说人话:
三个任务并发执行,总共只用了 3 秒(不是 6 秒)!
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器监听在 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("新连接:{}", addr);
// 为每个连接 spawn 一个任务
tokio::spawn(async move {
handle_connection(socket).await.unwrap();
});
}
}
async fn handle_connection(socket: tokio::net::TcpStream)
-> Result<(), Box<dyn std::error::Error>>
{
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
while reader.read_line(&mut line).await? > {
writer.write_all(line.as_bytes()).await?;
line.clear();
}
Ok(())
}
// Cargo.toml
[dependencies]
async-std = { version = "1.0", features = ["attributes"] }
// main.rs
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("服务器监听在 127.0.0.1:8080");
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream.unwrap();
task::spawn(async move {
handle_connection(stream).await;
});
}
}
说人话:
async-std 的 API 和 Tokio 很像,但更接近标准库的风格。
// Cargo.toml
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
// main.rs
use reqwest;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Post {
userId: u32,
id: u32,
title: String,
body: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// GET 请求
let client = reqwest::Client::new();
let response = client
.get("https://jsonplaceholder.typicode.com/posts/1")
.send()
.await?;
let post: Post = response.json().await?;
println!("{:?}", post);
// POST 请求
let new_post = serde_json::json!({
"title": "我的文章",
"body": "这是内容",
"userId":
});
let response = client
.post("https://jsonplaceholder.typicode.com/posts")
.json(&new_post)
.send()
.await?;
let created: Post = response.json().await?;
println!("创建成功:{:?}", created);
Ok(())
}
use reqwest;
use tokio::io::AsyncWriteExt;
use tokio::fs::File;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
];
// 并发下载
let tasks: Vec<_> = urls
.iter()
.map(|url| download_file(url))
.collect();
let results = futures::future::join_all(tasks).await;
for (url, result) in urls.iter().zip(results) {
match result {
Ok(_) => println!("✅ {} 下载成功", url),
Err(e) => println!("❌ {} 下载失败:{}", url, e),
}
}
Ok(())
}
async fn download_file(url: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let response = client.get(url).send().await?;
let bytes = response.bytes().await?;
let filename = url.split('/').last().unwrap();
let mut file = File::create(filename).await?;
file.write_all(&bytes).await?;
Ok(())
}
use tokio::time::{timeout, Duration, sleep};
use std::time::Instant;
#[tokio::main]
async fn main() {
// 超时处理
match timeout(Duration::from_secs(), async {
sleep(Duration::from_secs()).await;
"完成"
}).await {
Ok(result) => println!("结果:{}", result),
Err(_) => println!("超时了!"),
}
// 重试逻辑
let result = retry(, || async {
// 模拟可能失败的操作
if rand::random::<u32>() % == {
Err("随机失败")
} else {
Ok("成功")
}
}).await;
println!("重试结果:{:?}", result);
}
async fn retry<F, T, E>(max_retries: u32, mut f: F) -> Result<T, E>
where
F: FnMut() -> futures::future::BoxFuture<'static, Result<T, E>>,
{
let mut last_error = None;
for i in ..=max_retries {
match f().await {
Ok(result) => return Ok(result),
Err(e) => {
last_error = Some(e);
if i < max_retries {
println!("重试 {}/{}", i + , max_retries);
}
}
}
}
Err(last_error.unwrap())
}
#[tokio::main]
async fn main() {
// ❌ 这样会阻塞整个运行时
std::thread::sleep(std::time::Duration::from_secs());
// ✅ 用异步的 sleep
tokio::time::sleep(std::time::Duration::from_secs()).await;
}
问题: 同步的 sleep 会阻塞整个线程,其他任务都无法执行。
fn sync_function() {
// ❌ 不能直接调用异步函数
let result = async_function().await; // 编译错误
}
// ✅ 解决方案 1:用 tokio::block_on
fn sync_function() {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async_function());
}
// ✅ 解决方案 2:把函数也改成异步
async fn sync_function() {
let result = async_function().await;
}
async fn fetch_data() -> String {
"data".to_string()
}
#[tokio::main]
async fn main() {
// ❌ 忘记 await
let data = fetch_data(); // 这是 Future,不是 String!
// ✅ 正确
let data = fetch_data().await;
}
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_secs()).await;
println!("任务完成");
});
// 取消任务
handle.abort();
// 等待任务结束(会被取消)
let _ = handle.await;
}
注意: abort() 不会立即停止任务,只是标记为取消,任务会在下一个 .await 点被取消。
// ❌ 在库代码中创建运行时
pub fn my_library_function() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// ...
});
}
// ✅ 库代码应该是异步的
pub async fn my_library_function() {
// ...
}
记住:
#[tokio::main])use reqwest;
use scraper::{Html, Selector};
use tokio::sync::mpsc;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel();
let visited = Arc::new(Mutex::new(HashSet::new()));
// 初始 URL
tx.send("https://example.com".to_string()).await.unwrap();
// 工作线程
let mut handles = vec![];
for _ in .. {
let tx = tx.clone();
let visited = visited.clone();
let handle = tokio::spawn(async move {
while let Some(url) = rx.recv().await {
if visited.lock().await.contains(&url) {
continue;
}
visited.lock().await.insert(url.clone());
match fetch_and_parse(&url).await {
Ok(links) => {
for link in links {
let _ = tx.send(link).await;
}
}
Err(e) => eprintln!("错误 {}: {}", url, e),
}
}
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
let _ = handle.await;
}
}
async fn fetch_and_parse(url: &str) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let response = client.get(url).send().await?;
let html = response.text().await?;
let document = Html::parse_document(&html);
let selector = Selector::parse("a").unwrap();
let links: Vec<String> = document
.select(&selector)
.filter_map(|element| element.value().attr("href"))
.map(|href| url.join(href).unwrap_or_else(|_| url.to_string()))
.collect();
Ok(links)
}
use futures::{stream::{SplitSink, SplitStream}, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use std::collections::HashMap;
use std::sync::Arc;
type Tx = mpsc::UnboundedSender<String>;
type Rx = mpsc::UnboundedReceiver<String>;
#[tokio::main]
async fn main() {
let state = Arc::new(Mutex::new(HashMap::new()));
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("WebSocket 服务器监听在 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await.unwrap();
let state = state.clone();
tokio::spawn(async move {
handle_connection(socket, state, addr).await;
});
}
}
async fn handle_connection(
socket: TcpStream,
state: Arc<Mutex<HashMap<String, Tx>>>,
addr: std::net::SocketAddr,
) {
// WebSocket 握手逻辑(简化)
// 实际项目用 tokio-tungstenite
let (tx, rx) = mpsc::unbounded_channel();
// 注册客户端
state.lock().await.insert(addr.to_string(), tx);
// 处理消息...
}
use tokio::time::{interval, Duration};
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel();
// 定时任务 1:每秒执行
let tx1 = tx.clone();
tokio::spawn(async move {
let mut int = interval(Duration::from_secs());
loop {
int.tick().await;
let _ = tx1.send("每秒任务");
}
});
// 定时任务 2:每 5 秒执行
let tx2 = tx.clone();
tokio::spawn(async move {
let mut int = interval(Duration::from_secs());
loop {
int.tick().await;
let _ = tx2.send("每 5 秒任务");
}
});
// 接收并处理
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
println!("收到:{}", msg);
}
});
// 运行 30 秒
tokio::time::sleep(Duration::from_secs()).await;
}

金句回顾:
下篇预告:
咱们已经学了 unsafe Rust 的基础,但 Rust 的 unsafe 可不止这些。下篇深入聊聊FFI(外部函数接口),看看怎么和 C 代码互操作,调用系统 API!