Skip to content

Commit

Permalink
query.Executer.QueryResultSet now returns query.ResultSetWithClose
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Sep 4, 2024
1 parent 8492468 commit ea3ad13
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Changed result type of method `query.Executor.QueryResultSet` from `query.ResultSet` to `query.ClosableResultSet`
* Added `table/types.DecimalValueFromString` decimal type constructor

## v3.77.1
Expand Down
2 changes: 0 additions & 2 deletions examples/basic/native/query/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ func read(ctx context.Context, c query.Client, prefix string) error {
return c.Do(ctx,
func(ctx context.Context, s query.Session) (err error) {
result, err := s.Query(ctx, fmt.Sprintf(`
PRAGMA TablePathPrefix("%s");
DECLARE $seriesID AS Uint64;
SELECT
series_id,
title,
Expand Down
5 changes: 3 additions & 2 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"time"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
Expand Down Expand Up @@ -462,7 +463,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) (

func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
if err != nil {
Expand All @@ -486,7 +487,7 @@ func clientQueryResultSet(
// QueryResultSet is a helper which read all rows from first result set in result
func (c *Client) QueryResultSet(
ctx context.Context, q string, opts ...options.Execute,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
ctx, cancel := xcontext.WithDone(ctx, c.done)
defer cancel()

Expand Down
7 changes: 5 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func readAll(ctx context.Context, r *streamResult) error {
}
}

func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr error) {
func readResultSet(ctx context.Context, r *streamResult) (_ *resultSetWithClose, finalErr error) {
defer func() {
_ = r.Close(ctx)
}()
Expand All @@ -161,7 +161,10 @@ func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr
return nil, xerrors.WithStackTrace(err)
}

return rs, nil
return &resultSetWithClose{
resultSet: rs,
close: r.Close,
}, nil
}

func readMaterializedResultSet(ctx context.Context, r *streamResult) (_ *materializedResultSet, finalErr error) {
Expand Down
10 changes: 5 additions & 5 deletions internal/query/range_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"io"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
)

func rangeResultSets(ctx context.Context, r query.Result) xiter.Seq2[query.ResultSet, error] {
return func(yield func(query.ResultSet, error) bool) {
func rangeResultSets(ctx context.Context, r result.Result) xiter.Seq2[result.Set, error] {
return func(yield func(result.Set, error) bool) {
for {
rs, err := r.NextResultSet(ctx)
if err != nil {
Expand All @@ -26,8 +26,8 @@ func rangeResultSets(ctx context.Context, r query.Result) xiter.Seq2[query.Resul
}
}

func rangeRows(ctx context.Context, rs query.ResultSet) xiter.Seq2[query.Row, error] {
return func(yield func(query.Row, error) bool) {
func rangeRows(ctx context.Context, rs result.Set) xiter.Seq2[result.Set, error] {
return func(yield func(result.Row, error) bool) {
for {
rs, err := rs.NextRow(ctx)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/query/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type (
// Rows is experimental API for range iterators available with Go version 1.23+
Rows(ctx context.Context) xiter.Seq2[Row, error]
}
ClosableResultSet interface {
Set
closer.Closer
}
Row interface {
Scan(dst ...interface{}) error
ScanNamed(dst ...scanner.NamedDestination) error
Expand Down
12 changes: 12 additions & 0 deletions internal/query/result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@ type (
rowIndex int
done chan struct{}
}
resultSetWithClose struct {
*resultSet
close func(ctx context.Context) error
}
)

func (*materializedResultSet) Close(context.Context) error {
return nil
}

func (rs *resultSetWithClose) Close(ctx context.Context) error {
return rs.close(ctx)
}

func (rs *materializedResultSet) Rows(ctx context.Context) xiter.Seq2[query.Row, error] {
return rangeRows(ctx, rs)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"sync/atomic"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
Expand Down Expand Up @@ -32,7 +33,7 @@ type Session struct {

func (s *Session) QueryResultSet(
ctx context.Context, q string, opts ...options.Execute,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
onDone := trace.QueryOnSessionQueryResultSet(s.cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).QueryResultSet"), s, q)
defer func() {
Expand Down
3 changes: 2 additions & 1 deletion internal/query/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (tx *Transaction) UnLazy(ctx context.Context) (err error) {

func (tx *Transaction) QueryResultSet(
ctx context.Context, q string, opts ...options.Execute,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
onDone := trace.QueryOnTxQueryResultSet(tx.s.cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).QueryResultSet"), tx, q)
defer func() {
Expand Down
10 changes: 5 additions & 5 deletions query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ type (
//
// Exec used by default:
// - DefaultTxControl
Query(ctx context.Context, query string, opts ...options.Execute) (r Result, err error)
Query(ctx context.Context, query string, opts ...options.Execute) (Result, error)

// QueryResultSet execute query and take the exactly single materialized result set from result
//
// Exec used by default:
// - DefaultTxControl
QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (rs ResultSet, err error)
QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error)

// QueryRow execute query and take the exactly single row from exactly single result set from result
//
// Exec used by default:
// - DefaultTxControl
QueryRow(ctx context.Context, query string, opts ...options.Execute) (row Row, err error)
QueryRow(ctx context.Context, query string, opts ...options.Execute) (Row, error)
}
// Client defines API of query client
//
Expand Down Expand Up @@ -80,14 +80,14 @@ type (
//
// Exec used by default:
// - DefaultTxControl
Query(ctx context.Context, query string, opts ...options.Execute) (r Result, err error)
Query(ctx context.Context, query string, opts ...options.Execute) (Result, error)

// QueryResultSet is a helper which read all rows from first result set in result
//
// Warning: the large result set from query will be materialized and can happened to "OOM killed" problem
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (ResultSet, error)
QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error)

// QueryRow is a helper which read only one row from first result set in result
//
Expand Down
13 changes: 7 additions & 6 deletions query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
)

type (
Result = result.Result
ResultSet = result.Set
Row = result.Row
Type = types.Type
NamedDestination = scanner.NamedDestination
ScanStructOption = scanner.ScanStructOption
Result = result.Result
ResultSet = result.Set
resultSetWithClose = result.ClosableResultSet
Row = result.Row
Type = types.Type
NamedDestination = scanner.NamedDestination
ScanStructOption = scanner.ScanStructOption
)

func Named(columnName string, destinationValueReference interface{}) (dst NamedDestination) {
Expand Down

0 comments on commit ea3ad13

Please sign in to comment.