Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enum is not being properly casted while doing ingestion #434

Open
PiotrSierkin-Ki opened this issue Nov 6, 2024 · 1 comment
Open

Enum is not being properly casted while doing ingestion #434

PiotrSierkin-Ki opened this issue Nov 6, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@PiotrSierkin-Ki
Copy link

Issue Description

  • Description of the issue:
    I would like to have ability to ingest data with enums (lacking proper casting?)

  • Sling version (sling --version): Version: 1.2.22

  • Operating System (linux, mac, windows): mac

  • Replication Configuration:

source: LOCAL_DEV_SOUECE
target: LOCAL_DEV_DESTINATION

defaults:
  object: '{stream_schema}.{stream_table}'
  mode: incremental
  primary_key: id

streams:
  public.users:
    mode: incremental
    primary_key: [id]
    sql: select "id", "username", "email", "role"::userroleenum, "created_at", "last_login" from "public"."users"

SOURCE DB

-- Step 1: Create a new enum type for user roles
CREATE TYPE userroleenum AS ENUM ('admin', 'editor', 'viewer');

-- Step 2: Create a new table for users that uses this enum type
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    role userroleenum NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login TIMESTAMP
);

-- Step 3: Provide some insert statements to populate the user table
INSERT INTO users (username, email, role, last_login)
VALUES 
('admin_user', '[email protected]', 'admin', '2023-01-01 12:00:00'),
('editor_user', '[email protected]', 'editor', '2023-02-01 12:00:00'),
('viewer_user', '[email protected]', 'viewer', '2023-03-01 12:00:00');

DESTINATION DB

-- Step 1: Create a new enum type for user roles
CREATE TYPE userroleenum AS ENUM ('admin', 'editor', 'viewer');

-- Step 2: Create a new table for users that uses this enum type
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    role userroleenum NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login TIMESTAMP
);
❯ sling run -r bug_sling.yaml --debug
2024-11-06 14:22:02 INF Sling Replication | LOCAL_DEV_SOUECE -> LOCAL_DEV_DESTINATION | public.users
2024-11-06 14:22:02 DBG Sling version: 1.2.22 (darwin arm64)
2024-11-06 14:22:02 DBG type is db-db
2024-11-06 14:22:02 DBG using: {"columns":null,"mode":"incremental","transforms":null}
2024-11-06 14:22:02 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-11-06 14:22:02 DBG using target options: {"datetime_format":"auto","file_max_rows":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-11-06 14:22:02 DBG opened "postgres" connection (conn-postgres-ugQ)
2024-11-06 14:22:02 DBG opened "postgres" connection (conn-postgres-ohX)
2024-11-06 14:22:02 INF connecting to source database (postgres)
2024-11-06 14:22:02 INF connecting to target database (postgres)
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 INF reading from source database
2024-11-06 14:22:02 DBG select "id", "username", "email", "role"::userroleenum, "created_at", "last_login" from "public"."users"
2024-11-06 14:22:02 INF writing to target database [mode: incremental]
2024-11-06 14:22:02 DBG drop table if exists "public"."users_tmp"
2024-11-06 14:22:02 DBG table "public"."users_tmp" dropped
2024-11-06 14:22:02 DBG create table if not exists "public"."users_tmp" ("id" integer,
"username" varchar(65500),
"email" varchar(65500),
"role" text,
"created_at" timestamp,
"last_login" timestamp)
2024-11-06 14:22:02 INF created table "public"."users_tmp"
2024-11-06 14:22:02 INF streaming data
2024-11-06 14:22:02 DBG select count(*) cnt from "public"."users_tmp"
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 DBG Performing upsert from temporary table "public"."users_tmp" to target table "public"."users" with primary keys [id]
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 DBG inserting username [character varying(65500)] into username [character varying(255)]
2024-11-06 14:22:02 DBG inserting email [character varying(65500)] into email [character varying(255)]
2024-11-06 14:22:02 DBG inserting role [text] into role [userroleenum]
2024-11-06 14:22:02 DBG create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
2024-11-06 14:22:02 DBG create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
2024-11-06 14:22:02 DBG with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
2024-11-06 14:22:02 DBG drop table if exists "public"."users_tmp"
2024-11-06 14:22:02 DBG table "public"."users_tmp" dropped
2024-11-06 14:22:02 DBG closed "postgres" connection (conn-postgres-ohX)
2024-11-06 14:22:02 INF execution failed

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
--- transaction.go:135 ExecContext ---
pq: column "role" is of type userroleenum but expression is of type text

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

2024-11-06 14:22:03 INF Sling Replication Completed in 0s | LOCAL_DEV_SOUECE -> LOCAL_DEV_DESTINATION | 0 Successes | 1 Failures

fatal:
--- proc.go:271 main ---
--- sling_cli.go:458 main ---
--- sling_cli.go:494 cliInit ---
--- cli.go:286 CliProcess ---
~ failure running replication (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:209 processRun ---

--------------------------- public.users ---------------------------
--- task_run.go:116 func2 ---
~ Could not WriteToDb
--- task_run.go:559 runDbToDb ---
~ Error transferring data from temp to final table
--- task_run_write.go:325 WriteToDb ---
~ Could not perform upsert from temp
--- task_run_write.go:759 transferData ---
~ Could not perform upsert from temp
--- task_run_write.go:803 performUpsert ---
~ could not upsert
--- database.go:2153 Upsert ---
~ Could not upsert
--- transaction.go:479 Upsert ---

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
--- transaction.go:135 ExecContext ---
pq: column "role" is of type userroleenum but expression is of type text

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block
@PiotrSierkin-Ki
Copy link
Author

Additionally using full-refresh works but enum is being casted to the text type

source: LOCAL_DEV_SOUECE
target: LOCAL_DEV_DESTINATION

defaults:
  object: '{stream_schema}.{stream_table}'
  mode: incremental
  primary_key: id

streams:
  public.users:
    mode: full-refresh

@flarco flarco added the enhancement New feature or request label Nov 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants