Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query.Executer.QueryResultSet now returns query.ClosableResultSet #1434

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Changed result type of method `query.Executor.QueryResultSet` from `query.ResultSet` to `query.ClosableResultSet`
* Added `table/types.DecimalValueFromString` decimal type constructor

## v3.77.1
Expand Down
250 changes: 130 additions & 120 deletions examples/basic/native/query/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ func read(ctx context.Context, c query.Client, prefix string) error {
return c.Do(ctx,
func(ctx context.Context, s query.Session) (err error) {
result, err := s.Query(ctx, fmt.Sprintf(`
PRAGMA TablePathPrefix("%s");
DECLARE $seriesID AS Uint64;
SELECT
series_id,
title,
release_date
FROM
series
`, prefix),
query.WithTxControl(query.TxControl(query.BeginTx(query.WithOnlineReadOnly()))),
%s
`, "`"+path.Join(prefix, "series")+"`"),
query.WithTxControl(query.TxControl(query.BeginTx(query.WithSnapshotReadOnly()))),
)
if err != nil {
return err
Expand Down Expand Up @@ -76,126 +74,138 @@ func read(ctx context.Context, c query.Client, prefix string) error {
func fillTablesWithData(ctx context.Context, c query.Client, prefix string) error {
series, seasons, episodes := getData()

return c.Do(ctx,
func(ctx context.Context, s query.Session) (err error) {
return s.Exec(ctx,
fmt.Sprintf(`
PRAGMA TablePathPrefix("%s");

DECLARE $seriesData AS List<Struct<
series_id: Bytes,
title: Text,
series_info: Text,
release_date: Date,
comment: Optional<Text>>>;

DECLARE $seasonsData AS List<Struct<
series_id: Bytes,
season_id: Bytes,
title: Text,
first_aired: Date,
last_aired: Date>>;

DECLARE $episodesData AS List<Struct<
series_id: Bytes,
season_id: Bytes,
episode_id: Bytes,
title: Text,
air_date: Date>>;

REPLACE INTO series
SELECT
series_id,
title,
series_info,
release_date,
comment
FROM AS_TABLE($seriesData);

REPLACE INTO seasons
SELECT
series_id,
season_id,
title,
first_aired,
last_aired
FROM AS_TABLE($seasonsData);

REPLACE INTO episodes
SELECT
series_id,
season_id,
episode_id,
title,
air_date
FROM AS_TABLE($episodesData);
`, prefix),
query.WithParameters(ydb.ParamsBuilder().
Param("$seriesData").BeginList().AddItems(series...).EndList().
Param("$seasonsData").BeginList().AddItems(seasons...).EndList().
Param("$episodesData").BeginList().AddItems(episodes...).EndList().
Build(),
),
)
},
err := c.Exec(ctx, fmt.Sprintf(`
DECLARE $seriesData AS List<Struct<
series_id: Bytes,
title: Text,
series_info: Text,
release_date: Date,
comment: Optional<Text>>>;

REPLACE INTO %s
SELECT
series_id,
title,
series_info,
release_date,
comment
FROM AS_TABLE($seriesData);`,
"`"+path.Join(prefix, "series")+"`"),
query.WithParameters(ydb.ParamsBuilder().
Param("$seriesData").
BeginList().AddItems(series...).EndList().
Build(),
),
)
if err != nil {
return err
}

err = c.Exec(ctx, fmt.Sprintf(`
DECLARE $seasonsData AS List<Struct<
series_id: Bytes,
season_id: Bytes,
title: Text,
first_aired: Date,
last_aired: Date>>;

REPLACE INTO %s
SELECT
series_id,
season_id,
title,
first_aired,
last_aired
FROM AS_TABLE($seasonsData);`,
"`"+path.Join(prefix, "seasons")+"`"),
query.WithParameters(ydb.ParamsBuilder().
Param("$seasonsData").
BeginList().AddItems(seasons...).EndList().
Build(),
),
)
if err != nil {
return err
}

err = c.Exec(ctx, fmt.Sprintf(`
DECLARE $episodesData AS List<Struct<
series_id: Bytes,
season_id: Bytes,
episode_id: Bytes,
title: Text,
air_date: Date>>;

REPLACE INTO %s
SELECT
series_id,
season_id,
episode_id,
title,
air_date
FROM AS_TABLE($episodesData);`,
"`"+path.Join(prefix, "episodes")+"`"),
query.WithParameters(ydb.ParamsBuilder().
Param("$episodesData").
BeginList().AddItems(episodes...).EndList().
Build(),
),
)
if err != nil {
return err
}

return nil
}

func createTables(ctx context.Context, c query.Client, prefix string) error {
return c.Do(ctx,
func(ctx context.Context, s query.Session) error {
err := s.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
title Text,
series_info Text,
release_date Date,
comment Text,

PRIMARY KEY(series_id)
)
`, "`"+path.Join(prefix, "series")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}

err = s.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
season_id Bytes,
title Text,
first_aired Date,
last_aired Date,

PRIMARY KEY(series_id,season_id)
)
`, "`"+path.Join(prefix, "seasons")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}
err := c.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
title Text,
series_info Text,
release_date Date,
comment Text,

PRIMARY KEY(series_id)
)`, "`"+path.Join(prefix, "series")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}

err = s.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
season_id Bytes,
episode_id Bytes,
title Text,
air_date Date,

PRIMARY KEY(series_id,season_id,episode_id)
)
`, "`"+path.Join(prefix, "episodes")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}
err = c.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
season_id Bytes,
title Text,
first_aired Date,
last_aired Date,

PRIMARY KEY(series_id,season_id)
)`, "`"+path.Join(prefix, "seasons")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}

return nil
},
err = c.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
series_id Bytes,
season_id Bytes,
episode_id Bytes,
title Text,
air_date Date,

PRIMARY KEY(series_id,season_id,episode_id)
)`, "`"+path.Join(prefix, "episodes")+"`"),
query.WithTxControl(query.NoTx()),
)
if err != nil {
return err
}

return nil
}
7 changes: 4 additions & 3 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (c *Client) ExecuteScript(
return op, nil
}

func (p *poolStub) Close(ctx context.Context) error {
func (p *poolStub) Close(context.Context) error {
return nil
}

Expand Down Expand Up @@ -462,7 +463,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) (

func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
if err != nil {
Expand All @@ -486,7 +487,7 @@ func clientQueryResultSet(
// QueryResultSet is a helper which read all rows from first result set in result
func (c *Client) QueryResultSet(
ctx context.Context, q string, opts ...options.Execute,
) (rs query.ResultSet, finalErr error) {
) (rs result.ClosableResultSet, finalErr error) {
ctx, cancel := xcontext.WithDone(ctx, c.done)
defer cancel()

Expand Down
11 changes: 5 additions & 6 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,7 @@ func readAll(ctx context.Context, r *streamResult) error {
}
}

func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr error) {
defer func() {
_ = r.Close(ctx)
}()

func readResultSet(ctx context.Context, r *streamResult) (_ *resultSetWithClose, finalErr error) {
rs, err := r.nextResultSet(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand All @@ -161,7 +157,10 @@ func readResultSet(ctx context.Context, r *streamResult) (_ *resultSet, finalErr
return nil, xerrors.WithStackTrace(err)
}

return rs, nil
return &resultSetWithClose{
resultSet: rs,
close: r.Close,
}, nil
}

func readMaterializedResultSet(ctx context.Context, r *streamResult) (_ *materializedResultSet, finalErr error) {
Expand Down
44 changes: 0 additions & 44 deletions internal/query/range_experiment.go

This file was deleted.

Loading
Loading