Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Per Message TTL #609

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
JSMetaCurrentServerLevel = "_nats.level"
JSMetaCurrentServerVersion = "_nats.ver"
JsMetaRequiredServerLevel = "_nats.req.level"
JSMessageTTL = "Nats-TTL"
)

// Responses to requests sent to a server from a client.
Expand Down
5 changes: 5 additions & 0 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,11 @@ type StreamConfig struct {
FirstSeq uint64 `json:"first_seq,omitempty" yaml:"first_seq"`
// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"`
// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"`
// LimitsTTL activates writing of messages when limits are applied with a specific TTL.
LimitsTTL time.Duration `json:"limits_ttl,omitempty" yaml:"limits_ttl"`
// The following defaults will apply to consumers when created against
// this stream, unless overridden manually. They also represent the maximum values that
// these properties may have
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/jedib0t/go-pretty/v6 v6.6.5
github.com/klauspost/compress v1.17.11
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26
github.com/nats-io/nats.go v1.38.0
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -31,8 +33,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a h1:Z8XlUCYnAqGgw4ijut3jL1sN9vZGZVI3qxZ7jlX4j/g=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a/go.mod h1:skFpICXskKQmrPs+EqjiIdeBTq7FBYWfYAxyCdPdP+4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26 h1:mN0eraizaHih90T32ItWe58+l31eVjoD6JOu99WqdNc=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26/go.mod h1:nXRZ6eQo2lmNpZLVNIMDNwKM7FgbHgPJ1pIRcPOpVuk=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
Expand Down
9 changes: 9 additions & 0 deletions schema_source/jetstream/api/v1/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,15 @@
"consumer_limits": {
"description": "Limits of certain values that consumers can set, defaults for those who don't set these settings",
"$ref": "#/definitions/stream_consumer_limits"
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$ref": "#/definitions/golang_duration_nanos"
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_list_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_restore_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_snapshot_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_update_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_update_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,21 @@ func FirstSequence(seq uint64) StreamOption {
}
}

func AllowMsgTTL() StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
return nil
}
}

func LimitsAppliedTTL(ttl time.Duration) StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
o.LimitsTTL = ttl
return nil
}
}

func SubjectTransform(subjectTransform *api.SubjectTransformConfig) StreamOption {
return func(o *api.StreamConfig) error {
o.SubjectTransform = subjectTransform
Expand Down Expand Up @@ -1116,4 +1131,6 @@ func (s *Stream) IsRepublishing() bool { return s.Republish(
func (s *Stream) Metadata() map[string]string { return s.cfg.Metadata }
func (s *Stream) Compression() api.Compression { return s.cfg.Compression }
func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq }
func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL }
func (s *Stream) LimitsTTL() time.Duration { return s.cfg.LimitsTTL }
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits }
Loading