diff --git a/.changeset/brown-dodos-raise.md b/.changeset/brown-dodos-raise.md new file mode 100644 index 0000000000..638effe9de --- /dev/null +++ b/.changeset/brown-dodos-raise.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +feat: add compaction flag to Electric that allows `UPDATE` events within a shape to be compacted to save space and bandwidth diff --git a/packages/sync-service/lib/electric/log_items.ex b/packages/sync-service/lib/electric/log_items.ex index 9f47f3c8b7..607674e080 100644 --- a/packages/sync-service/lib/electric/log_items.ex +++ b/packages/sync-service/lib/electric/log_items.ex @@ -126,4 +126,13 @@ defmodule Electric.LogItems do |> Enum.zip() |> Map.new() end + + def merge_updates(u1, u2) do + %{ + "key" => u1["key"], + "offset" => u2["offset"], + "headers" => Map.take(u1["headers"], ["operation", "relation"]), + "value" => Map.merge(u1["value"], u2["value"]) + } + end end diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 441a224d27..3e207afe31 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -36,6 +36,8 @@ defmodule Electric.Plug.ServeShapePlug do import Ecto.Changeset alias Electric.Replication.LogOffset + @tmp_compaction_flag :experimental_compaction + @primary_key false embedded_schema do field(:table, :string) @@ -46,6 +48,7 @@ defmodule Electric.Plug.ServeShapePlug do field(:columns, :string) field(:shape_definition, :string) field(:replica, Ecto.Enum, values: [:default, :full], default: :default) + field(@tmp_compaction_flag, :boolean, default: false) end def validate(params, opts) do @@ -135,10 +138,17 @@ defmodule Electric.Plug.ServeShapePlug do where = fetch_field!(changeset, :where) columns = get_change(changeset, :columns, nil) replica = fetch_field!(changeset, :replica) + compaction_enabled? = fetch_field!(changeset, @tmp_compaction_flag) case Shapes.Shape.new( table, - opts ++ [where: where, columns: columns, replica: replica] + opts ++ + [ + where: where, + columns: columns, + replica: replica, + storage: %{compaction: if(compaction_enabled?, do: :enabled, else: :disabled)} + ] ) do {:ok, result} -> put_change(changeset, :shape_definition, result) @@ -655,7 +665,8 @@ defmodule Electric.Plug.ServeShapePlug do conn |> fetch_query_params() |> assign(:error_str, error_str) - |> end_telemetry_span() + + # |> end_telemetry_span() conn end diff --git a/packages/sync-service/lib/electric/shape_cache/compaction_runner.ex b/packages/sync-service/lib/electric/shape_cache/compaction_runner.ex new file mode 100644 index 0000000000..23a6bcaf12 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/compaction_runner.ex @@ -0,0 +1,49 @@ +defmodule Electric.ShapeCache.CompactionRunner do + use GenServer + + require Logger + + alias Electric.ShapeCache.Storage + + @schema NimbleOptions.new!( + stack_id: [type: :string, required: true], + shape_handle: [type: :string, required: true], + storage: [type: :mod_arg, required: true], + compaction_period: [type: :non_neg_integer, default: :timer.minutes(10)] + ) + + def start_link(opts) do + with {:ok, opts} <- NimbleOptions.validate(opts, @schema) do + GenServer.start_link(__MODULE__, opts, name: name(opts)) + end + end + + def name(opts) do + Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__, opts[:shape_handle]) + end + + @impl GenServer + def init(opts) do + clean_after_period(opts) + Process.set_label({:compaction_runner, opts[:stack_id], opts[:shape_handle]}) + Logger.metadata(stack_id: opts[:stack_id], shape_handle: opts[:shape_handle]) + {:ok, opts} + end + + @impl GenServer + def handle_info(:clean, opts) do + Logger.info("Triggering compaction for shape #{opts[:shape_handle]}") + clean_after_period(opts) + Storage.compact(opts[:storage]) + Logger.info("Compaction complete for shape #{opts[:shape_handle]}") + + {:noreply, opts} + end + + defp clean_after_period(opts) do + # add a large random jitter to avoid all compactions happening at the same time + half_period = div(opts[:compaction_period], 2) + next_msg = opts[:compaction_period] + Enum.random(-half_period..half_period) + Process.send_after(self(), :clean, next_msg) + end +end diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex index 368480cc0a..2ee4af8da3 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do use Retry require Logger + alias Electric.ShapeCache.LogChunker alias Electric.Telemetry.OpenTelemetry alias Electric.Replication.LogOffset import Electric.Replication.LogOffset, only: :macros @@ -17,6 +18,7 @@ defmodule Electric.ShapeCache.FileStorage do @xmin_key :snapshot_xmin @snapshot_meta_key :snapshot_meta @snapshot_started_key :snapshot_started + @compaction_info_key :compaction_info @behaviour Electric.ShapeCache.Storage @@ -27,8 +29,10 @@ defmodule Electric.ShapeCache.FileStorage do :data_dir, :cubdb_dir, :snapshot_dir, + :log_dir, :stack_id, :extra_opts, + :chunk_bytes_threshold, version: @version ] @@ -38,7 +42,12 @@ defmodule Electric.ShapeCache.FileStorage do storage_dir = Keyword.get(opts, :storage_dir, "./shapes") # Always scope the provided storage dir by stack id - %{base_path: Path.join(storage_dir, stack_id), stack_id: stack_id} + %{ + base_path: Path.join(storage_dir, stack_id), + stack_id: stack_id, + chunk_bytes_threshold: + Keyword.get(opts, :chunk_bytes_threshold, LogChunker.default_chunk_size_threshold()) + } end @impl Electric.ShapeCache.Storage @@ -59,8 +68,10 @@ defmodule Electric.ShapeCache.FileStorage do data_dir: data_dir, cubdb_dir: Path.join([data_dir, "cubdb"]), snapshot_dir: Path.join([data_dir, "snapshots"]), + log_dir: Path.join([data_dir, "log"]), stack_id: stack_id, - extra_opts: Map.get(opts, :extra_opts, %{}) + extra_opts: Map.get(opts, :extra_opts, %{}), + chunk_bytes_threshold: opts.chunk_bytes_threshold } end @@ -91,7 +102,8 @@ defmodule Electric.ShapeCache.FileStorage do defp initialise_filesystem(opts) do with :ok <- File.mkdir_p(opts.data_dir), :ok <- File.mkdir_p(opts.cubdb_dir), - :ok <- File.mkdir_p(opts.snapshot_dir) do + :ok <- File.mkdir_p(opts.snapshot_dir), + :ok <- File.mkdir_p(opts.log_dir) do :ok end end @@ -324,11 +336,21 @@ defmodule Electric.ShapeCache.FileStorage do @impl Electric.ShapeCache.Storage def append_to_log!(log_items, %FS{} = opts) do + compaction_boundary = get_compaction_boundary(opts) + retry with: linear_backoff(50, 2) |> expiry(5_000) do log_items - |> Enum.map(fn - {:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil} - {offset, json_log_item} -> {log_key(offset), json_log_item} + |> Enum.flat_map(fn + {:chunk_boundary, offset} -> + [{chunk_checkpoint_key(offset), nil}] + + # We have definitely seen this, but it's not going to be in CubDB after compaction, + # so instead of idempotent insert we just ignore. + {offset, _, _, _} when is_log_offset_lt(offset, compaction_boundary) -> + [] + + {offset, key, op_type, json_log_item} -> + [{log_key(offset), {key, op_type, json_log_item}}] end) |> then(&CubDB.put_multi(opts.db, &1)) else @@ -375,6 +397,107 @@ defmodule Electric.ShapeCache.FileStorage do def get_log_stream(%LogOffset{} = offset, max_offset, %FS{} = opts), do: stream_log_chunk(offset, max_offset, opts) + def compact(%FS{} = opts) do + CubDB.select(opts.db, + min_key: chunk_checkpoint_start(), + max_key: chunk_checkpoint_end(), + reverse: true + ) + # Keep the last 2 chunks as-is so that anything that relies on the live stream and + # transactional information/LSNs always has something to work with. + |> Enum.take(3) + |> case do + [_, _, {key, _}] -> + compact(opts, offset(key)) + + _ -> + # Not enough chunks to warrant compaction + :ok + end + end + + def compact(%FS{} = opts, %LogOffset{} = upper_bound) do + # We consider log before the stored upper bound live & uncompacted. This means that concurrent readers + # will be able to read out everything they want while the compaction is happening and we're only + # atomically updating the pointer to the live portion. + + case CubDB.fetch(opts.db, @compaction_info_key) do + {:ok, {_, ^upper_bound}} -> + :ok + + {:ok, {old_log, _}} -> + # compact further + new_log_file_path = + Path.join( + opts.log_dir, + "compact_log_#{DateTime.utc_now() |> DateTime.to_unix(:millisecond)}.electric" + ) + + new_log = + CubDB.select(opts.db, + min_key: log_start(), + max_key: log_key(upper_bound), + max_key_inclusive: true + ) + |> Stream.map(fn {key, {op_key, op_type, json}} -> + {offset(key), op_key, op_type, json} + end) + |> FS.LogFile.write_log_file(new_log_file_path <> ".new") + + merged_log = + FS.Compaction.merge_and_compact( + old_log, + new_log, + new_log_file_path, + opts.chunk_bytes_threshold + ) + + CubDB.put(opts.db, @compaction_info_key, {merged_log, upper_bound}) + delete_compacted_keys(opts, upper_bound) + FS.Compaction.rm_log(new_log) + FS.Compaction.rm_log(old_log) + :ok + + :error -> + log_file_path = Path.join(opts.log_dir, "compact_log.electric") + + log = + CubDB.select(opts.db, + min_key: log_start(), + max_key: log_key(upper_bound), + max_key_inclusive: true + ) + |> Stream.map(fn {key, {op_key, op_type, json}} -> + {offset(key), op_key, op_type, json} + end) + |> FS.LogFile.write_log_file(log_file_path) + |> FS.Compaction.compact_in_place(opts.chunk_bytes_threshold) + + CubDB.put(opts.db, @compaction_info_key, {log, upper_bound}) + delete_compacted_keys(opts, upper_bound) + :ok + end + end + + defp delete_compacted_keys(%FS{} = opts, upper_bound) do + compacted_chunks = + CubDB.select(opts.db, + min_key: chunk_checkpoint_start(), + max_key: chunk_checkpoint_key(upper_bound), + max_key_inclusive: true + ) + |> Enum.map(fn {key, _} -> key end) + + compacted_logs = + CubDB.select(opts.db, + min_key: log_start(), + max_key: log_key(upper_bound) + ) + |> Enum.map(fn {key, _} -> key end) + + CubDB.delete_multi(opts.db, compacted_chunks ++ compacted_logs) + end + # This function raises if the chunk file doesn't exist. defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do Stream.resource( @@ -435,13 +558,27 @@ defmodule Electric.ShapeCache.FileStorage do end defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do - opts.db - |> CubDB.select( - min_key: log_key(offset), - max_key: log_key(max_offset), - min_key_inclusive: false - ) - |> Stream.map(fn {_, item} -> item end) + case CubDB.fetch(opts.db, @compaction_info_key) do + {:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) -> + FS.ChunkIndex.fetch_chunk(elem(log, 1), offset) + FS.LogFile.read_chunk(log, offset) + + _ -> + opts.db + |> CubDB.select( + min_key: log_key(offset), + max_key: log_key(max_offset), + min_key_inclusive: false + ) + |> Stream.map(fn {_, {_, _, json_log_item}} -> json_log_item end) + end + end + + defp get_compaction_boundary(%FS{} = opts) do + case CubDB.fetch(opts.db, @compaction_info_key) do + {:ok, {_, upper_bound}} -> upper_bound + :error -> LogOffset.first() + end end defp wait_for_chunk_file_or_snapshot_end( @@ -503,14 +640,21 @@ defmodule Electric.ShapeCache.FileStorage do def get_chunk_end_log_offset(offset, %FS{} = opts), do: get_chunk_end_for_log(offset, opts) defp get_chunk_end_for_log(offset, %FS{} = opts) do - CubDB.select(opts.db, - min_key: chunk_checkpoint_key(offset), - max_key: chunk_checkpoint_end(), - min_key_inclusive: false - ) - |> Stream.map(fn {key, _} -> offset(key) end) - |> Enum.take(1) - |> Enum.at(0) + case CubDB.fetch(opts.db, @compaction_info_key) do + {:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) -> + {:ok, max_offset, _} = FS.ChunkIndex.fetch_chunk(elem(log, 1), offset) + max_offset + + _ -> + CubDB.select(opts.db, + min_key: chunk_checkpoint_key(offset), + max_key: chunk_checkpoint_end(), + min_key_inclusive: false + ) + |> Stream.map(fn {key, _} -> offset(key) end) + |> Enum.take(1) + |> Enum.at(0) + end end defp get_last_snapshot_offset(%FS{} = opts) do diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/action_file.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/action_file.ex new file mode 100644 index 0000000000..92e0be7bb4 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/action_file.ex @@ -0,0 +1,143 @@ +defmodule Electric.ShapeCache.FileStorage.ActionFile do + @moduledoc false + alias Electric.Utils + alias Electric.ShapeCache.FileStorage.LogFile + alias Electric.ShapeCache.FileStorage.KeyIndex + import KeyIndex, only: :macros + + @doc """ + Convert a sorted key index to a sorted action file. + + Action file is line-for-line mapping of log file offsets to actions of "keep", "skip" or "compact". + It's ordering should be the same as the log file to allow for sequential reads of both. + + For "keep" lines, we keep the original, for "skip" lines, we skip the original, and for "compact" lines, + we read all specified JSONs from the log file and merge them into one. Multiple updates to the the same + key are mapped to be "skipped" for all but the last one, which is then mapped to "compact" + + Action file format is, in elixir binary: + + <> + + Where `operation_type` is one of: + + <> #- Keep + <> #- Skip + <> #- Compact + + And `json_offsets` is `json_offsets_count` of `<>` + """ + def create_from_key_index(key_index_path, action_file_path) do + KeyIndex.stream(key_index_path) + |> Stream.chunk_by(&key_index_item(&1, :key)) + |> Stream.flat_map(fn chunk -> + # Chunk contains all operations for a given key in order + + chunk + |> Enum.chunk_by(&key_index_item(&1, :op_type)) + |> Enum.flat_map(fn + # Keep any single operation, since inserts/deletes won't be duplicated, and one update can't be compacted + [key_index_item(offset: offset)] -> [<>] + # If more than one, then it's definitely an update + updates -> updates_to_actions(updates) + end) + end) + |> Stream.into(File.stream!(action_file_path)) + |> Stream.run() + + Utils.external_merge_sort(action_file_path, &stream_for_sorting/1) + end + + @doc """ + Read the action file and return a stream of tuples `{offset, action}`. + """ + @spec stream(path :: String.t()) :: + Enumerable.t( + {LogFile.offset(), + :keep | :skip | {:compact, [{non_neg_integer(), non_neg_integer()}, ...]}} + ) + def stream(action_file_path) do + Stream.resource( + fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end, + fn file -> + case IO.binread(file, 17) do + :eof -> + {:halt, file} + + <> -> + <> = IO.binread(file, 2) + offsets = for <>, do: {pos, size} + {[{{tx_offset, op_offset}, {:compact, offsets}}], file} + + <> -> + {[{{tx_offset, op_offset}, :keep}], file} + + <> -> + {[{{tx_offset, op_offset}, :skip}], file} + end + end, + &File.close/1 + ) + end + + # acc format: {positions_len, positions, actions} + defp updates_to_actions(updates, acc \\ {0, [], []}) + # We don't care about order being reversed because it's going to be sorted. + defp updates_to_actions([], {_, _, acc}), do: acc + + # The compaction target is either last one, or after we hit 65535 updates. Technically makes it suboptimal, + # but saves us a lot of memory because the position list will take up at most 65535 * 16 = 1048560 bytes ~ 1MB of memory, + # as opposed to 65536MB if we allow int32 positions. + defp updates_to_actions( + [key_index_item(offset: offset, json: last) | rest], + {total_positions, positions, actions} + ) + when rest == [] + when total_positions > 65534 do + actions = + [ + [ + <>, + Utils.list_reverse_map([last | positions], fn {pos, size} -> <> end) + ] + | actions + ] + + updates_to_actions(rest, {0, [], actions}) + end + + defp updates_to_actions( + [key_index_item(offset: offset, json: position) | rest], + {total_positions, all_positions, actions} + ) do + updates_to_actions( + rest, + {total_positions + 1, [position | all_positions], + [[<>] | actions]} + ) + end + + @spec stream_for_sorting(String.t()) :: + Enumerable.t(Utils.sortable_binary({non_neg_integer(), non_neg_integer()})) + defp stream_for_sorting(action_file_path) do + Stream.resource( + fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end, + fn file -> + case IO.binread(file, 17) do + :eof -> + {:halt, file} + + <> = line -> + <> = IO.binread(file, 2) + + {[{{tx_offset, op_offset}, line <> <> <> IO.binread(file, count * 16)}], + file} + + <> = line -> + {[{{tx_offset, op_offset}, line}], file} + end + end, + &File.close/1 + ) + end +end diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex new file mode 100644 index 0000000000..f0a5f9bad5 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/chunk_index.ex @@ -0,0 +1,118 @@ +defmodule Electric.ShapeCache.FileStorage.ChunkIndex do + @moduledoc false + + alias Electric.Replication.LogOffset + alias Electric.ShapeCache.LogChunker + alias Electric.Utils + alias Electric.ShapeCache.FileStorage.LogFile + + # 16 bytes offset + 8 bytes position + 16 bytes offset + 8 bytes position = 48 bytes + @chunk_entry_size 48 + + @doc """ + Write a chunk index from the stream of log items to the given path. + + A chunk index serves two purposes: it acts as a sparse index for the log file + and chunks are used to align client reads to benefit CDN cache hits. + + The format of the file is: + + <> + + Fixed byte width entries give us an opportunity to use binary search. + """ + @spec write_from_stream( + Enumerable.t(LogFile.log_item_with_sizes()), + path :: String.t(), + chunk_size :: non_neg_integer + ) :: Enumerable.t(LogFile.log_item_with_sizes()) + def write_from_stream(stream, path, chunk_size) do + Utils.stream_add_side_effect( + stream, + # agg is {file, write_position, byte_count, last_seen_offset} + fn -> {File.open!(path, [:write, :raw]), 0, 0, nil} end, + fn {offset, _, _, _, json_size, _} = line, + {file, write_position, byte_count, last_seen_offset} -> + # Start the chunk if there's no last offset + if is_nil(last_seen_offset), + do: IO.binwrite(file, <>) + + position_after_write = LogFile.expected_position(write_position, line) + + # We're counting bytes only on JSON payloads that are actually sent to the client + case LogChunker.fit_into_chunk(json_size, byte_count, chunk_size) do + {:ok, new_size} -> + {file, position_after_write, new_size, offset} + + {:threshold_exceeded, 0} -> + # Chunk ended, finish writing the entry + IO.binwrite(file, <>) + + {file, position_after_write, 0, nil} + end + end, + fn {file, pos, _, last_offset} = acc -> + # Finish writing the last entry if there is one + if not is_nil(last_offset), + do: IO.binwrite(file, <>) + + acc + end, + &File.close(elem(&1, 0)) + ) + end + + @doc """ + For a given chunk index, find the chunk that contains the first + offset greater than the given one. + + Returns the max offset of the found chunk and reading boundaries for the log file. + """ + @spec fetch_chunk(path :: String.t(), LogOffset.t()) :: + {:ok, max_offset :: LogOffset.t(), + {start_position :: non_neg_integer, end_position :: non_neg_integer}} + | :error + def fetch_chunk(chunk_file_path, %LogOffset{} = exclusive_min_offset) do + file = File.open!(chunk_file_path, [:read, :raw]) + {:ok, size} = :file.position(file, :eof) + + try do + case do_binary_search(file, 0, div(size, @chunk_entry_size) - 1, exclusive_min_offset) do + {:ok, max_offset, start_pos, end_pos} -> {:ok, max_offset, {start_pos, end_pos}} + nil -> :error + end + after + File.close(file) + end + end + + defp do_binary_search(file, left, right, %LogOffset{} = target) + when left <= right do + mid = div(left + right, 2) + + {:ok, <<_min_tx::64, _min_op::64, start_pos::64, max_tx::64, max_op::64, end_pos::64>>} = + :file.pread(file, mid * @chunk_entry_size, @chunk_entry_size) + + max_offset = LogOffset.new(max_tx, max_op) + + case {LogOffset.compare(target, max_offset), mid} do + {:lt, mid} when mid > 0 -> + # Target is less than max_offset, this chunk might be the answer + # but let's check if there's a better one in the left half + do_binary_search(file, left, mid - 1, target) || {:ok, max_offset, start_pos, end_pos} + + {:lt, _} -> + {:ok, max_offset, start_pos, end_pos} + + {_, mid} when mid < right -> + # Target is equal to / greater than max_offset, need to look in right half + do_binary_search(file, mid + 1, right, target) + + _ -> + # Target is greater than max_offset but we're at the end + nil + end + end + + defp do_binary_search(_file, _left, _right, _target), do: nil +end diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/compaction.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/compaction.ex new file mode 100644 index 0000000000..2267118649 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/compaction.ex @@ -0,0 +1,78 @@ +defmodule Electric.ShapeCache.FileStorage.Compaction do + alias Electric.LogItems + alias Electric.Utils + alias Electric.ShapeCache.LogChunker + alias Electric.ShapeCache.FileStorage.LogFile + alias Electric.ShapeCache.FileStorage.KeyIndex + alias Electric.ShapeCache.FileStorage.ActionFile + + # Compaction and race conditions + # + # `FileStorage` has a pointer to the last compacted offset (and the compacted log file name) + # which is updated atomically once the compaction is complete, so while it's ongoing, the + # pointer is not updated. + # + # While the log is compacted in place, it's actually a merged copy that's being + # compacted, not the original log. Original log is deleted after the compaction + # is complete and the pointer is updated. + # + # Any concurrent reads of the log that's being replaced are also OK: the `File.rename` + # on linux doesn't close the original file descriptor, so the reader will still see + # the original file, and we don't reuse file names. Any readers mid-file of the + # log that's being replaced but that read the chunk will continue from a correct chunk + # of the new file due to offset ordering being preserved. They might observe some updates + # more than once in a compacted form. + + @spec compact_in_place({String.t(), String.t(), String.t()}, non_neg_integer(), (any(), any() -> + any())) :: + {String.t(), String.t(), String.t()} + def compact_in_place( + {log_file_path, chunk_index_path, key_index_path}, + chunk_size \\ LogChunker.default_chunk_size_threshold(), + merge_fun \\ &LogItems.merge_updates/2 + ) do + KeyIndex.sort(key_index_path) + ActionFile.create_from_key_index(key_index_path, log_file_path <> ".actions") + + {new_log, new_chunk_index, new_key_index} = + LogFile.apply_actions(log_file_path, log_file_path <> ".actions", chunk_size, merge_fun) + + File.rm!(log_file_path <> ".actions") + File.rename!(new_log, log_file_path) + File.rename!(new_chunk_index, chunk_index_path) + File.rename!(new_key_index, key_index_path) + + {log_file_path, chunk_index_path, key_index_path} + end + + def merge_and_compact( + log1, + log2, + merged_log_path, + chunk_size \\ LogChunker.default_chunk_size_threshold() + ) do + {log_file_path1, _, key_index_path1} = log1 + {log_file_path2, _, key_index_path2} = log2 + + second_part_start = File.stat!(log_file_path1).size + Utils.concat_files([log_file_path1, log_file_path2], merged_log_path) + + KeyIndex.merge_with_offset( + key_index_path1, + key_index_path2, + merged_log_path <> ".key_index", + second_part_start + ) + + compact_in_place( + {merged_log_path, merged_log_path <> ".chunk_index", merged_log_path <> ".key_index"}, + chunk_size + ) + end + + def rm_log({log, chunk_index, key_index}) do + File.rm!(log) + File.rm!(chunk_index) + File.rm!(key_index) + end +end diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex new file mode 100644 index 0000000000..5ac36d3a15 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/key_index.ex @@ -0,0 +1,144 @@ +defmodule Electric.ShapeCache.FileStorage.KeyIndex do + @moduledoc false + alias Electric.Replication.LogOffset + alias Electric.Utils + alias Electric.ShapeCache.FileStorage.LogFile + + require Record + + @doc """ + Write an unsorted key index from the stream of log items to the given path. + + Key index maps the keys of operation to the offsets for further processing. + We care about sorted index maps, but it's easier to generate them on the fly + and sort them later. + + Key index format is, in elixir binary: + + <> + """ + @spec write_from_stream(Enumerable.t(LogFile.log_item_with_sizes()), path :: String.t()) :: + Enumerable.t(LogFile.log_item_with_sizes()) + def write_from_stream(stream, path) do + Utils.stream_add_side_effect( + stream, + # We're using delayed writes to avoid interfering with writing the log. Write size here is 64KB or 1s delay + # It's used here because we're writing a line per log line, so this reduces disk contention + fn -> {File.open!(path, [:write, :raw, {:delayed_write, 64 * 1024, 1000}]), 0} end, + fn {log_offset, key_size, key, op_type, json_size, _} = line, {file, write_position} -> + IO.binwrite( + file, + <> + ) + + {file, LogFile.expected_position(write_position, line)} + end, + &File.close(elem(&1, 0)) + ) + end + + Record.defrecord(:key_index_item, key: nil, op_type: nil, offset: nil, json: nil) + + @type key_index_item() :: + record(:key_index_item, + key: binary(), + op_type: LogFile.op_type(), + offset: LogOffset.t(), + json: {json_start_position :: non_neg_integer, json_size :: non_neg_integer} + ) + + @doc """ + Read a key index from the given path. + """ + @spec stream(path :: String.t()) :: Enumerable.t(key_index_item()) + def stream(path) do + Stream.resource( + fn -> File.open!(path, [:read, :raw, :read_ahead]) end, + fn file -> + with <> <- IO.binread(file, 4), + <> <- IO.binread(file, key_size), + <> <- + IO.binread(file, 8 * 4 + 1) do + item = + key_index_item( + key: key, + op_type: op_type, + offset: LogOffset.new(tx_offset, op_offset), + json: {json_start_position, json_size} + ) + + {[item], file} + else + :eof -> {:halt, file} + end + end, + &File.close/1 + ) + end + + @doc """ + Sort the key index file. + + Sorts alpha-numerically by key first and offset second, so within each + key the operations are sorted by offset. + + Uses an external merge sort to support large files, but requires + storage overhead while the sort is in-progress. Rewrites the original + file after the sort is complete. + """ + def sort(path) do + Utils.external_merge_sort(path, &stream_for_sorting/1, &<=/2) + end + + @spec stream_for_sorting(path :: String.t()) :: + Enumerable.t(Utils.sortable_binary({binary(), non_neg_integer(), non_neg_integer()})) + defp stream_for_sorting(path) do + Stream.resource( + fn -> File.open!(path, [:read, :raw, :read_ahead]) end, + fn file -> + with <> <- IO.binread(file, 4), + <> <- IO.binread(file, key_size), + <> <- + IO.binread(file, 17 + 8 + 8) do + full_line = + <> + + {[{{key, tx_offset, op_offset}, full_line}], file} + else + :eof -> {:halt, file} + end + end, + &File.close/1 + ) + end + + @doc """ + Merge two sorted key index files into a third file adjusting the positions of the second file by the given offset. + """ + @spec merge_with_offset( + path1 :: String.t(), + path2 :: String.t(), + output_path :: String.t(), + offset :: non_neg_integer() + ) :: :ok + def merge_with_offset(path1, path2, output_path, offset) do + File.cp!(path1, output_path) + + stream(path2) + |> Stream.map(fn key_index_item(json: {start_position, json_size}) = item -> + key_index_item(item, json: {start_position + offset, json_size}) + end) + |> Stream.map(&serialize_key_index_item/1) + |> Stream.into(File.stream!(output_path, [:append])) + |> Stream.run() + end + + defp serialize_key_index_item( + key_index_item(offset: offset, key: key, op_type: op_type, json: {pos, size}) + ) do + <> + end +end diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage/log_file.ex b/packages/sync-service/lib/electric/shape_cache/file_storage/log_file.ex new file mode 100644 index 0000000000..6768a929b7 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/file_storage/log_file.ex @@ -0,0 +1,277 @@ +defmodule Electric.ShapeCache.FileStorage.LogFile do + @moduledoc false + alias Electric.ShapeCache.FileStorage.KeyIndex + alias Electric.LogItems + alias Electric.ShapeCache.FileStorage.ActionFile + alias Electric.ShapeCache.FileStorage.ChunkIndex + alias Electric.Replication.LogOffset + alias Electric.ShapeCache.LogChunker + + # 16 bytes offset + 4 bytes key size + 1 byte op type + 8 bytes json size = 29 bytes + @line_overhead 16 + 4 + 1 + 8 + + @type operation_type() :: :insert | :update | :delete + @type op_type() :: ?u | ?i | ?d + # We're allowing tuple offsets to avoid struct creation in the hot path + @type offset() :: + {tx_offset :: non_neg_integer(), op_offset :: non_neg_integer()} | LogOffset.t() + + @typedoc "Log item that can be written to the log file" + @type normal_log_item() :: + {offset(), key :: String.t(), op_type :: operation_type(), json :: String.t()} + @typedoc """ + Log item that can be read from the log file, but with precomputed + `byte_size(key)` and `byte_size(json)` values, and with `op_type` as a byte + """ + @type log_item_with_sizes() :: + {offset(), key_size :: non_neg_integer(), key :: String.t(), op_type :: op_type(), + json_size :: non_neg_integer(), json :: String.t()} + @type log_item() :: normal_log_item() | log_item_with_sizes() + + @typedoc """ + Paths to the log file, chunk index, and key index files, used in conjuction + """ + @type log_and_supporting() :: + {log_file_path :: String.t(), chunk_index_path :: String.t(), + key_index_path :: String.t()} + + @doc """ + Write a log file based on the stream of log items. + + Writes 2 files: the log file itself and the chunk index alongside it. + + The log file structure is, in elixir binary: + + <> + """ + @spec write_log_file( + log_stream :: Enumerable.t(log_item()), + log_file_path :: String.t(), + chunk_size :: non_neg_integer() + ) :: log_and_supporting() + def write_log_file( + log_stream, + log_file_path, + chunk_size \\ LogChunker.default_chunk_size_threshold() + ) do + log_stream + |> normalize_log_stream() + |> ChunkIndex.write_from_stream(log_file_path <> ".chunk_index", chunk_size) + |> KeyIndex.write_from_stream(log_file_path <> ".key_index") + |> Stream.map(fn + {log_offset, key_size, key, op_type, json_size, json} -> + # avoid constructing a binary that includes the json + [ + <>, + json + ] + end) + |> Stream.into(File.stream!(log_file_path)) + |> Stream.run() + + {log_file_path, log_file_path <> ".chunk_index", log_file_path <> ".key_index"} + end + + @doc """ + Apply the compaction actions to the log file + """ + def apply_actions( + log_file_path, + action_file_path, + chunk_size \\ LogChunker.default_chunk_size_threshold(), + merge_updates_fun \\ &LogItems.merge_updates/2 + ) do + compacted_log_file_path = log_file_path <> ".compacted" + + ActionFile.stream(action_file_path) + |> Stream.transform( + fn -> File.open!(log_file_path, [:read, :raw, :read_ahead]) end, + fn + {_, :skip}, file -> + _ = read_line(file) + {[], file} + + {_, :keep}, file -> + {[read_line(file)], file} + + {_, {:compact, offsets}}, file -> + {[compact_log_file_lines(file, offsets, merge_updates_fun)], file} + end, + &File.close(&1) + ) + |> write_log_file(compacted_log_file_path, chunk_size) + end + + defp read_line(file) do + with <> <- IO.binread(file, 20), + <> <- IO.binread(file, key_size), + <> <- IO.binread(file, 9), + <> <- IO.binread(file, json_size) do + {{tx_offset, op_offset}, key_size, key, op_type, json_size, json} + end + end + + @spec compact_log_file_lines( + :file.io_device(), + [{position :: non_neg_integer(), size :: non_neg_integer()}], + (elem, elem -> elem) + ) :: log_item_with_sizes() + when elem: var + defp compact_log_file_lines(file, file_offsets, merge_updates_fun) do + # The line to be replaced with compaction will keep it's offset & key + {offset, key_size, key, op_type, _, _} = read_line(file) + + # Save position + {:ok, current_position} = :file.position(file, :cur) + + merged_json = + file_offsets + # Group reads to be efficient, but try to limit loading the JSONs to 10MB at a time. + # In the worst case when JSONs exceed 10MB, we'll just read one at a time. + |> chunk_expected_reads(bytes: 1024 * 1024 * 10) + |> Stream.flat_map(fn offsets -> + case :file.pread(file, offsets) do + {:ok, results} -> results + {:error, reason} -> raise inspect(reason) + :eof -> raise "unexpected end of file while reading back jsons from the log" + end + end) + |> Stream.map(&Jason.decode!/1) + |> Enum.reduce(fn new, acc -> merge_updates_fun.(acc, new) end) + |> Jason.encode!() + + # Restore position to continue reading in the outer loop + {:ok, _} = :file.position(file, {:bof, current_position}) + + {offset, key_size, key, op_type, byte_size(merged_json), merged_json} + end + + @doc """ + Normalize the log stream to have precomputed key and json sizes. + """ + @spec normalize_log_stream(Enumerable.t(log_item())) :: Enumerable.t(log_item_with_sizes()) + def normalize_log_stream(stream) do + Stream.map(stream, fn + {log_offset, key, op_type, json} -> + {log_offset, byte_size(key), key, get_op_type(op_type), byte_size(json), json} + + {_, _, _, _, _, _} = formed_line -> + formed_line + end) + end + + @spec chunk_expected_reads( + Enumerable.t({position :: non_neg_integer(), size :: non_neg_integer()}), + bytes: non_neg_integer() + ) :: Enumerable.t(list({position :: non_neg_integer(), size :: non_neg_integer()})) + defp chunk_expected_reads(stream, bytes: chunk_size) do + Stream.chunk_while( + stream, + {0, []}, + fn + {_, size} = item, {total_size, acc} when total_size > chunk_size -> + {:cont, Enum.reverse(acc), {size, [item]}} + + {_, size} = item, {total_size, acc} -> + {:cont, {total_size + size, [item | acc]}} + end, + fn + {_, []} -> {:cont, []} + {_, acc} -> {:cont, Enum.reverse(acc), []} + end + ) + end + + @doc """ + Get the expected byte position in the file after the given log item is written. + + Used by other modules that know the log file structure. + """ + @spec expected_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer() + def expected_position( + current_position, + {_log_offset, key_size, _key, _op_type, json_size, _json} + ) do + current_position + key_size + json_size + @line_overhead + end + + @doc """ + Get the expected byte position of the JSON for the given log item after it's written. + + Used by other modules that know the log file structure. + """ + @spec expected_json_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer() + def expected_json_position(current_position, {_, key_size, _, _, _, _}) do + current_position + key_size + @line_overhead + end + + @doc """ + Read a chunk of the log file from the given offset. + + Returns a stream of json strings. + """ + @spec read_chunk(log :: log_and_supporting(), LogOffset.t()) :: Enumerable.t(String.t()) + def read_chunk({log_file_path, chunk_index_path, _key_index_path}, %LogOffset{} = offset) do + case ChunkIndex.fetch_chunk(chunk_index_path, offset) do + {:ok, _max_offset, {start_position, end_position}} -> + stream_jsons(log_file_path, start_position, end_position, offset) + + :error -> + [] + end + end + + defp stream_jsons(log_file_path, start_position, end_position, exclusive_min_offset) do + # We can read ahead entire chunk into memory since chunk sizes are expected to be ~10MB by default, + file = File.open!(log_file_path, [:read, :raw]) + + try do + with {:ok, data} <- :file.pread(file, start_position, end_position - start_position) do + extract_jsons_from_binary(data, exclusive_min_offset) + else + :eof -> raise "unexpected end of file" + {:error, reason} -> raise "error reading file: #{inspect(reason)}" + end + after + File.close(file) + end + end + + @spec extract_jsons_from_binary(binary(), LogOffset.t()) :: Enumerable.t(String.t()) + defp extract_jsons_from_binary(binary, exclusive_min_offset, acc \\ []) + defp extract_jsons_from_binary(<<>>, _, acc), do: Enum.reverse(acc) + + defp extract_jsons_from_binary( + <>, + %LogOffset{ + tx_offset: tx_offset2, + op_offset: op_offset2 + } = log_offset, + acc + ) + when tx_offset1 < tx_offset2 or (tx_offset1 == tx_offset2 and op_offset1 <= op_offset2), + do: extract_jsons_from_binary(rest, log_offset, acc) + + defp extract_jsons_from_binary( + <<_::128, key_size::32, _::binary-size(key_size), _::binary-size(1), json_size::64, + json::binary-size(json_size), rest::binary>>, + log_offset, + acc + ), + do: extract_jsons_from_binary(rest, log_offset, [json | acc]) + + defp get_op_type(:insert), do: ?i + defp get_op_type(:update), do: ?u + defp get_op_type(:delete), do: ?d + + @doc "Serialize a non-infinite non-negative offset to a 16-byte binary" + @spec offset(offset()) :: binary + def offset(%LogOffset{tx_offset: tx_offset, op_offset: op_offset}), + do: <> + + def offset({tx_offset, op_offset}), do: <> +end diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index c3d394a701..0d776ae37a 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -287,8 +287,11 @@ defmodule Electric.ShapeCache.InMemoryStorage do log_items |> Enum.map(fn - {:chunk_boundary, offset} -> {storage_offset(offset), :checkpoint} - {offset, json_log_item} -> {{:offset, storage_offset(offset)}, json_log_item} + {:chunk_boundary, offset} -> + {storage_offset(offset), :checkpoint} + + {offset, _key, _op_type, json_log_item} -> + {{:offset, storage_offset(offset)}, json_log_item} end) |> Enum.split_with(fn item -> match?({_, :checkpoint}, item) end) |> then(fn {checkpoints, log_items} -> diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index a706360fa2..cd09debcc4 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -15,7 +15,11 @@ defmodule Electric.ShapeCache.Storage do @type storage :: {module(), compiled_opts()} @type shape_storage :: {module(), shape_opts()} - @type log_item :: {LogOffset.t(), Querying.json_iodata()} | {:chunk_boundary | LogOffset.t()} + @type operation_type :: :insert | :update | :delete + @type log_item :: + {LogOffset.t(), key :: String.t(), operation_type :: operation_type(), + Querying.json_iodata()} + | {:chunk_boundary | LogOffset.t()} @type log :: Enumerable.t(Querying.json_iodata()) @type row :: list() @@ -199,4 +203,9 @@ defmodule Electric.ShapeCache.Storage do def unsafe_cleanup!({mod, shape_opts}) do mod.unsafe_cleanup!(shape_opts) end + + def compact({mod, shape_opts}), do: mod.compact(shape_opts) + + def compact({mod, shape_opts}, offset) when is_struct(offset, LogOffset), + do: mod.compact(shape_opts, offset) end diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index ac21d128d8..fba26e261c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -449,15 +449,15 @@ defmodule Electric.Shapes.Consumer do item_byte_size = byte_size(json_log_item) state = %{state | current_txn_bytes: txn_bytes + item_byte_size} + line_tuple = {log_item.offset, log_item.key, log_item.headers.operation, json_log_item} case LogChunker.fit_into_chunk(item_byte_size, chunk_size, chunk_bytes_threshold) do {:ok, new_chunk_size} -> - {[{log_item.offset, json_log_item}], - %{state | current_chunk_byte_size: new_chunk_size}} + {[line_tuple], %{state | current_chunk_byte_size: new_chunk_size}} {:threshold_exceeded, new_chunk_size} -> { - [{log_item.offset, json_log_item}, {:chunk_boundary, log_item.offset}], + [line_tuple, {:chunk_boundary, log_item.offset}], %{state | current_chunk_byte_size: new_chunk_size} } end diff --git a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex index 4172be8135..54e9fed0cf 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex @@ -58,7 +58,7 @@ defmodule Electric.Shapes.ConsumerSupervisor do end def init(config) when is_map(config) do - %{shape_handle: shape_handle, storage: {_, _} = storage} = + %{shape_handle: shape_handle, storage: {_, _} = storage, shape: shape} = config Process.set_label({:consumer_supervisor, shape_handle}) @@ -76,6 +76,19 @@ defmodule Electric.Shapes.ConsumerSupervisor do {Electric.Shapes.Consumer.Snapshotter, shape_config} ] + children = + if should_enable_compaction?(shape, config) do + children ++ + [{Electric.ShapeCache.CompactionRunner, [{:storage, shape_storage} | metadata]}] + else + children + end + Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) end + + defp should_enable_compaction?(%{storage: %{compaction: :enabled}}, _config), do: true + defp should_enable_compaction?(%{storage: %{compaction: :disabled}}, _config), do: false + # Old shapes don't get compaction by default. + defp should_enable_compaction?(_, _), do: false end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index b6cd7badbc..36137fc713 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -15,6 +15,7 @@ defmodule Electric.Shapes.Shape do :table_info, :where, :selected_columns, + storage: %{compaction: :disabled}, replica: :default ] @@ -23,6 +24,9 @@ defmodule Electric.Shapes.Shape do columns: [Inspector.column_info(), ...], pk: [String.t(), ...] } + @type storage_config :: %{ + compaction: :enabled | :disabled + } @type t() :: %__MODULE__{ root_table: Electric.relation(), root_table_id: Electric.relation_id(), @@ -31,7 +35,8 @@ defmodule Electric.Shapes.Shape do }, where: Electric.Replication.Eval.Expr.t() | nil, selected_columns: [String.t(), ...] | nil, - replica: replica() + replica: replica(), + storage: storage_config() | nil } @type json_relation() :: [String.t(), ...] @@ -45,7 +50,8 @@ defmodule Electric.Shapes.Shape do table_info: [json_table_list(), ...] } - def hash(%__MODULE__{} = shape), do: shape |> Map.drop([:table_info]) |> :erlang.phash2() + def hash(%__MODULE__{} = shape), + do: shape |> Map.drop([:table_info, :storage]) |> :erlang.phash2() def generate_id(%__MODULE__{} = shape) do hash = hash(shape) @@ -74,6 +80,10 @@ defmodule Electric.Shapes.Shape do inspector: [ type: :mod_arg, default: {Electric.Postgres.Inspector, Electric.DbPool} + ], + storage: [ + type: {:or, [:map, nil]}, + default: nil ] ) def new(table, opts) do @@ -93,7 +103,8 @@ defmodule Electric.Shapes.Shape do table_info: %{table => %{pk: pk_cols, columns: column_info}}, where: where, selected_columns: selected_columns, - replica: Access.get(opts, :replica, :default) + replica: Access.get(opts, :replica, :default), + storage: Access.get(opts, :storage) }} end end @@ -294,31 +305,16 @@ defmodule Electric.Shapes.Shape do @spec to_json_safe(t()) :: json_safe() def to_json_safe(%__MODULE__{} = shape) do %{ - root_table: {schema, name}, - root_table_id: root_table_id, - where: where, - selected_columns: selected_columns, - table_info: table_info - } = shape - - query = - case where do - %{query: query} -> query - nil -> nil - end - - %{ - root_table: [schema, name], - root_table_id: root_table_id, - where: query, - selected_columns: selected_columns, + root_table: Tuple.to_list(shape.root_table), + root_table_id: shape.root_table_id, + where: if(shape.where, do: shape.where.query), + selected_columns: shape.selected_columns, + replica: shape.replica, + storage: shape.storage, table_info: - if(table_info, - do: - Enum.map(table_info, fn {{schema, name}, columns} -> - [[schema, name], json_safe_columns(columns)] - end) - ) + Enum.map(shape.table_info, fn {relation, columns} -> + [Tuple.to_list(relation), json_safe_columns(columns)] + end) } end @@ -335,15 +331,15 @@ defmodule Electric.Shapes.Shape do defp column_info_to_json_safe({k, v}), do: {k, v} @spec from_json_safe!(json_safe()) :: t() | no_return() - def from_json_safe!(map) do - %{ - "root_table" => [schema, name], - "root_table_id" => root_table_id, - "where" => where, - "selected_columns" => selected_columns, - "table_info" => info - } = map - + def from_json_safe!( + %{ + "root_table" => [schema, name], + "root_table_id" => root_table_id, + "where" => where, + "selected_columns" => selected_columns, + "table_info" => info + } = data + ) do table_info = Enum.reduce(info, %{}, fn [[schema, name], table_info], info -> %{"columns" => columns, "pk" => pk} = table_info @@ -363,10 +359,16 @@ defmodule Electric.Shapes.Shape do root_table_id: root_table_id, where: where, selected_columns: selected_columns, - table_info: table_info + table_info: table_info, + replica: String.to_atom(Map.get(data, "replica", "default")), + storage: storage_config_from_json(Map.get(data, "storage")) } end + defp storage_config_from_json(nil), do: %{compaction: :disabled} + defp storage_config_from_json(%{"compaction" => "enabled"}), do: %{compaction: :enabled} + defp storage_config_from_json(%{"compaction" => "disabled"}), do: %{compaction: :disabled} + defp column_info_from_json({"type_id", [id, mod]}), do: {:type_id, {id, mod}} defp column_info_from_json({"type", type}), do: {:type, String.to_atom(type)} defp column_info_from_json({key, value}), do: {String.to_atom(key), value} diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 69104ee773..95b396a63b 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -28,6 +28,7 @@ defmodule Electric.StackSupervisor do 2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2) 3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata """ + alias Electric.ShapeCache.LogChunker use Supervisor, restart: :transient @opts_schema NimbleOptions.new!( @@ -172,8 +173,15 @@ defmodule Electric.StackSupervisor do end @doc false - def storage_mod_arg(%{stack_id: stack_id, storage: {mod, arg}}) do - {mod, arg |> Keyword.put(:stack_id, stack_id) |> mod.shared_opts()} + def storage_mod_arg(%{stack_id: stack_id, storage: {mod, arg}} = opts) do + {mod, + arg + |> Keyword.put(:stack_id, stack_id) + |> Keyword.put( + :chunk_bytes_threshold, + opts[:chunk_bytes_threshold] || LogChunker.default_chunk_size_threshold() + ) + |> mod.shared_opts()} end def registry_name(stack_id) do diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index a14407b251..0260175006 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -149,6 +149,25 @@ defmodule Electric.Utils do Enum.reverse(max_values) end + @doc """ + Map each value of the enumerable using a mapper and reverse the resulting list. + + Equivalent to `Enum.reverse/1` followed by `Enum.map/2`. + + ## Examples + + iex> list_reverse_map([1, 2, 3], &(&1 + 1)) + [4, 3, 2] + """ + @spec list_reverse_map(Enumerable.t(elem), (elem -> result), list(result)) :: list(result) + when elem: var, result: var + def list_reverse_map(list, mapper, acc \\ []) + + def list_reverse_map([], _, acc), do: acc + + def list_reverse_map([head | tail], mapper, acc), + do: list_reverse_map(tail, mapper, [mapper.(head) | acc]) + @doc """ Parse a markdown table from a string @@ -308,4 +327,193 @@ defmodule Electric.Utils do def map_values(map, fun), do: Map.new(map, fn {k, v} -> {k, fun.(v)} end) defp wrap_in_fun(val), do: fn -> val end + + @doc """ + Merge a list of streams by taking the minimum element from each stream and emitting it and its + stream. The streams are compared using the given comparator function. + + ## Examples + + iex> merge_sorted_streams([[1, 2, 3], [2, 3, 4]]) |> Enum.to_list() + [1, 2, 2, 3, 3, 4] + + iex> merge_sorted_streams([[1, 2, 3], [4, 5, 6]]) |> Enum.to_list() + [1, 2, 3, 4, 5, 6] + + iex> merge_sorted_streams([[10], [4, 5, 6]]) |> Enum.to_list() + [4, 5, 6, 10] + """ + def merge_sorted_streams(streams, comparator \\ &<=/2, mapper \\ & &1) do + Stream.resource( + fn -> + Enum.flat_map(streams, fn stream -> + case Enum.take(stream, 1) do + [value] -> [{value, Stream.drop(stream, 1)}] + [] -> [] + end + end) + end, + fn + [] -> + {:halt, nil} + + values_and_streams -> + {val, stream} = Enum.min_by(values_and_streams, fn {value, _} -> value end, comparator) + + acc = + case Enum.take(stream, 1) do + [next_val] -> + List.keyreplace(values_and_streams, val, 0, {next_val, Stream.drop(stream, 1)}) + + [] -> + List.keydelete(values_and_streams, val, 0) + end + + {[mapper.(val)], acc} + end, + fn _ -> nil end + ) + end + + @doc """ + Open a file, retrying if it doesn't exist yet, up to `attempts_left` times, with 20ms delay between + attempts. + """ + @spec open_with_retry(path :: String.t(), opts :: [File.mode()]) :: :file.io_device() + def open_with_retry(path, opts, attempts_left \\ 100) when is_list(opts) do + case File.open(path, opts) do + {:ok, file} -> + file + + {:error, :enoent} -> + Process.sleep(20) + open_with_retry(path, opts, attempts_left - 1) + + {:error, reason} -> + raise IO.StreamError, reason: reason + end + end + + @type sortable_binary(key) :: {key :: key, data :: binary()} + + @doc """ + Performs external merge sort on a file. + + ## Parameters + * `path` - Path to the file to sort + * `reader` - Function that takes a file path and returns a stream of records. Records should be + in the form of `{key, binary}`, where `binary` will be written to the file sorted by `key`. + * `sorter` - Function that compares two keys, should return true if first argument is less than or equal to second + * `chunk_size` - Byte size of each chunk (i.e. how much is sorted in memory at once). Uses 50 MB by default. + + The function will: + 1. Split the input file into sorted temporary chunks + 2. Merge the sorted chunks back into the original file + """ + @spec external_merge_sort( + path :: String.t(), + reader :: (path :: String.t() -> Enumerable.t(sortable_binary(elem))), + sorter :: (elem, elem -> boolean()) + ) :: :ok + when elem: var + def external_merge_sort(path, reader, sorter \\ &<=/2, chunk_size \\ 50 * 1024 * 1024) do + tmp_dir = Path.join(System.tmp_dir!(), "external_sort_#{:erlang.system_time()}") + File.mkdir_p!(tmp_dir) + + try do + chunks = split_into_sorted_chunks(path, reader, sorter, tmp_dir, chunk_size) + merge_sorted_files(chunks, path, reader, sorter) + :ok + after + File.rm_rf!(tmp_dir) + end + end + + defp split_into_sorted_chunks(path, reader, sorter, tmp_dir, chunk_size) do + path + |> reader.() + |> chunk_by_size(chunk_size) + |> Stream.with_index() + |> Stream.map(fn {chunk, idx} -> + chunk_path = Path.join(tmp_dir, "chunk_#{idx}") + + chunk + |> Enum.sort(sorter) + |> Stream.map(fn {_, value} -> value end) + |> Stream.into(File.stream!(chunk_path)) + |> Stream.run() + + chunk_path + end) + |> Enum.to_list() + end + + @doc """ + Merge a list of sorted files into a single file. + + Uses a reader function that takes a path to a file and returns a stream of tuples `{key, binary}`, + where `binary` will be written to the file as sorted by `key`. + """ + def merge_sorted_files(paths, target_path, reader, sorter \\ &<=/2) + + def merge_sorted_files([path], target_path, _reader, _sorter) do + File.stream!(path) + |> Stream.into(File.stream!(target_path)) + |> Stream.run() + end + + def merge_sorted_files(paths, target_path, reader, sorter) do + paths + |> Enum.map(reader) + |> merge_sorted_streams(sorter, fn {_, binary} -> binary end) + |> Stream.into(File.stream!(target_path)) + |> Stream.run() + end + + defp chunk_by_size(stream, size) do + Stream.chunk_while( + stream, + {0, []}, + fn {_, value} = full_value, {acc_size, acc} -> + value_size = byte_size(value) + + if acc_size + value_size > size do + {:cont, Enum.reverse(acc), {0, [full_value]}} + else + {:cont, {acc_size + value_size, [full_value | acc]}} + end + end, + fn + {_, []} -> {:cont, []} + {_, acc} -> {:cont, Enum.reverse(acc), []} + end + ) + end + + def concat_files(paths, into) do + # `:file.copy` is not optimized to use a syscall, so basic stream forming is good enough + paths + |> Enum.map(&File.stream!/1) + |> Stream.concat() + |> Stream.into(File.stream!(into)) + |> Stream.run() + end + + @doc """ + Transform the stream to call a side-effect function for each element before continuing. + + Acts like `Stream.each/2` but with an aggregate. `start_fun`, `last_fun`, `after_fun` + have the same semantics as in `Stream.transform/5` + """ + def stream_add_side_effect(stream, start_fun, reducer, last_fun \\ & &1, after_fun \\ & &1) do + Stream.transform( + stream, + start_fun, + fn elem, acc -> + {[elem], reducer.(elem, acc)} + end, + fn acc -> {[], last_fun.(acc)} end, + after_fun + ) + end end diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index 4b0589aafc..415c9cb8cf 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -124,6 +124,45 @@ defmodule Electric.Plug.RouterTest do assert [%{"value" => %{"num" => "2"}}, _] = Jason.decode!(conn.resp_body) end + @tag with_sql: [ + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'test value 1')" + ] + test "GET after a compaction proceeds correctly", + %{opts: opts, db_conn: db_conn} do + conn = conn("GET", "/v1/shape?table=items&offset=-1") |> Router.call(opts) + assert [_] = Jason.decode!(conn.resp_body) + + for x <- 1..10 do + Postgrex.query!( + db_conn, + "UPDATE items SET value = 'test value #{x}' WHERE id = '00000000-0000-0000-0000-000000000001'", + [] + ) + end + + shape_handle = get_resp_shape_handle(conn) + + Process.sleep(500) + + conn = + conn("GET", "/v1/shape?table=items&handle=#{shape_handle}&offset=0_0&live") + |> Router.call(opts) + + assert length(Jason.decode!(conn.resp_body)) == 11 + {:ok, offset} = LogOffset.from_string(get_resp_header(conn, "electric-offset")) + + # Force compaction + Electric.ShapeCache.Storage.for_shape(shape_handle, opts[:storage]) + |> Electric.ShapeCache.Storage.compact(offset) + + conn = + conn("GET", "/v1/shape?table=items&handle=#{shape_handle}&offset=0_0") + |> Router.call(opts) + + assert [%{"value" => %{"value" => "test value 10"}}, _] = Jason.decode!(conn.resp_body) + assert LogOffset.from_string(get_resp_header(conn, "electric-offset")) == {:ok, offset} + end + @tag with_sql: [ "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')" ] diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/action_file_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/action_file_test.exs new file mode 100644 index 0000000000..96b35903ef --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage/action_file_test.exs @@ -0,0 +1,36 @@ +defmodule Electric.ShapeCache.FileStorage.ActionFileTest do + use ExUnit.Case, async: true + alias Electric.ShapeCache.FileStorage.ActionFile + # alias Electric.Replication.LogOffset + # alias Electric.ShapeCache.FileStorage.LogFile + + @moduletag :tmp_dir + + describe "stream/1" do + test "streams actions from file", %{tmp_dir: tmp_dir} do + action_file_path = Path.join(tmp_dir, "action_file") + + # Write test data in the correct binary format + actions = [ + # Keep action + <<1::64, 1::64, ?k::8>>, + # Skip action + <<2::64, 1::64, ?s::8>>, + # Compact action with one position + <<3::64, 1::64, ?c::8, 1::16, 0::64, 10::64>> + ] + + File.write!(action_file_path, Enum.join(actions)) + + # Test streaming + result = ActionFile.stream(action_file_path) |> Enum.to_list() + assert length(result) == 3 + + assert [ + {{1, 1}, :keep}, + {{2, 1}, :skip}, + {{3, 1}, {:compact, [{0, 10}]}} + ] = result + end + end +end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/chunk_index_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/chunk_index_test.exs new file mode 100644 index 0000000000..e18074f125 --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage/chunk_index_test.exs @@ -0,0 +1,49 @@ +defmodule Electric.ShapeCache.FileStorage.ChunkIndexTest do + use ExUnit.Case, async: true + alias Electric.ShapeCache.FileStorage.LogFile + alias Electric.ShapeCache.FileStorage.ChunkIndex + alias Electric.Replication.LogOffset + + @moduletag :tmp_dir + + describe "write_from_stream/3" do + test "writes a chunk index", %{tmp_dir: tmp_dir} do + chunk_index_path = Path.join(tmp_dir, "chunk_index") + + log_stream = + [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"}, + {%LogOffset{tx_offset: 3, op_offset: 3}, "key3", :insert, "value3"} + ] + |> LogFile.normalize_log_stream() + + refute File.exists?(chunk_index_path) + + ChunkIndex.write_from_stream(log_stream, chunk_index_path, 10) + |> Stream.run() + + assert File.exists?(chunk_index_path) + end + end + + describe "fetch_chunk/2" do + test "fetches a chunk by offset", %{tmp_dir: tmp_dir} do + chunk_index_path = Path.join(tmp_dir, "chunk_index") + + log_stream = + [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"} + ] + |> LogFile.normalize_log_stream() + + result_stream = ChunkIndex.write_from_stream(log_stream, chunk_index_path, 10) + # consume the stream to write the file + Enum.to_list(result_stream) + + result = ChunkIndex.fetch_chunk(chunk_index_path, %LogOffset{tx_offset: 0, op_offset: 0}) + assert match?({:ok, %LogOffset{}, {_, _}}, result) + end + end +end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs new file mode 100644 index 0000000000..3e6e4968be --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs @@ -0,0 +1,44 @@ +defmodule Electric.ShapeCache.FileStorage.CompactionTest do + use ExUnit.Case, async: true + alias Electric.ShapeCache.FileStorage.Compaction + alias Electric.ShapeCache.FileStorage.LogFile + alias Electric.Replication.LogOffset + + @moduletag :tmp_dir + + describe "compact_in_place/2" do + test "compacts a log file", %{tmp_dir: tmp_dir} do + log_file_path = Path.join(tmp_dir, "log_file") + + # Write initial log file with supporting files + log_stream = [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, ~S|"value1"|}, + {%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update, ~S|"value2"|}, + {%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :insert, ~S|"value3"|}, + {%LogOffset{tx_offset: 4, op_offset: 1}, "key1", :update, ~S|"value new 1"|}, + {%LogOffset{tx_offset: 5, op_offset: 1}, "key1", :update, ~S|"value new 2"|}, + {%LogOffset{tx_offset: 6, op_offset: 1}, "key1", :update, ~S|"value new 3"|}, + {%LogOffset{tx_offset: 7, op_offset: 1}, "key1", :update, ~S|"value new 4"|}, + {%LogOffset{tx_offset: 8, op_offset: 1}, "key1", :update, ~S|"value new 5"|}, + {%LogOffset{tx_offset: 9, op_offset: 1}, "key2", :delete, ~S|"value"|} + ] + + paths = LogFile.write_log_file(log_stream, log_file_path) + + assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0}) + |> Enum.to_list() + |> length == 9 + + assert {log_file_path, chunk_index_path, key_index_path} = + Compaction.compact_in_place(paths, 1_000_000, &(&1 <> &2)) + + assert File.exists?(log_file_path) + assert File.exists?(chunk_index_path) + assert File.exists?(key_index_path) + + assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0}) + |> Enum.to_list() + |> length == 4 + end + end +end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/key_index_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/key_index_test.exs new file mode 100644 index 0000000000..0a54933728 --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage/key_index_test.exs @@ -0,0 +1,50 @@ +defmodule Electric.ShapeCache.FileStorage.KeyIndexTest do + use ExUnit.Case, async: true + alias Electric.ShapeCache.FileStorage.KeyIndex + alias Electric.Replication.LogOffset + alias Electric.ShapeCache.FileStorage.LogFile + + @moduletag :tmp_dir + + describe "write_from_stream/2" do + test "writes key index from stream", %{tmp_dir: tmp_dir} do + key_index_path = Path.join(tmp_dir, "key_index") + + log_stream = + [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 2, op_offset: 1}, "key2", :insert, "value2"} + ] + |> LogFile.normalize_log_stream() + + refute File.exists?(key_index_path) + result_stream = KeyIndex.write_from_stream(log_stream, key_index_path) + assert is_function(result_stream) + + # Consume the stream to write the file + Enum.to_list(result_stream) + assert File.exists?(key_index_path) + end + end + + describe "stream/1" do + test "streams key index entries", %{tmp_dir: tmp_dir} do + key_index_path = Path.join(tmp_dir, "key_index") + + log_stream = + [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 2, op_offset: 1}, "key2", :insert, "value2"} + ] + |> LogFile.normalize_log_stream() + + result_stream = KeyIndex.write_from_stream(log_stream, key_index_path) + # consume the stream to write the file + Enum.to_list(result_stream) + + # Test streaming + result = KeyIndex.stream(key_index_path) |> Enum.to_list() + assert length(result) > 0 + end + end +end diff --git a/packages/sync-service/test/electric/shape_cache/file_storage/log_file_test.exs b/packages/sync-service/test/electric/shape_cache/file_storage/log_file_test.exs new file mode 100644 index 0000000000..f62eef984f --- /dev/null +++ b/packages/sync-service/test/electric/shape_cache/file_storage/log_file_test.exs @@ -0,0 +1,58 @@ +defmodule Electric.ShapeCache.FileStorage.LogFileTest do + use ExUnit.Case, async: true + alias Electric.ShapeCache.FileStorage.LogFile + alias Electric.Replication.LogOffset + + @moduletag :tmp_dir + + describe "write_log_file/2" do + test "writes a log file to disk", %{tmp_dir: tmp_dir} do + log_file_path = Path.join(tmp_dir, "log_file") + + log_stream = [ + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"}, + {%LogOffset{tx_offset: 3, op_offset: 3}, "key3", :insert, "value3"} + ] + + refute File.exists?(log_file_path) + + assert {^log_file_path, chunk_index_path, key_index_path} = + LogFile.write_log_file(log_stream, log_file_path) + + assert File.exists?(log_file_path) + assert File.exists?(chunk_index_path) + assert File.exists?(key_index_path) + + assert File.read!(log_file_path) =~ "value1" + assert File.read!(log_file_path) =~ "value2" + assert File.read!(log_file_path) =~ "value3" + end + end + + describe "read_chunk/2" do + test "reads a chunk from disk according to the log offset", %{tmp_dir: tmp_dir} do + log_file_path = Path.join(tmp_dir, "log_file") + + log_stream = [ + # Will be in chunk 1 + {%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"}, + {%LogOffset{tx_offset: 1, op_offset: 2}, "key2", :insert, "value2"}, + # Will be in chunk 2 + {%LogOffset{tx_offset: 2, op_offset: 1}, "key3", :insert, "value3"}, + {%LogOffset{tx_offset: 2, op_offset: 2}, "key4", :insert, "value4"}, + # Will be in chunk 3 + {%LogOffset{tx_offset: 3, op_offset: 1}, "key5", :insert, "value5"}, + {%LogOffset{tx_offset: 3, op_offset: 2}, "key6", :insert, "value6"} + ] + + refute File.exists?(log_file_path) + # 10-byte chunks + assert {^log_file_path, _, _} = + paths = LogFile.write_log_file(log_stream, log_file_path, 10) + + chunk = LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0}) + assert length(chunk) > 0 + end + end +end diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index 6663df24d4..ab17244004 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -423,6 +423,162 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do for module <- [FileStorage] do module_name = module |> Module.split() |> List.last() + describe "#{module_name}.compact/1" do + setup do + {:ok, %{module: unquote(module)}} + end + + setup :start_storage + + test "can compact operations within a shape", %{storage: storage} do + Storage.initialise(storage) + Storage.mark_snapshot_as_started(storage) + Storage.make_new_snapshot!([], storage) + + for i <- 1..10 do + %Changes.UpdatedRecord{ + relation: {"public", "test_table"}, + old_record: %{"id" => "sameid", "name" => "Test#{i - 1}"}, + record: %{"id" => "sameid", "name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 0), + changed_columns: MapSet.new(["name"]) + } + end + # Super small chunk size so that each update is its own chunk + |> changes_to_log_items(chunk_size: 5) + |> Storage.append_to_log!(storage) + + assert Storage.get_log_stream(LogOffset.first(), LogOffset.new(7, 0), storage) + |> Enum.to_list() + |> length() == 7 + + assert :ok = Storage.compact(storage) + + assert [line] = + Storage.get_log_stream(LogOffset.first(), LogOffset.new(7, 0), storage) + |> Enum.to_list() + + assert Jason.decode!(line, keys: :atoms) == %{ + offset: "8_0", + value: %{id: "sameid", name: "Test8"}, + key: ~S|"public"."test_table"/"sameid"|, + headers: %{operation: "update", relation: ["public", "test_table"]} + } + end + + test "compaction doesn't bridge deletes", %{storage: storage} do + Storage.initialise(storage) + Storage.mark_snapshot_as_started(storage) + Storage.make_new_snapshot!([], storage) + + for i <- 1..10 do + update = %Changes.UpdatedRecord{ + relation: {"public", "test_table"}, + old_record: %{"id" => "sameid", "name" => "Test#{i - 1}"}, + record: %{"id" => "sameid", "name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 0), + changed_columns: MapSet.new(["name"]) + } + + if i == 5 do + delete = %Changes.DeletedRecord{ + relation: {"public", "test_table"}, + old_record: %{"id" => "sameid", "name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 1) + } + + insert = %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "sameid", "name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 2) + } + + [update, delete, insert] + else + [update] + end + end + |> List.flatten() + # Super small chunk size so that each update is its own chunk + |> changes_to_log_items(chunk_size: 5) + |> Storage.append_to_log!(storage) + + assert Storage.get_log_stream(LogOffset.first(), LogOffset.new(7, 0), storage) + |> Enum.to_list() + |> length() == 9 + + assert :ok = Storage.compact(storage) + + assert [op1, op2, op3, op4] = + Storage.get_log_stream(LogOffset.first(), LogOffset.new(7, 0), storage) + |> Enum.to_list() + + assert %{value: %{name: "Test5"}} = Jason.decode!(op1, keys: :atoms) + assert %{headers: %{operation: "delete"}} = Jason.decode!(op2, keys: :atoms) + assert %{headers: %{operation: "insert"}} = Jason.decode!(op3, keys: :atoms) + + assert %{headers: %{operation: "update"}, value: %{name: "Test8"}} = + Jason.decode!(op4, keys: :atoms) + end + + test "compaction works multiple times", %{storage: storage} do + Storage.initialise(storage) + Storage.mark_snapshot_as_started(storage) + Storage.make_new_snapshot!([], storage) + + for i <- 1..10 do + %Changes.UpdatedRecord{ + relation: {"public", "test_table"}, + old_record: %{"id" => "sameid", "name" => "Test#{i - 1}"}, + record: %{"id" => "sameid", "name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 0), + changed_columns: MapSet.new(["name"]) + } + end + # Super small chunk size so that each update is its own chunk + |> changes_to_log_items(chunk_size: 5) + |> Storage.append_to_log!(storage) + + assert Storage.get_log_stream(LogOffset.first(), LogOffset.new(7, 0), storage) + |> Enum.to_list() + |> length() == 7 + + # Force compaction of all the lines + assert :ok = Storage.compact(storage, LogOffset.new(10, 0)) + + assert Storage.get_log_stream(LogOffset.first(), LogOffset.new(10, 0), storage) + |> Enum.to_list() + |> length() == 1 + + for i <- 10..20 do + %Changes.UpdatedRecord{ + relation: {"public", "test_table"}, + old_record: %{"id" => "sameid", "other_name" => "Test#{i - 1}"}, + record: %{"id" => "sameid", "other_name" => "Test#{i}"}, + log_offset: LogOffset.new(i, 0), + # Change the other column here to make sure previous are also included in the compaction + changed_columns: MapSet.new(["other_name"]) + } + end + # Super small chunk size so that each update is its own chunk + |> changes_to_log_items(chunk_size: 5) + |> Storage.append_to_log!(storage) + + assert :ok = Storage.compact(storage) + + assert [line] = + Storage.get_log_stream(LogOffset.first(), LogOffset.new(17, 0), storage) + |> Enum.to_list() + + assert Jason.decode!(line, keys: :atoms) == %{ + offset: "18_0", + value: %{id: "sameid", name: "Test10", other_name: "Test18"}, + key: ~S|"public"."test_table"/"sameid"|, + headers: %{operation: "update", relation: ["public", "test_table"]} + } + end + end + describe "#{module_name}.initialise/1" do setup do {:ok, %{module: unquote(module)}} @@ -575,7 +731,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do storage.initialise(shape_opts) storage.set_shape_definition(@shape, shape_opts) - assert 2274 = Electric.ShapeCache.Storage.get_total_disk_usage({storage, opts}) + assert 2330 = Electric.ShapeCache.Storage.get_total_disk_usage({storage, opts}) end end end diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 9f83d43031..047b97ef24 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -253,12 +253,12 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {^ref2, :new_changes, ^last_log_offset}, 1000 assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, - [{_offset, serialized_record}]} + [{_offset, _key, _type, serialized_record}]} assert %{"value" => %{"id" => "1"}} = Jason.decode!(serialized_record) assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, - [{_offset, serialized_record}]} + [{_offset, _key, _type, serialized_record}]} assert %{"value" => %{"id" => "2"}} = Jason.decode!(serialized_record) end diff --git a/packages/sync-service/test/electric/utils_test.exs b/packages/sync-service/test/electric/utils_test.exs index dcd1d8fa61..aa3a5526e4 100644 --- a/packages/sync-service/test/electric/utils_test.exs +++ b/packages/sync-service/test/electric/utils_test.exs @@ -2,4 +2,68 @@ defmodule Electric.UtilsTest do alias Electric.Utils use ExUnit.Case, async: true doctest Utils, import: true + + describe "external_merge_sort/4" do + @describetag :tmp_dir + + setup %{tmp_dir: tmp_dir, file_size: size} do + path = Path.join(tmp_dir, "test.txt") + + Stream.unfold(0, fn + bytes when bytes >= size -> + nil + + bytes -> + {<>, + bytes + 4 + 40} + end) + |> Stream.into(File.stream!(path)) + |> Stream.run() + + {:ok, %{path: path}} + end + + @tag file_size: 1_000 + test "sorts a file", %{path: path} do + refute stream_sorted?(stream_test_file(path)) + assert :ok = Utils.external_merge_sort(path, &stream_test_file/1, &<=/2) + assert stream_sorted?(stream_test_file(path)) + end + + @tag file_size: 10_000 + test "sorts a large file externally", %{path: path} do + refute stream_sorted?(stream_test_file(path)) + assert :ok = Utils.external_merge_sort(path, &stream_test_file/1, &<=/2, 1_000) + assert stream_sorted?(stream_test_file(path)) + end + end + + defp stream_test_file(path) do + Stream.resource( + fn -> File.open!(path) end, + fn file -> + case IO.binread(file, 44) do + <> -> + {[{key, <>}], file} + + :eof -> + {:halt, file} + end + end, + &File.close/1 + ) + end + + defp stream_sorted?(stream, mapper \\ & &1, comparator \\ &<=/2) do + Enum.reduce_while(stream, {true, nil}, fn value, {true, prev_value} -> + new_value = mapper.(value) + + cond do + is_nil(prev_value) -> {:cont, {true, new_value}} + comparator.(prev_value, new_value) -> {:cont, {true, new_value}} + true -> {:halt, {false, {prev_value, new_value}}} + end + end) + |> elem(0) + end end diff --git a/packages/sync-service/test/support/test_utils.ex b/packages/sync-service/test/support/test_utils.ex index dcc9d2c7b2..fdb3a2808e 100644 --- a/packages/sync-service/test/support/test_utils.ex +++ b/packages/sync-service/test/support/test_utils.ex @@ -1,4 +1,5 @@ defmodule Support.TestUtils do + alias Electric.ShapeCache.LogChunker alias Electric.LogItems alias Electric.Replication.Changes @@ -6,11 +7,28 @@ defmodule Support.TestUtils do Preprocess a list of `Changes.data_change()` structs in the same way they are preprocessed before reaching storage. """ - def changes_to_log_items(changes, pk \\ ["id"], xid \\ 1, replica \\ :default) do + def changes_to_log_items(changes, opts \\ []) do + pk = Keyword.get(opts, :pk, ["id"]) + xid = Keyword.get(opts, :xid, 1) + replica = Keyword.get(opts, :replica, :default) + chunk_size = Keyword.get(opts, :chunk_size, LogChunker.default_chunk_size_threshold()) + changes |> Enum.map(&Changes.fill_key(&1, pk)) |> Enum.flat_map(&LogItems.from_change(&1, xid, pk, replica)) - |> Enum.map(fn item -> {item.offset, Jason.encode!(item)} end) + |> Enum.map(fn item -> + {item.offset, item.key, item.headers.operation, Jason.encode!(item)} + end) + |> Enum.flat_map_reduce(0, fn {offset, _, _, json_log_item} = line, acc -> + case LogChunker.fit_into_chunk(byte_size(json_log_item), acc, chunk_size) do + {:ok, new_chunk_size} -> + {[line], new_chunk_size} + + {:threshold_exceeded, new_chunk_size} -> + {[line, {:chunk_boundary, offset}], new_chunk_size} + end + end) + |> elem(0) end def with_electric_instance_id(ctx) do