Skip to content

Commit

Permalink
Merge pull request #301 from taosdata/enh/xftan/TD-32013
Browse files Browse the repository at this point in the history
enh: tmq add config `session.timeout.ms` and `max.poll.interval.ms`
  • Loading branch information
sheyanjie-qq authored Sep 30, 2024
2 parents 21d50e3 + 5ad9ee5 commit a0b2089
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
10 changes: 10 additions & 0 deletions ws/tmq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type config struct {
AutoReconnect bool
ReconnectIntervalMs int
ReconnectRetryCount int
SessionTimeoutMS string
MaxPollIntervalMS string
}

func newConfig(url string, chanLength uint) *config {
Expand Down Expand Up @@ -99,3 +101,11 @@ func (c *config) setReconnectIntervalMs(reconnectIntervalMs int) {
func (c *config) setReconnectRetryCount(reconnectRetryCount int) {
c.ReconnectRetryCount = reconnectRetryCount
}

func (c *config) setSessionTimeoutMS(sessionTimeoutMS string) {
c.SessionTimeoutMS = sessionTimeoutMS
}

func (c *config) setMaxPollIntervalMS(maxPollIntervalMS string) {
c.MaxPollIntervalMS = maxPollIntervalMS
}
34 changes: 24 additions & 10 deletions ws/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Consumer struct {
offsetRest string
snapshotEnable string
withTableName string
sessionTimeoutMS string
maxPollIntervalMS string
closeOnce sync.Once
closeChan chan struct{}
topics []string
Expand Down Expand Up @@ -243,6 +245,14 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) {
if err != nil {
return nil, err
}
sessionTimeoutMS, err := m.Get("session.timeout.ms", "")
if err != nil {
return nil, err
}
maxPollIntervalMS, err := m.Get("max.poll.interval.ms", "")
if err != nil {
return nil, err
}
config := newConfig(url.(string), chanLen.(uint))
err = config.setMessageTimeout(messageTimeout.(time.Duration))
if err != nil {
Expand All @@ -265,6 +275,8 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) {
config.setAutoReconnect(autoReconnect.(bool))
config.setReconnectIntervalMs(reconnectIntervalMs.(int))
config.setReconnectRetryCount(reconnectRetryCount.(int))
config.setSessionTimeoutMS(sessionTimeoutMS.(string))
config.setMaxPollIntervalMS(maxPollIntervalMS.(string))
return config, nil
}

Expand Down Expand Up @@ -417,16 +429,18 @@ func (c *Consumer) doSubscribe(topics []string, reconnect bool) error {
}
reqID := c.generateReqID()
req := &SubscribeReq{
ReqID: reqID,
User: c.user,
Password: c.password,
GroupID: c.groupID,
ClientID: c.clientID,
OffsetRest: c.offsetRest,
Topics: topics,
AutoCommit: "false",
SnapshotEnable: c.snapshotEnable,
WithTableName: c.withTableName,
ReqID: reqID,
User: c.user,
Password: c.password,
GroupID: c.groupID,
ClientID: c.clientID,
OffsetRest: c.offsetRest,
Topics: topics,
AutoCommit: "false",
SnapshotEnable: c.snapshotEnable,
WithTableName: c.withTableName,
SessionTimeoutMS: c.sessionTimeoutMS,
MaxPollIntervalMS: c.maxPollIntervalMS,
}
args, err := client.JsonI.Marshal(req)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions ws/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func TestConsumer(t *testing.T) {
"enable.auto.commit": "true",
"auto.commit.interval.ms": "5000",
"msg.with.table.name": "true",
"session.timeout.ms": "12000",
"max.poll.interval.ms": "300000",
})
if err != nil {
t.Error(err)
Expand Down
2 changes: 2 additions & 0 deletions ws/tmq/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type SubscribeReq struct {
AutoCommitIntervalMS string `json:"auto_commit_interval_ms"`
SnapshotEnable string `json:"snapshot_enable"`
WithTableName string `json:"with_table_name"`
SessionTimeoutMS string `json:"session_timeout_ms"`
MaxPollIntervalMS string `json:"max_poll_interval_ms"`
}

type SubscribeResp struct {
Expand Down

0 comments on commit a0b2089

Please sign in to comment.