Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Basic subscriber #8

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ project(ecal-go LANGUAGES C CXX)

find_package(eCAL CONFIG REQUIRED)

add_library(utils INTERFACE)
target_include_directories(utils INTERFACE .)

add_library(ecal_go_core)
target_sources(ecal_go_core PRIVATE
./ecal/core.h
Expand All @@ -15,12 +18,20 @@ target_sources(ecal_go_publisher PRIVATE
./ecal/publisher/publisher.h
./ecal/publisher/publisher.cpp
)
target_link_libraries(ecal_go_publisher PRIVATE eCAL::core)
target_link_libraries(ecal_go_publisher PRIVATE eCAL::core utils)

add_library(ecal_go_subscriber)
target_sources(ecal_go_subscriber PRIVATE
./ecal/subscriber/subscriber.h
./ecal/subscriber/subscriber.cpp
)
target_link_libraries(ecal_go_subscriber PRIVATE eCAL::core utils)

# Subpackages that use cgo
set(subpackages
"ecal"
"ecal/publisher"
"ecal/subscriber"
"ecal/logging"
)

Expand Down
8 changes: 8 additions & 0 deletions ecal/msg/datatype.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package msg

type DataType struct {
Name string
Encoding string
Descriptor []byte
}

20 changes: 20 additions & 0 deletions ecal/publisher/cgo_wrapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Implementation for cgo preamble functions
package publisher

// #include "publisher.h"
//bool GoPublisherCreate(
// uintptr_t handle,
// _GoString_ topic,
// _GoString_ name, _GoString_ encoding,
// const char* const descriptor, size_t descriptor_len
//) {
// return PublisherCreate(
// handle,
// _GoStringPtr(topic), _GoStringLen(topic),
// _GoStringPtr(name), _GoStringLen(name),
// _GoStringPtr(encoding), _GoStringLen(encoding),
// descriptor, descriptor_len
// );
//}
import "C"

33 changes: 9 additions & 24 deletions ecal/publisher/publisher.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,23 @@
#include "publisher.h"

#include <memory>
#include <unordered_map>

#include <ecal/ecal_publisher.h>

namespace {
std::unordered_map<uintptr_t, std::unique_ptr<eCAL::CPublisher>> publishers;

eCAL::CPublisher *const getPublisher(uintptr_t handle) {
const auto publisher = publishers.find(handle);
if (publisher == publishers.end()) {
return nullptr;
}
return publisher->second.get();
}
#include "internal/handle_map.hpp"

namespace {
handle_map<eCAL::CPublisher> publishers;
} // namespace

const void *NewPublisher() {
auto publisher = std::make_unique<eCAL::CPublisher>();
const auto handle = publisher.get();
const auto [new_pub, added] = publishers.emplace(
reinterpret_cast<uintptr_t>(handle),
std::move(publisher)
);
if (!added) {
const auto [it, added] = publishers.emplace();
if(!added) {
return nullptr;
}
return handle;
return it->second.get();
}

bool DestroyPublisher(uintptr_t handle) {
return publishers.erase(handle) == 1;
return publishers.erase(handle);
}

bool PublisherCreate(
Expand All @@ -46,7 +31,7 @@ bool PublisherCreate(
const char *const datatype_descriptor,
size_t datatype_descriptor_len
) {
auto *publisher = getPublisher(handle);
auto *publisher = publishers.find(handle);
if (publisher == nullptr) {
return false;
}
Expand All @@ -59,7 +44,7 @@ bool PublisherCreate(
}

void PublisherSend(uintptr_t handle, void *buf, size_t len) {
auto *publisher = getPublisher(handle);
auto *publisher = publishers.find(handle);
if (publisher == nullptr) {
return;
}
Expand Down
41 changes: 16 additions & 25 deletions ecal/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,30 @@
package publisher

// #cgo LDFLAGS: -lecal_core
// #include "publisher.h"
// bool GoPublisherCreate(
// uintptr_t handle,
// _GoString_ topic,
// _GoString_ name,
// _GoString_ encoding,
// const char* const descriptor,
// size_t descriptor_len
// ) {
// return PublisherCreate(
// handle,
// _GoStringPtr(topic), _GoStringLen(topic),
// _GoStringPtr(name), _GoStringLen(name),
// _GoStringPtr(encoding), _GoStringLen(encoding),
// descriptor, descriptor_len
// );
// }
// #cgo CPPFLAGS: -I${SRCDIR}/../../
//#include "publisher.h"
//bool GoPublisherCreate(
// uintptr_t handle,
// _GoString_ topic,
// _GoString_ name, _GoString_ encoding,
// const char* const descriptor, size_t descriptor_len
//);
import "C"
import "unsafe"
import (
"errors"
"fmt"

"github.com/DownerCase/ecal-go/ecal/msg"
)

type DataType = msg.DataType

type Publisher struct {
Messages chan []byte
handle C.uintptr_t
stopped bool
}

type DataType struct {
Name string
Encoding string
Descriptor []byte
}

func New() (*Publisher, error) {
ptr := C.NewPublisher()
if ptr == nil {
Expand All @@ -49,7 +37,6 @@ func New() (*Publisher, error) {
}

func (p *Publisher) Delete() {
fmt.Println("Deleting publisher")
if !p.stopped {
p.stopped = true
close(p.Messages)
Expand Down Expand Up @@ -81,6 +68,10 @@ func (p *Publisher) Create(topic string, datatype DataType) error {
return nil
}

func (p *Publisher) IsStopped() bool {
return p.stopped
}

func (p *Publisher) sendMessages() {
for msg := range p.Messages {
C.PublisherSend(p.handle, unsafe.Pointer(&msg[0]), C.size_t(len(msg)))
Expand Down
19 changes: 19 additions & 0 deletions ecal/subscriber/cgo_wrapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Implementation for cgo preamble functions
package subscriber

//#include "subscriber.h"
//bool GoSubscriberCreate(
// uintptr_t handle,
// _GoString_ topic,
// _GoString_ name, _GoString_ encoding,
// const char* const descriptor, size_t descriptor_len
// ) {
// return SubscriberCreate(
// handle,
// _GoStringPtr(topic), _GoStringLen(topic),
// _GoStringPtr(name), _GoStringLen(name),
// _GoStringPtr(encoding), _GoStringLen(encoding),
// descriptor, descriptor_len
// );
//}
import "C"
82 changes: 82 additions & 0 deletions ecal/subscriber/subscriber.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "subscriber.h"

#include <ecal/ecal_subscriber.h>
#include <stdexcept>

#include "internal/handle_map.hpp"

namespace {
handle_map<eCAL::CSubscriber> subscribers;
handle_map<std::string> messages;
} // namespace

const void *NewSubscriber() {
const auto [it, added] = subscribers.emplace();
if (!added) {
return nullptr;
}
return it->second.get();
}

bool DestroySubscriber(uintptr_t handle) { return subscribers.erase(handle); }

bool SubscriberCreate(
uintptr_t handle,
const char *const topic,
size_t topic_len,
const char *const datatype_name,
size_t datatype_name_len,
const char *const datatype_encoding,
size_t datatype_encoding_len,
const char *const datatype_descriptor,
size_t datatype_descriptor_len
) {
auto *subscriber = subscribers.find(handle);
if (subscriber == nullptr) {
return false;
}
return subscriber->Create(
std::string(topic, topic_len),
{std::string(datatype_name, datatype_name_len),
std::string(datatype_encoding, datatype_encoding_len),
std::string(datatype_descriptor, datatype_descriptor_len)}
);
}

uintptr_t
SubscriberReceive(uintptr_t subscriber_handle, const char **msg, size_t *len) {
auto *subscriber = subscribers.find(subscriber_handle);
if (subscriber == nullptr) {
*msg = nullptr;
*len = 0;
return 0;
}

// Receive message into our buffer
// TODO: Replace with callback based method to remove a copy
std::string buffer{};
const auto received = subscriber->ReceiveBuffer(buffer, nullptr, -1);
if (!received) {
*msg = nullptr;
*len = 0;
return 0;
}

// Save the message for later processing
auto [it, added] = messages.emplace(std::move(buffer));
if (!added) {
throw std::runtime_error("Failed to store received message");
*msg = nullptr;
*len = 0;
return 0;
}

*msg = it->second->c_str();
*len = it->second->size();

return it->first;
}

bool DestroyMessage(uintptr_t message_handle) {
return messages.erase(message_handle);
}
89 changes: 89 additions & 0 deletions ecal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package subscriber

// #cgo LDFLAGS: -lecal_core
// #cgo CPPFLAGS: -I${SRCDIR}/../../
// #include "subscriber.h"
// bool GoSubscriberCreate(
// uintptr_t handle,
// _GoString_ topic,
// _GoString_ name, _GoString_ encoding,
// const char* const descriptor, size_t descriptor_len
// );
import "C"
import "unsafe"
import (
"errors"

"github.com/DownerCase/ecal-go/ecal/msg"
)

type Subscriber struct {
messages chan []byte
handle C.uintptr_t
stopped bool
}

type DataType = msg.DataType

func New() (*Subscriber, error) {
ptr := C.NewSubscriber()
if ptr == nil {
return nil, errors.New("Failed to allocate new subscriber")
}
return &Subscriber{
handle: C.uintptr_t((uintptr(ptr))),
messages: make(chan []byte),
}, nil
}

func (p *Subscriber) Delete() {
if !p.stopped {
p.stopped = true
close(p.messages)
}
if !bool(C.DestroySubscriber(p.handle)) {
// "Failed to delete subscriber"
return
}
// Deleted, clear handle
p.handle = 0
}

func (p *Subscriber) Create(topic string, datatype DataType) error {
var descriptor_ptr *C.char = nil
if len(datatype.Descriptor) > 0 {
descriptor_ptr = (*C.char)(unsafe.Pointer(&datatype.Descriptor[0]))
}
if !C.GoSubscriberCreate(
p.handle,
topic,
datatype.Name,
datatype.Encoding,
descriptor_ptr,
C.size_t(len(datatype.Descriptor)),
) {
return errors.New("Failed to Create publisher")
}
return nil
}

// Receive a new message from eCAL
// Currently performs at least two copies
// - Internal eCAL recieve buffer -> ReceiveBuffer's buffer in C wrapper
// - C wrapper -> C.GoBytes
// TODO: Use a callback based method to copy the data directly from eCAL's
// buffer to a Go []byte result variable
func (p *Subscriber) Receive() []byte {
var msg *C.char
var len C.size_t
// WARNING: Calling through cgo three times in a frequently run function
// is suboptimal

// Receive message
handle := C.SubscriberReceive(p.handle, &msg, &len)
// Copy to a go []byte
go_msg := C.GoBytes(unsafe.Pointer(msg), C.int(len))
// Free the original receive buffer
C.DestroyMessage(handle)
return go_msg
}
Loading