diff --git a/ecal/core/src/io/shm/ecal_memfile_sync.cpp b/ecal/core/src/io/shm/ecal_memfile_sync.cpp index 187faa5fee..ebc3ef5b6e 100644 --- a/ecal/core/src/io/shm/ecal_memfile_sync.cpp +++ b/ecal/core/src/io/shm/ecal_memfile_sync.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,6 +103,9 @@ namespace eCAL if (iter != m_event_handle_map.end()) { const SEventHandlePair event_pair = iter->second; + // fire acknowledge events, to unlock blocking send function + gSetEvent(event_pair.event_ack); + // close the snd and ack event gCloseEvent(event_pair.event_snd); gCloseEvent(event_pair.event_ack); m_event_handle_map.erase(iter); @@ -336,19 +339,26 @@ namespace eCAL // fire the publisher events // connected subscribers will read the content from the memory file - const std::lock_guard lock(m_event_handle_map_sync); + // we work on a copy of the event handle map, this is needed to .. + // 1. unlock a memory file sync via Disconnect(process_id) (ack event is set by the Disconnect in this case) + // 2. be able to add a new memory file sync via Connect(process_id) + EventHandleMapT event_handle_map_snapshot; + { + const std::lock_guard lock(m_event_handle_map_sync); + event_handle_map_snapshot = m_event_handle_map; + } // "eat" old acknowledge events :) if (m_attr.timeout_ack_ms != 0) { - for (const auto& event_handle : m_event_handle_map) + for (const auto& event_handle : event_handle_map_snapshot) { while (gWaitForEvent(event_handle.second.event_ack, 0)) {} } } // send sync (memory file update) event - for (const auto& event_handle : m_event_handle_map) + for (const auto& event_handle : event_handle_map_snapshot) { // send sync event gSetEvent(event_handle.second.event_snd); @@ -360,7 +370,7 @@ namespace eCAL // take start time for all acknowledge timeouts const auto start_time = std::chrono::steady_clock::now(); - for (auto& event_handle : m_event_handle_map) + for (auto& event_handle : event_handle_map_snapshot) { const auto time_since_start = std::chrono::steady_clock::now() - start_time; const auto time_to_wait = std::chrono::milliseconds(m_attr.timeout_ack_ms)- time_since_start; diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp index 99bd7cc58e..adb28414d1 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp @@ -83,11 +83,18 @@ namespace eCAL return sent; } - void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) + void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_, const std::string& /*conn_par_*/) { // we accept local connections only if (host_name_ != m_attributes.host_name) return; + // add or update the map with process id's and sets of topic ids + { + const std::lock_guard lock(m_process_id_topic_id_set_map_sync); + auto& topic_set = m_process_id_topic_id_set_map[process_id_]; + topic_set.insert(topic_id_); + } + for (auto& memory_file : m_memory_file_vec) { memory_file->Connect(std::to_string(process_id_)); @@ -97,6 +104,47 @@ namespace eCAL } } + void CDataWriterSHM::RemoveSubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_) + { + // we accept local disconnections only + if (host_name_ != m_attributes.host_name) return; + + // remove topic id from the id set for the given process id + bool memfile_has_subscriptions(true); + { + const std::lock_guard lock(m_process_id_topic_id_set_map_sync); + auto process_it = m_process_id_topic_id_set_map.find(process_id_); + + // this process id is connected to the memory file + if (process_it != m_process_id_topic_id_set_map.end()) + { + // remove it from the id set + process_it->second.erase(topic_id_); + + // this process id has no more connection to this memory file + if (process_it->second.empty()) + { + // we can remove the empty topic id set + m_process_id_topic_id_set_map.erase(process_it); + // and set the subscription state to false for later processing + memfile_has_subscriptions = false; + } + } + } + + // memory file is still connected to at least one topic id of this process id + // no need to Disconnect process id + if (memfile_has_subscriptions) return; + + for (auto& memory_file : m_memory_file_vec) + { + memory_file->Disconnect(std::to_string(process_id_)); +#ifndef NDEBUG + Logging::Log(log_level_debug1, std::string("CDataWriterSHM::RemoveSubscription - Memory FileName: ") + memory_file->GetName() + " to ProcessId " + std::to_string(process_id_)); +#endif + } + } + Registration::ConnectionPar CDataWriterSHM::GetConnectionParameter() { Registration::ConnectionPar connection_par; diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.h b/ecal/core/src/readwrite/shm/ecal_writer_shm.h index 2c3c122fda..407eacb820 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.h @@ -29,7 +29,10 @@ #include "readwrite/ecal_writer_base.h" #include +#include #include +#include +#include #include #include @@ -47,6 +50,7 @@ namespace eCAL bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override; void ApplySubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_, const std::string& conn_par_) override; + void RemoveSubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_) override; Registration::ConnectionPar GetConnectionParameter() override; @@ -58,5 +62,9 @@ namespace eCAL size_t m_write_idx = 0; std::vector> m_memory_file_vec; static const std::string m_memfile_base_name; + + using ProcessIDTopicIDSetT = std::map>; + std::mutex m_process_id_topic_id_set_map_sync; + ProcessIDTopicIDSetT m_process_id_topic_id_set_map; }; }