Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: advanced subscriber undeclartion deadlock #1685

Conversation

YuanYuYuan
Copy link
Contributor

Description

This PR solves the deadlock when undeclaring subscribers with the same keyexpr.

To Reproduce

Run the following modified zenoh-ext example.

use clap::{arg, Parser};
use zenoh::config::Config;
use zenoh_ext::AdvancedSubscriberBuilderExt;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
    // Initiate logging
    zenoh::init_log_from_env_or("error");

    let (config, key_expr) = parse_args();

    println!("Opening session...");
    let session = zenoh::open(config).await.unwrap();

    println!("Declaring AdvancedSubscriber on {}", key_expr);

    let subscriber = session
        .declare_subscriber(key_expr.clone())
        .subscriber_detection()
        .callback(|_sample| {
            println!("recv");
        })
        .await
        .unwrap();

    let subscriber2 = session
        .declare_subscriber(key_expr.clone())
        .callback(|_sample| {
            println!("recv");
        })
        .await
        .unwrap();

    println!("Undeclare1 ... ");
    subscriber.undeclare().await.unwrap();
    println!("done");
    println!("Undeclare2 ... ");
    subscriber2.undeclare().await.unwrap();
    println!("done");

    std::thread::sleep(std::time::Duration::from_secs(2));
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
    #[arg(short, long, default_value = "demo/example/**")]
    /// The key expression to subscribe onto.
    key: String,
    #[command(flatten)]
    common: CommonArgs,
}

fn parse_args() -> (Config, String) {
    let args = Args::parse();
    (args.common.into(), args.key)
}

Analysis

#4  0x0000555556212525 in std::sync::rwlock::RwLock<zenoh::api::session::SessionState>::read<zenoh::api::session::SessionState> (self=0x555557dee0e8)
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/sync/rwlock.rs:210
#5  0x0000555556612673 in zenoh::api::session::SessionInner::undeclare_liveliness (self=0x555557dee0c0, tid=4) at zenoh/src/api/session.rs:1854
#6  0x00005555565c1c87 in zenoh::api::liveliness::LivelinessToken::undeclare_impl (self=0x555557ddf5a0) at zenoh/src/api/liveliness.rs:362
#7  0x0000555556307d22 in zenoh::api::liveliness::{impl#9}::drop (self=0x555557ddf5a0) at zenoh/src/api/liveliness.rs:377
#8  0x00005555562e87b7 in core::ptr::drop_in_place<zenoh::api::liveliness::LivelinessToken> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#9  0x00005555560a1f53 in core::ptr::drop_in_place<core::option::Option<zenoh::api::liveliness::LivelinessToken>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#10 0x00005555560a1ad8 in core::ptr::drop_in_place<zenoh_ext::advanced_subscriber::State> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#11 0x00005555560a1f1b in core::ptr::drop_in_place<core::cell::UnsafeCell<zenoh_ext::advanced_subscriber::State>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#12 0x00005555560a1f6f in core::ptr::drop_in_place<std::sync::mutex::Mutex<zenoh_ext::advanced_subscriber::State>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#13 0x00005555560a66bf in alloc::sync::Arc<std::sync::mutex::Mutex<zenoh_ext::advanced_subscriber::State>, alloc::alloc::Global>::drop_slow<std::sync::mutex::Mutex<zenoh_ext::advanced_subscriber::State>, alloc::alloc::Global> (self=0x555557dfbdd0) at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:1752
#14 0x00005555560a21c2 in alloc::sync::{impl#33}::drop<std::sync::mutex::Mutex<zenoh_ext::advanced_subscriber::State>, alloc::alloc::Global> (self=0x555557dfbdd0)
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:2408
#15 0x00005555560a0e9b in core::ptr::drop_in_place<alloc::sync::Arc<std::sync::mutex::Mutex<zenoh_ext::advanced_subscriber::State>, alloc::alloc::Global>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#16 0x00005555560a137b in core::ptr::drop_in_place<zenoh_ext::advanced_subscriber::{impl#23}::new::{closure_env#1}<(), zenoh::api::handlers::callback::Callback<zenoh::api::sample::Sample>>> () at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#17 0x00005555562da210 in core::ptr::drop_in_place<(dyn core::ops::function::Fn<(zenoh::api::sample::Sample), Output=()> + core::marker::Send + core::marker::Sync)> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#18 0x000055555620f2ea in alloc::sync::Arc<(dyn core::ops::function::Fn<(zenoh::api::sample::Sample), Output=()> + core::marker::Send + core::marker::Sync), alloc::alloc::Global>::drop_slow<(dyn core::ops::function::Fn<(zenoh::api::sample::Sample), Output=()> + core::marker::Send + core::marker::Sync), alloc::alloc::Global> (self=0x555557de3550)
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:1752
#19 0x00005555562f38f0 in alloc::sync::{impl#33}::drop<(dyn core::ops::function::Fn<(zenoh::api::sample::Sample), Output=()> + core::marker::Send + core::marker::Sync), alloc::alloc::Global> (self=0x555557de3550) at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:2408
#20 0x00005555562db8bb in core::ptr::drop_in_place<alloc::sync::Arc<(dyn core::ops::function::Fn<(zenoh::api::sample::Sample), Output=()> + core::marker::Send + core::marker::Sync), alloc::alloc::Global>> () at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#21 0x00005555562f0e3b in core::ptr::drop_in_place<zenoh::api::handlers::callback::Callback<zenoh::api::sample::Sample>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#22 0x00005555562e886c in core::ptr::drop_in_place<zenoh::api::subscriber::SubscriberState> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#23 0x000055555620eb7f in alloc::sync::Arc<zenoh::api::subscriber::SubscriberState, alloc::alloc::Global>::drop_slow<zenoh::api::subscriber::SubscriberState, alloc::alloc::Global> (self=0x7fffffff7fe0) at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:1752
#24 0x00005555562f4192 in alloc::sync::{impl#33}::drop<zenoh::api::subscriber::SubscriberState, alloc::alloc::Global> (self=0x7fffffff7fe0)
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/sync.rs:2408
#25 0x00005555562ef06b in core::ptr::drop_in_place<alloc::sync::Arc<zenoh::api::subscriber::SubscriberState, alloc::alloc::Global>> ()
    at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ptr/mod.rs:498
