
"线程是雇佣员工,异步是让自己变成章鱼——八只手同时干活"
前面我们学了线程并发,现在来思考一个问题:
如果你想同时处理 10000 个网络连接,用线程会怎样?
这显然不现实。
那怎么办?异步编程登场了!
生活化类比:
想象你在等外卖:
同步方式(线程阻塞):
你:点外卖
你:站在门口等(阻塞)
30 分钟后:外卖到了
你:吃饭
异步方式(async/await):
你:点外卖
你:继续打游戏(不阻塞)
30 分钟后:外卖到了(回调/通知)
你:暂停游戏,吃饭
异步的核心思想是:在等待 I/O 的时候,去做别的事情,而不是干等着。
Rust 的异步编程基于 Future 和 async/await 语法。今天我们就来学习这些概念,以及背后的 Pin 和 Waker 机制。
Future 是一个可能还没完成的计算。它代表一个将来会有结果的值。
类比:
.await = 等外卖送到Future 有三个状态:
async 和 await 是 Rust 的语法糖,让异步代码写起来像同步代码。
// 同步代码
fn read_file() -> String {
// 阻塞读取
std::fs::read_to_string("file.txt").unwrap()
}
// 异步代码
async fn read_file() -> String {
// 非阻塞读取
tokio::fs::read_to_string("file.txt").await.unwrap()
}
关键区别:
async fn 返回一个 Future,不会立即执行.await 会"等待"Future 完成,但不阻塞线程Future 本身不会自己运行,需要异步运行时来驱动。
类比:
常见的 Rust 异步运行时:
Pin 是 Rust 异步编程中最难理解的概念之一。
为什么需要 Pin?
有些类型在内存中的位置是固定的,不能移动。比如:
类比:
Waker 是一个通知机制。当 Future 准备好时,用它来通知运行时。
工作流程:
Poll::Pending类比:
use std::time::Duration;
// 异步函数
async fn say_hello() {
println!("你好!");
}
// 带返回值的异步函数
async fn add(a: i32, b: i32) -> i32 {
a + b
}
// 异步函数可以 await 其他异步函数
async fn greet_and_add() -> i32 {
say_hello().await;
add(, ).await
}
#[tokio::main]
async fn main() {
greet_and_add().await;
}
注意: async fn 返回的是 Future,需要 .await 或运行时来执行。
use tokio::time::{sleep, Duration};
async fn task1() {
println!("任务 1 开始");
sleep(Duration::from_millis()).await;
println!("任务 1 完成");
}
async fn task2() {
println!("任务 2 开始");
sleep(Duration::from_millis()).await;
println!("任务 2 完成");
}
#[tokio::main]
async fn main() {
// 方式 1:顺序执行(总耗时 250ms)
// task1().await;
// task2().await;
// 方式 2:并发执行(总耗时 150ms)
tokio::join!(task1(), task2());
// 方式 3:选择第一个完成的
// tokio::select! {
// _ = task1() => println!("任务 1 先完成"),
// _ = task2() => println!("任务 2 先完成"),
// }
}
use tokio::time::{sleep, Duration};
async fn fetch_data(id: i32) -> String {
sleep(Duration::from_millis()).await;
format!("数据 {}", id)
}
#[tokio::main]
async fn main() {
// 并发获取多个数据
let results = tokio::join!(
fetch_data(),
fetch_data(),
fetch_data(),
);
println!("{:?}", results);
// 使用 futures crate
use futures::future::{join_all, try_join_all};
let futures = vec![
fetch_data(),
fetch_data(),
fetch_data(),
];
let results = join_all(futures).await;
println!("{:?}", results);
}
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
async fn read_file_async() -> std::io::Result<()> {
let file = File::open("Cargo.toml").await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
println!("{}", line);
}
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = read_file_async().await {
eprintln!("读取文件失败:{}", e);
}
}
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
// 一个简单的计时器 Future
struct Timer {
when: Instant,
}
impl Timer {
fn new(duration: Duration) -> Self {
Timer {
when: Instant::now() + duration,
}
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
println!("时间到!");
Poll::Ready(())
} else {
// 注册 Waker,时间到时通知
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("开始计时 1 秒...");
Timer::new(Duration::from_secs()).await;
println!("计时完成!");
}
注意: 上面的实现会 busy-wait(不断轮询),实际应该用定时器注册真正的事件通知。
use std::pin::Pin;
// 自引用结构体
struct SelfReferential {
data: String,
ptr: *const u8, // 指向 data 的指针
}
impl SelfReferential {
fn new(data: String) -> Self {
let ptr = data.as_ptr();
SelfReferential { data, ptr }
}
fn get_ptr(&self) -> *const u8 {
self.ptr
}
}
// ❌ 错误:移动后指针失效
fn broken_example() {
let mut sr = SelfReferential::new("hello".to_string());
let ptr1 = sr.get_ptr();
// 移动 sr
let sr2 = sr; // sr 被移动了
// ptr1 现在指向无效的内存!
// println!("{:?}", *ptr1); // ❌ 未定义行为
}
// ✅ 正确:用 Pin 固定
fn fixed_example() {
use std::pin::Pin;
use std::boxed::Box;
// 把结构体放在堆上并固定
let mut pinned = Box::pin(SelfReferential::new("hello".to_string()));
let ptr = pinned.get_ptr();
// 即使移动 pinned,指针仍然有效
let pinned2 = pinned;
// 安全:结构体没有被移动
unsafe {
println!("{:?}", *ptr);
}
}
async fn fetch_data() -> String {
"data".to_string()
}
#[tokio::main]
async fn main() {
// ❌ 错误:忘记 await
// let data = fetch_data(); // data 是 Future,不是 String
// ✅ 正确
let data = fetch_data().await;
println!("{}", data);
}
fn sync_function() {
// ❌ 错误:不能在同步函数中 await
// let data = fetch_data().await;
}
// ✅ 正确:用运行时阻塞执行
fn sync_function() {
let rt = tokio::runtime::Runtime::new().unwrap();
let data = rt.block_on(fetch_data());
}
#[tokio::main]
async fn main() {
// ❌ 错误:阻塞整个运行时
// std::thread::sleep(Duration::from_secs(1));
// ✅ 正确:用异步 sleep
// tokio::time::sleep(Duration::from_secs(1)).await;
// 如果必须用阻塞操作,用 spawn_blocking
tokio::task::spawn_blocking(|| {
std::thread::sleep(Duration::from_secs());
}).await.unwrap();
}
use std::rc::Rc;
async fn bad_example() {
// ❌ Rc 不是 Send,不能跨线程
// let data = Rc::new(5);
// tokio::spawn(async move {
// println!("{}", data);
// });
// ✅ 用 Arc
use std::sync::Arc;
let data = Arc::new();
tokio::spawn(async move {
println!("{}", data);
}).await.unwrap();
}
use reqwest;
use tokio;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
response.text().await
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://example.com",
"https://example.org",
"https://example.net",
];
// 并发请求所有 URL
let futures: Vec<_> = urls
.iter()
.map(|url| fetch_url(url))
.collect();
let results = futures::future::join_all(futures).await;
for (url, result) in urls.iter().zip(results) {
match result {
Ok(content) => println!("{}: {} bytes", url, content.len()),
Err(e) => println!("{} 失败:{}", url, e),
}
}
Ok(())
}
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器监听 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("新连接:{}", addr);
// 为每个连接 spawn 一个任务
tokio::spawn(async move {
let (mut reader, mut writer) = socket.into_split();
let mut buf_reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match buf_reader.read_line(&mut line).await {
Ok() => break, // 连接关闭
Ok(_) => {
let response = format!("收到:{}", line);
writer.write_all(response.as_bytes()).await.unwrap();
}
Err(e) => {
eprintln!("读取错误:{}", e);
break;
}
}
}
});
}
}
use tokio::time::{timeout, Duration};
async fn slow_operation() -> String {
tokio::time::sleep(Duration::from_secs()).await;
"完成".to_string()
}
#[tokio::main]
async fn main() {
// 设置 2 秒超时
match timeout(Duration::from_secs(), slow_operation()).await {
Ok(result) => println!("操作完成:{}", result),
Err(_) => println!("操作超时!"),
}
}
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(); // 缓冲区大小 32
// 生产者
let producer = tokio::spawn(async move {
for i in .. {
tx.send(i).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis()).await;
}
});
// 消费者
let consumer = tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("收到:{}", value);
}
});
producer.await.unwrap();
consumer.await.unwrap();
}

spawn_blocking 处理阻塞操作。下篇预告: 异步基础学完了,但实际开发中用什么库?Tokio 和 async-std 有什么区别?如何选择?下篇我们深入异步生态,学习主流框架和实战案例!