Skip to content

Commit

Permalink
feat: adds compaction (#2231)
Browse files Browse the repository at this point in the history
This PR introduces compaction for the FileStorage. Compaction follows
some rules:
1. Relative order preservation of insert/update/delete operations over
same row: a consumer cannot see an update before the insert
2. insert/delete operations are never compacted
3. updates to the same row are compacted into one
4. clients can continue reading from same offsets as before the
compaction and shouldn't see inserts/deletes they've already seen - they
may see updates they've already seen as part of another latter update
5. Live tail is not affected by compaction in order to preserve
idempotent inserts of already-seen transactions from Postgres

Compaction doesn't currently affect "live tail" storage - we're still
using CubDB for that, but compacted data is moved out of CubDB.

On-disk format for the log is 
```elixir
<<tx_offset::64, op_offset::64, key_size::32, key::binary-size(key_size), op_type::8, json_size::64, json::binary-size(json_size)>>
```

With a supporing chunk index
```elixir
<<start_tx_offset::64, start_op_offset::64, start_file_pos::64, end_tx_offset::64, end_op_offset::64, start_file_pos::64>>
```

that allows aligning reads for all clients and acts as a sparse index at
the same time - the client comes with the offset, we find the chunk to
serve them, and then serve only part of that chunk they've not seen,
same as we're doing right now
  • Loading branch information
icehaunter authored Jan 30, 2025
1 parent 79a7dce commit edfb9f3
Show file tree
Hide file tree
Showing 27 changed files with 1,809 additions and 74 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-dodos-raise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

feat: add compaction flag to Electric that allows `UPDATE` events within a shape to be compacted to save space and bandwidth
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ defmodule Electric.LogItems do
|> Enum.zip()
|> Map.new()
end

def merge_updates(u1, u2) do
%{
"key" => u1["key"],
"offset" => u2["offset"],
"headers" => Map.take(u1["headers"], ["operation", "relation"]),
"value" => Map.merge(u1["value"], u2["value"])
}
end
end
15 changes: 13 additions & 2 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ defmodule Electric.Plug.ServeShapePlug do
import Ecto.Changeset
alias Electric.Replication.LogOffset

@tmp_compaction_flag :experimental_compaction

@primary_key false
embedded_schema do
field(:table, :string)
Expand All @@ -46,6 +48,7 @@ defmodule Electric.Plug.ServeShapePlug do
field(:columns, :string)
field(:shape_definition, :string)
field(:replica, Ecto.Enum, values: [:default, :full], default: :default)
field(@tmp_compaction_flag, :boolean, default: false)
end

def validate(params, opts) do
Expand Down Expand Up @@ -135,10 +138,17 @@ defmodule Electric.Plug.ServeShapePlug do
where = fetch_field!(changeset, :where)
columns = get_change(changeset, :columns, nil)
replica = fetch_field!(changeset, :replica)
compaction_enabled? = fetch_field!(changeset, @tmp_compaction_flag)

case Shapes.Shape.new(
table,
opts ++ [where: where, columns: columns, replica: replica]
opts ++
[
where: where,
columns: columns,
replica: replica,
storage: %{compaction: if(compaction_enabled?, do: :enabled, else: :disabled)}
]
) do
{:ok, result} ->
put_change(changeset, :shape_definition, result)
Expand Down Expand Up @@ -655,7 +665,8 @@ defmodule Electric.Plug.ServeShapePlug do
conn
|> fetch_query_params()
|> assign(:error_str, error_str)
|> end_telemetry_span()

# |> end_telemetry_span()

conn
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Electric.ShapeCache.CompactionRunner do
use GenServer

require Logger

alias Electric.ShapeCache.Storage

@schema NimbleOptions.new!(
stack_id: [type: :string, required: true],
shape_handle: [type: :string, required: true],
storage: [type: :mod_arg, required: true],
compaction_period: [type: :non_neg_integer, default: :timer.minutes(10)]
)

def start_link(opts) do
with {:ok, opts} <- NimbleOptions.validate(opts, @schema) do
GenServer.start_link(__MODULE__, opts, name: name(opts))
end
end

def name(opts) do
Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__, opts[:shape_handle])
end

