Skip to content

Commit

Permalink
[Issue 1223] Support ZeroQueueConsumer (apache#1225)
Browse files Browse the repository at this point in the history
Fixes apache#1223

### Motivation

Support ZeroQueueConsumer, refer to Java [ZeroQueueConsumerImpl](https://github.com/apache/pulsar/blob/8c50a6c2e91c81dbf187ce5e66cb39e2758a741e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L42)

### Modifications

- The consumer add a new optional parameter `EnableZeroQueueConsumer`
- Add a new `zeroQueueConsumer`
  • Loading branch information
crossoverJie authored Jun 26, 2024
1 parent a6e28dc commit aa090be
Show file tree
Hide file tree
Showing 5 changed files with 860 additions and 46 deletions.
5 changes: 5 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ type ConsumerOptions struct {
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.
// Notice: only non-partitioned topic is supported.
// Default is false.
EnableZeroQueueConsumer bool

// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
Expand Down
114 changes: 69 additions & 45 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -81,6 +84,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.ReceiverQueueSize = defaultReceiverQueueSize
}

if options.EnableZeroQueueConsumer {
options.ReceiverQueueSize = 0
}

if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}
Expand Down Expand Up @@ -236,7 +243,24 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}

func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
partitions, err := client.TopicPartitions(topic)
if err != nil {
return nil, err
}

if len(partitions) > 1 && options.EnableZeroQueueConsumer {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}

consumer := &consumer{
topic: topic,
Expand All @@ -253,7 +277,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
metrics: client.metrics.GetLeveledMetrics(topic),
}

err := consumer.internalTopicSubscribeToPartitions()
err = consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -343,10 +367,6 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
consumer *partitionConsumer
}

receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties

startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions

Expand All @@ -364,45 +384,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {

go func(idx int, pt string) {
defer wg.Done()

var nackRedeliveryDelay time.Duration
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: c.options.startMessageID,
startMessageIDInclusive: c.options.StartMessageIDInclusive,
subscriptionMode: c.options.SubscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
opts := newPartitionConsumerOpts(pt, c.consumerName, idx, c.options)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
Expand Down Expand Up @@ -444,6 +426,48 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return nil
}

func newPartitionConsumerOpts(topic, consumerName string, idx int, options ConsumerOptions) *partitionConsumerOpts {

var nackRedeliveryDelay time.Duration
if options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = options.NackRedeliveryDelay
}
return &partitionConsumerOpts{
topic: topic,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
metadata: options.Properties,
subProperties: options.SubscriptionProperties,
replicateSubscriptionState: options.ReplicateSubscriptionState,
startMessageID: options.startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: options.SubscriptionMode,
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
backoffPolicy: options.BackoffPolicy,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
ackWithResponse: options.AckWithResponse,
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
consumerEventListener: options.EventListener,
enableBatchIndexAck: options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: options.AckGroupingOptions,
autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize,
}
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
Expand Down
Loading

0 comments on commit aa090be

Please sign in to comment.