首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >34-Rust 教程 - 异步生态

34-Rust 教程 - 异步生态

作者头像
LarryLan
发布2026-06-03 18:29:00
发布2026-06-03 18:29:00
10
举报

异步生态

异步编程的武器库:Tokio、async-std 和实战案例

🎬 引入

上篇咱们学了异步编程的基础(async/awaitFuture),但光有语法不够啊——你得有地方跑才行!

就像你学会了开车(async/await),但得有车(运行时)才能上路。

Rust 的异步生态里有几个主要的"车":

  • Tokio - 最流行,功能最全
  • async-std - 最像标准库,上手最简单
  • smol - 最轻量,适合嵌入式

今天咱们就聊聊这些运行时,以及怎么用它们写出靠谱的异步代码。

📌 核心概念

什么是异步运行时?

异步运行时是调度和执行 Future 的环境。它负责:

  • 调度任务(哪个 Future 先跑)
  • 处理 I/O(网络、文件)
  • 管理定时器
  • 线程池管理

生活化类比:

想象一个餐厅:

  • Future = 顾客点的菜(还没做好)
  • 运行时 = 厨房 + 厨师团队
  • 调度器 = 厨师长(决定先做哪道菜)

主要运行时对比

特性

Tokio

async-std

smol

流行度

⭐⭐⭐⭐⭐

⭐⭐⭐

⭐⭐

功能

最全

中等

轻量

学习曲线

陡峭

平缓

中等

性能

优秀

良好

优秀

生态

最丰富

中等

较小

适用场景

生产环境

学习/原型

嵌入式/CLI

Tokio 的核心组件

代码语言:javascript
复制
Tokio
├── Runtime(运行时)
│   ├── 线程池
│   └── 任务调度器
├── Task(任务)
│   ├── spawn
│   └── join
├── I/O
│   ├── TcpListener/TcpStream
│   ├── UdpSocket
│   └── File
├── Sync(同步原语)
│   ├── Mutex/RwLock
│   ├── Channel
│   └── Semaphore
└── Time(时间)
    ├── sleep
    ├── interval
    └── timeout

async-std 的特点

代码语言:javascript
复制
// 直接用 async-std 替换 std
use async_std::net::TcpListener;  // 替代 std::net
use async_std::fs::File;          // 替代 std::fs
use async_std::task;              // 任务管理

说人话:

async-std 的 API 设计和标准库几乎一样,学习成本低。

💻 代码示例

基础示例:Tokio Hello World

代码语言:javascript
复制
// Cargo.toml
[dependencies]
tokio = { version = "1.0", features = ["full"] }

// main.rs
#[tokio::main]
async fn main() {
    println!("Hello, Tokio!");
}

说人话:

#[tokio::main] 宏把 main 函数变成异步的,并创建运行时。

Tokio 多任务并发

代码语言:javascript
复制
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 并发执行多个任务
    let task1 = tokio::spawn(async {
        sleep(Duration::from_secs()).await;
        println!("任务 1 完成");
        
    });
    
    let task2 = tokio::spawn(async {
        sleep(Duration::from_secs()).await;
        println!("任务 2 完成");
        
    });
    
    let task3 = tokio::spawn(async {
        sleep(Duration::from_secs()).await;
        println!("任务 3 完成");
        
    });
    
    // 等待所有任务完成
    let (r1, r2, r3) = tokio::join!(task1, task2, task3);
    println!("结果:{:?}, {:?}, {:?}", r1, r2, r3);
}

输出:

代码语言:javascript
复制
任务 1 完成
任务 2 完成
任务 3 完成
结果:Ok(1), Ok(2), Ok(3)

说人话:

三个任务并发执行,总共只用了 3 秒(不是 6 秒)!

Tokio TCP 服务器

代码语言:javascript
复制
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器监听在 127.0.0.1:8080");
    
    loop {
        let (socket, addr) = listener.accept().await?;
        println!("新连接:{}", addr);
        
        // 为每个连接 spawn 一个任务
        tokio::spawn(async move {
            handle_connection(socket).await.unwrap();
        });
    }
}

async fn handle_connection(socket: tokio::net::TcpStream) 
    -> Result<(), Box<dyn std::error::Error>> 
{
    let (reader, mut writer) = socket.into_split();
    let mut reader = BufReader::new(reader);
    
    let mut line = String::new();
    while reader.read_line(&mut line).await? >  {
        writer.write_all(line.as_bytes()).await?;
        line.clear();
    }
    
    Ok(())
}

async-std 示例

代码语言:javascript
复制
// Cargo.toml
[dependencies]
async-std = { version = "1.0", features = ["attributes"] }

// main.rs
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    println!("服务器监听在 127.0.0.1:8080");
    
    let mut incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream.unwrap();
        task::spawn(async move {
            handle_connection(stream).await;
        });
    }
}

说人话:

async-std 的 API 和 Tokio 很像,但更接近标准库的风格。

异步 HTTP 客户端(reqwest)

代码语言:javascript
复制
// Cargo.toml
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

