From 36682fa1e4d820a2fab759a666a16012c8a185ab Mon Sep 17 00:00:00 2001 From: Florian Meiberg Date: Thu, 21 Apr 2022 13:29:14 +0200 Subject: [PATCH] enhanced example --- README.md | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 2e961bf..db92991 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ For using vnats you will need to ### Publisher -The publisher sends a slice of bytes `[]byte` to a subject. If a struct or different type should be sent, -the user has to (un-)marshal the payload. +The publisher sends a slice of bytes `[]byte` to a subject. If a struct or different type should be sent, the user has +to (un-)marshal the payload. #### Example @@ -46,6 +46,12 @@ func main() { if err != nil { log.Errorf(err.Error()) } + // Close NATS connection deferred + defer func(conn vnats.Connection) { + if err := conn.Close(); err != nil { + log.Errorf("NATS connection could not be closed: %v", err) + } + }(conn) // Create publisher bound to stream `PRODUCTS` pub, err := conn.NewPublisher(vnats.NewPublisherArgs{StreamName: "PRODUCTS"}) @@ -58,13 +64,13 @@ func main() { Price: "12,34", LastUpdated: time.Now(), } - + // Since vnats needs a slice of bytes, the products is converted via the json marshaller productToBytes, err := json.Marshal(p) if err != nil { - panic(err) + panic(err) } - + // Publish message to stream `PRODUCTS.PRICES` with a context bound, unique message ID // msgID is used for deduplication msgID := fmt.Sprintf("%s-%s", p.Name, p.LastUpdated) @@ -75,11 +81,6 @@ func main() { }); err != nil { log.Errorf("Could not publish %v: %v", p, err) } - - // Close NATS connection - if err := conn.Close(); err != nil { - log.Errorf("NATS connection could not be closed: %v", err) - } } ``` @@ -88,8 +89,8 @@ func main() { ### Subscriber We use a pull-based subscriber by default, which scales horizontally. The subscriber is asynchronous and pulls -continuously for new messages. A message handler is needed to process each message. The message will be passed as -a slice of bytes `[]byte`. +continuously for new messages. A message handler is needed to process each message. The message will be passed as a +slice of bytes `[]byte`. **Important**: The `MsgHandler` **MUST** finish its task under 30 seconds. Longer tasks must be only triggered and executed asynchronously. @@ -125,6 +126,13 @@ func main() { if err != nil { log.Errorf(err.Error()) } + + // Unsubscribe to all open subscriptions and close NATS connection deferred + defer func(conn vnats.Connection) { + if err := conn.Close(); err != nil { + log.Errorf("NATS connection could not be closed: %v", err) + } + }(conn) // Create Pull-Subscriber bound to consumer `EXAMPLE_CONSUMER` // and the subject `PRODUCTS.PRICES` @@ -143,19 +151,13 @@ func main() { // Wait for stop signal (e.g. ctrl-C) waitForStopSignal() - - // Unsubscribe to all open subscriptions and close NATS connection - if err := conn.Close(); err != nil { - log.Errorf("NATS connection could not be closed: %v", err) - } - } // MsgHandler returns the data in a slice of bytes inside the InMsg struct. func msgHandler(msg vnats.InMsg) error { var p Product - if err := json.Unmarshal(msg.Data(), &p); err != nil { - return err + if err := json.Unmarshal(msg.Data(), &p); err != nil { + return err } log.Debugf("Received product: %v", p) return nil