首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >33-Rust 教程 - 异步编程基础

33-Rust 教程 - 异步编程基础

作者头像
LarryLan
发布2026-06-03 18:27:51
发布2026-06-03 18:27:51
00
举报

异步编程基础

"线程是雇佣员工,异步是让自己变成章鱼——八只手同时干活"

🎬 引入

前面我们学了线程并发,现在来思考一个问题:

如果你想同时处理 10000 个网络连接,用线程会怎样?

  • 10000 个线程
  • 每个线程占用栈内存(默认 2MB)
  • 总共需要 20GB 内存!
  • 操作系统调度开销巨大

这显然不现实。

那怎么办?异步编程登场了!

生活化类比:

想象你在等外卖:

同步方式(线程阻塞):

代码语言:javascript
复制
你:点外卖
你:站在门口等(阻塞)
30 分钟后:外卖到了
你:吃饭

异步方式(async/await):

代码语言:javascript
复制
你:点外卖
你:继续打游戏(不阻塞)
30 分钟后:外卖到了(回调/通知)
你:暂停游戏,吃饭

异步的核心思想是:在等待 I/O 的时候,去做别的事情,而不是干等着

Rust 的异步编程基于 Futureasync/await 语法。今天我们就来学习这些概念,以及背后的 Pin 和 Waker 机制。

📌 核心概念

什么是 Future?

Future 是一个可能还没完成的计算。它代表一个将来会有结果的值。

类比:

  • Future = 外卖订单
  • .await = 等外卖送到
  • 结果 = 热腾腾的饭菜

Future 有三个状态:

  1. Pending:还没完成(外卖还在路上)
  2. Ready:完成了,有结果(外卖到了)
  3. Cancelled:取消了(你退单了)

async/await 是什么?

asyncawait 是 Rust 的语法糖,让异步代码写起来像同步代码。

代码语言:javascript
复制
// 同步代码
fn read_file() -> String {
    // 阻塞读取
    std::fs::read_to_string("file.txt").unwrap()
}

// 异步代码
async fn read_file() -> String {
    // 非阻塞读取
    tokio::fs::read_to_string("file.txt").await.unwrap()
}

关键区别:

  • async fn 返回一个 Future,不会立即执行
  • .await 会"等待"Future 完成,但不阻塞线程

异步运行时(Async Runtime)

Future 本身不会自己运行,需要异步运行时来驱动。

类比:

  • Future = 外卖订单
  • 异步运行时 = 外卖平台调度系统

常见的 Rust 异步运行时:

  • Tokio:最流行,功能全面
  • async-std:标准库的异步版本
  • smol:轻量级

Pin 是什么?

Pin 是 Rust 异步编程中最难理解的概念之一。

为什么需要 Pin?

有些类型在内存中的位置是固定的,不能移动。比如:

  • 自引用结构体(结构体里有指向自己的指针)
  • Future 内部可能有自引用

类比:

  • 普通值 = 可以随便搬家的家具
  • Pin 的值 = 固定在地上的保险箱(不能移动)

Waker 是什么?

Waker 是一个通知机制。当 Future 准备好时,用它来通知运行时。

工作流程:

  1. Future 还没准备好,返回 Poll::Pending
  2. Future 注册一个 Waker
  3. 当数据准备好时,调用 Waker 通知运行时
  4. 运行时重新调度这个 Future

类比:

  • Waker = 外卖员的电话
  • 外卖到了,打电话通知你
  • 你收到通知,去取外卖

💻 代码示例

基础 async/await

代码语言:javascript
复制
use std::time::Duration;

// 异步函数
async fn say_hello() {
    println!("你好!");
}

// 带返回值的异步函数
async fn add(a: i32, b: i32) -> i32 {
    a + b
}

// 异步函数可以 await 其他异步函数
async fn greet_and_add() -> i32 {
    say_hello().await;
    add(, ).await
}

#[tokio::main]
async fn main() {
    greet_and_add().await;
}

注意: async fn 返回的是 Future,需要 .await 或运行时来执行。

并发执行多个 Future

代码语言:javascript
复制
use tokio::time::{sleep, Duration};

async fn task1() {
    println!("任务 1 开始");
    sleep(Duration::from_millis()).await;
    println!("任务 1 完成");
}

async fn task2() {
    println!("任务 2 开始");
    sleep(Duration::from_millis()).await;
    println!("任务 2 完成");
}

#[tokio::main]
async fn main() {
    // 方式 1:顺序执行(总耗时 250ms)
    // task1().await;
    // task2().await;
    
    // 方式 2:并发执行(总耗时 150ms)
    tokio::join!(task1(), task2());
    
    // 方式 3:选择第一个完成的
    // tokio::select! {
    //     _ = task1() => println!("任务 1 先完成"),
    //     _ = task2() => println!("任务 2 先完成"),
    // }
}