// main.rs
use reqwest;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Post {
    userId: u32,
    id: u32,
    title: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // GET 请求
    let client = reqwest::Client::new();
    let response = client
        .get("https://jsonplaceholder.typicode.com/posts/1")
        .send()
        .await?;
    
    let post: Post = response.json().await?;
    println!("{:?}", post);
    
    // POST 请求
    let new_post = serde_json::json!({
        "title": "我的文章",
        "body": "这是内容",
        "userId": 
    });
    
    let response = client
        .post("https://jsonplaceholder.typicode.com/posts")
        .json(&new_post)
        .send()
        .await?;
    
    let created: Post = response.json().await?;
    println!("创建成功:{:?}", created);
    
    Ok(())
}

并发下载多个文件

代码语言:javascript
复制
use reqwest;
use tokio::io::AsyncWriteExt;
use tokio::fs::File;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://example.com/file1.txt",
        "https://example.com/file2.txt",
        "https://example.com/file3.txt",
    ];
    
    // 并发下载
    let tasks: Vec<_> = urls
        .iter()
        .map(|url| download_file(url))
        .collect();
    
    let results = futures::future::join_all(tasks).await;
    
    for (url, result) in urls.iter().zip(results) {
        match result {
            Ok(_) => println!("✅ {} 下载成功", url),
            Err(e) => println!("❌ {} 下载失败:{}", url, e),
        }
    }
    
    Ok(())
}

async fn download_file(url: &str) -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let response = client.get(url).send().await?;
    let bytes = response.bytes().await?;
    
    let filename = url.split('/').last().unwrap();
    let mut file = File::create(filename).await?;
    file.write_all(&bytes).await?;
    
    Ok(())
}

超时和重试

代码语言:javascript
复制
use tokio::time::{timeout, Duration, sleep};
use std::time::Instant;

#[tokio::main]
async fn main() {
    // 超时处理
    match timeout(Duration::from_secs(), async {
        sleep(Duration::from_secs()).await;
        "完成"
    }).await {
        Ok(result) => println!("结果:{}", result),
        Err(_) => println!("超时了!"),
    }
    
    // 重试逻辑
    let result = retry(, || async {
        // 模拟可能失败的操作
        if rand::random::<u32>() %  ==  {
            Err("随机失败")
        } else {
            Ok("成功")
        }
    }).await;
    
    println!("重试结果:{:?}", result);
}

async fn retry<F, T, E>(max_retries: u32, mut f: F) -> Result<T, E>
where
    F: FnMut() -> futures::future::BoxFuture<'static, Result<T, E>>,
{
    let mut last_error = None;
    
    for i in ..=max_retries {
        match f().await {
            Ok(result) => return Ok(result),
            Err(e) => {
                last_error = Some(e);
                if i < max_retries {
                    println!("重试 {}/{}", i + , max_retries);
                }
            }
        }
    }
    
    Err(last_error.unwrap())
}

🐛 常见坑点

坑点 1:阻塞异步运行时

代码语言:javascript
复制
#[tokio::main]
async fn main() {
    // ❌ 这样会阻塞整个运行时
    std::thread::sleep(std::time::Duration::from_secs());
    
    // ✅ 用异步的 sleep
    tokio::time::sleep(std::time::Duration::from_secs()).await;
}

问题: 同步的 sleep 会阻塞整个线程,其他任务都无法执行。

坑点 2:在同步上下文中调用异步代码

代码语言:javascript
复制
fn sync_function() {
    // ❌ 不能直接调用异步函数
    let result = async_function().await;  // 编译错误
}

// ✅ 解决方案 1:用 tokio::block_on
fn sync_function() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let result = rt.block_on(async_function());
}

// ✅ 解决方案 2:把函数也改成异步
async fn sync_function() {
    let result = async_function().await;
}

坑点 3:忘记 await

代码语言:javascript
复制
async fn fetch_data() -> String {
    "data".to_string()
}

#[tokio::main]
async fn main() {
    // ❌ 忘记 await
    let data = fetch_data();  // 这是 Future,不是 String!
    
    // ✅ 正确
    let data = fetch_data().await;
}

坑点 4:任务取消问题

代码语言:javascript
复制
#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_secs()).await;
        println!("任务完成");
    });
    
    // 取消任务
    handle.abort();
    
    // 等待任务结束(会被取消)
    let _ = handle.await;
}

注意: abort() 不会立即停止任务,只是标记为取消,任务会在下一个 .await 点被取消。

坑点 5:运行时选择错误

代码语言:javascript
复制
// ❌ 在库代码中创建运行时
pub fn my_library_function() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        // ...
    });
}

// ✅ 库代码应该是异步的
pub async fn my_library_function() {
    // ...
}

