diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index bbc2610f..27141d76 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -1,4 +1,32 @@ +<%# + # Elixir template variables: + # schema - string + # stream_id - integer + # number_of_events - integer + + # Bind variables + # 1 - stream_id - integer + # 2 - stream_version - integer + + # Additionally, we multiply the index by 9 for each event: + # 3 - event_id - uuid + # 4 - event_type - text + # 5 - causation_id - uuid + # 6 - correlation_id - uuid + # 7 - data - + # 8 - metadata - jsonb (always?) + # 9 - created_at - timestamp + # 10 - index - integer + # 11 - stream_version - integer +%> + WITH + <%# + # create a table variable with: + # event_id - uuid - the id for the new event + # index - integer - the increase in the stream version for any stream it is linked to + # stream_version - integer - the final stream version after all of the events have been inserted + %> new_events_indexes (event_id, index, stream_version) AS ( VALUES <%= for i <- 0..(number_of_events - 1) do %> @@ -7,6 +35,11 @@ WITH <% end %> ), events AS ( + <% + # insert the new events into the events table + # using the 7 bind variables from 3 to 9 inclusive + # n.b.: the bind for the event_id is re-generated here + %> INSERT INTO "<%= schema %>".events ( event_id, @@ -24,6 +57,7 @@ WITH <% end %> ), stream AS ( + <% # Increase the version to the stream version given %> <%= cond do %> <% stream_id -> %> UPDATE "<%= schema %>".streams @@ -31,6 +65,10 @@ WITH WHERE stream_id = $1::bigint returning stream_id <% created_at -> %> + <% + # the event created_at date has been provided as the last bind variable + # use that instead of generating one + %> INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at) VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>) returning stream_id @@ -41,6 +79,14 @@ WITH <% end %> ), source_stream_events AS ( + <% + # link the new events into it's source stream + # we're using the passed in event_ids rather than reading/joining from tables + # each insert uses the stream_version calculated for the corresponding event + # we're joining here, so we'll get the product of: + # the stream (1) + # the rows in the table variable (number_of_events) + %> INSERT INTO "<%= schema %>".stream_events ( event_id, @@ -58,12 +104,21 @@ WITH FROM new_events_indexes, stream ), linked_stream AS ( + <% + # Update the all streams version by the number of events + # This is the value of the expected version at append time + the number of events + # Returns the version before the update + %> UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version ), linked_stream_events AS ( + <% + # Link the new events into the $all stream + # 1 row for each event + %> INSERT INTO "<%= schema %>".stream_events ( event_id,