diff --git a/bridge.go b/bridge.go index 2759669..864bc8a 100644 --- a/bridge.go +++ b/bridge.go @@ -1,7 +1,6 @@ package vnats import ( - "errors" "fmt" "strings" @@ -67,53 +66,21 @@ func (b *natsBridge) EnsureStreamExists(streamConfig *nats.StreamConfig) error { } func (b *natsBridge) Subscribe(subject, consumerName string, mode SubscriptionMode) (*nats.Subscription, error) { - streamName := strings.Split(subject, ".")[0] - config := &nats.ConsumerConfig{ - Durable: consumerName, - AckPolicy: nats.AckExplicitPolicy, - AckWait: defaultAckWait, - } - - patchConsumerConfig(config, mode) - - if _, err := b.fetchOrAddConsumer(streamName, config); err != nil { - return nil, err - } - - return b.jetStreamContext.PullSubscribe(subject, consumerName, nats.Bind(streamName, consumerName)) -} - -func patchConsumerConfig(config *nats.ConsumerConfig, mode SubscriptionMode) { + var maxAckPending int switch mode { case MultipleSubscribersAllowed: - config.MaxAckPending = natsServer.JsDefaultMaxAckPending + maxAckPending = natsServer.JsDefaultMaxAckPending case SingleSubscriberStrictMessageOrder: - config.MaxAckPending = 1 + maxAckPending = 1 default: - config.MaxAckPending = natsServer.JsDefaultMaxAckPending - } -} - -func (b *natsBridge) fetchOrAddConsumer(streamName string, consumerConfig *nats.ConsumerConfig) (*nats.ConsumerInfo, error) { - ci, err := b.jetStreamContext.ConsumerInfo(streamName, consumerConfig.Durable) - if errors.Is(err, nats.ErrConsumerNotFound) { - b.log("Consumer %s not found, about to add consumer.", consumerConfig.Durable) - if ci, err = b.jetStreamContext.AddConsumer(streamName, consumerConfig); err != nil { - return nil, fmt.Errorf("NATS consumer could not be fetched: %w", err) - } - b.log("Created new NATS consumer %s", consumerConfig.Durable) - return ci, nil - } else if err != nil { - return nil, fmt.Errorf("consumer %s could not be fetched: %w", consumerConfig.Durable, err) - } - - if ci.Config.MaxAckPending != consumerConfig.MaxAckPending { - b.log("Consumer %s SubscriptionMode has changed. Use the existing SubscriptionMode=%v or delete consumer.", - consumerConfig.Durable, SubscriptionMode(ci.Config.MaxAckPending)) - return nil, fmt.Errorf("stream consumer SubscriptionMode %v does not match with consumerConfig", SubscriptionMode(ci.Config.MaxAckPending)) + maxAckPending = natsServer.JsDefaultMaxAckPending } - return ci, nil + return b.jetStreamContext.PullSubscribe(subject, consumerName, + nats.AckExplicit(), + nats.MaxAckPending(maxAckPending), + nats.AckWait(defaultAckWait), + ) } func (b *natsBridge) Servers() []string {