From 5ad9ee5f2df6f81e9043e29294a6f6822cba2c66 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Sun, 29 Sep 2024 14:40:21 +0800 Subject: [PATCH] enh: tmq add config `session.timeout.ms` and `max.poll.interval.ms` --- ws/tmq/config.go | 10 ++++++++++ ws/tmq/consumer.go | 34 ++++++++++++++++++++++++---------- ws/tmq/consumer_test.go | 2 ++ ws/tmq/proto.go | 2 ++ 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/ws/tmq/config.go b/ws/tmq/config.go index e119dcf..1f90ee7 100644 --- a/ws/tmq/config.go +++ b/ws/tmq/config.go @@ -23,6 +23,8 @@ type config struct { AutoReconnect bool ReconnectIntervalMs int ReconnectRetryCount int + SessionTimeoutMS string + MaxPollIntervalMS string } func newConfig(url string, chanLength uint) *config { @@ -99,3 +101,11 @@ func (c *config) setReconnectIntervalMs(reconnectIntervalMs int) { func (c *config) setReconnectRetryCount(reconnectRetryCount int) { c.ReconnectRetryCount = reconnectRetryCount } + +func (c *config) setSessionTimeoutMS(sessionTimeoutMS string) { + c.SessionTimeoutMS = sessionTimeoutMS +} + +func (c *config) setMaxPollIntervalMS(maxPollIntervalMS string) { + c.MaxPollIntervalMS = maxPollIntervalMS +} diff --git a/ws/tmq/consumer.go b/ws/tmq/consumer.go index a6edbac..b7fbe4f 100644 --- a/ws/tmq/consumer.go +++ b/ws/tmq/consumer.go @@ -42,6 +42,8 @@ type Consumer struct { offsetRest string snapshotEnable string withTableName string + sessionTimeoutMS string + maxPollIntervalMS string closeOnce sync.Once closeChan chan struct{} topics []string @@ -243,6 +245,14 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) { if err != nil { return nil, err } + sessionTimeoutMS, err := m.Get("session.timeout.ms", "") + if err != nil { + return nil, err + } + maxPollIntervalMS, err := m.Get("max.poll.interval.ms", "") + if err != nil { + return nil, err + } config := newConfig(url.(string), chanLen.(uint)) err = config.setMessageTimeout(messageTimeout.(time.Duration)) if err != nil { @@ -265,6 +275,8 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) { config.setAutoReconnect(autoReconnect.(bool)) config.setReconnectIntervalMs(reconnectIntervalMs.(int)) config.setReconnectRetryCount(reconnectRetryCount.(int)) + config.setSessionTimeoutMS(sessionTimeoutMS.(string)) + config.setMaxPollIntervalMS(maxPollIntervalMS.(string)) return config, nil } @@ -417,16 +429,18 @@ func (c *Consumer) doSubscribe(topics []string, reconnect bool) error { } reqID := c.generateReqID() req := &SubscribeReq{ - ReqID: reqID, - User: c.user, - Password: c.password, - GroupID: c.groupID, - ClientID: c.clientID, - OffsetRest: c.offsetRest, - Topics: topics, - AutoCommit: "false", - SnapshotEnable: c.snapshotEnable, - WithTableName: c.withTableName, + ReqID: reqID, + User: c.user, + Password: c.password, + GroupID: c.groupID, + ClientID: c.clientID, + OffsetRest: c.offsetRest, + Topics: topics, + AutoCommit: "false", + SnapshotEnable: c.snapshotEnable, + WithTableName: c.withTableName, + SessionTimeoutMS: c.sessionTimeoutMS, + MaxPollIntervalMS: c.maxPollIntervalMS, } args, err := client.JsonI.Marshal(req) if err != nil { diff --git a/ws/tmq/consumer_test.go b/ws/tmq/consumer_test.go index 37dd34b..4ab7a98 100644 --- a/ws/tmq/consumer_test.go +++ b/ws/tmq/consumer_test.go @@ -141,6 +141,8 @@ func TestConsumer(t *testing.T) { "enable.auto.commit": "true", "auto.commit.interval.ms": "5000", "msg.with.table.name": "true", + "session.timeout.ms": "12000", + "max.poll.interval.ms": "300000", }) if err != nil { t.Error(err) diff --git a/ws/tmq/proto.go b/ws/tmq/proto.go index 3a17c8b..ce9d501 100644 --- a/ws/tmq/proto.go +++ b/ws/tmq/proto.go @@ -19,6 +19,8 @@ type SubscribeReq struct { AutoCommitIntervalMS string `json:"auto_commit_interval_ms"` SnapshotEnable string `json:"snapshot_enable"` WithTableName string `json:"with_table_name"` + SessionTimeoutMS string `json:"session_timeout_ms"` + MaxPollIntervalMS string `json:"max_poll_interval_ms"` } type SubscribeResp struct {