首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在处理期货中的元素时中止中间期货::Stream::Stream并发

在处理期货中的元素时中止中间期货::Stream::Stream并发
EN

Stack Overflow用户
提问于 2022-10-21 18:32:46
回答 1查看 26关注 0票数 1

我试图“连接”多个对等点,并同时“处理”它们。对于单个进程(称为“任务”),这很好。但是,对于多个任务,一旦对等方处理了流程,我希望使用它。在下面的设置中,我有3个任务,当一个对等点完成后,我希望第二个任务为所有其他对等点中止。这应该包括来自create_new_conn_fut未来的任何未来同行。

代码语言:javascript
复制
use futures::stream::StreamExt;
use rand::Rng;

pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(duration).await;
    println!("task #{} done for {}", task_id, peer);
}

#[tokio::main]
async fn main() {
    let peers = vec!["peer A", "peer B", "peer C"];
    let peers = futures::stream::iter(peers);

    let (tx, rx) = tokio::sync::mpsc::channel(100);

    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
    let rx = peers.chain(rx);
    
    let handle_conn_fut = rx.for_each_concurrent(0,
        |peer| async move {
            let mut rng = rand::thread_rng();

            println!("connecting to {}", peer);
            process(peer, core::time::Duration::from_secs(1), "1").await;

            process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;

            process(peer, core::time::Duration::from_secs(1), "3").await;
        }
    );
    
    let create_new_conn_fut = async move {
        for peer in ["peer D", "peer E"] {
            tx.send(peer).await.unwrap();
        }
    };

    // awaits all futures in parallell
    futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

产出:

代码语言:javascript
复制
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E

我希望一旦一个对等点完成了任务2,并将所有未来的对等点重定向到只执行任务1和任务3,那么任务2就会被中止。

为了说明这一点,我有以下几点

代码语言:javascript
复制
     A                B           ....          E
     ↓                ↓                         ↓
async task #1    async task #1             async task #1
     ↓                ↓                         ↓
async task #2    async task #2             async task #2
     ↓                ↓                         ↓
async task #3    async task #3             async task #3
     ↓                ↓                         ↓
    done             done                      done

我想短路async task #1一旦任何一个对等(A)已经完成它.所以考虑到那个前任。B首先完成它--它看起来如下:

代码语言:javascript
复制
     A                B           ....          E          ....           F (future peer)
     ↓                ↓                         ↓                         ↓
async task #1    async task #1             async task #1             async task #1 
     ↓                ↓                         ↓                         ↓
     ↓           async task #2                  ↓                         ↓
     ↓                ↓                         ↓                         ↓
async task #3    async task #3             async task #3             async task #3
     ↓                ↓                         ↓                         ↓
    done             done                      done                      done

所以我想要的输出是:

代码语言:javascript
复制
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C    <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E

我已经调查过futures::future::AbortHandle,但我认为这只是针对单一的未来--因为futures::stream::AbortRegistration没有克隆人的特性?

一个人将如何实现这样的事情?

EN

回答 1

Stack Overflow用户

发布于 2022-10-21 21:22:18

一种方法是使用带有分支的tokio::select来检查任务是否已完成或是否已被取消:

代码语言:javascript
复制
use futures::stream::StreamExt;
use rand::Rng;
use futures::future;

pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(duration).await;
    println!("task #{} done for {}", task_id, peer);
}

#[tokio::main]
async fn main() {
    let peers = vec!["peer A", "peer B", "peer C"];
    let peers = futures::stream::iter(peers);

    let (tx, rx) = tokio::sync::mpsc::channel(100);

    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
    let rx = peers.chain(rx);

    let notify = std::sync::Arc::new(tokio::sync::Notify::new());
    let done_with_task_2 = std::rc::Rc::new(std::cell::RefCell::new(false));
    
    let handle_conn_fut = rx.for_each_concurrent(0, |peer| {
        let notify = notify.clone();
        let done_with_task_2 = done_with_task_2.clone();

        async move {
            let mut rng = rand::thread_rng();

            println!("connecting to {}", peer);
            process(peer, core::time::Duration::from_secs(1), "1").await;

            // task #2
            tokio::select! {
                done = async { 
                    if *done_with_task_2.borrow() {
                        future::ready(()).await
                    } else {
                        future::pending().await
                    }
                } => {}
                process = process(peer, core::time::Duration::from_secs(rng.gen_range(5..10)), "2") => {
                    notify.notify_waiters();
                    done_with_task_2.replace(true);
                }
                cancel = notify.notified() => {}
            }

            process(peer, core::time::Duration::from_secs(1), "3").await;

            process(peer, core::time::Duration::from_secs(20), "4").await;
        }
    }
    );
    
    let create_new_conn_fut = async move {
        for peer in ["peer D", "peer E"] {
            tx.send(peer).await.unwrap();
        }
        // a new peer after 15 seconds
        tokio::time::sleep(core::time::Duration::from_secs(15)).await;
        tx.send("peer F").await.unwrap()
    };

    // awaits all futures in parallell
    futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

产出:

代码语言:javascript
复制
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer B
task #3 done for peer B
task #3 done for peer E
task #3 done for peer D
task #3 done for peer C
task #3 done for peer A
connecting to peer F
task #1 done for peer F
task #3 done for peer F
task #4 done for peer B
task #4 done for peer E
task #4 done for peer D
task #4 done for peer C
task #4 done for peer A
task #4 done for peer F

然而,代码变得稍微冗长。有更好的方法来写这个吗?

人们可能使用futures::future::Either或使用shared未来。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74157880

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档