首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将自我传递到线程中

将自我传递到线程中
EN

Stack Overflow用户
提问于 2022-04-13 18:59:38
回答 1查看 222关注 0票数 2

我有以下代码:

代码语言:javascript
复制
use std::error::Error;
use std::collections::HashMap;
use std::fmt::{Display, Formatter, Error as FmtError};
use std::sync::{Arc, Mutex};
use std::thread;

use amiquip::{ConsumerMessage};
use serde::Deserialize;

use crate::rabbit_mq;
use crate::rabbit_mq::RmqChannel;
use super::agv::*;

pub type AGVControllerError = Box<dyn Error + Send + Sync + 'static>;
pub type AGVControllerResult<T> = Result<T, AGVControllerError>;


pub struct AGVController<'a> {
    agv_map: HashMap<&'a str, AGV>,
    //rmq: &'a mut rabbit_mq::RMQ
    rmq: Arc<Mutex<rabbit_mq::RMQ>>
}

impl<'a> AGVController<'a> {
    // Create a new AGV Controller
    //pub fn new(rmq: &'a mut rabbit_mq::RMQ) -> Self {
    pub fn new(rmq: Arc<Mutex<rabbit_mq::RMQ>>) -> Self {
        Self {
            agv_map: HashMap::new(),
            rmq
        }
    }

    // Listen for messages related to the agv controller.
    pub fn listen(&'static mut self, routing_key: &'static str) -> AGVControllerResult<()> {
        let mut rmq_channel = self.rmq.lock().unwrap().create_channel()?;

        thread::spawn(move || {
            //TODO: need to handle any error returned here
            //listen_on_consumer(routing_key, rmq_channel).unwrap();
            let consumer = rmq_channel.create_consumer(routing_key).unwrap();
            let test = self.agv_map.get("rr");
            for (i, message) in consumer.receiver().iter().enumerate() {
                match message {
                    ConsumerMessage::Delivery(delivery) => {
                        let body = String::from_utf8_lossy(&delivery.body);
                        println!("({:>3}) Received [{}]", i, body);
                        consumer.ack(delivery).unwrap();
                    }
                    other => {
                        println!("Consumer ended: {:?}", other);
                        break;
                    }
                }
            }
        });

        return Ok(());
    }
}

因此,在listen函数中,我从rmq创建了一个消费者,我只是在消息出现时读取消息。一切都很好。但是,当消息被读取时,我需要访问结构信息、数据和函数。让自己的参数使用“静态”似乎可以修复它。这可以吗?静态意味着只要程序存在,它就会保持活动,对吗?这似乎是正确的,因为我希望只要线程运行,它就会活着。我只想确保我做的正确。

更新:

所以我试着克隆自我并在线程中使用它,但是我得到了:

代码语言:javascript
复制
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/agv/agv_controller.rs:47:35
    |
47  |         let mut self_clone = self.clone();
    |                                   ^^^^^
    |
note: first, the lifetime cannot outlive the lifetime `'a` as defined here...
   --> src/agv/agv_controller.rs:35:6
    |
35  | impl<'a> AGVController<'a> {
    |      ^^
note: ...so that the types are compatible
   --> src/agv/agv_controller.rs:47:35
    |
47  |         let mut self_clone = self.clone();
    |                                   ^^^^^
    = note: expected `&agv_controller::AGVController<'_>`
               found `&agv_controller::AGVController<'a>`
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/agv/agv_controller.rs:48:23: 66:10]` will meet its required lifetime bounds...
   --> src/agv/agv_controller.rs:48:9
    |
48  |         thread::spawn(move || {
    |         ^^^^^^^^^^^^^
note: ...that is required by this bound
   --> /Users/conordowney/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/mod.rs:646:15
    |
646 |     F: Send + 'static,
    |               ^^^^^^^
EN

回答 1

Stack Overflow用户

发布于 2022-04-13 19:52:29

为了让它在我的机器上编译,我不得不把你的程序砍掉(为了将来的参考,一个可以放在操场上的minimal reproducable example是非常有用的),但是我想我找到了根本原因。

它不喜欢您试图将Mutex中的某个内容传递到另一个线程,因此您应该克隆Arc (其主要目的是克隆),然后将其锁定在线程中。

我不知道AGV是什么,但是看起来您只是在从地图中读取,所以希望解包/deref允许它安全地移动线程。如果您需要完整的地图,您应该能够替换对.clone()的调用。

代码语言:javascript
复制
use std::error::Error;
use std::collections::HashMap;
use std::fmt::{Display, Formatter, Error as FmtError};
use std::sync::{Arc, Mutex};
use std::thread;

use amiquip::{ConsumerMessage};
use serde::Deserialize;

use crate::rabbit_mq{self, RmqChannel};
use super::agv::*;

pub type AGVControllerError = Box<dyn Error + Send + Sync + 'static>;
pub type AGVControllerResult<T> = Result<T, AGVControllerError>;


pub struct AGVController<'a> {
    agv_map: HashMap<&'a str, AGV>,
    rmq: Arc<Mutex<rabbit_mq::RMQ>>
}

impl<'a> AGVController<'a> {
    // Create a new AGV Controller
    pub fn new(rmq: Arc<Mutex<rabbit_mq::RMQ>>) -> Self {
        Self {
            agv_map: HashMap::new(),
            rmq
        }
    }

    // Listen for messages related to the agv controller.
    pub fn listen(&self, routing_key: &'static str) -> AGVControllerResult<()> {
        //let mut rmq_channel = self.rmq.lock().unwrap().create_channel()?;
        let rmq_channel_mtx = self.rmq.clone();
        let rr = *self.agv_map.get("rr").unwrap()

        thread::spawn(move || {
            let rmq_channel = rmq_channel_mtx.lock().unwrap();
            let consumer = rmq_channel.create_consumer(routing_key).unwrap();

            let test = rr;

            for (i, message) in consumer.receiver().iter().enumerate() {
                match message {
                    ConsumerMessage::Delivery(delivery) => {
                        let body = String::from_utf8_lossy(&delivery.body);
                        println!("({:>3}) Received [{}]", i, body);
                        consumer.ack(delivery).unwrap();
                    }
                    other => {
                        println!("Consumer ended: {:?}", other);
                        break;
                    }
                }
            }
        });

        return Ok(());
    }
}

附带注意,除非您真的需要一个&str,否则我发现在像HashMap这样的结构中处理拥有的String通常更容易,而且您不会到处都需要那些生存期注释。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71862627

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档