From ea3ad133baea38265c9c1c1fe02c64f55bb8fb60 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Wed, 4 Sep 2024 00:10:49 +0300 Subject: [PATCH] query.Executer.QueryResultSet now returns query.ResultSetWithClose --- CHANGELOG.md | 1 + examples/basic/native/query/series.go | 2 -- internal/query/client.go | 5 +++-- internal/query/execute_query.go | 7 +++++-- internal/query/range_experiment.go | 10 +++++----- internal/query/result/result.go | 4 ++++ internal/query/result_set.go | 12 ++++++++++++ internal/query/session.go | 3 ++- internal/query/transaction.go | 3 ++- query/client.go | 10 +++++----- query/result.go | 13 +++++++------ 11 files changed, 46 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42c78fe55..dcf233e22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Changed result type of method `query.Executor.QueryResultSet` from `query.ResultSet` to `query.ClosableResultSet` * Added `table/types.DecimalValueFromString` decimal type constructor ## v3.77.1 diff --git a/examples/basic/native/query/series.go b/examples/basic/native/query/series.go index fb345b378..cb01d8106 100644 --- a/examples/basic/native/query/series.go +++ b/examples/basic/native/query/series.go @@ -17,8 +17,6 @@ func read(ctx context.Context, c query.Client, prefix string) error { return c.Do(ctx, func(ctx context.Context, s query.Session) (err error) { result, err := s.Query(ctx, fmt.Sprintf(` - PRAGMA TablePathPrefix("%s"); - DECLARE $seriesID AS Uint64; SELECT series_id, title, diff --git a/internal/query/client.go b/internal/query/client.go index 882581211..26cdb8b9d 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -2,6 +2,7 @@ package query import ( "context" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" "time" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" @@ -462,7 +463,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) ( func clientQueryResultSet( ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption, -) (rs query.ResultSet, finalErr error) { +) (rs result.ClosableResultSet, finalErr error) { err := do(ctx, pool, func(ctx context.Context, s *Session) error { _, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...) if err != nil { @@ -486,7 +487,7 @@ func clientQueryResultSet( // QueryResultSet is a helper which read all rows from first result set in result func (c *Client) QueryResultSet( ctx context.Context, q string, opts ...options.Execute, -) (rs query.ResultSet, finalErr error) { +) (rs result.ClosableResultSet, finalErr error) { ctx, cancel := xcontext.WithDone(ctx, c.done) defer cancel() diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 72012f5a0..c931e3482 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -143,7 +143,7 @@ func readAll(ctx context.Context, r *streamResult) error { } } -func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr error) { +func readResultSet(ctx context.Context, r *streamResult) (_ *resultSetWithClose, finalErr error) { defer func() { _ = r.Close(ctx) }() @@ -161,7 +161,10 @@ func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr return nil, xerrors.WithStackTrace(err) } - return rs, nil + return &resultSetWithClose{ + resultSet: rs, + close: r.Close, + }, nil } func readMaterializedResultSet(ctx context.Context, r *streamResult) (_ *materializedResultSet, finalErr error) { diff --git a/internal/query/range_experiment.go b/internal/query/range_experiment.go index 5c9b5dbab..df75de20b 100644 --- a/internal/query/range_experiment.go +++ b/internal/query/range_experiment.go @@ -4,13 +4,13 @@ import ( "context" "io" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" - "github.com/ydb-platform/ydb-go-sdk/v3/query" ) -func rangeResultSets(ctx context.Context, r query.Result) xiter.Seq2[query.ResultSet, error] { - return func(yield func(query.ResultSet, error) bool) { +func rangeResultSets(ctx context.Context, r result.Result) xiter.Seq2[result.Set, error] { + return func(yield func(result.Set, error) bool) { for { rs, err := r.NextResultSet(ctx) if err != nil { @@ -26,8 +26,8 @@ func rangeResultSets(ctx context.Context, r query.Result) xiter.Seq2[query.Resul } } -func rangeRows(ctx context.Context, rs query.ResultSet) xiter.Seq2[query.Row, error] { - return func(yield func(query.Row, error) bool) { +func rangeRows(ctx context.Context, rs result.Set) xiter.Seq2[result.Set, error] { + return func(yield func(result.Row, error) bool) { for { rs, err := rs.NextRow(ctx) if err != nil { diff --git a/internal/query/result/result.go b/internal/query/result/result.go index f53f7628f..3d6a8f95b 100644 --- a/internal/query/result/result.go +++ b/internal/query/result/result.go @@ -29,6 +29,10 @@ type ( // Rows is experimental API for range iterators available with Go version 1.23+ Rows(ctx context.Context) xiter.Seq2[Row, error] } + ClosableResultSet interface { + Set + closer.Closer + } Row interface { Scan(dst ...interface{}) error ScanNamed(dst ...scanner.NamedDestination) error diff --git a/internal/query/result_set.go b/internal/query/result_set.go index cbf7ef132..8d2f2ea01 100644 --- a/internal/query/result_set.go +++ b/internal/query/result_set.go @@ -35,8 +35,20 @@ type ( rowIndex int done chan struct{} } + resultSetWithClose struct { + *resultSet + close func(ctx context.Context) error + } ) +func (*materializedResultSet) Close(context.Context) error { + return nil +} + +func (rs *resultSetWithClose) Close(ctx context.Context) error { + return rs.close(ctx) +} + func (rs *materializedResultSet) Rows(ctx context.Context) xiter.Seq2[query.Row, error] { return rangeRows(ctx, rs) } diff --git a/internal/query/session.go b/internal/query/session.go index 1892019b3..a7538d5a9 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -2,6 +2,7 @@ package query import ( "context" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" "sync/atomic" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" @@ -32,7 +33,7 @@ type Session struct { func (s *Session) QueryResultSet( ctx context.Context, q string, opts ...options.Execute, -) (rs query.ResultSet, finalErr error) { +) (rs result.ClosableResultSet, finalErr error) { onDone := trace.QueryOnSessionQueryResultSet(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).QueryResultSet"), s, q) defer func() { diff --git a/internal/query/transaction.go b/internal/query/transaction.go index c9a87aafb..176a7bbb8 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -3,6 +3,7 @@ package query import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" @@ -77,7 +78,7 @@ func (tx *Transaction) UnLazy(ctx context.Context) (err error) { func (tx *Transaction) QueryResultSet( ctx context.Context, q string, opts ...options.Execute, -) (rs query.ResultSet, finalErr error) { +) (rs result.ClosableResultSet, finalErr error) { onDone := trace.QueryOnTxQueryResultSet(tx.s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).QueryResultSet"), tx, q) defer func() { diff --git a/query/client.go b/query/client.go index 99317b6ee..3b2549eaa 100644 --- a/query/client.go +++ b/query/client.go @@ -25,19 +25,19 @@ type ( // // Exec used by default: // - DefaultTxControl - Query(ctx context.Context, query string, opts ...options.Execute) (r Result, err error) + Query(ctx context.Context, query string, opts ...options.Execute) (Result, error) // QueryResultSet execute query and take the exactly single materialized result set from result // // Exec used by default: // - DefaultTxControl - QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (rs ResultSet, err error) + QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error) // QueryRow execute query and take the exactly single row from exactly single result set from result // // Exec used by default: // - DefaultTxControl - QueryRow(ctx context.Context, query string, opts ...options.Execute) (row Row, err error) + QueryRow(ctx context.Context, query string, opts ...options.Execute) (Row, error) } // Client defines API of query client // @@ -80,14 +80,14 @@ type ( // // Exec used by default: // - DefaultTxControl - Query(ctx context.Context, query string, opts ...options.Execute) (r Result, err error) + Query(ctx context.Context, query string, opts ...options.Execute) (Result, error) // QueryResultSet is a helper which read all rows from first result set in result // // Warning: the large result set from query will be materialized and can happened to "OOM killed" problem // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental - QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (ResultSet, error) + QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error) // QueryRow is a helper which read only one row from first result set in result // diff --git a/query/result.go b/query/result.go index 30b4809dd..8a4869ff4 100644 --- a/query/result.go +++ b/query/result.go @@ -7,12 +7,13 @@ import ( ) type ( - Result = result.Result - ResultSet = result.Set - Row = result.Row - Type = types.Type - NamedDestination = scanner.NamedDestination - ScanStructOption = scanner.ScanStructOption + Result = result.Result + ResultSet = result.Set + resultSetWithClose = result.ClosableResultSet + Row = result.Row + Type = types.Type + NamedDestination = scanner.NamedDestination + ScanStructOption = scanner.ScanStructOption ) func Named(columnName string, destinationValueReference interface{}) (dst NamedDestination) {