Skip to content

Commit

Permalink
fix: invalid json while reading a concurrently written snapshot (#2267)
Browse files Browse the repository at this point in the history
Closes #2259
  • Loading branch information
icehaunter authored Jan 29, 2025
1 parent b801999 commit 5f2cb99
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/modern-toes-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

fix: ensure correct JSON formating when reading concurrently written snapshot
22 changes: 13 additions & 9 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ defmodule Electric.ShapeCache.FileStorage do
File.open!(snapshot_chunk_path(opts, chunk_number), [:write, :raw])
end

defp snapshot_chunk_path(opts, chunk_number)
when is_integer(chunk_number) and chunk_number >= 0 do
def snapshot_chunk_path(opts, chunk_number)
when is_integer(chunk_number) and chunk_number >= 0 do
Path.join([opts.snapshot_dir, "snapshot_chunk.#{chunk_number}.jsonl"])
end

Expand Down Expand Up @@ -378,8 +378,8 @@ defmodule Electric.ShapeCache.FileStorage do
# This function raises if the chunk file doesn't exist.
defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do
Stream.resource(
fn -> {open_snapshot_chunk(opts, chunk_number), nil} end,
fn {file, eof_seen} ->
fn -> {open_snapshot_chunk(opts, chunk_number), nil, ""} end,
fn {file, eof_seen, incomplete_line} ->
case IO.binread(file, :line) do
{:error, reason} ->
raise IO.StreamError, reason: reason
Expand All @@ -388,7 +388,7 @@ defmodule Electric.ShapeCache.FileStorage do
cond do
is_nil(eof_seen) ->
# First time we see eof after any valid lines, we store a timestamp
{[], {file, System.monotonic_time(:millisecond)}}
{[], {file, System.monotonic_time(:millisecond), incomplete_line}}

# If it's been 60s without any new lines, and also we've not seen <<4>>,
# then likely something is wrong
Expand All @@ -398,18 +398,22 @@ defmodule Electric.ShapeCache.FileStorage do
true ->
# Sleep a little and check for new lines
Process.sleep(20)
{[], {file, eof_seen}}
{[], {file, eof_seen, incomplete_line}}
end

# The 4 byte marker (ASCII "end of transmission") indicates the end of the snapshot file.
<<4::utf8>> ->
{:halt, {file, nil}}
{:halt, {file, nil, ""}}

line ->
{[line], {file, nil}}
if binary_slice(line, -1, 1) == "\n" do
{[incomplete_line <> line], {file, nil, ""}}
else
{[], {file, nil, incomplete_line <> line}}
end
end
end,
fn {file, _} -> File.close(file) end
&File.close(elem(&1, 0))
)
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Electric.ShapeCache.FileStorageTest do
use ExUnit.Case, async: true
import Support.ComponentSetup
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.FileStorage

@moduletag :tmp_dir

@shape_handle "the-shape-handle"

setup :with_stack_id_from_test

setup %{tmp_dir: tmp_dir, stack_id: stack_id} do
opts =
FileStorage.shared_opts(
db: String.to_atom("shape_mixed_disk_#{stack_id}"),
storage_dir: tmp_dir,
stack_id: stack_id
)

shape_opts = FileStorage.for_shape(@shape_handle, opts)
{:ok, pid} = FileStorage.start_link(shape_opts)
{:ok, %{opts: shape_opts, shared_opts: opts, pid: pid, storage: {FileStorage, shape_opts}}}
end

test "returns complete snapshot when writes are partially complete", %{
opts: opts
} do
row_count = 10

data_stream =
for i <- 1..row_count, into: "" do
Jason.encode!(%{
offset: "0_0",
value: %{id: "00000000-0000-0000-0000-00000000000#{i}", title: "row#{i}"},
key: ~S|"public"."the-table"/"00000000-0000-0000-0000-00000000000#{i}"|,
headers: %{operation: "insert"}
}) <> "\n"
end

FileStorage.mark_snapshot_as_started(opts)
stream = FileStorage.get_log_stream(LogOffset.before_all(), LogOffset.first(), opts)

read_task =
Task.async(fn ->
log = Enum.to_list(stream)

assert Enum.count(log) == row_count

for {item, i} <- Enum.with_index(log, 1) do
assert Jason.decode!(item, keys: :atoms).value.title == "row#{i}"
end
end)

File.open!(FileStorage.snapshot_chunk_path(opts, 0), [:write, :raw], fn file ->
data_stream
|> chunk_string_every(10)
|> Stream.each(fn chunk ->
IO.binwrite(file, chunk)
# Sync-write to file to ensure the concurrent reader sees this "incomplete" line
:file.sync(file)
Process.sleep(1)
end)
|> Stream.run()

# Write EOF marker
IO.binwrite(file, <<4::utf8>>)
:file.sync(file)
end)

Task.await(read_task)
end

defp chunk_string_every(string, every) do
string
|> Stream.unfold(&String.split_at(&1, every))
|> Stream.take_while(&(&1 != ""))
end
end

0 comments on commit 5f2cb99

Please sign in to comment.