Skip to content

Commit

Permalink
Fix dates in Parquet, handle bpchar and convert decimal PG types
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Nov 5, 2024
1 parent 27e2785 commit 92ca814
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
16 changes: 13 additions & 3 deletions src/pg_schema_column.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +103,11 @@ func (pgSchemaColumn *PgSchemaColumn) FormatParquetValue(value string) *string {
timestamp := strconv.FormatInt(parsedTime.UnixMilli(), 10)
return &timestamp
}
case "date":
parsedTime, err := time.Parse("2006-01-02", value)
PanicIfError(err)
date := fmt.Sprintf("%d", parsedTime.Unix()/86400)
return &date
default:
if strings.HasPrefix(pgSchemaColumn.UdtName, "_") {
switch strings.TrimLeft(pgSchemaColumn.UdtName, "_") {
Expand Down Expand Up @@ -141,7 +147,7 @@ func (pgSchemaColumn PgSchemaColumn) toIcebergSchemaField() IcebergSchemaField {
}

switch pgSchemaColumn.UdtName {
case "varchar", "char", "text", "jsonb", "json", "uuid":
case "varchar", "char", "text", "jsonb", "json", "uuid", "bpchar":
icebergSchemaField.Type = "string"
case "int2", "int4", "int8":
icebergSchemaField.Type = "int"
Expand Down Expand Up @@ -187,7 +193,7 @@ func (pgSchemaColumn *PgSchemaColumn) toParquetSchemaField() ParquetSchemaField
}

switch pgSchemaColumn.UdtName {
case "varchar", "char", "text", "bytea", "jsonb", "json":
case "varchar", "char", "text", "bytea", "jsonb", "json", "bpchar":
parquetSchemaField.Type = "BYTE_ARRAY"
parquetSchemaField.ConvertedType = "UTF8"
case "int2", "int4", "int8":
Expand All @@ -196,10 +202,14 @@ func (pgSchemaColumn *PgSchemaColumn) toParquetSchemaField() ParquetSchemaField
parquetSchemaField.Type = "FLOAT"
case "numeric":
parquetSchemaField.Type = "FIXED_LEN_BYTE_ARRAY"
parquetSchemaField.Length = pgSchemaColumn.NumericPrecision + "." + pgSchemaColumn.NumericScale
parquetSchemaField.ConvertedType = "DECIMAL"
parquetSchemaField.Scale = pgSchemaColumn.NumericScale
parquetSchemaField.Precision = pgSchemaColumn.NumericPrecision
scale, err := strconv.Atoi(pgSchemaColumn.NumericScale)
PanicIfError(err)
precision, err := strconv.Atoi(pgSchemaColumn.NumericPrecision)
PanicIfError(err)
parquetSchemaField.Length = strconv.Itoa(scale + precision)
case "bool":
parquetSchemaField.Type = "BOOLEAN"
case "uuid":
Expand Down
55 changes: 47 additions & 8 deletions src/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
duckDB "github.com/marcboeker/go-duckdb"
pgQuery "github.com/pganalyze/pg_query_go/v5"
)

Expand Down Expand Up @@ -106,23 +110,58 @@ func (proxy *Proxy) generateRowDescription(cols []*sql.ColumnType) *pgproto3.Row
}

func (proxy *Proxy) generateDataRow(rows *sql.Rows, cols []*sql.ColumnType) (*pgproto3.DataRow, error) {
values := make([][]byte, len(cols))
valuePtrs := make([]interface{}, len(cols))

for i := range values {
valuePtrs[i] = &values[i]
for i, col := range cols {
switch col.ScanType().Name() {
case "int32":
var value int32
valuePtrs[i] = &value
case "int64":
var value int64
valuePtrs[i] = &value
case "string":
var value string
valuePtrs[i] = &value
case "Time":
var value time.Time
valuePtrs[i] = &value
case "Decimal":
var value duckDB.Decimal
valuePtrs[i] = &value
default:
panic("Unsupported type")
}
}

err := rows.Scan(valuePtrs...)
if err != nil {
return nil, err
}

dataRow := pgproto3.DataRow{Values: values}
// Convert values to text format
for i := range values {
dataRow.Values[i] = []byte(string(values[i]))
var values [][]byte
for i, valuePtr := range valuePtrs {
switch value := valuePtr.(type) {
case *int32:
values = append(values, []byte(strconv.Itoa(int(*value))))
case *int64:
values = append(values, []byte(strconv.Itoa(int(*value))))
case *string:
values = append(values, []byte(*value))
case *time.Time:
switch cols[i].DatabaseTypeName() {
case "DATE":
values = append(values, []byte(value.Format("2006-01-02")))
default:
panic("Unsupported type")
}
case *duckDB.Decimal:
float64Value := (*value).Float64()
values = append(values, []byte(fmt.Sprintf("%v", float64Value)))
default:
panic("Unsupported type")
}
}
dataRow := pgproto3.DataRow{Values: values}

return &dataRow, nil
}

0 comments on commit 92ca814

Please sign in to comment.