我试图“连接”多个对等点,并同时“处理”它们。对于单个进程(称为“任务”),这很好。但是,对于多个任务,一旦对等方处理了流程,我希望使用它。在下面的设置中,我有3个任务,当一个对等点完成后,我希望第二个任务为所有其他对等点中止。这应该包括来自create_new_conn_fut未来的任何未来同行。
use futures::stream::StreamExt;
use rand::Rng;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let handle_conn_fut = rx.for_each_concurrent(0,
|peer| async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;
process(peer, core::time::Duration::from_secs(1), "3").await;
}
);
let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}产出:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E我希望一旦一个对等点完成了任务2,并将所有未来的对等点重定向到只执行任务1和任务3,那么任务2就会被中止。
为了说明这一点,我有以下几点
A B .... E
↓ ↓ ↓
async task #1 async task #1 async task #1
↓ ↓ ↓
async task #2 async task #2 async task #2
↓ ↓ ↓
async task #3 async task #3 async task #3
↓ ↓ ↓
done done done我想短路async task #1一旦任何一个对等(A)已经完成它.所以考虑到那个前任。B首先完成它--它看起来如下:
A B .... E .... F (future peer)
↓ ↓ ↓ ↓
async task #1 async task #1 async task #1 async task #1
↓ ↓ ↓ ↓
↓ async task #2 ↓ ↓
↓ ↓ ↓ ↓
async task #3 async task #3 async task #3 async task #3
↓ ↓ ↓ ↓
done done done done所以我想要的输出是:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E我已经调查过futures::future::AbortHandle,但我认为这只是针对单一的未来--因为futures::stream::AbortRegistration没有克隆人的特性?
一个人将如何实现这样的事情?
发布于 2022-10-21 21:22:18
一种方法是使用带有分支的tokio::select来检查任务是否已完成或是否已被取消:
use futures::stream::StreamExt;
use rand::Rng;
use futures::future;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let notify = std::sync::Arc::new(tokio::sync::Notify::new());
let done_with_task_2 = std::rc::Rc::new(std::cell::RefCell::new(false));
let handle_conn_fut = rx.for_each_concurrent(0, |peer| {
let notify = notify.clone();
let done_with_task_2 = done_with_task_2.clone();
async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
// task #2
tokio::select! {
done = async {
if *done_with_task_2.borrow() {
future::ready(()).await
} else {
future::pending().await
}
} => {}
process = process(peer, core::time::Duration::from_secs(rng.gen_range(5..10)), "2") => {
notify.notify_waiters();
done_with_task_2.replace(true);
}
cancel = notify.notified() => {}
}
process(peer, core::time::Duration::from_secs(1), "3").await;
process(peer, core::time::Duration::from_secs(20), "4").await;
}
}
);
let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
// a new peer after 15 seconds
tokio::time::sleep(core::time::Duration::from_secs(15)).await;
tx.send("peer F").await.unwrap()
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}产出:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer B
task #3 done for peer B
task #3 done for peer E
task #3 done for peer D
task #3 done for peer C
task #3 done for peer A
connecting to peer F
task #1 done for peer F
task #3 done for peer F
task #4 done for peer B
task #4 done for peer E
task #4 done for peer D
task #4 done for peer C
task #4 done for peer A
task #4 done for peer F然而,代码变得稍微冗长。有更好的方法来写这个吗?
人们可能使用futures::future::Either或使用shared未来。
https://stackoverflow.com/questions/74157880
复制相似问题