Skip to content

Commit

Permalink
fix: make sure that compacted lines don't reference any txids (#2281)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter authored Feb 3, 2025
1 parent 4e9b9be commit 802680f
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 32 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-pianos-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

feat: make sure lines that underwent compaction don't reference any transaction id
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/replication/log_offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ defmodule Electric.Replication.LogOffset do
new(tx_offset, op_offset)
end

def new(%__MODULE__{} = offset), do: offset

@doc """
Compare two log offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Electric.ShapeCache.FileStorage.ChunkIndex do
stream,
# agg is {file, write_position, byte_count, last_seen_offset}
fn -> {File.open!(path, [:write, :raw]), 0, 0, nil} end,
fn {offset, _, _, _, json_size, _} = line,
fn {offset, _, _, _, _, json_size, _} = line,
{file, write_position, byte_count, last_seen_offset} ->
# Start the chunk if there's no last offset
if is_nil(last_seen_offset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Electric.ShapeCache.FileStorage.KeyIndex do
# We're using delayed writes to avoid interfering with writing the log. Write size here is 64KB or 1s delay
# It's used here because we're writing a line per log line, so this reduces disk contention
fn -> {File.open!(path, [:write, :raw, {:delayed_write, 64 * 1024, 1000}]), 0} end,
fn {log_offset, key_size, key, op_type, json_size, _} = line, {file, write_position} ->
fn {log_offset, key_size, key, op_type, _, json_size, _} = line, {file, write_position} ->
IO.binwrite(
file,
<<key_size::32, key::binary, LogFile.offset(log_offset)::binary, op_type::8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.LogChunker

# 16 bytes offset + 4 bytes key size + 1 byte op type + 8 bytes json size = 29 bytes
@line_overhead 16 + 4 + 1 + 8
# 16 bytes offset + 4 bytes key size + 1 byte op type + 1 byte processed flag + 8 bytes json size = 30 bytes
@line_overhead 16 + 4 + 1 + 1 + 8

@type operation_type() :: :insert | :update | :delete
@type op_type() :: ?u | ?i | ?d
Expand Down Expand Up @@ -45,6 +45,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
<<tx_offset::64, op_offset::64,
key_size::32, key::binary-size(key_size),
op_type::binary-size(1),
processed_flag::8,
json_size::64, json::binary-size(json_size)>>
"""
@spec write_log_file(
Expand All @@ -62,10 +63,11 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
|> ChunkIndex.write_from_stream(log_file_path <> ".chunk_index", chunk_size)
|> KeyIndex.write_from_stream(log_file_path <> ".key_index")
|> Stream.map(fn
{log_offset, key_size, key, op_type, json_size, json} ->
# avoid constructing a binary that includes the json
{log_offset, key_size, key, op_type, flag, json_size, json} ->
# Add processed flag (0 for unprocessed) to header
[
<<offset(log_offset)::binary, key_size::32, key::binary, op_type::8, json_size::64>>,
<<offset(log_offset)::binary, key_size::32, key::binary, op_type::8, flag::8,
json_size::64>>,
json
]
end)
Expand Down Expand Up @@ -95,7 +97,20 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
{[], file}

{_, :keep}, file ->
{[read_line(file)], file}
case read_line(file) do
{offset, key_size, key, op_type, 0, _json_size, json} ->
# First compaction - process JSON and mark as processed
processed_json = process_json(json)

new_line =
{offset, key_size, key, op_type, 1, byte_size(processed_json), processed_json}

{[new_line], file}

line ->
# Already processed or not insert/delete - keep as-is
{[line], file}
end

{_, {:compact, offsets}}, file ->
{[compact_log_file_lines(file, offsets, merge_updates_fun)], file}
Expand All @@ -108,9 +123,9 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
defp read_line(file) do
with <<tx_offset::64, op_offset::64, key_size::32>> <- IO.binread(file, 20),
<<key::binary-size(key_size)>> <- IO.binread(file, key_size),
<<op_type::8, json_size::64>> <- IO.binread(file, 9),
<<op_type::8, processed_flag::8, json_size::64>> <- IO.binread(file, 10),
<<json::binary-size(json_size)>> <- IO.binread(file, json_size) do
{{tx_offset, op_offset}, key_size, key, op_type, json_size, json}
{{tx_offset, op_offset}, key_size, key, op_type, processed_flag, json_size, json}
end
end

Expand All @@ -122,7 +137,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
when elem: var
defp compact_log_file_lines(file, file_offsets, merge_updates_fun) do
# The line to be replaced with compaction will keep it's offset & key
{offset, key_size, key, op_type, _, _} = read_line(file)
{offset, key_size, key, op_type, _, _, _} = read_line(file)

# Save position
{:ok, current_position} = :file.position(file, :cur)
Expand All @@ -146,7 +161,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
# Restore position to continue reading in the outer loop
{:ok, _} = :file.position(file, {:bof, current_position})

{offset, key_size, key, op_type, byte_size(merged_json), merged_json}
{offset, key_size, key, op_type, 1, byte_size(merged_json), merged_json}
end

@doc """
Expand All @@ -156,9 +171,9 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
def normalize_log_stream(stream) do
Stream.map(stream, fn
{log_offset, key, op_type, json} ->
{log_offset, byte_size(key), key, get_op_type(op_type), byte_size(json), json}
{log_offset, byte_size(key), key, get_op_type(op_type), 0, byte_size(json), json}

{_, _, _, _, _, _} = formed_line ->
{_, _, _, _, _, _, _} = formed_line ->
formed_line
end)
end
Expand Down Expand Up @@ -193,7 +208,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
@spec expected_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer()
def expected_position(
current_position,
{_log_offset, key_size, _key, _op_type, json_size, _json}
{_log_offset, key_size, _key, _op_type, _processed_flag, json_size, _json}
) do
current_position + key_size + json_size + @line_overhead
end
Expand All @@ -204,7 +219,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
Used by other modules that know the log file structure.
"""
@spec expected_json_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer()
def expected_json_position(current_position, {_, key_size, _, _, _, _}) do
def expected_json_position(current_position, {_, key_size, _, _, _, _, _}) do
current_position + key_size + @line_overhead
end

Expand Down Expand Up @@ -245,8 +260,8 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
defp extract_jsons_from_binary(<<>>, _, acc), do: Enum.reverse(acc)

defp extract_jsons_from_binary(
<<tx_offset1::64, op_offset1::64, key_size::32, _::binary-size(key_size),
_::binary-size(1), json_size::64, _::binary-size(json_size), rest::binary>>,
<<tx_offset1::64, op_offset1::64, key_size::32, _::binary-size(key_size), _::8, _flag::8,
json_size::64, _::binary-size(json_size), rest::binary>>,
%LogOffset{
tx_offset: tx_offset2,
op_offset: op_offset2
Expand All @@ -257,7 +272,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
do: extract_jsons_from_binary(rest, log_offset, acc)

defp extract_jsons_from_binary(
<<_::128, key_size::32, _::binary-size(key_size), _::binary-size(1), json_size::64,
<<_::128, key_size::32, _::binary-size(key_size), _::8, _flag::8, json_size::64,
json::binary-size(json_size), rest::binary>>,
log_offset,
acc
Expand All @@ -274,4 +289,11 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do
do: <<tx_offset::64, op_offset::64>>

def offset({tx_offset, op_offset}), do: <<tx_offset::64, op_offset::64>>

defp process_json(json) do
json
|> Jason.decode!()
|> update_in(["headers"], &Map.drop(&1 || %{}, ["txid"]))
|> Jason.encode!()
end
end
Original file line number Diff line number Diff line change
@@ -1,36 +1,73 @@
defmodule Electric.ShapeCache.FileStorage.CompactionTest do
use ExUnit.Case, async: true
alias Support.TestUtils
import Support.TestUtils, only: [ins: 1, del: 1, upd: 1]
alias Electric.ShapeCache.FileStorage.Compaction
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.Replication.LogOffset

@moduletag :tmp_dir

describe "compact_in_place/2" do
test "compacts a log file", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "log_file")
test "removes txid headers during first compaction only", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "txid_test_log")

# Write initial log file with supporting files
log_stream = [
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, ~S|"value1"|},
{%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update, ~S|"value2"|},
{%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :insert, ~S|"value3"|},
{%LogOffset{tx_offset: 4, op_offset: 1}, "key1", :update, ~S|"value new 1"|},
{%LogOffset{tx_offset: 5, op_offset: 1}, "key1", :update, ~S|"value new 2"|},
{%LogOffset{tx_offset: 6, op_offset: 1}, "key1", :update, ~S|"value new 3"|},
{%LogOffset{tx_offset: 7, op_offset: 1}, "key1", :update, ~S|"value new 4"|},
{%LogOffset{tx_offset: 8, op_offset: 1}, "key1", :update, ~S|"value new 5"|},
{%LogOffset{tx_offset: 9, op_offset: 1}, "key2", :delete, ~S|"value"|}
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert,
~S|{"value":"v1","headers":{"operation": "insert", "txid":123}}|},
{%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update,
~S|{"value":"v2","headers":{"operation": "update", "txid":124}}|},
{%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :delete,
~S|{"value":"v3","headers":{"operation": "delete", "txid":125}}|}
]

paths = LogFile.write_log_file(log_stream, log_file_path)

# First compaction
paths = Compaction.compact_in_place(paths, 1_000_000)

# Verify first compaction removed txid from insert/delete but left update untouched
entries = LogFile.read_chunk(paths, LogOffset.first()) |> Enum.to_list()

assert [
%{"operation" => "insert"},
%{"operation" => "update"},
%{"operation" => "delete"}
] == Enum.map(entries, &(Jason.decode!(&1) |> Map.get("headers")))

# Second compaction
paths = Compaction.compact_in_place(paths, 1_000_000)

# Verify txid remains removed and JSON stays the same
reentries = LogFile.read_chunk(paths, LogOffset.first()) |> Enum.to_list()
assert entries == reentries
end

test "compacts a log file", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "log_file")

log_stream =
[
ins(offset: {1, 1}, rec: [id: "key1", value: "value1"]),
upd(offset: {2, 1}, rec: [id: "key1", value: {"value1", "value2"}]),
ins(offset: {3, 1}, rec: [id: "key2", value: "value3"]),
upd(offset: {4, 1}, rec: [id: "key1", value: {"value2", "value new 1"}]),
upd(offset: {5, 1}, rec: [id: "key1", value: {"value new 1", "value new 2"}]),
upd(offset: {6, 1}, rec: [id: "key1", value: {"value new 2", "value new 3"}]),
upd(offset: {7, 1}, rec: [id: "key1", value: {"value new 3", "value new 4"}]),
upd(offset: {8, 1}, rec: [id: "key1", value: {"value new 4", "value new 5"}]),
del(offset: {9, 1}, rec: [id: "key2", value: "value"])
]
|> TestUtils.changes_to_log_items()

paths = LogFile.write_log_file(log_stream, log_file_path)

assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
|> Enum.to_list()
|> length == 9

assert {log_file_path, chunk_index_path, key_index_path} =
Compaction.compact_in_place(paths, 1_000_000, &(&1 <> &2))
Compaction.compact_in_place(paths, 1_000_000)

assert File.exists?(log_file_path)
assert File.exists?(chunk_index_path)
Expand Down
40 changes: 40 additions & 0 deletions packages/sync-service/test/support/test_utils.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Support.TestUtils do
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.LogChunker
alias Electric.LogItems
alias Electric.Replication.Changes
Expand Down Expand Up @@ -38,4 +39,43 @@ defmodule Support.TestUtils do
def full_test_name(ctx) do
"#{ctx.module} #{ctx.test}"
end

def ins(opts) do
offset = Keyword.fetch!(opts, :offset) |> LogOffset.new()
relation = Keyword.get(opts, :relation, {"public", "test_table"})
record = Keyword.fetch!(opts, :rec) |> Map.new(fn {k, v} -> {to_string(k), to_string(v)} end)
%Changes.NewRecord{relation: relation, record: record, log_offset: offset}
end

def del(opts) do
offset = Keyword.fetch!(opts, :offset) |> LogOffset.new()
relation = Keyword.get(opts, :relation, {"public", "test_table"})

old_record =
Keyword.fetch!(opts, :rec) |> Map.new(fn {k, v} -> {to_string(k), to_string(v)} end)

%Changes.DeletedRecord{relation: relation, old_record: old_record, log_offset: offset}
end

def upd(opts) do
offset = Keyword.fetch!(opts, :offset) |> LogOffset.new()
relation = Keyword.get(opts, :relation, {"public", "test_table"})

{old, new} =
Enum.reduce(Keyword.fetch!(opts, :rec), {%{}, %{}}, fn
{k, {old, new}}, {old_acc, new_acc} ->
{Map.put(old_acc, to_string(k), to_string(old)),
Map.put(new_acc, to_string(k), to_string(new))}

{k, v}, {old, new} ->
{Map.put(old, to_string(k), to_string(v)), Map.put(new, to_string(k), to_string(v))}
end)

Changes.UpdatedRecord.new(
relation: relation,
old_record: old,
record: new,
log_offset: offset
)
end
end

0 comments on commit 802680f

Please sign in to comment.