From 425305908988e1dd857a857aa70c2278f376a26b Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 31 Jan 2025 12:24:15 +0100 Subject: [PATCH] Supports extended msg delete Signed-off-by: R.I.Pienaar --- api/streams.go | 9 ++++++--- go.mod | 5 ++--- go.sum | 14 ++++---------- manager.go | 4 ++-- streams.go | 19 ++++++++++++++++--- test/streams_test.go | 6 +++--- 6 files changed, 33 insertions(+), 24 deletions(-) diff --git a/api/streams.go b/api/streams.go index e34cfff..91af786 100644 --- a/api/streams.go +++ b/api/streams.go @@ -117,9 +117,12 @@ type JSApiStreamListRequest struct { // io.nats.jetstream.api.v1.stream_msg_delete_request type JSApiMsgDeleteRequest struct { - Seq uint64 `json:"seq"` - NoErase bool `json:"no_erase,omitempty"` - NoMarker bool `json:"no_marker,omitempty"` + // Seq is the message sequence to delete + Seq uint64 `json:"seq"` + // NoErase avoids overwriting the message data with random bytes + NoErase bool `json:"no_erase,omitempty"` + // NoMarker avoids placing a delete marker on streams with Markers enabled + NoMarker bool `json:"no_marker,omitempty"` } // io.nats.jetstream.api.v1.stream_msg_delete_response diff --git a/go.mod b/go.mod index a2e30a5..d8d4ef6 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.62.0 - golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c golang.org/x/net v0.34.0 golang.org/x/text v0.21.0 gopkg.in/yaml.v3 v3.0.1 @@ -34,9 +34,8 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - go.uber.org/automaxprocs v1.6.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/time v0.9.0 // indirect - google.golang.org/protobuf v1.36.3 // indirect + google.golang.org/protobuf v1.36.4 // indirect ) diff --git a/go.sum b/go.sum index d78803f..f53c8c2 100644 --- a/go.sum +++ b/go.sum @@ -33,10 +33,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254 h1:3slXeEjjTNhhndJ7FEFqSx//GxSGG22C8IjsER7YIM0= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 h1:JTZQwrehqUHpEO+DRjm0734B4a0porO1Cb5ACT0nSJY= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 h1:UbqnZ2WOky1HufvUU+jRfrskdBkJlJrOsuyAzvrmMQA= github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= @@ -62,12 +58,10 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= -go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= -golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA= -golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -77,8 +71,8 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/manager.go b/manager.go index 5098ba3..86f1999 100644 --- a/manager.go +++ b/manager.go @@ -226,9 +226,9 @@ func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err er } // DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete -func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error { +func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool, noMarker bool) error { var resp api.JSApiMsgDeleteResponse - err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase}, &resp) + err := m.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, stream), api.JSApiMsgDeleteRequest{Seq: seq, NoErase: noErase, NoMarker: noMarker}, &resp) if err != nil { return err } diff --git a/streams.go b/streams.go index f6872a3..4459cfa 100644 --- a/streams.go +++ b/streams.go @@ -780,20 +780,33 @@ func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error) { } // FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete +// +// Deprecated: Use DeleteMessageRequest() func (s *Stream) FastDeleteMessage(seq uint64) error { - return s.mgr.DeleteStreamMessage(s.Name(), seq, true) + return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq, NoErase: true}) } // DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data +// +// Deprecated: Use DeleteMessageRequest() func (s *Stream) DeleteMessage(seq uint64) (err error) { + return s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq}) +} + +// DeleteMessageRequest deletes a specific message from the Stream with a full request +func (s *Stream) DeleteMessageRequest(req api.JSApiMsgDeleteRequest) (err error) { + if req.Seq == 0 { + return fmt.Errorf("sequence number is required") + } + var resp api.JSApiMsgDeleteResponse - err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), api.JSApiMsgDeleteRequest{Seq: seq}, &resp) + err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiMsgDeleteT, s.Name()), req, &resp) if err != nil { return err } if !resp.Success { - return fmt.Errorf("unknown error while deleting message %d", seq) + return fmt.Errorf("unknown error while deleting message %d", req.Seq) } return nil diff --git a/test/streams_test.go b/test/streams_test.go index 86ec11d..2565372 100644 --- a/test/streams_test.go +++ b/test/streams_test.go @@ -630,7 +630,7 @@ func TestStream_DeleteMessage(t *testing.T) { _, err = stream.ReadMessage(1) checkErr(t, err, "load failed") - err = stream.DeleteMessage(1) + err = stream.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1}) checkErr(t, err, "delete failed") msg, err := stream.ReadMessage(1) @@ -1060,7 +1060,7 @@ func TestStreamSealed(t *testing.T) { t.Fatalf("expected a sealed stream") } - err = s.DeleteMessage(1) + err = s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: 1}) if !jsm.IsNatsError(err, 10109) { t.Fatalf("expected err 10109 got %v", err) } @@ -1157,7 +1157,7 @@ func TestStream_DetectGaps(t *testing.T) { } for _, seq := range []uint64{1, 3, 10, 11, 12, 20, 21, 22, 2000, 2001, 2002, 2005} { - checkErr(t, s.DeleteMessage(seq), "delete failed") + checkErr(t, s.DeleteMessageRequest(api.JSApiMsgDeleteRequest{Seq: seq}), "delete failed") } gaps := [][2]uint64{}