记住:

  • 应用层:创建运行时(#[tokio::main]
  • 库代码:只提供异步函数,让调用者决定运行时

🎯 实战案例

案例 1:异步 Web 爬虫

代码语言:javascript
复制
use reqwest;
use scraper::{Html, Selector};
use tokio::sync::mpsc;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel();
    let visited = Arc::new(Mutex::new(HashSet::new()));
    
    // 初始 URL
    tx.send("https://example.com".to_string()).await.unwrap();
    
    // 工作线程
    let mut handles = vec![];
    for _ in .. {
        let tx = tx.clone();
        let visited = visited.clone();
        
        let handle = tokio::spawn(async move {
            while let Some(url) = rx.recv().await {
                if visited.lock().await.contains(&url) {
                    continue;
                }
                visited.lock().await.insert(url.clone());
                
                match fetch_and_parse(&url).await {
                    Ok(links) => {
                        for link in links {
                            let _ = tx.send(link).await;
                        }
                    }
                    Err(e) => eprintln!("错误 {}: {}", url, e),
                }
            }
        });
        
        handles.push(handle);
    }
    
    // 等待所有任务完成
    for handle in handles {
        let _ = handle.await;
    }
}

async fn fetch_and_parse(url: &str) -> Result<Vec<String>, Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let response = client.get(url).send().await?;
    let html = response.text().await?;
    
    let document = Html::parse_document(&html);
    let selector = Selector::parse("a").unwrap();
    
    let links: Vec<String> = document
        .select(&selector)
        .filter_map(|element| element.value().attr("href"))
        .map(|href| url.join(href).unwrap_or_else(|_| url.to_string()))
        .collect();
    
    Ok(links)
}

案例 2:WebSocket 聊天服务器

代码语言:javascript
复制
use futures::{stream::{SplitSink, SplitStream}, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use std::collections::HashMap;
use std::sync::Arc;

type Tx = mpsc::UnboundedSender<String>;
type Rx = mpsc::UnboundedReceiver<String>;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(HashMap::new()));
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    
    println!("WebSocket 服务器监听在 127.0.0.1:8080");
    
    loop {
        let (socket, addr) = listener.accept().await.unwrap();
        let state = state.clone();
        
        tokio::spawn(async move {
            handle_connection(socket, state, addr).await;
        });
    }
}

async fn handle_connection(
    socket: TcpStream,
    state: Arc<Mutex<HashMap<String, Tx>>>,
    addr: std::net::SocketAddr,
) {
    // WebSocket 握手逻辑(简化)
    // 实际项目用 tokio-tungstenite
    
    let (tx, rx) = mpsc::unbounded_channel();
    
    // 注册客户端
    state.lock().await.insert(addr.to_string(), tx);
    
    // 处理消息...
}

案例 3:定时任务调度器

代码语言:javascript
复制
use tokio::time::{interval, Duration};
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel();
    
    // 定时任务 1:每秒执行
    let tx1 = tx.clone();
    tokio::spawn(async move {
        let mut int = interval(Duration::from_secs());
        loop {
            int.tick().await;
            let _ = tx1.send("每秒任务");
        }
    });
    
    // 定时任务 2:每 5 秒执行
    let tx2 = tx.clone();
    tokio::spawn(async move {
        let mut int = interval(Duration::from_secs());
        loop {
            int.tick().await;
            let _ = tx2.send("每 5 秒任务");
        }
    });
    
    // 接收并处理
    tokio::spawn(async move {
        while let Ok(msg) = rx.recv().await {
            println!("收到:{}", msg);
        }
    });
    
    // 运行 30 秒
    tokio::time::sleep(Duration::from_secs()).await;
}

🧠 思维导图

34-异步生态
34-异步生态

📝 小结

金句回顾:

  1. 运行时是异步的基础——选对运行时很重要
  2. Tokio 最流行——生态好,功能全
  3. async-std 最像 std——学习成本低
  4. 别阻塞运行时——用异步的 I/O 和 sleep
  5. 库代码要异步——让应用层决定运行时

下篇预告:

咱们已经学了 unsafe Rust 的基础,但 Rust 的 unsafe 可不止这些。下篇深入聊聊FFI(外部函数接口),看看怎么和 C 代码互操作,调用系统 API!

🔗 参考资料

  • Tokio 官方文档
  • async-std
  • Rust Async Book
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Larry的Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步生态
    • 🎬 引入
    • 📌 核心概念
      • 什么是异步运行时?
      • 主要运行时对比
      • Tokio 的核心组件
      • async-std 的特点
    • 💻 代码示例
      • 基础示例:Tokio Hello World
      • Tokio 多任务并发
      • Tokio TCP 服务器
      • async-std 示例
      • 异步 HTTP 客户端(reqwest)
      • 并发下载多个文件
      • 超时和重试
    • 🐛 常见坑点
      • 坑点 1:阻塞异步运行时
      • 坑点 2:在同步上下文中调用异步代码
      • 坑点 3:忘记 await
      • 坑点 4:任务取消问题
      • 坑点 5:运行时选择错误
    • 🎯 实战案例
      • 案例 1:异步 Web 爬虫
      • 案例 2:WebSocket 聊天服务器
      • 案例 3:定时任务调度器
    • 🧠 思维导图
    • 📝 小结
    • 🔗 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档