Skip to content

Commit

Permalink
Merge pull request #10 from fond-of-vertigo/fix/subscriber-consumer-name
Browse files Browse the repository at this point in the history
fix: let pull-subscriber create consumer
  • Loading branch information
adankb authored Jun 27, 2023
2 parents ac0dd9e + ed7058b commit 71530ea
Showing 1 changed file with 9 additions and 42 deletions.
51 changes: 9 additions & 42 deletions bridge.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package vnats

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -67,53 +66,21 @@ func (b *natsBridge) EnsureStreamExists(streamConfig *nats.StreamConfig) error {
}

func (b *natsBridge) Subscribe(subject, consumerName string, mode SubscriptionMode) (*nats.Subscription, error) {
streamName := strings.Split(subject, ".")[0]
config := &nats.ConsumerConfig{
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
AckWait: defaultAckWait,
}

patchConsumerConfig(config, mode)

if _, err := b.fetchOrAddConsumer(streamName, config); err != nil {
return nil, err
}

return b.jetStreamContext.PullSubscribe(subject, consumerName, nats.Bind(streamName, consumerName))
}

func patchConsumerConfig(config *nats.ConsumerConfig, mode SubscriptionMode) {
var maxAckPending int
switch mode {
case MultipleSubscribersAllowed:
config.MaxAckPending = natsServer.JsDefaultMaxAckPending
maxAckPending = natsServer.JsDefaultMaxAckPending
case SingleSubscriberStrictMessageOrder:
config.MaxAckPending = 1
maxAckPending = 1
default:
config.MaxAckPending = natsServer.JsDefaultMaxAckPending
}
}

func (b *natsBridge) fetchOrAddConsumer(streamName string, consumerConfig *nats.ConsumerConfig) (*nats.ConsumerInfo, error) {
ci, err := b.jetStreamContext.ConsumerInfo(streamName, consumerConfig.Durable)
if errors.Is(err, nats.ErrConsumerNotFound) {
b.log("Consumer %s not found, about to add consumer.", consumerConfig.Durable)
if ci, err = b.jetStreamContext.AddConsumer(streamName, consumerConfig); err != nil {
return nil, fmt.Errorf("NATS consumer could not be fetched: %w", err)
}
b.log("Created new NATS consumer %s", consumerConfig.Durable)
return ci, nil
} else if err != nil {
return nil, fmt.Errorf("consumer %s could not be fetched: %w", consumerConfig.Durable, err)
}

if ci.Config.MaxAckPending != consumerConfig.MaxAckPending {
b.log("Consumer %s SubscriptionMode has changed. Use the existing SubscriptionMode=%v or delete consumer.",
consumerConfig.Durable, SubscriptionMode(ci.Config.MaxAckPending))
return nil, fmt.Errorf("stream consumer SubscriptionMode %v does not match with consumerConfig", SubscriptionMode(ci.Config.MaxAckPending))
maxAckPending = natsServer.JsDefaultMaxAckPending
}

return ci, nil
return b.jetStreamContext.PullSubscribe(subject, consumerName,
nats.AckExplicit(),
nats.MaxAckPending(maxAckPending),
nats.AckWait(defaultAckWait),
)
}

func (b *natsBridge) Servers() []string {
Expand Down

0 comments on commit 71530ea

Please sign in to comment.