Skip to content

Commit

Permalink
Reduce boilerplate in replication client tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alco committed Aug 5, 2024
1 parent 80a670f commit 7a9079b
Showing 1 changed file with 16 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ defmodule Electric.Postgres.ReplicationClientTest do

insert_item(conn, "test value")

assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => "test value"}} = change
assert %NewRecord{record: %{"value" => "test value"}} = receive_tx_change()
end

test "logs a message when connected & replication has started",
Expand All @@ -62,8 +61,7 @@ defmodule Electric.Postgres.ReplicationClientTest do

insert_item(conn, "test value")

assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => "test value"}} = change
assert %NewRecord{record: %{"value" => "test value"}} = receive_tx_change()
end)

log =~ "Started replication from postgres"
Expand Down Expand Up @@ -112,13 +110,10 @@ defmodule Electric.Postgres.ReplicationClientTest do
assert {:ok, pid} = ReplicationClient.start_link(config, replication_opts)

insert_item(conn, "test value")
assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => "test value"}} = change
assert %NewRecord{record: %{"value" => "test value"}} = receive_tx_change()

insert_item(conn, "return: not ok")
assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => "return: not ok"}} = change

assert %NewRecord{record: %{"value" => "return: not ok"}} = receive_tx_change()

# Verify that raising in the transaction callback crashes the connection process
monitor = Process.monitor(pid)
Expand All @@ -141,11 +136,8 @@ defmodule Electric.Postgres.ReplicationClientTest do
# confirmed one
assert {:ok, _pid} = ReplicationClient.start_link(config, replication_opts)

assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => "return: not ok"}} = change

assert_receive {:from_replication, %Transaction{changes: [change]}}
assert %NewRecord{record: %{"value" => ^interrupt_val}} = change
assert %NewRecord{record: %{"value" => "return: not ok"}} = receive_tx_change()
assert %NewRecord{record: %{"value" => ^interrupt_val}} = receive_tx_change()

refute_receive _
end
Expand Down Expand Up @@ -194,8 +186,6 @@ defmodule Electric.Postgres.ReplicationClientTest do
)

# Check that the incoming data is formatted according to Electric.Postgres.display_settings
assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %NewRecord{
record: %{
"date" => "2022-05-17",
Expand All @@ -204,7 +194,7 @@ defmodule Electric.Postgres.ReplicationClientTest do
"bytea" => "\\x0510fa",
"interval" => "P1DT12H59M10S"
}
} = change
} = receive_tx_change()
end
end

Expand Down Expand Up @@ -236,21 +226,17 @@ defmodule Electric.Postgres.ReplicationClientTest do
long_string_2
])

assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %NewRecord{
record: %{"id" => ^id, "val1" => ^long_string_1, "val2" => ^long_string_2},
relation: {"public", "items2"}
} = change
} = receive_tx_change()

Postgrex.query!(conn, "DELETE FROM items2 WHERE id = $1", [bin_uuid])

assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %DeletedRecord{
old_record: %{"id" => ^id, "val1" => ^long_string_1, "val2" => ^long_string_2},
relation: {"public", "items2"}
} = change
} = receive_tx_change()
end

test "detoasts column values in updates", %{db_conn: conn} do
Expand All @@ -264,17 +250,13 @@ defmodule Electric.Postgres.ReplicationClientTest do
long_string_2
])

assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %NewRecord{
record: %{"id" => ^id, "val1" => ^long_string_1, "val2" => ^long_string_2},
relation: {"public", "items2"}
} = change
} = receive_tx_change()

Postgrex.query!(conn, "UPDATE items2 SET num = 11 WHERE id = $1", [bin_uuid])

assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %UpdatedRecord{
record: %{
"id" => ^id,
Expand All @@ -284,7 +266,7 @@ defmodule Electric.Postgres.ReplicationClientTest do
},
changed_columns: changed_columns,
relation: {"public", "items2"}
} = change
} = receive_tx_change()

assert MapSet.new(["num"]) == changed_columns
end
Expand Down Expand Up @@ -396,4 +378,9 @@ defmodule Electric.Postgres.ReplicationClientTest do
{:ok, bin_uuid} = Ecto.UUID.dump(id)
{id, bin_uuid}
end

defp receive_tx_change do
assert_receive {:from_replication, %Transaction{changes: [change]}}
change
end
end

0 comments on commit 7a9079b

Please sign in to comment.