Skip to content

Commit

Permalink
Merge pull request #624 from ripienaar/marker_delete_purge
Browse files Browse the repository at this point in the history
Supports extended msg delete
  • Loading branch information
ripienaar authored Jan 31, 2025
2 parents 4df792d + 4253059 commit 4d953a4
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 24 deletions.
9 changes: 6 additions & 3 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ type JSApiStreamListRequest struct {

// io.nats.jetstream.api.v1.stream_msg_delete_request
type JSApiMsgDeleteRequest struct {
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
NoMarker bool `json:"no_marker,omitempty"`
// Seq is the message sequence to delete
Seq uint64 `json:"seq"`
// NoErase avoids overwriting the message data with random bytes
NoErase bool `json:"no_erase,omitempty"`
// NoMarker avoids placing a delete marker on streams with Markers enabled
NoMarker bool `json:"no_marker,omitempty"`
}

// io.nats.jetstream.api.v1.stream_msg_delete_response
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.62.0
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
golang.org/x/net v0.34.0
golang.org/x/text v0.21.0
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -34,9 +34,8 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.3 // indirect
google.golang.org/protobuf v1.36.4 // indirect
)
14 changes: 4 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254 h1:3slXeEjjTNhhndJ7FEFqSx//GxSGG22C8IjsER7YIM0=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 h1:JTZQwrehqUHpEO+DRjm0734B4a0porO1Cb5ACT0nSJY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 h1:UbqnZ2WOky1HufvUU+jRfrskdBkJlJrOsuyAzvrmMQA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
Expand All @@ -62,12 +58,10 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -77,8 +71,8 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
4 changes: 2 additions & 2 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err er
}

// DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error {
func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool, noMarker bool) error {
var resp api.JSApiMsgDeleteResponse
err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase}, &resp)
err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase, NoMarker: noMarker}, &resp)
if err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,20 +780,33 @@ func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error) {
}

// FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
//
// Deprecated: Use DeleteMessageRequest()
func (s *Stream) FastDeleteMessage(seq uint64) error {
return s.mgr.DeleteStreamMessage(s.Name(), seq, true)
return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq, NoErase: true})
}

// DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data
//
// Deprecated: Use DeleteMessageRequest()
func (s *Stream) DeleteMessage(seq uint64) (err error) {
return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq})
}

// DeleteMessageRequest deletes a specific message from the Stream with a full request
func (s *Stream) DeleteMessageRequest(req api.JSApiMsgDeleteRequest) (err error) {
if req.Seq == 0 {
return fmt.Errorf("sequence number is required")
}

var resp api.JSApiMsgDeleteResponse
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), api.JSApiMsgDeleteRequest{Seq: seq}, &resp)
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), req, &resp)
if err != nil {
return err
}

if !resp.Success {
return fmt.Errorf("unknown error while deleting message %d", seq)
return fmt.Errorf("unknown error while deleting message %d", req.Seq)
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions test/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func TestStream_DeleteMessage(t *testing.T) {
_, err = stream.ReadMessage(1)
checkErr(t, err, "load failed")

err = stream.DeleteMessage(1)
err = stream.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1})
checkErr(t, err, "delete failed")

msg, err := stream.ReadMessage(1)
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestStreamSealed(t *testing.T) {
t.Fatalf("expected a sealed stream")
}

err = s.DeleteMessage(1)
err = s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1})
if !jsm.IsNatsError(err, 10109) {
t.Fatalf("expected err 10109 got %v", err)
}
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func TestStream_DetectGaps(t *testing.T) {
}

for _, seq := range []uint64{1, 3, 10, 11, 12, 20, 21, 22, 2000, 2001, 2002, 2005} {
checkErr(t, s.DeleteMessage(seq), "delete failed")
checkErr(t, s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq}), "delete failed")
}

gaps := [][2]uint64{}
Expand Down

0 comments on commit 4d953a4

Please sign in to comment.