From 1fb9ba5920d81fcb6d1733283f0890191b2867ee Mon Sep 17 00:00:00 2001 From: DownerCase Date: Fri, 15 Nov 2024 15:11:28 +0000 Subject: [PATCH 1/2] Extract handle map logic --- CMakeLists.txt | 5 +++- ecal/msg/datatype.go | 8 +++++++ ecal/publisher/cgo_wrapping.go | 20 ++++++++++++++++ ecal/publisher/publisher.cpp | 33 +++++++------------------ ecal/publisher/publisher.go | 41 +++++++++++++------------------ internal/handle_map.hpp | 44 ++++++++++++++++++++++++++++++++++ 6 files changed, 101 insertions(+), 50 deletions(-) create mode 100644 ecal/msg/datatype.go create mode 100644 ecal/publisher/cgo_wrapping.go create mode 100644 internal/handle_map.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ed9c95..a070324 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,9 @@ project(ecal-go LANGUAGES C CXX) find_package(eCAL CONFIG REQUIRED) +add_library(utils INTERFACE) +target_include_directories(utils INTERFACE .) + add_library(ecal_go_core) target_sources(ecal_go_core PRIVATE ./ecal/core.h @@ -15,7 +18,7 @@ target_sources(ecal_go_publisher PRIVATE ./ecal/publisher/publisher.h ./ecal/publisher/publisher.cpp ) -target_link_libraries(ecal_go_publisher PRIVATE eCAL::core) +target_link_libraries(ecal_go_publisher PRIVATE eCAL::core utils) # Subpackages that use cgo set(subpackages diff --git a/ecal/msg/datatype.go b/ecal/msg/datatype.go new file mode 100644 index 0000000..8f7775d --- /dev/null +++ b/ecal/msg/datatype.go @@ -0,0 +1,8 @@ +package msg + +type DataType struct { + Name string + Encoding string + Descriptor []byte +} + diff --git a/ecal/publisher/cgo_wrapping.go b/ecal/publisher/cgo_wrapping.go new file mode 100644 index 0000000..bd05186 --- /dev/null +++ b/ecal/publisher/cgo_wrapping.go @@ -0,0 +1,20 @@ +// Implementation for cgo preamble functions +package publisher + +// #include "publisher.h" +//bool GoPublisherCreate( +// uintptr_t handle, +// _GoString_ topic, +// _GoString_ name, _GoString_ encoding, +// const char* const descriptor, size_t descriptor_len +//) { +// return PublisherCreate( +// handle, +// _GoStringPtr(topic), _GoStringLen(topic), +// _GoStringPtr(name), _GoStringLen(name), +// _GoStringPtr(encoding), _GoStringLen(encoding), +// descriptor, descriptor_len +// ); +//} +import "C" + diff --git a/ecal/publisher/publisher.cpp b/ecal/publisher/publisher.cpp index 27f12e2..ff774d8 100644 --- a/ecal/publisher/publisher.cpp +++ b/ecal/publisher/publisher.cpp @@ -1,38 +1,23 @@ #include "publisher.h" -#include -#include - #include -namespace { -std::unordered_map> publishers; - -eCAL::CPublisher *const getPublisher(uintptr_t handle) { - const auto publisher = publishers.find(handle); - if (publisher == publishers.end()) { - return nullptr; - } - return publisher->second.get(); -} +#include "internal/handle_map.hpp" +namespace { +handle_map publishers; } // namespace const void *NewPublisher() { - auto publisher = std::make_unique(); - const auto handle = publisher.get(); - const auto [new_pub, added] = publishers.emplace( - reinterpret_cast(handle), - std::move(publisher) - ); - if (!added) { + const auto [it, added] = publishers.emplace(); + if(!added) { return nullptr; } - return handle; + return it->second.get(); } bool DestroyPublisher(uintptr_t handle) { - return publishers.erase(handle) == 1; + return publishers.erase(handle); } bool PublisherCreate( @@ -46,7 +31,7 @@ bool PublisherCreate( const char *const datatype_descriptor, size_t datatype_descriptor_len ) { - auto *publisher = getPublisher(handle); + auto *publisher = publishers.find(handle); if (publisher == nullptr) { return false; } @@ -59,7 +44,7 @@ bool PublisherCreate( } void PublisherSend(uintptr_t handle, void *buf, size_t len) { - auto *publisher = getPublisher(handle); + auto *publisher = publishers.find(handle); if (publisher == nullptr) { return; } diff --git a/ecal/publisher/publisher.go b/ecal/publisher/publisher.go index c67cccd..14b5690 100644 --- a/ecal/publisher/publisher.go +++ b/ecal/publisher/publisher.go @@ -1,42 +1,30 @@ package publisher // #cgo LDFLAGS: -lecal_core -// #include "publisher.h" -// bool GoPublisherCreate( -// uintptr_t handle, -// _GoString_ topic, -// _GoString_ name, -// _GoString_ encoding, -// const char* const descriptor, -// size_t descriptor_len -// ) { -// return PublisherCreate( -// handle, -// _GoStringPtr(topic), _GoStringLen(topic), -// _GoStringPtr(name), _GoStringLen(name), -// _GoStringPtr(encoding), _GoStringLen(encoding), -// descriptor, descriptor_len -// ); -// } +// #cgo CPPFLAGS: -I${SRCDIR}/../../ +//#include "publisher.h" +//bool GoPublisherCreate( +// uintptr_t handle, +// _GoString_ topic, +// _GoString_ name, _GoString_ encoding, +// const char* const descriptor, size_t descriptor_len +//); import "C" import "unsafe" import ( "errors" - "fmt" + + "github.com/DownerCase/ecal-go/ecal/msg" ) +type DataType = msg.DataType + type Publisher struct { Messages chan []byte handle C.uintptr_t stopped bool } -type DataType struct { - Name string - Encoding string - Descriptor []byte -} - func New() (*Publisher, error) { ptr := C.NewPublisher() if ptr == nil { @@ -49,7 +37,6 @@ func New() (*Publisher, error) { } func (p *Publisher) Delete() { - fmt.Println("Deleting publisher") if !p.stopped { p.stopped = true close(p.Messages) @@ -81,6 +68,10 @@ func (p *Publisher) Create(topic string, datatype DataType) error { return nil } +func (p *Publisher) IsStopped() bool { + return p.stopped +} + func (p *Publisher) sendMessages() { for msg := range p.Messages { C.PublisherSend(p.handle, unsafe.Pointer(&msg[0]), C.size_t(len(msg))) diff --git a/internal/handle_map.hpp b/internal/handle_map.hpp new file mode 100644 index 0000000..7874559 --- /dev/null +++ b/internal/handle_map.hpp @@ -0,0 +1,44 @@ +#ifndef ECAL_GO_HANDLE_MAP_HPP +#define ECAL_GO_HANDLE_MAP_HPP + +#include +#include +#include +#include +#include +#include + +template class handle_map { +private: + std::unordered_map> handles; +public: + using iterator = typename decltype(handles)::iterator; + template + std::pair emplace(Args &&...args) { + // Create a new T to get it's address to use as the key + auto instance = std::make_unique(std::forward(args)...); + const auto &handle = *instance; + static_assert( + std::is_same_v, + "Unable to take reference" + ); + // Store in the map + return this->handles.emplace( + reinterpret_cast(&handle), + std::move(instance) + ); + } + + bool erase(std::uintptr_t handle) { return this->handles.erase(handle) == 1; } + + T *find(std::uintptr_t handle) { + const auto elem = this->handles.find(handle); + if (elem == handles.end()) { + return nullptr; + } + return elem->second.get(); + } + +}; + +#endif From 8f8f3ef94fe244701a8a0945918e2ff843db7785 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Fri, 15 Nov 2024 15:12:34 +0000 Subject: [PATCH 2/2] Implement basic eCAL subscriber --- CMakeLists.txt | 8 +++ ecal/subscriber/cgo_wrapping.go | 19 +++++++ ecal/subscriber/subscriber.cpp | 82 +++++++++++++++++++++++++++ ecal/subscriber/subscriber.go | 89 ++++++++++++++++++++++++++++++ ecal/subscriber/subscriber.h | 38 +++++++++++++ ecal/subscriber/subscriber_test.go | 64 +++++++++++++++++++++ main.go | 18 +++++- 7 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 ecal/subscriber/cgo_wrapping.go create mode 100644 ecal/subscriber/subscriber.cpp create mode 100644 ecal/subscriber/subscriber.go create mode 100644 ecal/subscriber/subscriber.h create mode 100644 ecal/subscriber/subscriber_test.go diff --git a/CMakeLists.txt b/CMakeLists.txt index a070324..64babf6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,10 +20,18 @@ target_sources(ecal_go_publisher PRIVATE ) target_link_libraries(ecal_go_publisher PRIVATE eCAL::core utils) +add_library(ecal_go_subscriber) +target_sources(ecal_go_subscriber PRIVATE + ./ecal/subscriber/subscriber.h + ./ecal/subscriber/subscriber.cpp +) +target_link_libraries(ecal_go_subscriber PRIVATE eCAL::core utils) + # Subpackages that use cgo set(subpackages "ecal" "ecal/publisher" + "ecal/subscriber" "ecal/logging" ) diff --git a/ecal/subscriber/cgo_wrapping.go b/ecal/subscriber/cgo_wrapping.go new file mode 100644 index 0000000..2cd4aeb --- /dev/null +++ b/ecal/subscriber/cgo_wrapping.go @@ -0,0 +1,19 @@ +// Implementation for cgo preamble functions +package subscriber + +//#include "subscriber.h" +//bool GoSubscriberCreate( +// uintptr_t handle, +// _GoString_ topic, +// _GoString_ name, _GoString_ encoding, +// const char* const descriptor, size_t descriptor_len +// ) { +// return SubscriberCreate( +// handle, +// _GoStringPtr(topic), _GoStringLen(topic), +// _GoStringPtr(name), _GoStringLen(name), +// _GoStringPtr(encoding), _GoStringLen(encoding), +// descriptor, descriptor_len +// ); +//} +import "C" diff --git a/ecal/subscriber/subscriber.cpp b/ecal/subscriber/subscriber.cpp new file mode 100644 index 0000000..0cdaee8 --- /dev/null +++ b/ecal/subscriber/subscriber.cpp @@ -0,0 +1,82 @@ +#include "subscriber.h" + +#include +#include + +#include "internal/handle_map.hpp" + +namespace { +handle_map subscribers; +handle_map messages; +} // namespace + +const void *NewSubscriber() { + const auto [it, added] = subscribers.emplace(); + if (!added) { + return nullptr; + } + return it->second.get(); +} + +bool DestroySubscriber(uintptr_t handle) { return subscribers.erase(handle); } + +bool SubscriberCreate( + uintptr_t handle, + const char *const topic, + size_t topic_len, + const char *const datatype_name, + size_t datatype_name_len, + const char *const datatype_encoding, + size_t datatype_encoding_len, + const char *const datatype_descriptor, + size_t datatype_descriptor_len +) { + auto *subscriber = subscribers.find(handle); + if (subscriber == nullptr) { + return false; + } + return subscriber->Create( + std::string(topic, topic_len), + {std::string(datatype_name, datatype_name_len), + std::string(datatype_encoding, datatype_encoding_len), + std::string(datatype_descriptor, datatype_descriptor_len)} + ); +} + +uintptr_t +SubscriberReceive(uintptr_t subscriber_handle, const char **msg, size_t *len) { + auto *subscriber = subscribers.find(subscriber_handle); + if (subscriber == nullptr) { + *msg = nullptr; + *len = 0; + return 0; + } + + // Receive message into our buffer + // TODO: Replace with callback based method to remove a copy + std::string buffer{}; + const auto received = subscriber->ReceiveBuffer(buffer, nullptr, -1); + if (!received) { + *msg = nullptr; + *len = 0; + return 0; + } + + // Save the message for later processing + auto [it, added] = messages.emplace(std::move(buffer)); + if (!added) { + throw std::runtime_error("Failed to store received message"); + *msg = nullptr; + *len = 0; + return 0; + } + + *msg = it->second->c_str(); + *len = it->second->size(); + + return it->first; +} + +bool DestroyMessage(uintptr_t message_handle) { + return messages.erase(message_handle); +} diff --git a/ecal/subscriber/subscriber.go b/ecal/subscriber/subscriber.go new file mode 100644 index 0000000..29be7a6 --- /dev/null +++ b/ecal/subscriber/subscriber.go @@ -0,0 +1,89 @@ +package subscriber + +// #cgo LDFLAGS: -lecal_core +// #cgo CPPFLAGS: -I${SRCDIR}/../../ +// #include "subscriber.h" +// bool GoSubscriberCreate( +// uintptr_t handle, +// _GoString_ topic, +// _GoString_ name, _GoString_ encoding, +// const char* const descriptor, size_t descriptor_len +// ); +import "C" +import "unsafe" +import ( + "errors" + + "github.com/DownerCase/ecal-go/ecal/msg" +) + +type Subscriber struct { + messages chan []byte + handle C.uintptr_t + stopped bool +} + +type DataType = msg.DataType + +func New() (*Subscriber, error) { + ptr := C.NewSubscriber() + if ptr == nil { + return nil, errors.New("Failed to allocate new subscriber") + } + return &Subscriber{ + handle: C.uintptr_t((uintptr(ptr))), + messages: make(chan []byte), + }, nil +} + +func (p *Subscriber) Delete() { + if !p.stopped { + p.stopped = true + close(p.messages) + } + if !bool(C.DestroySubscriber(p.handle)) { + // "Failed to delete subscriber" + return + } + // Deleted, clear handle + p.handle = 0 +} + +func (p *Subscriber) Create(topic string, datatype DataType) error { + var descriptor_ptr *C.char = nil + if len(datatype.Descriptor) > 0 { + descriptor_ptr = (*C.char)(unsafe.Pointer(&datatype.Descriptor[0])) + } + if !C.GoSubscriberCreate( + p.handle, + topic, + datatype.Name, + datatype.Encoding, + descriptor_ptr, + C.size_t(len(datatype.Descriptor)), + ) { + return errors.New("Failed to Create publisher") + } + return nil +} + +// Receive a new message from eCAL +// Currently performs at least two copies +// - Internal eCAL recieve buffer -> ReceiveBuffer's buffer in C wrapper +// - C wrapper -> C.GoBytes +// TODO: Use a callback based method to copy the data directly from eCAL's +// buffer to a Go []byte result variable +func (p *Subscriber) Receive() []byte { + var msg *C.char + var len C.size_t + // WARNING: Calling through cgo three times in a frequently run function + // is suboptimal + + // Receive message + handle := C.SubscriberReceive(p.handle, &msg, &len) + // Copy to a go []byte + go_msg := C.GoBytes(unsafe.Pointer(msg), C.int(len)) + // Free the original receive buffer + C.DestroyMessage(handle) + return go_msg +} diff --git a/ecal/subscriber/subscriber.h b/ecal/subscriber/subscriber.h new file mode 100644 index 0000000..7527003 --- /dev/null +++ b/ecal/subscriber/subscriber.h @@ -0,0 +1,38 @@ +#ifndef ECAL_GO_SUBSCRIBER_H +#define ECAL_GO_SUBSCRIBER_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +const void *NewSubscriber(); +bool DestroySubscriber(uintptr_t handle); + +bool SubscriberCreate( + uintptr_t handle, + const char *const topic, + size_t topic_len, + const char *const datatype_name, + size_t datatype_name_len, + const char *const datatype_encoding, + size_t datatype_encoding_len, + const char *const datatype_descriptor, + size_t datatype_descriptor_len +); + +// Receive a message and return a handle to it as well as the message data +// pointer and length +uintptr_t +SubscriberReceive(uintptr_t subscriber_handle, const char **msg, size_t *len); + +bool DestroyMessage(uintptr_t message_handle); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/ecal/subscriber/subscriber_test.go b/ecal/subscriber/subscriber_test.go new file mode 100644 index 0000000..dac3dda --- /dev/null +++ b/ecal/subscriber/subscriber_test.go @@ -0,0 +1,64 @@ +package subscriber + +import ( + "reflect" + "testing" + "time" + + "github.com/DownerCase/ecal-go/ecal" + "github.com/DownerCase/ecal-go/ecal/publisher" +) + +var TEST_MESSAGE = []byte{4, 15, 80} + +func TestSubscriber(t *testing.T) { + initResult := ecal.Initialize( + ecal.NewConfig(), + "Go eCAL!", + ecal.C_Publisher|ecal.C_Subscriber|ecal.C_Logging, + ) + if initResult != 0 { + t.Fatal("Failed to initialize", initResult) + } + defer ecal.Finalize() // Shutdown eCAL at the end of the program + + pub, err := publisher.New() + if err != nil { + t.Error(err) + } + defer pub.Delete() + + if err := pub.Create("testing_subscriber", DataType{}); err != nil { + t.Error(err) + } + + sub, err := New() + if err != nil { + t.Error(err) + } + defer sub.Delete() + if err := sub.Create("testing_subscriber", DataType{}); err != nil { + t.Error(err) + } + + go sendMessages(pub) + for range 10 { + msg := sub.Receive() + if msg == nil { + t.Error("Nil message received:") + } + if len(msg) != len(TEST_MESSAGE) { + t.Error("Expected message of length", len(TEST_MESSAGE), "Received:", len(msg)) + } + if !reflect.DeepEqual(msg, TEST_MESSAGE) { + t.Error(msg, "!=", TEST_MESSAGE) + } + } +} + +func sendMessages(p *publisher.Publisher) { + for !p.IsStopped() { + p.Messages <- TEST_MESSAGE + time.Sleep(10 * time.Millisecond) + } +} diff --git a/main.go b/main.go index 5380b50..99bf7c7 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/DownerCase/ecal-go/ecal/logging" "github.com/DownerCase/ecal-go/ecal/protobuf/publisher" string_publisher "github.com/DownerCase/ecal-go/ecal/string/publisher" + "github.com/DownerCase/ecal-go/ecal/subscriber" "github.com/DownerCase/ecal-go/protos" ) @@ -62,6 +63,15 @@ func main() { panic("Failed to Create string publisher") } + sub, _ := subscriber.New() + if sub.Create("string topic", subscriber.DataType{ + Name: "std::string", + Encoding: "base", + }) != nil { + panic("Failed to Create string subscriber") + } + go receiveMessages(sub) + for idx := range 100 { // Check if program has been requested to stop if !ecal.Ok() { @@ -79,7 +89,7 @@ func main() { logging.Error(err) } - if err = string_pub.Send("Sent ", idx, " messages"); err != nil { + if err = string_pub.Send("Message ", idx); err != nil { logging.Error(err) } @@ -87,3 +97,9 @@ func main() { time.Sleep(1 * time.Second) } } + +func receiveMessages(s *subscriber.Subscriber) { + for { + fmt.Println("Received:", string(s.Receive())) + } +}