Future 的组合

代码语言:javascript
复制
use tokio::time::{sleep, Duration};

async fn fetch_data(id: i32) -> String {
    sleep(Duration::from_millis()).await;
    format!("数据 {}", id)
}

#[tokio::main]
async fn main() {
    // 并发获取多个数据
    let results = tokio::join!(
        fetch_data(),
        fetch_data(),
        fetch_data(),
    );
    
    println!("{:?}", results);
    
    // 使用 futures crate
    use futures::future::{join_all, try_join_all};
    
    let futures = vec![
        fetch_data(),
        fetch_data(),
        fetch_data(),
    ];
    
    let results = join_all(futures).await;
    println!("{:?}", results);
}

异步 I/O 示例

代码语言:javascript
复制
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};

async fn read_file_async() -> std::io::Result<()> {
    let file = File::open("Cargo.toml").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    
    while let Some(line) = lines.next_line().await? {
        println!("{}", line);
    }
    
    Ok(())
}

#[tokio::main]
async fn main() {
    if let Err(e) = read_file_async().await {
        eprintln!("读取文件失败:{}", e);
    }
}

实现一个简单的 Future

代码语言:javascript
复制
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// 一个简单的计时器 Future
struct Timer {
    when: Instant,
}

impl Timer {
    fn new(duration: Duration) -> Self {
        Timer {
            when: Instant::now() + duration,
        }
    }
}

impl Future for Timer {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.when {
            println!("时间到!");
            Poll::Ready(())
        } else {
            // 注册 Waker,时间到时通知
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    println!("开始计时 1 秒...");
    Timer::new(Duration::from_secs()).await;
    println!("计时完成!");
}

注意: 上面的实现会 busy-wait(不断轮询),实际应该用定时器注册真正的事件通知。

Pin 的使用示例

代码语言:javascript
复制
use std::pin::Pin;

// 自引用结构体
struct SelfReferential {
    data: String,
    ptr: *const u8,  // 指向 data 的指针
}

impl SelfReferential {
    fn new(data: String) -> Self {
        let ptr = data.as_ptr();
        SelfReferential { data, ptr }
    }
    
    fn get_ptr(&self) -> *const u8 {
        self.ptr
    }
}

// ❌ 错误:移动后指针失效
fn broken_example() {
    let mut sr = SelfReferential::new("hello".to_string());
    let ptr1 = sr.get_ptr();
    
    // 移动 sr
    let sr2 = sr;  // sr 被移动了
    
    // ptr1 现在指向无效的内存!
    // println!("{:?}", *ptr1);  // ❌ 未定义行为
}

// ✅ 正确:用 Pin 固定
fn fixed_example() {
    use std::pin::Pin;
    use std::boxed::Box;
    
    // 把结构体放在堆上并固定
    let mut pinned = Box::pin(SelfReferential::new("hello".to_string()));
    let ptr = pinned.get_ptr();
    
    // 即使移动 pinned,指针仍然有效
    let pinned2 = pinned;
    
    // 安全:结构体没有被移动
    unsafe {
        println!("{:?}", *ptr);
    }
}

🐛 常见坑点

坑点 1:忘记 await

代码语言:javascript
复制
async fn fetch_data() -> String {
    "data".to_string()
}

#[tokio::main]
async fn main() {
    // ❌ 错误:忘记 await
    // let data = fetch_data();  // data 是 Future,不是 String
    
    // ✅ 正确
    let data = fetch_data().await;
    println!("{}", data);
}

坑点 2:在同步上下文中调用异步函数

代码语言:javascript
复制
fn sync_function() {
    // ❌ 错误:不能在同步函数中 await
    // let data = fetch_data().await;
}

// ✅ 正确:用运行时阻塞执行
fn sync_function() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let data = rt.block_on(fetch_data());
}

坑点 3:阻塞异步运行时

代码语言:javascript
复制
#[tokio::main]
async fn main() {
    // ❌ 错误:阻塞整个运行时
    // std::thread::sleep(Duration::from_secs(1));
    
    // ✅ 正确:用异步 sleep
    // tokio::time::sleep(Duration::from_secs(1)).await;
    
    // 如果必须用阻塞操作,用 spawn_blocking
    tokio::task::spawn_blocking(|| {
        std::thread::sleep(Duration::from_secs());
    }).await.unwrap();
}

坑点 4:Send trait 问题

代码语言:javascript
复制
use std::rc::Rc;

