Skip to content

Commit

Permalink
Merge pull request #1 from renatillas/concurrency
Browse files Browse the repository at this point in the history
concurrency
  • Loading branch information
renatillas authored Jan 24, 2025
2 parents 8500fe1 + 20cf914 commit 2da44f1
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 124 deletions.
4 changes: 2 additions & 2 deletions gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "eventsourcing_postgres"
version = "5.0.1"
version = "5.1.0"

# Fill out these fields if you intend to generate HTML documentation or publish
# your project to the Hex package manager.
Expand All @@ -13,7 +13,7 @@ repository = { type = "github", user = "renatillas", repo = "eventsourcing_postg

[dependencies]
gleam_stdlib = ">= 0.52.0 and < 2.0.0"
eventsourcing = ">= 6.0.0 and < 7.0.0"
eventsourcing = ">= 6.1.0 and < 7.0.0"
pog = ">= 3.2.0 and < 4.0.0"
gleam_json = ">= 2.3.0 and < 3.0.0"

Expand Down
262 changes: 162 additions & 100 deletions src/eventsourcing_postgres.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const select_snapshot_query = "
AND aggregate_id = $2
"

const isolation_level_query = "LOCK TABLE event IN ACCESS EXCLUSIVE MODE;"

// TYPES ----

type Metadata =
Expand Down Expand Up @@ -105,6 +107,7 @@ pub fn new(
command,
event,
error,
pog.Connection,
) {
let db = pog.connect(pgo_config)

Expand All @@ -122,113 +125,64 @@ pub fn new(

eventsourcing.EventStore(
eventstore:,
commit: commit,
load_events: load_events,
load_snapshot: load_snapshot,
save_snapshot: save_snapshot,
load_events: fn(postgres_store, tx, aggregate_id, start_from) {
load_events(postgres_store, tx, aggregate_id, start_from)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to load events: " <> string.inspect(error),
)
})
},
commit_events: fn(tx, aggregate_id, events, metadata) {
commit_events(eventstore, tx, aggregate_id, events, metadata)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to commit events: " <> string.inspect(error),
)
})
},
load_snapshot: fn(tx, aggregate_id) {
load_snapshot(eventstore, tx, aggregate_id)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to load snapshot: " <> string.inspect(error),
)
})
},
save_snapshot: fn(tx, snapshot) {
save_snapshot(eventstore, tx, snapshot)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to save snapshot: " <> string.inspect(error),
)
})
},
execute_transaction: execute_in_transaction(db),
load_aggregate_transaction: execute_in_transaction(db),
get_latest_snapshot_transaction: execute_in_transaction(db),
load_events_transaction: execute_in_transaction(db),
)
}

fn load_snapshot(
postgres_store: PostgresStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> Result(
Option(eventsourcing.Snapshot(entity)),
eventsourcing.EventSourcingError(error),
) {
let row_decoder = {
use aggregate_id <- decode.field(1, decode.string)
use sequence <- decode.field(2, decode.int)
use entity <- decode.field(3, {
use entity_string <- decode.then(decode.string)
let assert Ok(entity) =
json.parse(entity_string, postgres_store.entity_decoder)
decode.success(entity)
})
use timestamp <- decode.field(4, decode.int)

decode.success(eventsourcing.Snapshot(
aggregate_id: aggregate_id,
entity: entity,
sequence: sequence,
timestamp: timestamp,
))
}

pog.query(select_snapshot_query)
|> pog.parameter(pog.text(postgres_store.aggregate_type))
|> pog.parameter(pog.text(aggregate_id))
|> pog.returning(row_decoder)
|> pog.execute(postgres_store.db)
|> result.map(fn(response) {
case response.rows {
[] -> None
[snapshot, ..] -> option.Some(snapshot)
}
})
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to load snapshot: " <> pprint.format(error),
)
})
}

fn save_snapshot(
postgres_store: PostgresStore(entity, command, event, error),
snapshot: eventsourcing.Snapshot(entity),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
let eventsourcing.Snapshot(aggregate_id, entity, sequence, timestamp) =
snapshot

pog.query(save_snapshot_query)
|> pog.parameter(pog.text(postgres_store.aggregate_type))
|> pog.parameter(pog.text(aggregate_id))
|> pog.parameter(pog.int(sequence))
|> pog.parameter(pog.text(postgres_store.entity_encoder(entity)))
|> pog.parameter(pog.int(timestamp))
|> pog.execute(postgres_store.db)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to save snapshot: " <> pprint.format(pprint.format(error)),
)
})
}

pub fn create_snapshot_table(
postgres_store: PostgresStore(entity, command, event, error),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
pog.query(create_snapshot_table_query)
|> pog.execute(postgres_store.db)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to create snapshot table: " <> pprint.format(error),
)
})
}

pub fn create_event_table(
postgres_store: PostgresStore(entity, command, event, error),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
pog.query(create_event_table_query)
|> pog.execute(postgres_store.db)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to create snapshot table: " <> pprint.format(error),
)
})
}

fn load_events(
postgres_store: PostgresStore(entity, command, event, error),
tx: pog.Connection,
aggregate_id: eventsourcing.AggregateId,
start_from: Int,
) -> Result(
List(eventsourcing.EventEnvelop(event)),
eventsourcing.EventSourcingError(error),
) {
use _ <- result.try(
pog.query(isolation_level_query)
|> pog.execute(tx)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to set isolation level: " <> pprint.format(error),
)
}),
)

let row_decoder = {
use aggregate_id <- decode.field(1, decode.string)
use sequence <- decode.field(2, decode.int)
Expand Down Expand Up @@ -258,7 +212,7 @@ fn load_events(
|> pog.parameter(pog.text(aggregate_id))
|> pog.parameter(pog.int(start_from))
|> pog.returning(row_decoder)
|> pog.execute(postgres_store.db)
|> pog.execute(tx)
|> result.map(fn(response) { response.rows })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
Expand All @@ -279,8 +233,9 @@ fn metadata_decoder() {
|> decode.success
}

fn commit(
fn commit_events(
postgres_store: PostgresStore(entity, command, event, error),
tx: pog.Connection,
context: eventsourcing.Aggregate(entity, command, event, error),
events: List(event),
metadata: Metadata,
Expand All @@ -294,7 +249,7 @@ fn commit(
wrap_events(postgres_store, aggregate_id, events, sequence, metadata)
let assert Ok(last_event) = list.last(wrapped_events)

persist_events(postgres_store, wrapped_events)
persist_events(postgres_store, tx, wrapped_events)
|> result.map(fn(_) { #(wrapped_events, last_event.sequence) })
}

Expand Down Expand Up @@ -329,6 +284,7 @@ fn wrap_events(

fn persist_events(
postgres_store: PostgresStore(entity, command, event, error),
tx: pog.Connection,
wrapped_events: List(eventsourcing.EventEnvelop(event)),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
// Generate the placeholders for batch insert
Expand Down Expand Up @@ -393,7 +349,7 @@ fn persist_events(

// Execute the batch insert
prepared_query
|> pog.execute(postgres_store.db)
|> pog.execute(tx)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
Expand All @@ -410,3 +366,109 @@ fn metadata_encoder(metadata: Metadata) -> String {
})
|> json.to_string
}

fn load_snapshot(
postgres_store: PostgresStore(entity, command, event, error),
tx: pog.Connection,
aggregate_id: eventsourcing.AggregateId,
) -> Result(
Option(eventsourcing.Snapshot(entity)),
eventsourcing.EventSourcingError(error),
) {
let row_decoder = {
use aggregate_id <- decode.field(1, decode.string)
use sequence <- decode.field(2, decode.int)
use entity <- decode.field(3, {
use entity_string <- decode.then(decode.string)
let assert Ok(entity) =
json.parse(entity_string, postgres_store.entity_decoder)
decode.success(entity)
})
use timestamp <- decode.field(4, decode.int)

decode.success(eventsourcing.Snapshot(
aggregate_id: aggregate_id,
entity: entity,
sequence: sequence,
timestamp: timestamp,
))
}

pog.query(select_snapshot_query)
|> pog.parameter(pog.text(postgres_store.aggregate_type))
|> pog.parameter(pog.text(aggregate_id))
|> pog.returning(row_decoder)
|> pog.execute(tx)
|> result.map(fn(response) {
case response.rows {
[] -> None
[snapshot, ..] -> option.Some(snapshot)
}
})
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to load snapshot: " <> pprint.format(error),
)
})
}

fn save_snapshot(
postgres_store: PostgresStore(entity, command, event, error),
tx: pog.Connection,
snapshot: eventsourcing.Snapshot(entity),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
let eventsourcing.Snapshot(aggregate_id, entity, sequence, timestamp) =
snapshot

pog.query(save_snapshot_query)
|> pog.parameter(pog.text(postgres_store.aggregate_type))
|> pog.parameter(pog.text(aggregate_id))
|> pog.parameter(pog.int(sequence))
|> pog.parameter(pog.text(postgres_store.entity_encoder(entity)))
|> pog.parameter(pog.int(timestamp))
|> pog.execute(tx)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to save snapshot: " <> pprint.format(pprint.format(error)),
)
})
}

pub fn create_snapshot_table(
postgres_store: PostgresStore(entity, command, event, error),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
pog.query(create_snapshot_table_query)
|> pog.execute(postgres_store.db)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to create snapshot table: " <> pprint.format(error),
)
})
}

pub fn create_event_table(
postgres_store: PostgresStore(entity, command, event, error),
) -> Result(Nil, eventsourcing.EventSourcingError(error)) {
pog.query(create_event_table_query)
|> pog.execute(postgres_store.db)
|> result.map(fn(_) { Nil })
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(
"Failed to create snapshot table: " <> pprint.format(error),
)
})
}

fn execute_in_transaction(db) {
fn(f) {
let f = fn(db) {
f(db) |> result.map_error(fn(error) { string.inspect(error) })
}
pog.transaction(db, f)
|> result.map_error(fn(error) {
eventsourcing.EventStoreError(string.inspect(error))
})
}
}
12 changes: 4 additions & 8 deletions test/eventsourcing_postgres_test.gleam
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import eventsourcing
import eventsourcing_postgres
import example_bank_account
import gleam/io
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/otp/task
Expand Down Expand Up @@ -321,7 +320,7 @@ fn snapshot_concurrent_updates(event_sourcing) {
eventsourcing.execute_with_metadata(
event_sourcing,
account_id,
example_bank_account.DepositMoney(1.0),
example_bank_account.WithDrawMoney(1.0),
[#("concurrent_operation", string.inspect(i))],
)
})
Expand All @@ -331,19 +330,16 @@ fn snapshot_concurrent_updates(event_sourcing) {
// Load events to verify they were all recorded
eventsourcing.load_events(event_sourcing, account_id)
|> should.be_ok
|> fn(events) {
events
|> list.length
|> should.equal(100)
}
|> list.length
|> should.equal(201)
// Verify final state
eventsourcing.get_latest_snapshot(event_sourcing, account_id)
|> should.be_ok
|> fn(snapshot) {
let assert Some(eventsourcing.Snapshot(_, entity, sequence, _)) = snapshot
let assert example_bank_account.BankAccount(opened: True, balance: 0.0) =
entity
sequence |> should.equal(3)
sequence |> should.equal(201)
}
}

Expand Down
Loading

0 comments on commit 2da44f1

Please sign in to comment.