From a43b49add1140d0a51db6365c2de97cbe29708c0 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Mon, 18 Nov 2024 11:22:00 +0000 Subject: [PATCH] Add timeout parameter to subscribers --- README.md | 21 ++++--- .../publisher/protobuf_publisher_test.go | 13 +---- .../subscriber/protobuf_subscriber.go | 11 +++- .../subscriber/protobuf_subscriber_test.go | 50 +++++++++-------- ecal/publisher/publisher_test.go | 16 ++---- .../string/publisher/string_publisher_test.go | 16 ++---- ecal/string/subscriber/string_subscriber.go | 11 +++- .../subscriber/string_subscriber_test.go | 56 +++++++++++-------- ecal/subscriber/subscriber.go | 10 +++- ecal/subscriber/subscriber_test.go | 55 ++++++++++-------- internal/ecaltest/init.go | 18 ++++++ .../testutil_protobuf_publisher.go | 20 +++++++ .../testutil_string_publisher.go | 20 +++++++ .../testutil_publisher/testutil_publisher.go | 19 +++++++ main.go | 14 +++-- 15 files changed, 231 insertions(+), 119 deletions(-) create mode 100644 internal/ecaltest/init.go create mode 100644 internal/ecaltest/protobuf/testutil_publisher/testutil_protobuf_publisher.go create mode 100644 internal/ecaltest/string/testutil_publisher/testutil_string_publisher.go create mode 100644 internal/ecaltest/testutil_publisher/testutil_publisher.go diff --git a/README.md b/README.md index 5f2469b..1c1c27e 100644 --- a/README.md +++ b/README.md @@ -16,15 +16,22 @@ go run . - eCAL 6 compatible (unreleased) - Pure cgo; no SWIG dependency - Custom C interface implementation +- Direct deserialization from subscriber buffer to Go types Provides Go interfaces for: -- [x] core -- [x] publisher -- [ ] subscriber -- [ ] services -- [ ] logging -- [ ] monitoring -- [ ] registration +- [x] Core +- [ ] Configuration +- [x] Publisher + - [ ] Zero Copy +- [x] Subscriber +- [x] Message Types + - [x] Generic + - [x] String + - [x] Protobuf +- [x] Logging +- [ ] Services +- [ ] Monitoring +- [ ] Registration ## Non-system installations diff --git a/ecal/protobuf/publisher/protobuf_publisher_test.go b/ecal/protobuf/publisher/protobuf_publisher_test.go index 0bf9854..c8f883a 100644 --- a/ecal/protobuf/publisher/protobuf_publisher_test.go +++ b/ecal/protobuf/publisher/protobuf_publisher_test.go @@ -1,23 +1,16 @@ -package publisher +package publisher_test import ( "testing" + "github.com/DownerCase/ecal-go/internal/ecaltest/protobuf/testutil_publisher" "github.com/DownerCase/ecal-go/protos" ) func TestProtobufPublisher(t *testing.T) { - pub, err := New[protos.Person]() - - if err != nil { - t.Error(err) - } + pub := testutil_publisher.NewProtobufPublisher[protos.Person](t, "testing_protobuf_publisher") defer pub.Delete() - if err := pub.Create("testing"); err != nil { - t.Error(err) - } - if pub.Messages == nil { t.Error("Message channel nil") } diff --git a/ecal/protobuf/subscriber/protobuf_subscriber.go b/ecal/protobuf/subscriber/protobuf_subscriber.go index 3650ef4..fea6e9a 100644 --- a/ecal/protobuf/subscriber/protobuf_subscriber.go +++ b/ecal/protobuf/subscriber/protobuf_subscriber.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "reflect" + "time" "unsafe" "github.com/DownerCase/ecal-go/ecal/subscriber" @@ -32,9 +33,15 @@ func New[U any, T Msg[U]]() (*Subscriber[U, T], error) { return psub, err } -func (p *Subscriber[U, T]) Receive() (U, error) { +func (p *Subscriber[U, T]) Receive(timeout time.Duration) (U, error) { var u U - switch msg := (<-p.Messages).(type) { + var msg any + select { + case msg = <-p.Messages: + case <-time.After(timeout): + return u, errors.New("Receive timed out") + } + switch msg := msg.(type) { case U: return msg, nil case error: diff --git a/ecal/protobuf/subscriber/protobuf_subscriber_test.go b/ecal/protobuf/subscriber/protobuf_subscriber_test.go index d4f8469..dbab7c0 100644 --- a/ecal/protobuf/subscriber/protobuf_subscriber_test.go +++ b/ecal/protobuf/subscriber/protobuf_subscriber_test.go @@ -6,42 +6,35 @@ import ( "github.com/DownerCase/ecal-go/ecal" "github.com/DownerCase/ecal-go/ecal/protobuf/publisher" + "github.com/DownerCase/ecal-go/internal/ecaltest" + "github.com/DownerCase/ecal-go/internal/ecaltest/protobuf/testutil_publisher" "github.com/DownerCase/ecal-go/protos" ) -func TestSubscriber(t *testing.T) { - initResult := ecal.Initialize( - ecal.NewConfig(), - "Go eCAL!", - ecal.C_Publisher|ecal.C_Subscriber|ecal.C_Logging, - ) - if initResult != 0 { - t.Fatal("Failed to initialize", initResult) - } - defer ecal.Finalize() // Shutdown eCAL at the end of the program - - pub, err := publisher.New[protos.Person]() +func newSubscriber[U any, T Msg[U]](t *testing.T, topic string) *Subscriber[U, T] { + sub, err := New[U, T]() if err != nil { t.Error(err) } - defer pub.Delete() - - if err := pub.Create("testing_string_subscriber"); err != nil { + if err := sub.Create(topic); err != nil { t.Error(err) } + return sub +} - sub, err := New[protos.Person]() - if err != nil { - t.Error(err) - } +func TestSubscriber(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + + pub := testutil_publisher.NewProtobufPublisher[protos.Person](t, "testing_protobuf_subscriber") + defer pub.Delete() + + sub := newSubscriber[protos.Person](t, "testing_protobuf_subscriber") defer sub.Delete() - if err := sub.Create("testing_string_subscriber"); err != nil { - t.Error(err) - } go sendMessages(pub) for range 10 { - msg, err := sub.Receive() + msg, err := sub.Receive(2 * time.Second) if err != nil { t.Error(err) @@ -61,6 +54,17 @@ func TestSubscriber(t *testing.T) { } } +func TestSubscriberTimeout(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + sub := newSubscriber[protos.Person](t, "testing_protobuf_subscriber_timeout") + defer sub.Delete() + msg, err := sub.Receive(50 * time.Millisecond) + if err == nil { + t.Error("Expected timeout, received message:", &msg) + } +} + func sendMessages(p *publisher.Publisher[*protos.Person]) { person := &protos.Person{Id: 0, Name: "John", Email: "john@doe.net", Dog: &protos.Dog{Name: "Pluto"}, diff --git a/ecal/publisher/publisher_test.go b/ecal/publisher/publisher_test.go index 1d79489..94672f4 100644 --- a/ecal/publisher/publisher_test.go +++ b/ecal/publisher/publisher_test.go @@ -1,12 +1,15 @@ -package publisher +package publisher_test import ( "testing" + + "github.com/DownerCase/ecal-go/ecal/publisher" + "github.com/DownerCase/ecal-go/internal/ecaltest/testutil_publisher" ) func TestNewPublishers(t *testing.T) { for range 100 { - ptr, err := New() + ptr, err := publisher.New() if err != nil { t.Error(err) } @@ -16,15 +19,8 @@ func TestNewPublishers(t *testing.T) { } func TestPublisher(t *testing.T) { - pub, err := New() - if err != nil { - t.Error(err) - } + pub := testutil_publisher.NewGenericPublisher(t, "testing") defer pub.Delete() - - if err := pub.Create("testing", DataType{}); err != nil { - t.Error(err) - } if pub.Messages == nil { t.Error("Message channel nil") } diff --git a/ecal/string/publisher/string_publisher_test.go b/ecal/string/publisher/string_publisher_test.go index 7776683..546f584 100644 --- a/ecal/string/publisher/string_publisher_test.go +++ b/ecal/string/publisher/string_publisher_test.go @@ -1,21 +1,15 @@ -package publisher +package publisher_test import ( "testing" -) -func TestProtobufPublisher(t *testing.T) { - pub, err := New() + "github.com/DownerCase/ecal-go/internal/ecaltest/string/testutil_publisher" +) - if err != nil { - t.Error(err) - } +func TestStringPublisher(t *testing.T) { + pub := testutilpublisher.NewStringPublisher(t, "test_string_publisher") defer pub.Delete() - if err := pub.Create("testing"); err != nil { - t.Error(err) - } - if pub.Messages == nil { t.Error("Message channel nil") } diff --git a/ecal/string/subscriber/string_subscriber.go b/ecal/string/subscriber/string_subscriber.go index d44be2f..e1ca755 100644 --- a/ecal/string/subscriber/string_subscriber.go +++ b/ecal/string/subscriber/string_subscriber.go @@ -2,6 +2,8 @@ package subscriber import "C" import ( + "errors" + "time" "unsafe" "github.com/DownerCase/ecal-go/ecal/subscriber" @@ -17,8 +19,13 @@ func New() (*Subscriber, error) { return &Subscriber{*sub}, err } -func (p *Subscriber) Receive() string { - return (<-p.Messages).(string) +func (p *Subscriber) Receive(timeout time.Duration) (string, error) { + select { + case msg := <-p.Messages: + return msg.(string), nil + case <-time.After(timeout): + return "", errors.New("Receive timed out") + } } func deserialize(data unsafe.Pointer, len int) any { diff --git a/ecal/string/subscriber/string_subscriber_test.go b/ecal/string/subscriber/string_subscriber_test.go index a718a45..dcb8917 100644 --- a/ecal/string/subscriber/string_subscriber_test.go +++ b/ecal/string/subscriber/string_subscriber_test.go @@ -7,43 +7,42 @@ import ( "github.com/DownerCase/ecal-go/ecal" "github.com/DownerCase/ecal-go/ecal/string/publisher" + "github.com/DownerCase/ecal-go/internal/ecaltest" + "github.com/DownerCase/ecal-go/internal/ecaltest/string/testutil_publisher" ) var TEST_MESSAGE = "Test string" -func TestSubscriber(t *testing.T) { - initResult := ecal.Initialize( - ecal.NewConfig(), - "Go eCAL!", - ecal.C_Publisher|ecal.C_Subscriber|ecal.C_Logging, - ) - if initResult != 0 { - t.Fatal("Failed to initialize", initResult) - } - defer ecal.Finalize() // Shutdown eCAL at the end of the program - - pub, err := publisher.New() +func newSubscriber(t *testing.T, topic string) *Subscriber { + sub, err := New() if err != nil { t.Error(err) } - defer pub.Delete() - - if err := pub.Create("testing_string_subscriber"); err != nil { + if err := sub.Create(topic); err != nil { t.Error(err) } + return sub +} - sub, err := New() - if err != nil { - t.Error(err) - } +// Export for testing +var NewSubscriber = newSubscriber + +func TestSubscriber(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + + pub := testutilpublisher.NewStringPublisher(t, "testing_string_subscriber") + defer pub.Delete() + + sub := newSubscriber(t, "testing_string_subscriber") defer sub.Delete() - if err := sub.Create("testing_string_subscriber"); err != nil { - t.Error(err) - } go sendMessages(pub) for range 10 { - msg := sub.Receive() + msg, err := sub.Receive(2 * time.Second) + if err != nil { + t.Error(err) + } if len(msg) != len(TEST_MESSAGE) { t.Error("Expected message of length", len(TEST_MESSAGE), "Received:", len(msg)) } @@ -53,6 +52,17 @@ func TestSubscriber(t *testing.T) { } } +func TestSubscriberTimeout(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + sub := newSubscriber(t, "testing_string_subscriber_timeout") + defer sub.Delete() + msg, err := sub.Receive(50 * time.Millisecond) + if err == nil { + t.Error("Expected timeout, received message:", msg) + } +} + func sendMessages(p *publisher.Publisher) { for !p.IsStopped() { p.Messages <- []byte(TEST_MESSAGE) diff --git a/ecal/subscriber/subscriber.go b/ecal/subscriber/subscriber.go index e6995dd..e32f1f2 100644 --- a/ecal/subscriber/subscriber.go +++ b/ecal/subscriber/subscriber.go @@ -13,6 +13,7 @@ import "C" import ( "errors" "runtime/cgo" + "time" "unsafe" "github.com/DownerCase/ecal-go/ecal/msg" @@ -74,8 +75,13 @@ func (p *Subscriber) Create(topic string, datatype DataType) error { } // Receive a new message from the eCAL receive callback -func (p *Subscriber) Receive() []byte { - return (<-p.Messages).([]byte) +func (p *Subscriber) Receive(timeout time.Duration) ([]byte, error) { + select { + case msg := <-p.Messages: + return msg.([]byte), nil + case <-time.After(timeout): + return nil, errors.New("Receive timed out") + } } // Deserialize straight from the eCAL internal buffer to our Go []byte diff --git a/ecal/subscriber/subscriber_test.go b/ecal/subscriber/subscriber_test.go index dac3dda..b550c8b 100644 --- a/ecal/subscriber/subscriber_test.go +++ b/ecal/subscriber/subscriber_test.go @@ -7,43 +7,41 @@ import ( "github.com/DownerCase/ecal-go/ecal" "github.com/DownerCase/ecal-go/ecal/publisher" + "github.com/DownerCase/ecal-go/internal/ecaltest" + "github.com/DownerCase/ecal-go/internal/ecaltest/testutil_publisher" ) var TEST_MESSAGE = []byte{4, 15, 80} -func TestSubscriber(t *testing.T) { - initResult := ecal.Initialize( - ecal.NewConfig(), - "Go eCAL!", - ecal.C_Publisher|ecal.C_Subscriber|ecal.C_Logging, - ) - if initResult != 0 { - t.Fatal("Failed to initialize", initResult) - } - defer ecal.Finalize() // Shutdown eCAL at the end of the program - - pub, err := publisher.New() +func newSubscriber(t *testing.T, topic string) *Subscriber { + sub, err := New() if err != nil { t.Error(err) } - defer pub.Delete() - - if err := pub.Create("testing_subscriber", DataType{}); err != nil { + if err := sub.Create(topic, DataType{}); err != nil { t.Error(err) } + return sub +} - sub, err := New() - if err != nil { - t.Error(err) - } +func TestSubscriber(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + + pub := testutil_publisher.NewGenericPublisher(t, "testing_subscriber") + defer pub.Delete() + + sub := newSubscriber(t, "testing_subscriber") defer sub.Delete() - if err := sub.Create("testing_subscriber", DataType{}); err != nil { - t.Error(err) - } go sendMessages(pub) for range 10 { - msg := sub.Receive() + // TODO: Reduce the propagation delay for when the subscriber gets + // connected to the publisher + msg, err := sub.Receive(2 * time.Second) + if err != nil { + t.Error("Received err:", err) + } if msg == nil { t.Error("Nil message received:") } @@ -56,6 +54,17 @@ func TestSubscriber(t *testing.T) { } } +func TestSubscriberTimeout(t *testing.T) { + ecaltest.InitEcal(t) + defer ecal.Finalize() // Shutdown eCAL at the end of the program + sub := newSubscriber(t, "testing_subscriber_timeout") + defer sub.Delete() + msg, err := sub.Receive(50 * time.Millisecond) + if err == nil { + t.Error("Expected timeout, received message:", msg) + } +} + func sendMessages(p *publisher.Publisher) { for !p.IsStopped() { p.Messages <- TEST_MESSAGE diff --git a/internal/ecaltest/init.go b/internal/ecaltest/init.go new file mode 100644 index 0000000..bbf3e1d --- /dev/null +++ b/internal/ecaltest/init.go @@ -0,0 +1,18 @@ +package ecaltest + +import ( + "testing" + + "github.com/DownerCase/ecal-go/ecal" +) + +func InitEcal(t *testing.T) { + initResult := ecal.Initialize( + ecal.NewConfig(), + "Go eCAL!", + ecal.C_Publisher|ecal.C_Subscriber|ecal.C_Logging, + ) + if initResult != 0 { + t.Fatal("Failed to initialize", initResult) + } +} diff --git a/internal/ecaltest/protobuf/testutil_publisher/testutil_protobuf_publisher.go b/internal/ecaltest/protobuf/testutil_publisher/testutil_protobuf_publisher.go new file mode 100644 index 0000000..0f0c0d3 --- /dev/null +++ b/internal/ecaltest/protobuf/testutil_publisher/testutil_protobuf_publisher.go @@ -0,0 +1,20 @@ +package testutil_publisher + +import ( + "testing" + + "github.com/DownerCase/ecal-go/ecal/protobuf/publisher" +) + +func NewProtobufPublisher[U any, T publisher.Msg[U]](t *testing.T, topic string) *publisher.Publisher[T] { + pub, err := publisher.New[U, T]() + + if err != nil { + t.Error(err) + } + + if err := pub.Create(topic); err != nil { + t.Error(err) + } + return pub +} diff --git a/internal/ecaltest/string/testutil_publisher/testutil_string_publisher.go b/internal/ecaltest/string/testutil_publisher/testutil_string_publisher.go new file mode 100644 index 0000000..e7946b8 --- /dev/null +++ b/internal/ecaltest/string/testutil_publisher/testutil_string_publisher.go @@ -0,0 +1,20 @@ +package testutilpublisher + +import ( + "testing" + + "github.com/DownerCase/ecal-go/ecal/string/publisher" +) + +func NewStringPublisher(t *testing.T, topic string) *publisher.Publisher { + pub, err := publisher.New() + + if err != nil { + t.Error(err) + } + + if err := pub.Create(topic); err != nil { + t.Error(err) + } + return pub +} diff --git a/internal/ecaltest/testutil_publisher/testutil_publisher.go b/internal/ecaltest/testutil_publisher/testutil_publisher.go new file mode 100644 index 0000000..102b119 --- /dev/null +++ b/internal/ecaltest/testutil_publisher/testutil_publisher.go @@ -0,0 +1,19 @@ +package testutil_publisher + +import ( + "testing" + + "github.com/DownerCase/ecal-go/ecal/publisher" +) + +func NewGenericPublisher(t *testing.T, topic string) *publisher.Publisher { + pub, err := publisher.New() + if err != nil { + t.Error(err) + } + + if err := pub.Create(topic, publisher.DataType{}); err != nil { + t.Error(err) + } + return pub +} diff --git a/main.go b/main.go index ea8afe9..8705a77 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "github.com/DownerCase/ecal-go/ecal/logging" "github.com/DownerCase/ecal-go/ecal/protobuf/publisher" string_publisher "github.com/DownerCase/ecal-go/ecal/string/publisher" - "github.com/DownerCase/ecal-go/ecal/subscriber" + "github.com/DownerCase/ecal-go/ecal/string/subscriber" "github.com/DownerCase/ecal-go/protos" ) @@ -64,10 +64,7 @@ func main() { } sub, _ := subscriber.New() - if sub.Create("string topic", subscriber.DataType{ - Name: "std::string", - Encoding: "base", - }) != nil { + if sub.Create("string topic") != nil { panic("Failed to Create string subscriber") } go receiveMessages(sub) @@ -100,6 +97,11 @@ func main() { func receiveMessages(s *subscriber.Subscriber) { for { - fmt.Println("Received:", string(s.Receive())) + msg, err := s.Receive(2 * time.Second) + if err == nil { + fmt.Println("Received:", msg) + } else { + fmt.Println(err) + } } }