Skip to content

Commit

Permalink
replaced ensure... to explicit constructors of entities
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Oct 6, 2024
1 parent af3be44 commit 865024a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 119 deletions.
34 changes: 21 additions & 13 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
Expand Down Expand Up @@ -286,35 +285,44 @@ func (c *Client) BulkUpsert(
a := allocator.New()
defer a.Free()

config := c.retryOptions(opts...)
config.RetryOptions = append(config.RetryOptions, retry.WithIdempotent(true))
attempts, config := 0, c.retryOptions(opts...)
config.RetryOptions = append(config.RetryOptions,
retry.WithIdempotent(true),
retry.WithTrace(&trace.Retry{
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
return func(info trace.RetryLoopDoneInfo) {
attempts = info.Attempts
}
},
}),
)

attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx,
onDone := trace.TableOnBulkUpsert(config.Trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"),
)
defer func() {
onDone(finalErr, attempts)
}()

request := Ydb_Table.BulkUpsertRequest{
Table: tableName,
}
finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request))
if finalErr != nil {
return finalErr
request, err := data.ToYDB(a, tableName)
if err != nil {
return xerrors.WithStackTrace(err)
}

finalErr = retry.Retry(ctx,
err = retry.Retry(ctx,
func(ctx context.Context) (err error) {
attempts++
_, err = c.client.BulkUpsert(ctx, &request)
_, err = c.client.BulkUpsert(ctx, request)

return err
},
config.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return xerrors.WithStackTrace(finalErr)
return nil
}

func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
Expand Down
175 changes: 69 additions & 106 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,87 +588,71 @@ func WithTrace(t trace.Table) traceOption { //nolint:gocritic
return traceOption{t: &t}
}

type (
BulkUpsertRequest Ydb_Table.BulkUpsertRequest
)

type BulkUpsertData interface {
ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error
ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error)
}

type bulkUpsertRows struct {
Rows value.Value
rows value.Value
}

func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Rows = value.ToYDB(data.Rows, a)

return nil
func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
return &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Rows: value.ToYDB(data.rows, a),
}, nil
}

func BulkUpsertDataRows(rows value.Value) bulkUpsertRows {
return bulkUpsertRows{
Rows: rows,
rows: rows,
}
}

type bulkUpsertCsv struct {
Data []byte
Options []csvFormatOption
data []byte
opts []csvFormatOption
}

type csvFormatOption interface {
ApplyCsvFormatOption(req *BulkUpsertRequest) (err error)
applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error)
}

func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Data = data.Data
func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{
CsvSettings: &Ydb_Formats.CsvSettings{},
}
)

var err error
for _, opt := range data.Options {
for _, opt := range data.opts {
if opt != nil {
err = opt.ApplyCsvFormatOption(req)
if err != nil {
return err
if err := opt.applyCsvFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

return err
request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv {
return bulkUpsertCsv{
Data: data,
Options: opts,
}
}

func ensureCsvDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.CsvSettings) {
if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings); ok {
if settings.CsvSettings == nil {
settings.CsvSettings = &Ydb_Formats.CsvSettings{}
}

return settings.CsvSettings
}

req.DataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{
CsvSettings: &Ydb_Formats.CsvSettings{},
data: data,
opts: opts,
}

settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings)
if !ok {
return nil
}

return settings.CsvSettings
}

type csvHeaderOption struct{}

func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
ensureCsvDataFormatSettings(req).Header = true
func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Header = true

return nil
}
Expand All @@ -678,114 +662,93 @@ func WithCsvHeader() csvFormatOption {
return &csvHeaderOption{}
}

type csvNullValueOption struct {
Value []byte
}
type csvNullValueOption []byte

func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
ensureCsvDataFormatSettings(req).NullValue = opt.Value
func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.NullValue = nullValue

return nil
}

// String value that would be interpreted as NULL.
func WithCsvNullValue(value []byte) csvFormatOption {
return &csvNullValueOption{value}
return csvNullValueOption(value)
}

type csvDelimiterOption struct {
Value []byte
}
type csvDelimiterOption []byte

func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
ensureCsvDataFormatSettings(req).Delimiter = opt.Value
func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Delimiter = delimeter

return nil
}

// Fields delimiter in CSV file. It's "," if not set.
func WithCsvDelimiter(value []byte) csvFormatOption {
return &csvDelimiterOption{value}
return csvDelimiterOption(value)
}

type csvSkipRowsOption struct {
Count uint32
}
type csvSkipRowsOption uint32

func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
ensureCsvDataFormatSettings(req).SkipRows = opt.Count
func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.SkipRows = uint32(skipRows)

return nil
}

// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file.
func WithCsvSkipRows(count uint32) csvFormatOption {
return &csvSkipRowsOption{count}
func WithCsvSkipRows(skipRows uint32) csvFormatOption {
return csvSkipRowsOption(skipRows)
}

type bulkUpsertArrow struct {
Data []byte
Options []arrowFormatOption
data []byte
opts []arrowFormatOption
}

type arrowFormatOption interface {
ApplyArrowFormatOption(req *BulkUpsertRequest) (err error)
applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error)
}

func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Data = data.Data
func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{
ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{},
}
)

var err error
for _, opt := range data.Options {
for _, opt := range data.opts {
if opt != nil {
err = opt.ApplyArrowFormatOption(req)
if err != nil {
return err
if err := opt.applyArrowFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

return err
request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow {
return bulkUpsertArrow{
Data: data,
Options: opts,
data: data,
opts: opts,
}
}

func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) {
if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok {
if settings.ArrowBatchSettings == nil {
settings.ArrowBatchSettings = &Ydb_Formats.ArrowBatchSettings{}
}

return settings.ArrowBatchSettings
}

req.DataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{
ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{},
}

settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings)
if !ok {
return nil
}

return settings.ArrowBatchSettings
}

type arrowSchemaOption struct {
Schema []byte
}
type arrowSchemaOption []byte

func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) error {
ensureArrowDataFormatSettings(req).Schema = opt.Schema
func (schema arrowSchemaOption) applyArrowFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) error {

Check failure on line 746 in table/table.go

View workflow job for this annotation

GitHub Actions / golangci-lint

the line is 122 characters long, which exceeds the maximum of 120 characters. (lll)
dataFormat.ArrowBatchSettings.Schema = schema

return nil
}

func WithArrowSchema(schema []byte) arrowFormatOption {
return &arrowSchemaOption{schema}
return arrowSchemaOption(schema)
}

0 comments on commit 865024a

Please sign in to comment.