Skip to content

Commit

Permalink
Add parameter to API to send entire row on update (#1807)
Browse files Browse the repository at this point in the history
With `replica=full` update messages include the full row, not just
the changed columns.

Phoenix.LiveView Stream integration requires the full row to perform an
update, not just the changed values. This adds a parameter to the shape
definition that turns off the column diffing for updates.

Since we were just dropping unchanged values this is a minor change to
the wiring. Most of the work here is adding the configuration.
  • Loading branch information
magnetised authored Nov 5, 2024
1 parent 4d872b6 commit aed079f
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 28 deletions.
6 changes: 6 additions & 0 deletions .changeset/sharp-moons-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Add `replica` parameter to change the behaviour for updates to include the full row, not just the modified columns
30 changes: 20 additions & 10 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ defmodule Electric.LogItems do
@spec from_change(
Changes.data_change(),
txid :: non_neg_integer() | nil,
pk_cols :: [String.t()]
pk_cols :: [String.t()],
replica :: Shape.replica()
) :: [log_item(), ...]
def from_change(%Changes.NewRecord{} = change, txid, _) do
def from_change(%Changes.NewRecord{} = change, txid, _, _replica) do
[
%{
key: change.key,
Expand All @@ -34,34 +35,34 @@ defmodule Electric.LogItems do
]
end

def from_change(%Changes.DeletedRecord{} = change, txid, pk_cols) do
def from_change(%Changes.DeletedRecord{} = change, txid, pk_cols, replica) do
[
%{
key: change.key,
value: take_pks_or_all(change.old_record, pk_cols),
value: take_pks_or_all(change.old_record, pk_cols, replica),
headers: %{operation: :delete, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

# `old_key` is nil when it's unchanged. This is not possible when there is no PK defined.
def from_change(%Changes.UpdatedRecord{old_key: nil} = change, txid, pk_cols) do
def from_change(%Changes.UpdatedRecord{old_key: nil} = change, txid, pk_cols, replica) do
[
%{
key: change.key,
value: Map.take(change.record, Enum.concat(pk_cols, change.changed_columns)),
value: update_values(change, pk_cols, replica),
headers: %{operation: :update, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

def from_change(%Changes.UpdatedRecord{} = change, txid, pk_cols) do
def from_change(%Changes.UpdatedRecord{} = change, txid, pk_cols, replica) do
[
%{
key: change.old_key,
value: take_pks_or_all(change.old_record, pk_cols),
value: take_pks_or_all(change.old_record, pk_cols, replica),
headers: %{
operation: :delete,
txid: txid,
Expand All @@ -84,8 +85,17 @@ defmodule Electric.LogItems do
]
end

defp take_pks_or_all(record, []), do: record
defp take_pks_or_all(record, pks), do: Map.take(record, pks)
defp take_pks_or_all(record, _pks, :full), do: record
defp take_pks_or_all(record, [], :default), do: record
defp take_pks_or_all(record, pks, :default), do: Map.take(record, pks)

defp update_values(%{record: record, changed_columns: changed_columns}, pk_cols, :default) do
Map.take(record, Enum.concat(pk_cols, changed_columns))
end

defp update_values(%{record: record}, _pk_cols, :full) do
record
end

@spec from_snapshot_row_stream(
row_stream :: Enumerable.t(list()),
Expand Down
8 changes: 7 additions & 1 deletion packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ defmodule Electric.Plug.ServeShapePlug do
field(:where, :string)
field(:columns, :string)
field(:shape_definition, :string)
field(:replica, Ecto.Enum, values: [:default, :full], default: :default)
end

def validate(params, opts) do
Expand Down Expand Up @@ -156,8 +157,12 @@ defmodule Electric.Plug.ServeShapePlug do
table = fetch_change!(changeset, :table)
where = fetch_field!(changeset, :where)
columns = get_change(changeset, :columns, nil)
replica = fetch_field!(changeset, :replica)

case Shapes.Shape.new(table, opts ++ [where: where, columns: columns]) do
case Shapes.Shape.new(
table,
opts ++ [where: where, columns: columns, replica: replica]
) do
{:ok, result} ->
put_change(changeset, :shape_definition, result)

Expand Down Expand Up @@ -615,6 +620,7 @@ defmodule Electric.Plug.ServeShapePlug do
"shape.where" => assigns[:where],
"shape.root_table" => assigns[:table],
"shape.definition" => assigns[:shape_definition],
"shape.replica" => assigns[:replica],
"shape_req.is_live" => assigns[:live],
"shape_req.offset" => assigns[:offset],
"shape_req.is_shape_rotated" => assigns[:ot_is_shape_rotated] || false,
Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ defmodule Electric.Shapes.Consumer do
) do
{log_items, new_log_state} =
changes
|> Stream.flat_map(&LogItems.from_change(&1, xid, Shape.pk(shape, &1.relation)))
|> Stream.flat_map(
&LogItems.from_change(&1, xid, Shape.pk(shape, &1.relation), shape.replica)
)
|> Enum.flat_map_reduce(log_state, fn log_item,
%{current_chunk_byte_size: chunk_size} = state ->
json_log_item = Jason.encode!(log_item)
Expand Down
35 changes: 28 additions & 7 deletions packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ defmodule Electric.Shapes.Shape do
alias Electric.Replication.Changes

@enforce_keys [:root_table, :root_table_id]
defstruct [:root_table, :root_table_id, :table_info, :where, :selected_columns]

defstruct [
:root_table,
:root_table_id,
:table_info,
:where,
:selected_columns,
replica: :default
]

@type replica() :: :full | :default
@type table_info() :: %{
columns: [Inspector.column_info(), ...],
pk: [String.t(), ...]
Expand All @@ -22,7 +30,8 @@ defmodule Electric.Shapes.Shape do
Electric.relation() => table_info()
},
where: Electric.Replication.Eval.Expr.t() | nil,
selected_columns: [String.t(), ...] | nil
selected_columns: [String.t(), ...] | nil,
replica: replica()
}

@type table_with_where_clause() :: {Electric.relation(), String.t() | nil}
Expand Down Expand Up @@ -60,15 +69,18 @@ defmodule Electric.Shapes.Shape do
@shape_schema NimbleOptions.new!(
where: [type: {:or, [:string, nil]}],
columns: [type: {:or, [{:list, :string}, nil]}],
replica: [
type: {:custom, __MODULE__, :verify_replica, []},
default: :default
],
inspector: [
type: :mod_arg,
default: {Electric.Postgres.Inspector, Electric.DbPool}
]
)
def new(table, opts) do
opts = NimbleOptions.validate!(opts, @shape_schema)

with inspector <- Access.fetch!(opts, :inspector),
with {:ok, opts} <- NimbleOptions.validate(opts, @shape_schema),
inspector <- Access.fetch!(opts, :inspector),
{:ok, %{relation: table, relation_id: relation_id}} <- validate_table(table, inspector),
{:ok, column_info, pk_cols} <- load_column_info(table, inspector),
{:ok, selected_columns} <-
Expand All @@ -81,7 +93,8 @@ defmodule Electric.Shapes.Shape do
root_table_id: relation_id,
table_info: %{table => %{pk: pk_cols, columns: column_info}},
where: where,
selected_columns: selected_columns
selected_columns: selected_columns,
replica: Access.get(opts, :replica, :default)
}}
end
end
Expand Down Expand Up @@ -165,6 +178,14 @@ defmodule Electric.Shapes.Shape do
end
end

def verify_replica(mode) when mode in [:full, "full"], do: {:ok, :full}
def verify_replica(mode) when mode in [:default, "default"], do: {:ok, :default}

def verify_replica(invalid),
do:
{:error,
"Invalid value for replica: #{inspect(invalid)}. Expecting one of `full` or `default`"}

@doc """
List tables that are a part of this shape.
"""
Expand Down
55 changes: 48 additions & 7 deletions packages/sync-service/test/electric/log_item_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
}

assert LogItems.from_change(record, 1, ["pk"]) ==
assert LogItems.from_change(record, 1, ["pk"], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand All @@ -24,7 +24,7 @@ defmodule Electric.LogItemsTest do
]

# And with empty PK
assert LogItems.from_change(record, 1, []) ==
assert LogItems.from_change(record, 1, [], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand All @@ -43,7 +43,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
}

assert LogItems.from_change(record, 1, ["pk"]) ==
assert LogItems.from_change(record, 1, ["pk"], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand All @@ -62,7 +62,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
}

assert LogItems.from_change(record, 1, []) ==
assert LogItems.from_change(record, 1, [], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand All @@ -83,7 +83,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
})

assert LogItems.from_change(record, 1, ["pk"]) ==
assert LogItems.from_change(record, 1, ["pk"], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand All @@ -94,6 +94,47 @@ defmodule Electric.LogItemsTest do
]
end

test "when replica=full sends entire row for updates" do
record =
Changes.UpdatedRecord.new(%{
key: "my_key",
old_record: %{"pk" => "10", "hello" => "world", "test" => "me"},
record: %{"pk" => "10", "hello" => "world", "test" => "new"},
log_offset: LogOffset.first(),
relation: {"public", "test"}
})

assert LogItems.from_change(record, 1, ["pk"], :full) ==
[
%{
offset: LogOffset.first(),
value: %{"pk" => "10", "hello" => "world", "test" => "new"},
key: "my_key",
headers: %{relation: ["public", "test"], operation: :update, txid: 1}
}
]
end

test "when replica=full sends entire row for deletes" do
record =
%Changes.DeletedRecord{
key: "my_key",
old_record: %{"pk" => "10", "hello" => "world", "test" => "me"},
log_offset: LogOffset.first(),
relation: {"public", "test"}
}

assert LogItems.from_change(record, 1, ["pk"], :full) ==
[
%{
offset: LogOffset.first(),
value: %{"pk" => "10", "hello" => "world", "test" => "me"},
key: "my_key",
headers: %{relation: ["public", "test"], operation: :delete, txid: 1}
}
]
end

test "splits up the `UpdatedRecord` if a key was changed, adding a reference to both" do
record =
Changes.UpdatedRecord.new(%{
Expand All @@ -105,7 +146,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
})

assert LogItems.from_change(record, 1, ["pk"]) ==
assert LogItems.from_change(record, 1, ["pk"], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand Down Expand Up @@ -143,7 +184,7 @@ defmodule Electric.LogItemsTest do
relation: {"public", "test"}
})

assert LogItems.from_change(record, 1, []) ==
assert LogItems.from_change(record, 1, [], :default) ==
[
%{
offset: LogOffset.new(0, 0),
Expand Down
48 changes: 48 additions & 0 deletions packages/sync-service/test/electric/plug/serve_shape_plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -698,5 +698,53 @@ defmodule Electric.Plug.ServeShapePlugTest do
"columns" => ["The following columns could not be found: invalid"]
}
end

test "honours replica query param for shape", %{tenant_id: tenant_id} = ctx do
test_shape_handle = "test-shape-without-deltas"
next_offset = LogOffset.increment(@first_offset)

Mock.ShapeCache
|> expect(:get_or_create_shape_handle, fn %{root_table: {"public", "users"}, replica: :full},
_opts ->
{test_shape_handle, @test_offset}
end)
|> stub(:has_shape?, fn ^test_shape_handle, _opts -> true end)
|> expect(:await_snapshot_start, fn ^test_shape_handle, _ -> :started end)

Mock.Storage
|> stub(:for_shape, fn ^test_shape_handle, ^tenant_id, _opts -> @test_opts end)
|> expect(:get_chunk_end_log_offset, fn @before_all_offset, _ ->
next_offset
end)
|> expect(:get_snapshot, fn @test_opts ->
{@first_offset, [Jason.encode!(%{key: "snapshot"})]}
end)
|> expect(:get_log_stream, fn @first_offset, _, @test_opts ->
[Jason.encode!(%{key: "log", value: "foo", headers: %{}, offset: next_offset})]
end)

conn =
ctx
|> conn(:get, %{"table" => "public.users"}, "?offset=-1&replica=full")
|> ServeShapePlug.call([])

assert conn.status == 200

assert Jason.decode!(conn.resp_body) == [
%{"key" => "snapshot"},
%{
"key" => "log",
"value" => "foo",
"headers" => %{},
"offset" => "#{next_offset}"
}
]

assert Plug.Conn.get_resp_header(conn, "etag") == [
"#{test_shape_handle}:-1:#{next_offset}"
]

assert Plug.Conn.get_resp_header(conn, "electric-handle") == [test_shape_handle]
end
end
end
Loading

0 comments on commit aed079f

Please sign in to comment.