Skip to content

Commit

Permalink
Document how insert_events.sql works
Browse files Browse the repository at this point in the history
  • Loading branch information
drteeth committed Jan 11, 2025
1 parent 0904d36 commit 362f087
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
<%#
# Elixir template variables:
# schema - string
# stream_id - integer
# number_of_events - integer
# Bind variables
# 1 - stream_id - integer
# 2 - stream_version - integer
# Additionally, we multiply the index by 9 for each event:
# 3 - event_id - uuid
# 4 - event_type - text
# 5 - causation_id - uuid
# 6 - correlation_id - uuid
# 7 - data - <serialized event data format (bytea, jsonb, etc.)>
# 8 - metadata - jsonb (always?)
# 9 - created_at - timestamp
# 10 - index - integer
# 11 - stream_version - integer
%>

WITH
<%#
# create a table variable with:
# event_id - uuid - the id for the new event
# index - integer - the increase in the stream version for any stream it is linked to
# stream_version - integer - the final stream version after all of the events have been inserted
%>
new_events_indexes (event_id, index, stream_version) AS (
VALUES
<%= for i <- 0..(number_of_events - 1) do %>
Expand All @@ -7,6 +35,11 @@ WITH
<% end %>
),
events AS (
<%
# insert the new events into the events table
# using the 7 bind variables from 3 to 9 inclusive
# n.b.: the bind for the event_id is re-generated here
%>
INSERT INTO "<%= schema %>".events
(
event_id,
Expand All @@ -24,13 +57,18 @@ WITH
<% end %>
),
stream AS (
<% # Increase the version to the stream version given %>
<%= cond do %>
<% stream_id -> %>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = $1::bigint
returning stream_id
<% created_at -> %>
<%
# the event created_at date has been provided as the last bind variable
# use that instead of generating one
%>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at)
VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>)
returning stream_id
Expand All @@ -41,6 +79,14 @@ WITH
<% end %>
),
source_stream_events AS (
<%
# link the new events into it's source stream
# we're using the passed in event_ids rather than reading/joining from tables
# each insert uses the stream_version calculated for the corresponding event
# we're joining here, so we'll get the product of:
# the stream (1)
# the rows in the table variable (number_of_events)
%>
INSERT INTO "<%= schema %>".stream_events
(
event_id,
Expand All @@ -58,12 +104,21 @@ WITH
FROM new_events_indexes, stream
),
linked_stream AS (
<%
# Update the all streams version by the number of events
# This is the value of the expected version at append time + the number of events
# Returns the version before the update
%>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
<%
# Link the new events into the $all stream
# 1 row for each event
%>
INSERT INTO "<%= schema %>".stream_events
(
event_id,
Expand Down

0 comments on commit 362f087

Please sign in to comment.