@impl GenServer
def init(opts) do
clean_after_period(opts)
Process.set_label({:compaction_runner, opts[:stack_id], opts[:shape_handle]})
Logger.metadata(stack_id: opts[:stack_id], shape_handle: opts[:shape_handle])
{:ok, opts}
end

@impl GenServer
def handle_info(:clean, opts) do
Logger.info("Triggering compaction for shape #{opts[:shape_handle]}")
clean_after_period(opts)
Storage.compact(opts[:storage])
Logger.info("Compaction complete for shape #{opts[:shape_handle]}")

{:noreply, opts}
end

defp clean_after_period(opts) do
# add a large random jitter to avoid all compactions happening at the same time
half_period = div(opts[:compaction_period], 2)
next_msg = opts[:compaction_period] + Enum.random(-half_period..half_period)
Process.send_after(self(), :clean, next_msg)
end
end
186 changes: 165 additions & 21 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do
use Retry
require Logger

alias Electric.ShapeCache.LogChunker
alias Electric.Telemetry.OpenTelemetry
alias Electric.Replication.LogOffset
import Electric.Replication.LogOffset, only: :macros
Expand All @@ -17,6 +18,7 @@ defmodule Electric.ShapeCache.FileStorage do
@xmin_key :snapshot_xmin
@snapshot_meta_key :snapshot_meta
@snapshot_started_key :snapshot_started
@compaction_info_key :compaction_info

@behaviour Electric.ShapeCache.Storage

Expand All @@ -27,8 +29,10 @@ defmodule Electric.ShapeCache.FileStorage do
:data_dir,
:cubdb_dir,
:snapshot_dir,
:log_dir,
:stack_id,
:extra_opts,
:chunk_bytes_threshold,
version: @version
]

Expand All @@ -38,7 +42,12 @@ defmodule Electric.ShapeCache.FileStorage do
storage_dir = Keyword.get(opts, :storage_dir, "./shapes")

# Always scope the provided storage dir by stack id
%{base_path: Path.join(storage_dir, stack_id), stack_id: stack_id}
%{
base_path: Path.join(storage_dir, stack_id),
stack_id: stack_id,
chunk_bytes_threshold:
Keyword.get(opts, :chunk_bytes_threshold, LogChunker.default_chunk_size_threshold())
}
end

@impl Electric.ShapeCache.Storage
Expand All @@ -59,8 +68,10 @@ defmodule Electric.ShapeCache.FileStorage do
data_dir: data_dir,
cubdb_dir: Path.join([data_dir, "cubdb"]),
snapshot_dir: Path.join([data_dir, "snapshots"]),
log_dir: Path.join([data_dir, "log"]),
stack_id: stack_id,
extra_opts: Map.get(opts, :extra_opts, %{})
extra_opts: Map.get(opts, :extra_opts, %{}),
chunk_bytes_threshold: opts.chunk_bytes_threshold
}
end

Expand Down Expand Up @@ -91,7 +102,8 @@ defmodule Electric.ShapeCache.FileStorage do
defp initialise_filesystem(opts) do
with :ok <- File.mkdir_p(opts.data_dir),
:ok <- File.mkdir_p(opts.cubdb_dir),
:ok <- File.mkdir_p(opts.snapshot_dir) do
:ok <- File.mkdir_p(opts.snapshot_dir),
:ok <- File.mkdir_p(opts.log_dir) do
:ok
end
end
Expand Down Expand Up @@ -324,11 +336,21 @@ defmodule Electric.ShapeCache.FileStorage do

@impl Electric.ShapeCache.Storage
def append_to_log!(log_items, %FS{} = opts) do
compaction_boundary = get_compaction_boundary(opts)

