Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports extended msg delete #624

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ type JSApiStreamListRequest struct {

// io.nats.jetstream.api.v1.stream_msg_delete_request
type JSApiMsgDeleteRequest struct {
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
NoMarker bool `json:"no_marker,omitempty"`
// Seq is the message sequence to delete
Seq uint64 `json:"seq"`
// NoErase avoids overwriting the message data with random bytes
NoErase bool `json:"no_erase,omitempty"`
// NoMarker avoids placing a delete marker on streams with Markers enabled
NoMarker bool `json:"no_marker,omitempty"`
}

// io.nats.jetstream.api.v1.stream_msg_delete_response
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.62.0
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
golang.org/x/net v0.34.0
golang.org/x/text v0.21.0
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -34,9 +34,8 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.3 // indirect
google.golang.org/protobuf v1.36.4 // indirect
)
14 changes: 4 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254 h1:3slXeEjjTNhhndJ7FEFqSx//GxSGG22C8IjsER7YIM0=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 h1:JTZQwrehqUHpEO+DRjm0734B4a0porO1Cb5ACT0nSJY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 h1:UbqnZ2WOky1HufvUU+jRfrskdBkJlJrOsuyAzvrmMQA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
Expand All @@ -62,12 +58,10 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -77,8 +71,8 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
4 changes: 2 additions & 2 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err er
}

// DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error {
func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool, noMarker bool) error {
var resp api.JSApiMsgDeleteResponse
err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase}, &resp)
err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase, NoMarker: noMarker}, &resp)
if err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,20 +780,33 @@ func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error) {
}

// FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
//
// Deprecated: Use DeleteMessageRequest()
func (s *Stream) FastDeleteMessage(seq uint64) error {
return s.mgr.DeleteStreamMessage(s.Name(), seq, true)
return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq, NoErase: true})
}

// DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data
//
// Deprecated: Use DeleteMessageRequest()
func (s *Stream) DeleteMessage(seq uint64) (err error) {
return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq})
}

// DeleteMessageRequest deletes a specific message from the Stream with a full request
func (s *Stream) DeleteMessageRequest(req api.JSApiMsgDeleteRequest) (err error) {
if req.Seq == 0 {
return fmt.Errorf("sequence number is required")
}

var resp api.JSApiMsgDeleteResponse
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), api.JSApiMsgDeleteRequest{Seq: seq}, &resp)
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), req, &resp)
if err != nil {
return err
}

if !resp.Success {
return fmt.Errorf("unknown error while deleting message %d", seq)
return fmt.Errorf("unknown error while deleting message %d", req.Seq)
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions test/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func TestStream_DeleteMessage(t *testing.T) {
_, err = stream.ReadMessage(1)
checkErr(t, err, "load failed")

err = stream.DeleteMessage(1)
err = stream.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1})
checkErr(t, err, "delete failed")

msg, err := stream.ReadMessage(1)
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestStreamSealed(t *testing.T) {
t.Fatalf("expected a sealed stream")
}

err = s.DeleteMessage(1)
err = s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1})
if !jsm.IsNatsError(err, 10109) {
t.Fatalf("expected err 10109 got %v", err)
}
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func TestStream_DetectGaps(t *testing.T) {
}

for _, seq := range []uint64{1, 3, 10, 11, 12, 20, 21, 22, 2000, 2001, 2002, 2005} {
checkErr(t, s.DeleteMessage(seq), "delete failed")
checkErr(t, s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq}), "delete failed")
}

gaps := [][2]uint64{}
Expand Down