diff --git a/sync_service/lib/electric/postgres/replication_client.ex b/sync_service/lib/electric/postgres/replication_client.ex index 51d233c254..decaa03d77 100644 --- a/sync_service/lib/electric/postgres/replication_client.ex +++ b/sync_service/lib/electric/postgres/replication_client.ex @@ -1,35 +1,27 @@ defmodule Electric.Postgres.ReplicationClient do - require Logger - alias Electric.Postgres.LogicalReplication.Messages, as: LR + @moduledoc """ + A client module for Postgres logical replication + """ + alias Electric.Postgres.ReplicationClient.Collector alias Electric.Postgres.LogicalReplication.Decoder - - alias Electric.Replication.Changes.{ - Transaction, - NewRecord, - UpdatedRecord, - DeletedRecord, - TruncatedRelation - } - + require Logger use Postgrex.ReplicationConnection defmodule State do - alias Electric.Replication.Changes @enforce_keys [:transaction_received, :publication_name] defstruct [ :transaction_received, :publication_name, - relations: %{}, origin: "postgres", - txn: nil, + txn_collector: %Collector{}, step: :disconnected ] @type t() :: %__MODULE__{ transaction_received: {module(), atom(), [term()]}, - txn: nil | Transaction.t(), + publication_name: String.t(), origin: String.t(), - relations: %{LR.relation_id() => LR.Relation.t()}, + txn_collector: Collector.t(), step: :disconnected | :create_slot | :streaming } @@ -43,10 +35,6 @@ defmodule Electric.Postgres.ReplicationClient do opts = NimbleOptions.validate!(opts, @opts_schema) struct!(__MODULE__, opts) end - - @spec add_txn_change(t(), Changes.change()) :: t() - def add_txn_change(%__MODULE__{txn: %Transaction{} = txn} = state, change), - do: %{state | txn: Transaction.prepend_change(txn, change)} end def start_link(opts) do @@ -84,18 +72,21 @@ defmodule Electric.Postgres.ReplicationClient do @impl true @spec handle_data(binary(), State.t()) :: {:noreply, State.t()} | {:noreply, list(binary()), State.t()} - def handle_data(<>, state) do + def handle_data( + <>, + %State{} = state + ) do rest |> Decoder.decode() - |> handle_message(state) + |> Collector.handle_message(state.txn_collector) |> case do - %State{} = state -> - {:noreply, state} + %Collector{} = txn_collector -> + {:noreply, %{state | txn_collector: txn_collector}} - {%Transaction{} = txn, %State{} = state} -> + {txn, %Collector{} = txn_collector} -> {m, f, args} = state.transaction_received apply(m, f, [txn | args]) - {:noreply, state} + {:noreply, %{state | txn_collector: txn_collector}} end end @@ -111,107 +102,4 @@ defmodule Electric.Postgres.ReplicationClient do @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) defp current_time(), do: System.os_time(:microsecond) - @epoch - - defp handle_message(%LR.Message{} = msg, state) do - Logger.info("Got a message from PG via logical replication: #{inspect(msg)}") - - state - end - - defp handle_message(%LR.Begin{} = msg, %State{} = state) do - txn = %Transaction{ - xid: msg.xid, - lsn: msg.final_lsn, - changes: [], - commit_timestamp: msg.commit_timestamp - } - - %{state | txn: txn} - end - - defp handle_message(%LR.Origin{} = msg, state) do - Logger.debug("origin: #{inspect(msg.name)}") - state - end - - defp handle_message(%LR.Type{}, state), do: state - - defp handle_message(%LR.Relation{} = rel, state) do - if is_map_key(state.relations, rel.id) do - Logger.warning("Schema had changed") - end - - Map.update!(state, :relations, &Map.put(&1, rel.id, rel)) - end - - defp handle_message(%LR.Insert{} = msg, %State{} = state) do - relation = Map.fetch!(state.relations, msg.relation_id) - - data = data_tuple_to_map(relation.columns, msg.tuple_data) - - new_record = %NewRecord{relation: {relation.namespace, relation.name}, record: data} - - State.add_txn_change(state, new_record) - end - - defp handle_message(%LR.Update{} = msg, %State{} = state) do - relation = Map.get(state.relations, msg.relation_id) - - old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data) - data = data_tuple_to_map(relation.columns, msg.tuple_data) - - updated_record = - UpdatedRecord.new( - relation: {relation.namespace, relation.name}, - old_record: old_data, - record: data - ) - - State.add_txn_change(state, updated_record) - end - - defp handle_message(%LR.Delete{} = msg, %State{} = state) do - relation = Map.get(state.relations, msg.relation_id) - - data = - data_tuple_to_map( - relation.columns, - msg.old_tuple_data || msg.changed_key_tuple_data - ) - - deleted_record = %DeletedRecord{ - relation: {relation.namespace, relation.name}, - old_record: data - } - - State.add_txn_change(state, deleted_record) - end - - defp handle_message(%LR.Truncate{} = msg, state) do - msg.truncated_relations - |> Enum.map(&Map.get(state.relations, &1)) - |> Enum.map(&%TruncatedRelation{relation: {&1.namespace, &1.name}}) - |> Enum.reduce(state, &State.add_txn_change(&2, &1)) - end - - # When we have a new event, enqueue it and see if there's any - # pending demand we can meet by dispatching events. - - defp handle_message(%LR.Commit{lsn: commit_lsn, end_lsn: end_lsn}, %State{txn: txn} = state) - when not is_nil(txn) and commit_lsn == txn.lsn do - # Metrics.span_event(state.span, :transaction, Transaction.count_operations(txn)) - - {%Transaction{txn | lsn: end_lsn}, %State{state | txn: nil}} - end - - @spec data_tuple_to_map([LR.Relation.Column.t()], list(String.t())) :: %{ - String.t() => String.t() - } - defp data_tuple_to_map(_columns, nil), do: %{} - - defp data_tuple_to_map(columns, tuple_data) do - columns - |> Enum.zip(tuple_data) - |> Map.new(fn {column, data} -> {column.name, data} end) - end end diff --git a/sync_service/lib/electric/postgres/replication_client/collector.ex b/sync_service/lib/electric/postgres/replication_client/collector.ex new file mode 100644 index 0000000000..4c1c3d9cb4 --- /dev/null +++ b/sync_service/lib/electric/postgres/replication_client/collector.ex @@ -0,0 +1,129 @@ +defmodule Electric.Postgres.ReplicationClient.Collector do + @moduledoc """ + Conversion of incoming Postgres logical replication messages + to internal change representation. + """ + + require Logger + alias Electric.Replication.Changes + alias Electric.Postgres.LogicalReplication.Messages, as: LR + + alias Electric.Replication.Changes.{ + Transaction, + NewRecord, + UpdatedRecord, + DeletedRecord, + TruncatedRelation + } + + defstruct transaction: nil, relations: %{} + + @type t() :: %__MODULE__{ + transaction: nil | Transaction.t(), + relations: %{optional(LR.relation_id()) => LR.Relation.t()} + } + + @doc """ + Handle incoming logical replication message by either building up a transaction or + returning a complete built up transaction. + """ + @spec handle_message(LR.message(), t()) :: t() | {Transaction.t(), t()} + def handle_message(%LR.Message{} = msg, state) do + Logger.info("Got a message from PG via logical replication: #{inspect(msg)}") + + state + end + + def handle_message(%LR.Begin{} = msg, %__MODULE__{} = state) do + txn = %Transaction{ + xid: msg.xid, + lsn: msg.final_lsn, + changes: [], + commit_timestamp: msg.commit_timestamp + } + + %{state | transaction: txn} + end + + def handle_message(%LR.Origin{} = _msg, state), do: state + def handle_message(%LR.Type{}, state), do: state + + def handle_message(%LR.Relation{id: id} = rel, %__MODULE__{} = state) do + if Map.get(state.relations, id, rel) != rel do + Logger.warning("Schema for the table #{rel.namespace}.#{rel.name} had changed") + end + + Map.update!(state, :relations, &Map.put(&1, rel.id, rel)) + end + + def handle_message(%LR.Insert{} = msg, %__MODULE__{} = state) do + relation = Map.fetch!(state.relations, msg.relation_id) + + data = data_tuple_to_map(relation.columns, msg.tuple_data) + + %NewRecord{relation: {relation.namespace, relation.name}, record: data} + |> prepend_change(state) + end + + def handle_message(%LR.Update{} = msg, %__MODULE__{} = state) do + relation = Map.get(state.relations, msg.relation_id) + + old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data) + data = data_tuple_to_map(relation.columns, msg.tuple_data) + + UpdatedRecord.new( + relation: {relation.namespace, relation.name}, + old_record: old_data, + record: data + ) + |> prepend_change(state) + end + + def handle_message(%LR.Delete{} = msg, %__MODULE__{} = state) do + relation = Map.get(state.relations, msg.relation_id) + + data = + data_tuple_to_map( + relation.columns, + msg.old_tuple_data || msg.changed_key_tuple_data + ) + + %DeletedRecord{ + relation: {relation.namespace, relation.name}, + old_record: data + } + |> prepend_change(state) + end + + def handle_message(%LR.Truncate{} = msg, state) do + msg.truncated_relations + |> Enum.map(&Map.get(state.relations, &1)) + |> Enum.map(&%TruncatedRelation{relation: {&1.namespace, &1.name}}) + |> Enum.reduce(state, &prepend_change/2) + end + + def handle_message( + %LR.Commit{lsn: commit_lsn, end_lsn: end_lsn}, + %__MODULE__{transaction: txn} = state + ) + when not is_nil(txn) and commit_lsn == txn.lsn do + {%Transaction{txn | lsn: end_lsn, changes: Enum.reverse(txn.changes)}, + %__MODULE__{state | transaction: nil}} + end + + @spec data_tuple_to_map([LR.Relation.Column.t()], list(String.t())) :: %{ + String.t() => String.t() + } + defp data_tuple_to_map(_columns, nil), do: %{} + + defp data_tuple_to_map(columns, tuple_data) do + columns + |> Enum.zip(tuple_data) + |> Map.new(fn {column, data} -> {column.name, data} end) + end + + @spec prepend_change(Changes.change(), t()) :: t() + defp prepend_change(change, %__MODULE__{transaction: txn} = state) do + %{state | transaction: Transaction.prepend_change(txn, change)} + end +end diff --git a/sync_service/lib/electric/replication/changes.ex b/sync_service/lib/electric/replication/changes.ex index c42076a209..d59eb1c30b 100644 --- a/sync_service/lib/electric/replication/changes.ex +++ b/sync_service/lib/electric/replication/changes.ex @@ -32,7 +32,7 @@ defmodule Electric.Replication.Changes do | Changes.UpdatedRecord.t() | Changes.DeletedRecord.t() - @type change() :: data_change() + @type change() :: data_change() | Changes.TruncatedRelation.t() defmodule Transaction do alias Electric.Replication.Changes @@ -56,8 +56,14 @@ defmodule Electric.Replication.Changes do @spec prepend_change(t(), Changes.change()) :: t() def prepend_change( %__MODULE__{changes: changes, affected_relations: rels} = txn, - %{relation: rel} = change - ) do + %change_mod{relation: rel} = change + ) + when change_mod in [ + Changes.NewRecord, + Changes.UpdatedRecord, + Changes.DeletedRecord, + Changes.TruncatedRelation + ] do %{ txn | changes: [change | changes], @@ -130,6 +136,8 @@ defmodule Electric.Replication.Changes do defmodule TruncatedRelation do defstruct [:relation] + + @type t() :: %__MODULE__{relation: Changes.relation()} end # FIXME: this assumes PK is literally just "id" column diff --git a/sync_service/test/electric/postgres/decoder_test.exs b/sync_service/test/electric/postgres/decoder_test.exs new file mode 100644 index 0000000000..aff90be49e --- /dev/null +++ b/sync_service/test/electric/postgres/decoder_test.exs @@ -0,0 +1,246 @@ +defmodule Electric.Postgres.LogicalReplication.DecoderTest do + use ExUnit.Case, async: true + doctest Electric.Postgres.LogicalReplication.Decoder, import: true + import Electric.Postgres.LogicalReplication.Decoder + + alias Electric.Postgres.LogicalReplication.Messages.{ + Begin, + Commit, + Origin, + Relation, + Relation.Column, + Insert, + Update, + Delete, + Truncate, + Type + } + + alias Electric.Postgres.Lsn + + def lsn({segment, offset}), do: %Lsn{segment: segment, offset: offset} + + test "decodes begin messages" do + {:ok, expected_dt, 0} = DateTime.from_iso8601("2019-07-18T17:02:35.726322Z") + + assert decode( + <<66, 0, 0, 0, 2, 167, 244, 168, 128, 0, 2, 48, 246, 88, 88, 213, 242, 0, 0, 2, 107>> + ) == %Begin{ + commit_timestamp: expected_dt, + final_lsn: lsn({2, 2_817_828_992}), + xid: 619 + } + end + + test "decodes commit messages" do + {:ok, expected_dt, 0} = DateTime.from_iso8601("2019-07-18T17:02:35.726322Z") + + assert decode( + <<67, 0, 0, 0, 0, 2, 167, 244, 168, 128, 0, 0, 0, 2, 167, 244, 168, 176, 0, 2, 48, + 246, 88, 88, 213, 242>> + ) == %Commit{ + flags: [], + lsn: lsn({2, 2_817_828_992}), + end_lsn: lsn({2, 2_817_829_040}), + commit_timestamp: expected_dt + } + end + + test "decodes origin messages" do + assert decode(<<79, 0, 0, 0, 2, 167, 244, 168, 128, "Elmer Fud", 0>>) == + %Origin{ + origin_commit_lsn: %Lsn{segment: 2, offset: 2_817_828_992}, + name: "Elmer Fud" + } + end + + test "decodes relation messages" do + assert decode( + <<82, 0, 0, 96, 0, 112, 117, 98, 108, 105, 99, 0, 102, 111, 111, 0, 100, 0, 2, 0, 98, + 97, 114, 0, 0, 0, 0, 25, 255, 255, 255, 255, 1, 105, 100, 0, 0, 0, 0, 23, 255, 255, + 255, 255>> + ) == %Relation{ + id: 24576, + namespace: "public", + name: "foo", + replica_identity: :default, + columns: [ + %Column{ + flags: [], + name: "bar", + # :text + type_oid: 25, + type_modifier: -1 + }, + %Column{ + flags: [:key], + name: "id", + # :int4 + type_oid: 23, + type_modifier: -1 + } + ] + } + end + + test "decodes relation messages with array types" do + assert decode( + <<82, 0, 0, 64, 18, 112, 117, 98, 108, 105, 99, 0, 99, 111, 109, 112, 108, 101, 120, + 0, 102, 0, 3, 1, 105, 100, 0, 0, 0, 11, 134, 255, 255, 255, 255, 1, 110, 117, 109, + 98, 101, 114, 115, 0, 0, 0, 3, 239, 255, 255, 255, 255, 1, 116, 101, 120, 116, 95, + 109, 97, 116, 114, 105, 120, 0, 0, 0, 3, 241, 255, 255, 255, 255>> + ) == %Relation{ + id: 16402, + namespace: "public", + name: "complex", + replica_identity: :all_columns, + columns: [ + %Column{ + flags: [:key], + name: "id", + # :uuid + type_oid: 2950, + type_modifier: -1 + }, + %Column{ + flags: [:key], + name: "numbers", + # {:array, :int4} + type_oid: 1007, + type_modifier: -1 + }, + %Column{ + flags: [:key], + name: "text_matrix", + # {:array, :text} + type_oid: 1009, + type_modifier: -1 + } + ] + } + end + + test "decodes type messages" do + assert decode( + <<89, 0, 0, 128, 52, 112, 117, 98, 108, 105, 99, 0, 101, 120, 97, 109, 112, 108, 101, + 95, 116, 121, 112, 101, 0>> + ) == + %Type{ + id: 32820, + namespace: "public", + name: "example_type" + } + end + + describe "truncate messages" do + test "decodes messages" do + assert decode(<<84, 0, 0, 0, 1, 0, 0, 0, 96, 0>>) == + %Truncate{ + number_of_relations: 1, + options: [], + truncated_relations: [24576] + } + end + + test "decodes messages with cascade option" do + assert decode(<<84, 0, 0, 0, 1, 1, 0, 0, 96, 0>>) == + %Truncate{ + number_of_relations: 1, + options: [:cascade], + truncated_relations: [24576] + } + end + + test "decodes messages with restart identity option" do + assert decode(<<84, 0, 0, 0, 1, 2, 0, 0, 96, 0>>) == + %Truncate{ + number_of_relations: 1, + options: [:restart_identity], + truncated_relations: [24576] + } + end + end + + describe "data message (TupleData) decoder" do + test "decodes insert messages" do + assert decode( + <<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, + 48>> + ) == %Insert{ + relation_id: 24576, + tuple_data: ["baz", "560"] + } + end + + test "decodes insert messages with null values" do + assert decode(<<73, 0, 0, 96, 0, 78, 0, 2, 110, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ + relation_id: 24576, + tuple_data: [nil, "560"] + } + end + + test "decodes insert messages with unchanged toasted values" do + assert decode(<<73, 0, 0, 96, 0, 78, 0, 2, 117, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ + relation_id: 24576, + tuple_data: [:unchanged_toast, "560"] + } + end + + test "decodes update messages with default replica identity setting" do + assert decode( + <<85, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, + 116, 0, 0, 0, 3, 53, 54, 48>> + ) == %Update{ + relation_id: 24576, + changed_key_tuple_data: nil, + old_tuple_data: nil, + tuple_data: ["example", "560"] + } + end + + test "decodes update messages with FULL replica identity setting" do + assert decode( + <<85, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, + 48, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, + 53, 54, 48>> + ) == %Update{ + relation_id: 24576, + changed_key_tuple_data: nil, + old_tuple_data: ["baz", "560"], + tuple_data: ["example", "560"] + } + end + + test "decodes update messages with USING INDEX replica identity setting" do + assert decode( + <<85, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 110, 78, 0, 2, 116, 0, + 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, 48>> + ) == %Update{ + relation_id: 24576, + changed_key_tuple_data: ["baz", nil], + old_tuple_data: nil, + tuple_data: ["example", "560"] + } + end + + test "decodes DELETE messages with USING INDEX replica identity setting" do + assert decode( + <<68, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, + 110>> + ) == %Delete{ + relation_id: 24576, + changed_key_tuple_data: ["example", nil] + } + end + + test "decodes DELETE messages with FULL replica identity setting" do + assert decode( + <<68, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, + 48>> + ) == %Delete{ + relation_id: 24576, + old_tuple_data: ["baz", "560"] + } + end + end +end diff --git a/sync_service/test/electric/postgres/lsn_test.exs b/sync_service/test/electric/postgres/lsn_test.exs new file mode 100644 index 0000000000..127143abc2 --- /dev/null +++ b/sync_service/test/electric/postgres/lsn_test.exs @@ -0,0 +1,5 @@ +defmodule Electric.Postgres.LsnTest do + use ExUnit.Case, async: true + + doctest Electric.Postgres.Lsn, import: true +end diff --git a/sync_service/test/electric/postgres/replication_client/collector_test.exs b/sync_service/test/electric/postgres/replication_client/collector_test.exs new file mode 100644 index 0000000000..ea90c6751a --- /dev/null +++ b/sync_service/test/electric/postgres/replication_client/collector_test.exs @@ -0,0 +1,215 @@ +defmodule Electric.Postgres.ReplicationClient.CollectorTest do + use ExUnit.Case + alias Electric.Postgres.ReplicationClient.Collector + alias Electric.Postgres.LogicalReplication.Messages, as: LR + + alias Electric.Replication.Changes.{ + Transaction, + NewRecord, + UpdatedRecord, + DeletedRecord, + TruncatedRelation + } + + @relation %LR.Relation{ + id: 1, + namespace: "public", + name: "users", + replica_identity: :default, + columns: [%LR.Relation.Column{name: "id", flags: [:key], type_oid: 23, type_modifier: -1}] + } + + setup do + collector = %Collector{} + collector = Collector.handle_message(@relation, collector) + {:ok, collector: collector} + end + + test "collector correctly starts a transaction when seeing a 'Begin' message", %{ + collector: collector + } do + begin_msg = %LR.Begin{ + final_lsn: 123, + commit_timestamp: DateTime.utc_now(), + xid: 456 + } + + updated_collector = Collector.handle_message(begin_msg, collector) + + assert %Collector{transaction: %Transaction{xid: 456, lsn: 123}} = updated_collector + end + + test "collector stores received relation message", %{collector: collector} do + new_relation = %LR.Relation{ + id: 2, + namespace: "public", + name: "posts", + replica_identity: :default, + columns: [%LR.Relation.Column{name: "id", flags: [:key], type_oid: 23, type_modifier: -1}] + } + + updated_collector = Collector.handle_message(new_relation, collector) + + assert %Collector{relations: %{1 => @relation, 2 => ^new_relation}} = updated_collector + end + + test "collector logs a warning when receiving a new relation message that doesn't match the previous one", + %{collector: collector} do + new_relation = %{ + @relation + | columns: [%LR.Relation.Column{name: "id", flags: [:key], type_oid: 20, type_modifier: -1}] + } + + log = + ExUnit.CaptureLog.capture_log(fn -> + Collector.handle_message(new_relation, collector) + end) + + assert log =~ "Schema for the table public.users had changed" + end + + test "collector stores received insert when the relation is known", %{collector: collector} do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + insert_msg = %LR.Insert{ + relation_id: 1, + tuple_data: ["123"] + } + + updated_collector = Collector.handle_message(insert_msg, collector) + + assert %Collector{ + transaction: %Transaction{ + changes: [%NewRecord{relation: {"public", "users"}, record: %{"id" => "123"}}] + } + } = updated_collector + end + + test "collector stores received update when the relation is known", %{collector: collector} do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + update_msg = %LR.Update{ + relation_id: 1, + old_tuple_data: ["123"], + tuple_data: ["124"] + } + + updated_collector = Collector.handle_message(update_msg, collector) + + assert %Collector{ + transaction: %Transaction{ + changes: [ + %UpdatedRecord{ + relation: {"public", "users"}, + old_record: %{"id" => "123"}, + record: %{"id" => "124"} + } + ] + } + } = updated_collector + end + + test "collector stores received delete when the relation is known", %{collector: collector} do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + delete_msg = %LR.Delete{ + relation_id: 1, + old_tuple_data: ["123"] + } + + updated_collector = Collector.handle_message(delete_msg, collector) + + assert %Collector{ + transaction: %Transaction{ + changes: [ + %DeletedRecord{relation: {"public", "users"}, old_record: %{"id" => "123"}} + ] + } + } = updated_collector + end + + test "collector stores received truncate when the relation is known", %{collector: collector} do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + truncate_msg = %LR.Truncate{ + number_of_relations: 1, + options: [], + truncated_relations: [1] + } + + updated_collector = Collector.handle_message(truncate_msg, collector) + + assert %Collector{ + transaction: %Transaction{ + changes: [%TruncatedRelation{relation: {"public", "users"}}] + } + } = updated_collector + end + + test "collector emits a complete transaction when seeing a 'Commit' message", %{ + collector: collector + } do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + commit_msg = %LR.Commit{ + lsn: 123, + end_lsn: 456 + } + + {completed_txn, updated_collector} = Collector.handle_message(commit_msg, collector) + + assert %Transaction{xid: 456, lsn: 456} = completed_txn + assert %Collector{transaction: nil} = updated_collector + end + + test "Multiple collected operations are stored in the correct order within the transaction when it's emitted", + %{collector: collector} do + collector = + Collector.handle_message( + %LR.Begin{final_lsn: 123, commit_timestamp: DateTime.utc_now(), xid: 456}, + collector + ) + + insert_msg = %LR.Insert{relation_id: 1, tuple_data: ["123"]} + update_msg = %LR.Update{relation_id: 1, old_tuple_data: ["123"], tuple_data: ["124"]} + delete_msg = %LR.Delete{relation_id: 1, old_tuple_data: ["124"]} + + collector = Collector.handle_message(insert_msg, collector) + collector = Collector.handle_message(update_msg, collector) + collector = Collector.handle_message(delete_msg, collector) + + commit_msg = %LR.Commit{lsn: 123, end_lsn: 456} + + {completed_txn, _updated_collector} = Collector.handle_message(commit_msg, collector) + + assert [ + %NewRecord{relation: {"public", "users"}, record: %{"id" => "123"}}, + %UpdatedRecord{ + relation: {"public", "users"}, + old_record: %{"id" => "123"}, + record: %{"id" => "124"} + }, + %DeletedRecord{relation: {"public", "users"}, old_record: %{"id" => "124"}} + ] = completed_txn.changes + end +end diff --git a/sync_service/test/electric/replication/changes_test.exs b/sync_service/test/electric/replication/changes_test.exs new file mode 100644 index 0000000000..54634039b6 --- /dev/null +++ b/sync_service/test/electric/replication/changes_test.exs @@ -0,0 +1,44 @@ +defmodule Electric.Replication.ChangesTest do + use ExUnit.Case, async: true + + alias Electric.Replication.Changes.UpdatedRecord + + describe "UpdatedRecord.changed_columns" do + test "is empty when old_record is nil" do + changed_columns = MapSet.new([]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new(old_record: nil, record: %{"this" => "that"}) + end + + test "captures column if new value != old value" do + changed_columns = MapSet.new(["first"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value", "second" => "second value"}, + record: %{"first" => "updated first value", "second" => "second value"} + ) + end + + test "captures column if old record does not have column value" do + changed_columns = MapSet.new(["first", "second"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value"}, + record: %{"first" => "updated first value", "second" => "second value"} + ) + end + + test "ignores column if new does not have value" do + changed_columns = MapSet.new(["second"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value", "second" => "second value"}, + record: %{"second" => "second updated value"} + ) + end + end +end