diff --git a/.changeset/modern-toes-drop.md b/.changeset/modern-toes-drop.md
new file mode 100644
index 0000000000..8331d28cf6
--- /dev/null
+++ b/.changeset/modern-toes-drop.md
@@ -0,0 +1,5 @@
+---
+"@core/sync-service": patch
+---
+
+fix: ensure correct JSON formating when reading concurrently written snapshot
diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex
index 5726d70455..81c6eab9eb 100644
--- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex
+++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex
@@ -317,8 +317,8 @@ defmodule Electric.ShapeCache.FileStorage do
     File.open!(snapshot_chunk_path(opts, chunk_number), [:write, :raw])
   end
 
-  defp snapshot_chunk_path(opts, chunk_number)
-       when is_integer(chunk_number) and chunk_number >= 0 do
+  def snapshot_chunk_path(opts, chunk_number)
+      when is_integer(chunk_number) and chunk_number >= 0 do
     Path.join([opts.snapshot_dir, "snapshot_chunk.#{chunk_number}.jsonl"])
   end
 
@@ -378,8 +378,8 @@ defmodule Electric.ShapeCache.FileStorage do
   # This function raises if the chunk file doesn't exist.
   defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do
     Stream.resource(
-      fn -> {open_snapshot_chunk(opts, chunk_number), nil} end,
-      fn {file, eof_seen} ->
+      fn -> {open_snapshot_chunk(opts, chunk_number), nil, ""} end,
+      fn {file, eof_seen, incomplete_line} ->
         case IO.binread(file, :line) do
           {:error, reason} ->
             raise IO.StreamError, reason: reason
@@ -388,7 +388,7 @@ defmodule Electric.ShapeCache.FileStorage do
             cond do
               is_nil(eof_seen) ->
                 # First time we see eof after any valid lines, we store a timestamp
-                {[], {file, System.monotonic_time(:millisecond)}}
+                {[], {file, System.monotonic_time(:millisecond), incomplete_line}}
 
               # If it's been 60s without any new lines, and also we've not seen <<4>>,
               # then likely something is wrong
@@ -398,18 +398,22 @@ defmodule Electric.ShapeCache.FileStorage do
               true ->
                 # Sleep a little and check for new lines
                 Process.sleep(20)
-                {[], {file, eof_seen}}
+                {[], {file, eof_seen, incomplete_line}}
             end
 
           # The 4 byte marker (ASCII "end of transmission") indicates the end of the snapshot file.
           <<4::utf8>> ->
-            {:halt, {file, nil}}
+            {:halt, {file, nil, ""}}
 
           line ->
-            {[line], {file, nil}}
+            if binary_slice(line, -1, 1) == "\n" do
+              {[incomplete_line <> line], {file, nil, ""}}
+            else
+              {[], {file, nil, incomplete_line <> line}}
+            end
         end
       end,
-      fn {file, _} -> File.close(file) end
+      &File.close(elem(&1, 0))
     )
   end
 
diff --git a/packages/sync-service/test/electric/shape_cache/file_storage_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage_test.exs
new file mode 100644
index 0000000000..5a19a693cb
--- /dev/null
+++ b/packages/sync-service/test/electric/shape_cache/file_storage_test.exs
@@ -0,0 +1,79 @@
+defmodule Electric.ShapeCache.FileStorageTest do
+  use ExUnit.Case, async: true
+  import Support.ComponentSetup
+  alias Electric.Replication.LogOffset
+  alias Electric.ShapeCache.FileStorage
+
+  @moduletag :tmp_dir
+
+  @shape_handle "the-shape-handle"
+
+  setup :with_stack_id_from_test
+
+  setup %{tmp_dir: tmp_dir, stack_id: stack_id} do
+    opts =
+      FileStorage.shared_opts(
+        db: String.to_atom("shape_mixed_disk_#{stack_id}"),
+        storage_dir: tmp_dir,
+        stack_id: stack_id
+      )
+
+    shape_opts = FileStorage.for_shape(@shape_handle, opts)
+    {:ok, pid} = FileStorage.start_link(shape_opts)
+    {:ok, %{opts: shape_opts, shared_opts: opts, pid: pid, storage: {FileStorage, shape_opts}}}
+  end
+
+  test "returns complete snapshot when writes are partially complete", %{
+    opts: opts
+  } do
+    row_count = 10
+
+    data_stream =
+      for i <- 1..row_count, into: "" do
+        Jason.encode!(%{
+          offset: "0_0",
+          value: %{id: "00000000-0000-0000-0000-00000000000#{i}", title: "row#{i}"},
+          key: ~S|"public"."the-table"/"00000000-0000-0000-0000-00000000000#{i}"|,
+          headers: %{operation: "insert"}
+        }) <> "\n"
+      end
+
+    FileStorage.mark_snapshot_as_started(opts)
+    stream = FileStorage.get_log_stream(LogOffset.before_all(), LogOffset.first(), opts)
+
+    read_task =
+      Task.async(fn ->
+        log = Enum.to_list(stream)
+
+        assert Enum.count(log) == row_count
+
+        for {item, i} <- Enum.with_index(log, 1) do
+          assert Jason.decode!(item, keys: :atoms).value.title == "row#{i}"
+        end
+      end)
+
+    File.open!(FileStorage.snapshot_chunk_path(opts, 0), [:write, :raw], fn file ->
+      data_stream
+      |> chunk_string_every(10)
+      |> Stream.each(fn chunk ->
+        IO.binwrite(file, chunk)
+        # Sync-write to file to ensure the concurrent reader sees this "incomplete" line
+        :file.sync(file)
+        Process.sleep(1)
+      end)
+      |> Stream.run()
+
+      # Write EOF marker
+      IO.binwrite(file, <<4::utf8>>)
+      :file.sync(file)
+    end)
+
+    Task.await(read_task)
+  end
+
+  defp chunk_string_every(string, every) do
+    string
+    |> Stream.unfold(&String.split_at(&1, every))
+    |> Stream.take_while(&(&1 != ""))
+  end
+end