diff --git a/.changeset/small-pianos-explain.md b/.changeset/small-pianos-explain.md new file mode 100644 index 0000000000..333f363704 --- /dev/null +++ b/.changeset/small-pianos-explain.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +feat: make sure lines that underwent compaction don't reference any transaction id diff --git a/packages/sync-service/lib/electric/replication/log_offset.ex b/packages/sync-service/lib/electric/replication/log_offset.ex index 6982a62145..6217f61313 100644 --- a/packages/sync-service/lib/electric/replication/log_offset.ex +++ b/packages/sync-service/lib/electric/replication/log_offset.ex @@ -60,6 +60,8 @@ defmodule Electric.Replication.LogOffset do new(tx_offset, op_offset) end + def new(%__MODULE__{} = offset), do: offset + @doc """ Compare two log offsets diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex index f0a5f9bad5..6b7a1a21e4 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex @@ -31,7 +31,7 @@ defmodule Electric.ShapeCache.FileStorage.ChunkIndex do stream, # agg is {file, write_position, byte_count, last_seen_offset} fn -> {File.open!(path, [:write, :raw]), 0, 0, nil} end, - fn {offset, _, _, _, json_size, _} = line, + fn {offset, _, _, _, _, json_size, _} = line, {file, write_position, byte_count, last_seen_offset} -> # Start the chunk if there's no last offset if is_nil(last_seen_offset), diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex index 5ac36d3a15..df2b9cab60 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex @@ -25,7 +25,7 @@ defmodule Electric.ShapeCache.FileStorage.KeyIndex do # We're using delayed writes to avoid interfering with writing the log. Write size here is 64KB or 1s delay # It's used here because we're writing a line per log line, so this reduces disk contention fn -> {File.open!(path, [:write, :raw, {:delayed_write, 64 * 1024, 1000}]), 0} end, - fn {log_offset, key_size, key, op_type, json_size, _} = line, {file, write_position} -> + fn {log_offset, key_size, key, op_type, _, json_size, _} = line, {file, write_position} -> IO.binwrite( file, <> """ @spec write_log_file( @@ -62,10 +63,11 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do |> ChunkIndex.write_from_stream(log_file_path <> ".chunk_index", chunk_size) |> KeyIndex.write_from_stream(log_file_path <> ".key_index") |> Stream.map(fn - {log_offset, key_size, key, op_type, json_size, json} -> - # avoid constructing a binary that includes the json + {log_offset, key_size, key, op_type, flag, json_size, json} -> + # Add processed flag (0 for unprocessed) to header [ - <>, + <>, json ] end) @@ -95,7 +97,20 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do {[], file} {_, :keep}, file -> - {[read_line(file)], file} + case read_line(file) do + {offset, key_size, key, op_type, 0, _json_size, json} -> + # First compaction - process JSON and mark as processed + processed_json = process_json(json) + + new_line = + {offset, key_size, key, op_type, 1, byte_size(processed_json), processed_json} + + {[new_line], file} + + line -> + # Already processed or not insert/delete - keep as-is + {[line], file} + end {_, {:compact, offsets}}, file -> {[compact_log_file_lines(file, offsets, merge_updates_fun)], file} @@ -108,9 +123,9 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do defp read_line(file) do with <> <- IO.binread(file, 20), <> <- IO.binread(file, key_size), - <> <- IO.binread(file, 9), + <> <- IO.binread(file, 10), <> <- IO.binread(file, json_size) do - {{tx_offset, op_offset}, key_size, key, op_type, json_size, json} + {{tx_offset, op_offset}, key_size, key, op_type, processed_flag, json_size, json} end end @@ -122,7 +137,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do when elem: var defp compact_log_file_lines(file, file_offsets, merge_updates_fun) do # The line to be replaced with compaction will keep it's offset & key - {offset, key_size, key, op_type, _, _} = read_line(file) + {offset, key_size, key, op_type, _, _, _} = read_line(file) # Save position {:ok, current_position} = :file.position(file, :cur) @@ -146,7 +161,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do # Restore position to continue reading in the outer loop {:ok, _} = :file.position(file, {:bof, current_position}) - {offset, key_size, key, op_type, byte_size(merged_json), merged_json} + {offset, key_size, key, op_type, 1, byte_size(merged_json), merged_json} end @doc """ @@ -156,9 +171,9 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do def normalize_log_stream(stream) do Stream.map(stream, fn {log_offset, key, op_type, json} -> - {log_offset, byte_size(key), key, get_op_type(op_type), byte_size(json), json} + {log_offset, byte_size(key), key, get_op_type(op_type), 0, byte_size(json), json} - {_, _, _, _, _, _} = formed_line -> + {_, _, _, _, _, _, _} = formed_line -> formed_line end) end @@ -193,7 +208,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do @spec expected_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer() def expected_position( current_position, - {_log_offset, key_size, _key, _op_type, json_size, _json} + {_log_offset, key_size, _key, _op_type, _processed_flag, json_size, _json} ) do current_position + key_size + json_size + @line_overhead end @@ -204,7 +219,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do Used by other modules that know the log file structure. """ @spec expected_json_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer() - def expected_json_position(current_position, {_, key_size, _, _, _, _}) do + def expected_json_position(current_position, {_, key_size, _, _, _, _, _}) do current_position + key_size + @line_overhead end @@ -245,8 +260,8 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do defp extract_jsons_from_binary(<<>>, _, acc), do: Enum.reverse(acc) defp extract_jsons_from_binary( - <>, + <>, %LogOffset{ tx_offset: tx_offset2, op_offset: op_offset2 @@ -257,7 +272,7 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do do: extract_jsons_from_binary(rest, log_offset, acc) defp extract_jsons_from_binary( - <<_::128, key_size::32, _::binary-size(key_size), _::binary-size(1), json_size::64, + <<_::128, key_size::32, _::binary-size(key_size), _::8, _flag::8, json_size::64, json::binary-size(json_size), rest::binary>>, log_offset, acc @@ -274,4 +289,11 @@ defmodule Electric.ShapeCache.FileStorage.LogFile do do: <> def offset({tx_offset, op_offset}), do: <> + + defp process_json(json) do + json + |> Jason.decode!() + |> update_in(["headers"], &Map.drop(&1 || %{}, ["txid"])) + |> Jason.encode!() + end end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs index 3e6e4968be..515239ccb5 100644 --- a/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs +++ b/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs @@ -1,5 +1,7 @@ defmodule Electric.ShapeCache.FileStorage.CompactionTest do use ExUnit.Case, async: true + alias Support.TestUtils + import Support.TestUtils, only: [ins: 1, del: 1, upd: 1] alias Electric.ShapeCache.FileStorage.Compaction alias Electric.ShapeCache.FileStorage.LogFile alias Electric.Replication.LogOffset @@ -7,30 +9,65 @@ defmodule Electric.ShapeCache.FileStorage.CompactionTest do @moduletag :tmp_dir describe "compact_in_place/2" do - test "compacts a log file", %{tmp_dir: tmp_dir} do - log_file_path = Path.join(tmp_dir, "log_file") + test "removes txid headers during first compaction only", %{tmp_dir: tmp_dir} do + log_file_path = Path.join(tmp_dir, "txid_test_log") - # Write initial log file with supporting files log_stream = [ - {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, ~S|"value1"|}, - {%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update, ~S|"value2"|}, - {%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :insert, ~S|"value3"|}, - {%LogOffset{tx_offset: 4, op_offset: 1}, "key1", :update, ~S|"value new 1"|}, - {%LogOffset{tx_offset: 5, op_offset: 1}, "key1", :update, ~S|"value new 2"|}, - {%LogOffset{tx_offset: 6, op_offset: 1}, "key1", :update, ~S|"value new 3"|}, - {%LogOffset{tx_offset: 7, op_offset: 1}, "key1", :update, ~S|"value new 4"|}, - {%LogOffset{tx_offset: 8, op_offset: 1}, "key1", :update, ~S|"value new 5"|}, - {%LogOffset{tx_offset: 9, op_offset: 1}, "key2", :delete, ~S|"value"|} + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, + ~S|{"value":"v1","headers":{"operation": "insert", "txid":123}}|}, + {%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update, + ~S|{"value":"v2","headers":{"operation": "update", "txid":124}}|}, + {%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :delete, + ~S|{"value":"v3","headers":{"operation": "delete", "txid":125}}|} ] paths = LogFile.write_log_file(log_stream, log_file_path) + # First compaction + paths = Compaction.compact_in_place(paths, 1_000_000) + + # Verify first compaction removed txid from insert/delete but left update untouched + entries = LogFile.read_chunk(paths, LogOffset.first()) |> Enum.to_list() + + assert [ + %{"operation" => "insert"}, + %{"operation" => "update"}, + %{"operation" => "delete"} + ] == Enum.map(entries, &(Jason.decode!(&1) |> Map.get("headers"))) + + # Second compaction + paths = Compaction.compact_in_place(paths, 1_000_000) + + # Verify txid remains removed and JSON stays the same + reentries = LogFile.read_chunk(paths, LogOffset.first()) |> Enum.to_list() + assert entries == reentries + end + + test "compacts a log file", %{tmp_dir: tmp_dir} do + log_file_path = Path.join(tmp_dir, "log_file") + + log_stream = + [ + ins(offset: {1, 1}, rec: [id: "key1", value: "value1"]), + upd(offset: {2, 1}, rec: [id: "key1", value: {"value1", "value2"}]), + ins(offset: {3, 1}, rec: [id: "key2", value: "value3"]), + upd(offset: {4, 1}, rec: [id: "key1", value: {"value2", "value new 1"}]), + upd(offset: {5, 1}, rec: [id: "key1", value: {"value new 1", "value new 2"}]), + upd(offset: {6, 1}, rec: [id: "key1", value: {"value new 2", "value new 3"}]), + upd(offset: {7, 1}, rec: [id: "key1", value: {"value new 3", "value new 4"}]), + upd(offset: {8, 1}, rec: [id: "key1", value: {"value new 4", "value new 5"}]), + del(offset: {9, 1}, rec: [id: "key2", value: "value"]) + ] + |> TestUtils.changes_to_log_items() + + paths = LogFile.write_log_file(log_stream, log_file_path) + assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0}) |> Enum.to_list() |> length == 9 assert {log_file_path, chunk_index_path, key_index_path} = - Compaction.compact_in_place(paths, 1_000_000, &(&1 <> &2)) + Compaction.compact_in_place(paths, 1_000_000) assert File.exists?(log_file_path) assert File.exists?(chunk_index_path) diff --git a/packages/sync-service/test/support/test_utils.ex b/packages/sync-service/test/support/test_utils.ex index fdb3a2808e..5eb918a093 100644 --- a/packages/sync-service/test/support/test_utils.ex +++ b/packages/sync-service/test/support/test_utils.ex @@ -1,4 +1,5 @@ defmodule Support.TestUtils do + alias Electric.Replication.LogOffset alias Electric.ShapeCache.LogChunker alias Electric.LogItems alias Electric.Replication.Changes @@ -38,4 +39,43 @@ defmodule Support.TestUtils do def full_test_name(ctx) do "#{ctx.module} #{ctx.test}" end + + def ins(opts) do + offset = Keyword.fetch!(opts, :offset) |> LogOffset.new() + relation = Keyword.get(opts, :relation, {"public", "test_table"}) + record = Keyword.fetch!(opts, :rec) |> Map.new(fn {k, v} -> {to_string(k), to_string(v)} end) + %Changes.NewRecord{relation: relation, record: record, log_offset: offset} + end + + def del(opts) do + offset = Keyword.fetch!(opts, :offset) |> LogOffset.new() + relation = Keyword.get(opts, :relation, {"public", "test_table"}) + + old_record = + Keyword.fetch!(opts, :rec) |> Map.new(fn {k, v} -> {to_string(k), to_string(v)} end) + + %Changes.DeletedRecord{relation: relation, old_record: old_record, log_offset: offset} + end + + def upd(opts) do + offset = Keyword.fetch!(opts, :offset) |> LogOffset.new() + relation = Keyword.get(opts, :relation, {"public", "test_table"}) + + {old, new} = + Enum.reduce(Keyword.fetch!(opts, :rec), {%{}, %{}}, fn + {k, {old, new}}, {old_acc, new_acc} -> + {Map.put(old_acc, to_string(k), to_string(old)), + Map.put(new_acc, to_string(k), to_string(new))} + + {k, v}, {old, new} -> + {Map.put(old, to_string(k), to_string(v)), Map.put(new, to_string(k), to_string(v))} + end) + + Changes.UpdatedRecord.new( + relation: relation, + old_record: old, + record: new, + log_offset: offset + ) + end end