async fn bad_example() {
    // ❌ Rc 不是 Send,不能跨线程
    // let data = Rc::new(5);
    // tokio::spawn(async move {
    //     println!("{}", data);
    // });
    
    // ✅ 用 Arc
    use std::sync::Arc;
    let data = Arc::new();
    tokio::spawn(async move {
        println!("{}", data);
    }).await.unwrap();
}

🎯 实战案例

案例 1:并发 HTTP 请求

代码语言:javascript
复制
use reqwest;
use tokio;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    response.text().await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://example.com",
        "https://example.org",
        "https://example.net",
    ];
    
    // 并发请求所有 URL
    let futures: Vec<_> = urls
        .iter()
        .map(|url| fetch_url(url))
        .collect();
    
    let results = futures::future::join_all(futures).await;
    
    for (url, result) in urls.iter().zip(results) {
        match result {
            Ok(content) => println!("{}: {} bytes", url, content.len()),
            Err(e) => println!("{} 失败:{}", url, e),
        }
    }
    
    Ok(())
}

案例 2:异步 TCP 服务器

代码语言:javascript
复制
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器监听 127.0.0.1:8080");
    
    loop {
        let (socket, addr) = listener.accept().await?;
        println!("新连接:{}", addr);
        
        // 为每个连接 spawn 一个任务
        tokio::spawn(async move {
            let (mut reader, mut writer) = socket.into_split();
            let mut buf_reader = BufReader::new(reader);
            let mut line = String::new();
            
            loop {
                line.clear();
                match buf_reader.read_line(&mut line).await {
                    Ok() => break,  // 连接关闭
                    Ok(_) => {
                        let response = format!("收到:{}", line);
                        writer.write_all(response.as_bytes()).await.unwrap();
                    }
                    Err(e) => {
                        eprintln!("读取错误:{}", e);
                        break;
                    }
                }
            }
        });
    }
}

案例 3:带超时的异步操作

代码语言:javascript
复制
use tokio::time::{timeout, Duration};

async fn slow_operation() -> String {
    tokio::time::sleep(Duration::from_secs()).await;
    "完成".to_string()
}

#[tokio::main]
async fn main() {
    // 设置 2 秒超时
    match timeout(Duration::from_secs(), slow_operation()).await {
        Ok(result) => println!("操作完成:{}", result),
        Err(_) => println!("操作超时!"),
    }
}

案例 4:异步通道

代码语言:javascript
复制
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel();  // 缓冲区大小 32
    
    // 生产者
    let producer = tokio::spawn(async move {
        for i in .. {
            tx.send(i).await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis()).await;
        }
    });
    
    // 消费者
    let consumer = tokio::spawn(async move {
        while let Some(value) = rx.recv().await {
            println!("收到:{}", value);
        }
    });
    
    producer.await.unwrap();
    consumer.await.unwrap();
}

🧠 思维导图

33-异步编程基础
33-异步编程基础

📝 小结

  1. Future 代表将来的值,有 Pending 和 Ready 两种状态。
  2. async/await 是语法糖,让异步代码写起来像同步代码。
  3. 异步运行时驱动 Future,常见的有 Tokio 和 async-std。
  4. Pin 固定内存位置,防止自引用结构体失效。
  5. Waker 是通知机制,Future 准备好时通知运行时。
  6. 不要阻塞异步运行时,用 spawn_blocking 处理阻塞操作。

下篇预告: 异步基础学完了,但实际开发中用什么库?Tokio 和 async-std 有什么区别?如何选择?下篇我们深入异步生态,学习主流框架和实战案例!

🔗 参考资料

  • Rust Async Book
  • Tokio 文档
  • Future trait 文档
  • Pin 和 Unpin
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Larry的Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步编程基础
    • 🎬 引入
    • 📌 核心概念
      • 什么是 Future?
      • async/await 是什么?
      • 异步运行时(Async Runtime)
      • Pin 是什么?
      • Waker 是什么?
    • 💻 代码示例
      • 基础 async/await
      • 并发执行多个 Future
      • Future 的组合
      • 异步 I/O 示例
      • 实现一个简单的 Future
      • Pin 的使用示例
    • 🐛 常见坑点
      • 坑点 1:忘记 await
      • 坑点 2:在同步上下文中调用异步函数
      • 坑点 3:阻塞异步运行时
      • 坑点 4:Send trait 问题
    • 🎯 实战案例
      • 案例 1:并发 HTTP 请求
      • 案例 2:异步 TCP 服务器
      • 案例 3:带超时的异步操作
      • 案例 4:异步通道
    • 🧠 思维导图
    • 📝 小结
    • 🔗 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档