From 5f2cb9949dc2df16790995b39a1ae37f3e58a79c Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Wed, 29 Jan 2025 20:37:59 +0300 Subject: [PATCH] fix: invalid json while reading a concurrently written snapshot (#2267) Closes #2259 --- .changeset/modern-toes-drop.md | 5 ++ .../lib/electric/shape_cache/file_storage.ex | 22 +++--- .../shape_cache/file_storage_test.exs | 79 +++++++++++++++++++ 3 files changed, 97 insertions(+), 9 deletions(-) create mode 100644 .changeset/modern-toes-drop.md create mode 100644 packages/sync-service/test/electric/shape_cache/file_storage_test.exs diff --git a/.changeset/modern-toes-drop.md b/.changeset/modern-toes-drop.md new file mode 100644 index 0000000000..8331d28cf6 --- /dev/null +++ b/.changeset/modern-toes-drop.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +fix: ensure correct JSON formating when reading concurrently written snapshot diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex index 5726d70455..81c6eab9eb 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex @@ -317,8 +317,8 @@ defmodule Electric.ShapeCache.FileStorage do File.open!(snapshot_chunk_path(opts, chunk_number), [:write, :raw]) end - defp snapshot_chunk_path(opts, chunk_number) - when is_integer(chunk_number) and chunk_number >= 0 do + def snapshot_chunk_path(opts, chunk_number) + when is_integer(chunk_number) and chunk_number >= 0 do Path.join([opts.snapshot_dir, "snapshot_chunk.#{chunk_number}.jsonl"]) end @@ -378,8 +378,8 @@ defmodule Electric.ShapeCache.FileStorage do # This function raises if the chunk file doesn't exist. defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do Stream.resource( - fn -> {open_snapshot_chunk(opts, chunk_number), nil} end, - fn {file, eof_seen} -> + fn -> {open_snapshot_chunk(opts, chunk_number), nil, ""} end, + fn {file, eof_seen, incomplete_line} -> case IO.binread(file, :line) do {:error, reason} -> raise IO.StreamError, reason: reason @@ -388,7 +388,7 @@ defmodule Electric.ShapeCache.FileStorage do cond do is_nil(eof_seen) -> # First time we see eof after any valid lines, we store a timestamp - {[], {file, System.monotonic_time(:millisecond)}} + {[], {file, System.monotonic_time(:millisecond), incomplete_line}} # If it's been 60s without any new lines, and also we've not seen <<4>>, # then likely something is wrong @@ -398,18 +398,22 @@ defmodule Electric.ShapeCache.FileStorage do true -> # Sleep a little and check for new lines Process.sleep(20) - {[], {file, eof_seen}} + {[], {file, eof_seen, incomplete_line}} end # The 4 byte marker (ASCII "end of transmission") indicates the end of the snapshot file. <<4::utf8>> -> - {:halt, {file, nil}} + {:halt, {file, nil, ""}} line -> - {[line], {file, nil}} + if binary_slice(line, -1, 1) == "\n" do + {[incomplete_line <> line], {file, nil, ""}} + else + {[], {file, nil, incomplete_line <> line}} + end end end, - fn {file, _} -> File.close(file) end + &File.close(elem(&1, 0)) ) end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage_test.exs new file mode 100644 index 0000000000..5a19a693cb --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage_test.exs @@ -0,0 +1,79 @@ +defmodule Electric.ShapeCache.FileStorageTest do + use ExUnit.Case, async: true + import Support.ComponentSetup + alias Electric.Replication.LogOffset + alias Electric.ShapeCache.FileStorage + + @moduletag :tmp_dir + + @shape_handle "the-shape-handle" + + setup :with_stack_id_from_test + + setup %{tmp_dir: tmp_dir, stack_id: stack_id} do + opts = + FileStorage.shared_opts( + db: String.to_atom("shape_mixed_disk_#{stack_id}"), + storage_dir: tmp_dir, + stack_id: stack_id + ) + + shape_opts = FileStorage.for_shape(@shape_handle, opts) + {:ok, pid} = FileStorage.start_link(shape_opts) + {:ok, %{opts: shape_opts, shared_opts: opts, pid: pid, storage: {FileStorage, shape_opts}}} + end + + test "returns complete snapshot when writes are partially complete", %{ + opts: opts + } do + row_count = 10 + + data_stream = + for i <- 1..row_count, into: "" do + Jason.encode!(%{ + offset: "0_0", + value: %{id: "00000000-0000-0000-0000-00000000000#{i}", title: "row#{i}"}, + key: ~S|"public"."the-table"/"00000000-0000-0000-0000-00000000000#{i}"|, + headers: %{operation: "insert"} + }) <> "\n" + end + + FileStorage.mark_snapshot_as_started(opts) + stream = FileStorage.get_log_stream(LogOffset.before_all(), LogOffset.first(), opts) + + read_task = + Task.async(fn -> + log = Enum.to_list(stream) + + assert Enum.count(log) == row_count + + for {item, i} <- Enum.with_index(log, 1) do + assert Jason.decode!(item, keys: :atoms).value.title == "row#{i}" + end + end) + + File.open!(FileStorage.snapshot_chunk_path(opts, 0), [:write, :raw], fn file -> + data_stream + |> chunk_string_every(10) + |> Stream.each(fn chunk -> + IO.binwrite(file, chunk) + # Sync-write to file to ensure the concurrent reader sees this "incomplete" line + :file.sync(file) + Process.sleep(1) + end) + |> Stream.run() + + # Write EOF marker + IO.binwrite(file, <<4::utf8>>) + :file.sync(file) + end) + + Task.await(read_task) + end + + defp chunk_string_every(string, every) do + string + |> Stream.unfold(&String.split_at(&1, every)) + |> Stream.take_while(&(&1 != "")) + end +end