#26 0x000055555660e8a4 in zenoh::api::session::SessionInner::undeclare_subscriber_inner (self=0x7fffffffb1c0, sid=3, kind=zenoh::api::subscriber::SubscriberKind::Subscriber)
    at zenoh/src/api/session.rs:1635
#27 0x00005555560926b3 in zenoh::api::subscriber::Subscriber<()>::undeclare_impl<()> (self=0x7fffffffb1a0) at zenoh/src/api/subscriber.rs:216

Frame #26:

impl SessionInner {
    pub(crate) fn undeclare_subscriber_inner(
        self: &Arc<Self>,
        sid: Id,
        kind: SubscriberKind,
    ) -> ZResult<()> {
        dbg!();
        let mut state = zwrite!(self.state);
        ...
        if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) {
            ...
        }
        ...
   }
}

In the branch of "multiple subscribers on the same keyexpr", we don't drop the state guard (SessionState).
Then the drop of sub_state (SubscriberState) would trigger the drop of liveliness token in the backtrace that causes a deadlock when reading the SessionState again.

Copy link

PR missing one of the required labels: {'new feature', 'internal', 'dependencies', 'bug', 'breaking-change', 'enhancement', 'documentation'}

@YuanYuYuan YuanYuYuan added bug Something isn't working internal Changes not included in the changelog labels Dec 31, 2024
@YuanYuYuan YuanYuYuan merged commit f0bdbea into eclipse-zenoh:main Jan 6, 2025
15 of 16 checks passed
YuanYuYuan added a commit to ZettaScaleLabs/rmw_zenoh that referenced this pull request Jan 9, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))

---

Let me know if you'd like further refinements!
YuanYuYuan added a commit to ZettaScaleLabs/rmw_zenoh that referenced this pull request Jan 9, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))
YuanYuYuan added a commit to ZettaScaleLabs/rmw_zenoh that referenced this pull request Jan 9, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))
ahcorde pushed a commit to ros2/rmw_zenoh that referenced this pull request Jan 10, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))
mergify bot pushed a commit to ros2/rmw_zenoh that referenced this pull request Jan 10, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))

(cherry picked from commit 1f2d675)
mergify bot pushed a commit to ros2/rmw_zenoh that referenced this pull request Jan 10, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))

(cherry picked from commit 1f2d675)
ahcorde pushed a commit to ros2/rmw_zenoh that referenced this pull request Jan 10, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))

(cherry picked from commit 1f2d675)

Co-authored-by: Yuyuan Yuan <[email protected]>
ahcorde pushed a commit to ros2/rmw_zenoh that referenced this pull request Jan 10, 2025
This version includes the following fixes:
- Fix memory leak in string deserialization ([zenoh-cpp#363](eclipse-zenoh/zenoh-cpp#363))
- Fix SHM Garbage Collection (GC) policy ([zenoh#1696](eclipse-zenoh/zenoh#1696))
- Fix deadlock in advanced subscription undeclaration ([zenoh#1685](eclipse-zenoh/zenoh#1685))

(cherry picked from commit 1f2d675)

Co-authored-by: Yuyuan Yuan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working internal Changes not included in the changelog
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants