Skip to content

Commit

Permalink
chore: added tests for copied functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Jul 3, 2024
1 parent ae63bb4 commit dc22a0f
Show file tree
Hide file tree
Showing 7 changed files with 667 additions and 132 deletions.
146 changes: 17 additions & 129 deletions sync_service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
defmodule Electric.Postgres.ReplicationClient do
require Logger
alias Electric.Postgres.LogicalReplication.Messages, as: LR
@moduledoc """
A client module for Postgres logical replication
"""
alias Electric.Postgres.ReplicationClient.Collector
alias Electric.Postgres.LogicalReplication.Decoder

alias Electric.Replication.Changes.{
Transaction,
NewRecord,
UpdatedRecord,
DeletedRecord,
TruncatedRelation
}

require Logger
use Postgrex.ReplicationConnection

defmodule State do
alias Electric.Replication.Changes
@enforce_keys [:transaction_received, :publication_name]
defstruct [
:transaction_received,
:publication_name,
relations: %{},
origin: "postgres",
txn: nil,
txn_collector: %Collector{},
step: :disconnected
]

@type t() :: %__MODULE__{
transaction_received: {module(), atom(), [term()]},
txn: nil | Transaction.t(),
publication_name: String.t(),
origin: String.t(),
relations: %{LR.relation_id() => LR.Relation.t()},
txn_collector: Collector.t(),
step: :disconnected | :create_slot | :streaming
}

Expand All @@ -43,10 +35,6 @@ defmodule Electric.Postgres.ReplicationClient do
opts = NimbleOptions.validate!(opts, @opts_schema)
struct!(__MODULE__, opts)
end

@spec add_txn_change(t(), Changes.change()) :: t()
def add_txn_change(%__MODULE__{txn: %Transaction{} = txn} = state, change),
do: %{state | txn: Transaction.prepend_change(txn, change)}
end

def start_link(opts) do
Expand Down Expand Up @@ -84,18 +72,21 @@ defmodule Electric.Postgres.ReplicationClient do
@impl true
@spec handle_data(binary(), State.t()) ::
{:noreply, State.t()} | {:noreply, list(binary()), State.t()}
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
def handle_data(
<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>,
%State{} = state
) do
rest
|> Decoder.decode()
|> handle_message(state)
|> Collector.handle_message(state.txn_collector)
|> case do
%State{} = state ->
{:noreply, state}
%Collector{} = txn_collector ->
{:noreply, %{state | txn_collector: txn_collector}}

{%Transaction{} = txn, %State{} = state} ->
{txn, %Collector{} = txn_collector} ->
{m, f, args} = state.transaction_received
apply(m, f, [txn | args])
{:noreply, state}
{:noreply, %{state | txn_collector: txn_collector}}
end
end

Expand All @@ -111,107 +102,4 @@ defmodule Electric.Postgres.ReplicationClient do

@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch

defp handle_message(%LR.Message{} = msg, state) do
Logger.info("Got a message from PG via logical replication: #{inspect(msg)}")

state
end

defp handle_message(%LR.Begin{} = msg, %State{} = state) do
txn = %Transaction{
xid: msg.xid,
lsn: msg.final_lsn,
changes: [],
commit_timestamp: msg.commit_timestamp
}

%{state | txn: txn}
end

defp handle_message(%LR.Origin{} = msg, state) do
Logger.debug("origin: #{inspect(msg.name)}")
state
end

defp handle_message(%LR.Type{}, state), do: state

defp handle_message(%LR.Relation{} = rel, state) do
if is_map_key(state.relations, rel.id) do
Logger.warning("Schema had changed")
end

Map.update!(state, :relations, &Map.put(&1, rel.id, rel))
end

defp handle_message(%LR.Insert{} = msg, %State{} = state) do
relation = Map.fetch!(state.relations, msg.relation_id)

data = data_tuple_to_map(relation.columns, msg.tuple_data)

new_record = %NewRecord{relation: {relation.namespace, relation.name}, record: data}

State.add_txn_change(state, new_record)
end

defp handle_message(%LR.Update{} = msg, %State{} = state) do
relation = Map.get(state.relations, msg.relation_id)

old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data)
data = data_tuple_to_map(relation.columns, msg.tuple_data)

updated_record =
UpdatedRecord.new(
relation: {relation.namespace, relation.name},
old_record: old_data,
record: data
)

State.add_txn_change(state, updated_record)
end

defp handle_message(%LR.Delete{} = msg, %State{} = state) do
relation = Map.get(state.relations, msg.relation_id)

data =
data_tuple_to_map(
relation.columns,
msg.old_tuple_data || msg.changed_key_tuple_data
)

deleted_record = %DeletedRecord{
relation: {relation.namespace, relation.name},
old_record: data
}

State.add_txn_change(state, deleted_record)
end

defp handle_message(%LR.Truncate{} = msg, state) do
msg.truncated_relations
|> Enum.map(&Map.get(state.relations, &1))
|> Enum.map(&%TruncatedRelation{relation: {&1.namespace, &1.name}})
|> Enum.reduce(state, &State.add_txn_change(&2, &1))
end

# When we have a new event, enqueue it and see if there's any
# pending demand we can meet by dispatching events.

defp handle_message(%LR.Commit{lsn: commit_lsn, end_lsn: end_lsn}, %State{txn: txn} = state)
when not is_nil(txn) and commit_lsn == txn.lsn do
# Metrics.span_event(state.span, :transaction, Transaction.count_operations(txn))

{%Transaction{txn | lsn: end_lsn}, %State{state | txn: nil}}
end

@spec data_tuple_to_map([LR.Relation.Column.t()], list(String.t())) :: %{
String.t() => String.t()
}
defp data_tuple_to_map(_columns, nil), do: %{}

defp data_tuple_to_map(columns, tuple_data) do
columns
|> Enum.zip(tuple_data)
|> Map.new(fn {column, data} -> {column.name, data} end)
end
end
129 changes: 129 additions & 0 deletions sync_service/lib/electric/postgres/replication_client/collector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
defmodule Electric.Postgres.ReplicationClient.Collector do
@moduledoc """
Conversion of incoming Postgres logical replication messages
to internal change representation.
"""

require Logger
alias Electric.Replication.Changes
alias Electric.Postgres.LogicalReplication.Messages, as: LR

alias Electric.Replication.Changes.{
Transaction,
NewRecord,
UpdatedRecord,
DeletedRecord,
TruncatedRelation
}

defstruct transaction: nil, relations: %{}

@type t() :: %__MODULE__{
transaction: nil | Transaction.t(),
relations: %{optional(LR.relation_id()) => LR.Relation.t()}
}

@doc """
Handle incoming logical replication message by either building up a transaction or
returning a complete built up transaction.
"""
@spec handle_message(LR.message(), t()) :: t() | {Transaction.t(), t()}
def handle_message(%LR.Message{} = msg, state) do
Logger.info("Got a message from PG via logical replication: #{inspect(msg)}")

state
end

def handle_message(%LR.Begin{} = msg, %__MODULE__{} = state) do
txn = %Transaction{
xid: msg.xid,
lsn: msg.final_lsn,
changes: [],
commit_timestamp: msg.commit_timestamp
}

%{state | transaction: txn}
end

def handle_message(%LR.Origin{} = _msg, state), do: state
def handle_message(%LR.Type{}, state), do: state

def handle_message(%LR.Relation{id: id} = rel, %__MODULE__{} = state) do
if Map.get(state.relations, id, rel) != rel do
Logger.warning("Schema for the table #{rel.namespace}.#{rel.name} had changed")
end

Map.update!(state, :relations, &Map.put(&1, rel.id, rel))
end

def handle_message(%LR.Insert{} = msg, %__MODULE__{} = state) do
relation = Map.fetch!(state.relations, msg.relation_id)

data = data_tuple_to_map(relation.columns, msg.tuple_data)

%NewRecord{relation: {relation.namespace, relation.name}, record: data}
|> prepend_change(state)
end

def handle_message(%LR.Update{} = msg, %__MODULE__{} = state) do
relation = Map.get(state.relations, msg.relation_id)

old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data)
data = data_tuple_to_map(relation.columns, msg.tuple_data)

UpdatedRecord.new(
relation: {relation.namespace, relation.name},
old_record: old_data,
record: data
)
|> prepend_change(state)
end

def handle_message(%LR.Delete{} = msg, %__MODULE__{} = state) do
relation = Map.get(state.relations, msg.relation_id)

data =
data_tuple_to_map(
relation.columns,
msg.old_tuple_data || msg.changed_key_tuple_data
)

%DeletedRecord{
relation: {relation.namespace, relation.name},
old_record: data
}
|> prepend_change(state)
end

def handle_message(%LR.Truncate{} = msg, state) do
msg.truncated_relations
|> Enum.map(&Map.get(state.relations, &1))
|> Enum.map(&%TruncatedRelation{relation: {&1.namespace, &1.name}})
|> Enum.reduce(state, &prepend_change/2)
end

def handle_message(
%LR.Commit{lsn: commit_lsn, end_lsn: end_lsn},
%__MODULE__{transaction: txn} = state
)
when not is_nil(txn) and commit_lsn == txn.lsn do
{%Transaction{txn | lsn: end_lsn, changes: Enum.reverse(txn.changes)},
%__MODULE__{state | transaction: nil}}
end

@spec data_tuple_to_map([LR.Relation.Column.t()], list(String.t())) :: %{
String.t() => String.t()
}
defp data_tuple_to_map(_columns, nil), do: %{}

defp data_tuple_to_map(columns, tuple_data) do
columns
|> Enum.zip(tuple_data)
|> Map.new(fn {column, data} -> {column.name, data} end)
end

@spec prepend_change(Changes.change(), t()) :: t()
defp prepend_change(change, %__MODULE__{transaction: txn} = state) do
%{state | transaction: Transaction.prepend_change(txn, change)}
end
end
14 changes: 11 additions & 3 deletions sync_service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Electric.Replication.Changes do
| Changes.UpdatedRecord.t()
| Changes.DeletedRecord.t()

@type change() :: data_change()
@type change() :: data_change() | Changes.TruncatedRelation.t()

defmodule Transaction do
alias Electric.Replication.Changes
Expand All @@ -56,8 +56,14 @@ defmodule Electric.Replication.Changes do
@spec prepend_change(t(), Changes.change()) :: t()
def prepend_change(
%__MODULE__{changes: changes, affected_relations: rels} = txn,
%{relation: rel} = change
) do
%change_mod{relation: rel} = change
)
when change_mod in [
Changes.NewRecord,
Changes.UpdatedRecord,
Changes.DeletedRecord,
Changes.TruncatedRelation
] do
%{
txn
| changes: [change | changes],
Expand Down Expand Up @@ -130,6 +136,8 @@ defmodule Electric.Replication.Changes do

defmodule TruncatedRelation do
defstruct [:relation]

@type t() :: %__MODULE__{relation: Changes.relation()}
end

# FIXME: this assumes PK is literally just "id" column
Expand Down
Loading

0 comments on commit dc22a0f

Please sign in to comment.