From 2faee6c9929a133f83aabfb36b2b5469b7ade94f Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 28 Oct 2022 16:14:20 -0400 Subject: [PATCH 1/2] set minPartitionsAutoDiscoveryInterval to prevent partition metadata lookup overwhelms broker --- pulsar/consumer_test.go | 2 +- pulsar/producer_impl.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f574378374..b25166ca6b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1703,7 +1703,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.NoError(t, err) return i }, - PartitionsAutoDiscoveryInterval: 100 * time.Millisecond, + PartitionsAutoDiscoveryInterval: 1000 * time.Millisecond, }) assert.Nil(t, err) defer producer.Close() diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597d0..037465e88d 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -45,6 +45,9 @@ const ( // defaultPartitionsAutoDiscoveryInterval init default time interval for partitions auto discovery defaultPartitionsAutoDiscoveryInterval = 1 * time.Minute + + // minPartitionAutoDiscoveryInterval is the minimum time interval for partition auto discovery + minPartitionAutoDiscoveryInterval = 1 * time.Second ) type producer struct { @@ -90,7 +93,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { if options.BatchingMaxPublishDelay <= 0 { options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay } - if options.PartitionsAutoDiscoveryInterval <= 0 { + if options.PartitionsAutoDiscoveryInterval <= minPartitionAutoDiscoveryInterval { options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval } From f45212f0285937876a917f88085e089f14faabab Mon Sep 17 00:00:00 2001 From: ming luo Date: Mon, 31 Oct 2022 09:33:42 -0400 Subject: [PATCH 2/2] error out when PartitionsAutoDiscoveryInterval is misconfigured --- pulsar/consumer_partition.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index ebaa48b989..be403ff071 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -416,7 +416,10 @@ func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } - return ackReq.err + ackReq.RLock() + err := ackReq.err + ackReq.RUnlock() + return err } func (pc *partitionConsumer) AckID(msgID MessageID) error { @@ -447,7 +450,10 @@ func (pc *partitionConsumer) AckID(msgID MessageID) error { pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } - return ackReq.err + ackReq.RLock() + err := ackReq.err + ackReq.RUnlock() + return err } func (pc *partitionConsumer) NackID(msgID MessageID) { @@ -672,7 +678,9 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.WithError(err).Error("Ack with response error") + req.Lock() req.err = err + req.Unlock() } return } @@ -680,7 +688,9 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.Error("Connection was closed when request ack cmd") + req.Lock() req.err = err + req.Unlock() } } @@ -1154,6 +1164,7 @@ type ackRequest struct { doneCh chan struct{} msgID trackingMessageID err error + sync.RWMutex } type unsubscribeRequest struct {