retry with: linear_backoff(50, 2) |> expiry(5_000) do
log_items
|> Enum.map(fn
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil}
{offset, json_log_item} -> {log_key(offset), json_log_item}
|> Enum.flat_map(fn
{:chunk_boundary, offset} ->
[{chunk_checkpoint_key(offset), nil}]

# We have definitely seen this, but it's not going to be in CubDB after compaction,
# so instead of idempotent insert we just ignore.
{offset, _, _, _} when is_log_offset_lt(offset, compaction_boundary) ->
[]

{offset, key, op_type, json_log_item} ->
[{log_key(offset), {key, op_type, json_log_item}}]
end)
|> then(&CubDB.put_multi(opts.db, &1))
else
Expand Down Expand Up @@ -375,6 +397,107 @@ defmodule Electric.ShapeCache.FileStorage do
def get_log_stream(%LogOffset{} = offset, max_offset, %FS{} = opts),
do: stream_log_chunk(offset, max_offset, opts)

def compact(%FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_end(),
reverse: true
)
# Keep the last 2 chunks as-is so that anything that relies on the live stream and
# transactional information/LSNs always has something to work with.
|> Enum.take(3)
|> case do
[_, _, {key, _}] ->
compact(opts, offset(key))

_ ->
# Not enough chunks to warrant compaction
:ok
end
end

def compact(%FS{} = opts, %LogOffset{} = upper_bound) do
# We consider log before the stored upper bound live & uncompacted. This means that concurrent readers
# will be able to read out everything they want while the compaction is happening and we're only
# atomically updating the pointer to the live portion.

case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {_, ^upper_bound}} ->
:ok

{:ok, {old_log, _}} ->
# compact further
new_log_file_path =
Path.join(
opts.log_dir,
"compact_log_#{DateTime.utc_now() |> DateTime.to_unix(:millisecond)}.electric"
)

new_log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, op_type, json}} ->
{offset(key), op_key, op_type, json}
end)
|> FS.LogFile.write_log_file(new_log_file_path <> ".new")

merged_log =
FS.Compaction.merge_and_compact(
old_log,
new_log,
new_log_file_path,
opts.chunk_bytes_threshold
)

CubDB.put(opts.db, @compaction_info_key, {merged_log, upper_bound})
delete_compacted_keys(opts, upper_bound)
FS.Compaction.rm_log(new_log)
FS.Compaction.rm_log(old_log)
:ok

:error ->
log_file_path = Path.join(opts.log_dir, "compact_log.electric")

log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, op_type, json}} ->
{offset(key), op_key, op_type, json}
end)
|> FS.LogFile.write_log_file(log_file_path)
|> FS.Compaction.compact_in_place(opts.chunk_bytes_threshold)

CubDB.put(opts.db, @compaction_info_key, {log, upper_bound})
delete_compacted_keys(opts, upper_bound)
:ok
end
end

defp delete_compacted_keys(%FS{} = opts, upper_bound) do
compacted_chunks =
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_key(upper_bound),
max_key_inclusive: true
)
|> Enum.map(fn {key, _} -> key end)

compacted_logs =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound)
)
|> Enum.map(fn {key, _} -> key end)

CubDB.delete_multi(opts.db, compacted_chunks ++ compacted_logs)
end

# This function raises if the chunk file doesn't exist.
defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do
Stream.resource(
Expand Down Expand Up @@ -435,13 +558,27 @@ defmodule Electric.ShapeCache.FileStorage do
end

defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, item} -> item end)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
FS.LogFile.read_chunk(log, offset)

_ ->
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, {_, _, json_log_item}} -> json_log_item end)
end
end

defp get_compaction_boundary(%FS{} = opts) do
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {_, upper_bound}} -> upper_bound
:error -> LogOffset.first()
end
end

defp wait_for_chunk_file_or_snapshot_end(
Expand Down Expand Up @@ -503,14 +640,21 @@ defmodule Electric.ShapeCache.FileStorage do
def get_chunk_end_log_offset(offset, %FS{} = opts), do: get_chunk_end_for_log(offset, opts)

defp get_chunk_end_for_log(offset, %FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
{:ok, max_offset, _} = FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
max_offset

_ ->
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
end
end

defp get_last_snapshot_offset(%FS{} = opts) do
Expand Down
Loading

0 comments on commit edfb9f3

Please sign in to comment.