diff --git a/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt b/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt new file mode 100644 index 00000000..f3639bfd --- /dev/null +++ b/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2023 Fraktalio D.O.O. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.fraktalio.fmodel.application + +import kotlinx.coroutines.flow.* + +/** + * Extension function - Handles the command message of type [C] + * + * @param command Command message of type [C] + * @return State of type [S] + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +fun I.handle(command: C): Flow where I : StateComputation, + I : EventComputation, + I : StateRepository, + I : EventSnapshottingRepository = + flow { + // 1. Fetch the latest state snapshot or NULL + val latestSnapshotState = command.fetchState() + // 2. Fetch the latest events, since the latest state snapshot + val latestEvents = command.fetchEvents(latestSnapshotState).toList() + // 3. Compute the current state, based on the latest state snapshot and the latest events + val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e) } + // 4. Compute the new events, based on the latest events, latest snapshot state and the command, and save it + val newEvents = latestEvents.asFlow() + .computeNewEvents(command, latestSnapshotState) + .save() + // 5. Compute the new state, based on the current state and the command and save it conditionally + with(currentState.computeNewState(command)) { + if (shouldCreateNewSnapshot(latestSnapshotState)) { + save() + } + } + emitAll(newEvents) + } + +/** + * Extension function - Handles the command message of type [C] to the locking state stored aggregate, optimistically + * + * @param command Command message of type [C] + * @return State of type [Pair]<[S], [V]>, in which [V] is the type of the Version (optimistic locking) + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +suspend fun I.handleOptimistically(command: C): Flow> where I : StateComputation, + I : EventComputation, + I : StateLockingRepository, + I : EventSnapshottingLockingRepository = + flow { + // 1. Fetch the latest state snapshot or NULL + val (latestSnapshotState, latestSnapshotVersion) = command.fetchState() + // 2. Fetch the latest events, since the latest state snapshot + val latestEvents = command.fetchEvents(Pair(latestSnapshotState, latestSnapshotVersion)).toList() + // 3. Compute the current state, based on the latest state snapshot and the latest events + val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e.first) } + // 4. Get the latest event version + val latestEventVersion = latestEvents.map { it.second }.lastOrNull() + // 5. Compute the new events, based on the latest events, latest snapshot state and the command, and save it + val newEvents = latestEvents.asFlow() + .map { it.first } + .computeNewEvents(command, latestSnapshotState) + .save(latestEventVersion) + // 6. Get the new snapshot version = the last/latest event version + val newSnapshotVersion = newEvents.map { it.second }.lastOrNull() + // 7. Compute the new state, based on the current state and the command and save it conditionally + with(currentState.computeNewState(command)) { + if (shouldCreateNewSnapshot( + latestSnapshotState, + latestSnapshotVersion, + newSnapshotVersion + ) + ) + save(latestSnapshotVersion, newSnapshotVersion) + } + emitAll(newEvents) + } diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventRepository.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventRepository.kt index ad4c2bbc..be99195f 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventRepository.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventRepository.kt @@ -45,6 +45,27 @@ interface EventRepository { fun Flow.save(): Flow } +interface EventSnapshottingRepository : EventRepository { + /** + * Fetch events by/after the latest snapshot + * + * @receiver Command of type [C] + * @param latestSnapshotState Latest snapshot of type [S] + * + * @return [Flow] of Events of type [E] + */ + fun C.fetchEvents(latestSnapshotState: S?): Flow + + /** + * Checks if saving of snapshot is needed + * + * @receiver State of type [S] + * @param latestSnapshotState Latest snapshot of type [S] + * @return newly saved State of type [S] + */ + fun S.shouldCreateNewSnapshot(latestSnapshotState: S?): Boolean +} + /** * A type alias for the version provider/function. * It provides the Version of the last Event in the stream. @@ -99,3 +120,26 @@ interface EventLockingRepository { */ fun Flow.save(latestVersion: V?): Flow> } + +interface EventSnapshottingLockingRepository : EventLockingRepository { + /** + * Fetch events by/after the latest snapshot + * + * @receiver Command of type [C] + * @param latestSnapshotState Latest snapshot of type [V] + * + * @return [Flow] of Events of type [Pair]<[E], [V]> + */ + fun C.fetchEvents(latestSnapshotState: Pair): Flow> + + /** + * Checks if saving of snapshot is needed + * + * @receiver State of type [S] + * @param latestSnapshotState Latest snapshot of type [S] + * @param latestSnapshotVersion Latest snapshot version of type [V] + * @param newSnapshotVersion New snapshot version of type [V] + * @return newly saved State of type [S] + */ + fun S.shouldCreateNewSnapshot(latestSnapshotState: S?, latestSnapshotVersion: V?, newSnapshotVersion: V?): Boolean +} diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt index 2be1d930..53d6a50a 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt @@ -25,8 +25,10 @@ import kotlinx.coroutines.flow.* * `EventComputation` interface formalizes the `Event Computation` algorithm / event sourced system by using a `decider` of type [IDecider]<[C], [S], [E]> to handle commands based on the current events, and produce new events. */ interface EventComputation : IDecider { - fun Flow.computeNewEvents(command: C): Flow = flow { - val currentState = fold(initialState) { s, e -> evolve(s, e) } + fun Flow.computeNewEvents(command: C): Flow = computeNewEvents(command, initialState) + + fun Flow.computeNewEvents(command: C, latestSnapshot: S?): Flow = flow { + val currentState = fold(latestSnapshot ?: initialState) { s, e -> evolve(s, e) } val resultingEvents = decide(command, currentState) emitAll(resultingEvents) } @@ -71,6 +73,22 @@ interface EventOrchestratingComputation : ISaga, IDecider : EventComputation, EventRepository +/** + * Event sourcing snapshotting aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events. + * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. + * Produced events are then stored via [EventSnapshottingRepository.save] suspending function. + * + * [EventSourcingAggregate] extends [EventComputation] and [EventSnapshottingRepository] interfaces, + * clearly communicating that it is composed out of these two behaviours. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that this aggregate can publish + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +interface EventSourcingSnapshottingAggregate : EventComputation, EventSnapshottingRepository + /** * Locking Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events. * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. @@ -93,6 +111,29 @@ interface EventSourcingAggregate : EventComputation, EventRepo */ interface EventSourcingLockingAggregate : EventComputation, EventLockingRepository +/** + * Locking Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events. + * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. + * Produced events are then stored via [EventSnapshottingLockingRepository.save] suspending function. + * + * Locking Event sourcing aggregate enables `optimistic locking` mechanism more explicitly. + * If you fetch events from a storage, the application records the `version` number of that event stream. + * You can append new events, but only if the `version` number in the storage has not changed. + * If there is a `version` mismatch, it means that someone else has added the event(s) before you did. + * + * [EventSourcingLockingAggregate] extends [EventComputation] and [EventSnapshottingLockingRepository] interfaces, + * clearly communicating that it is composed out of these two behaviours. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that this aggregate can publish + * @param V Version + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +interface EventSourcingSnapshottingLockingAggregate : EventComputation, + EventSnapshottingLockingRepository + /** * Orchestrating Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> to handle commands and produce events. * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. @@ -185,6 +226,28 @@ fun EventSourcingAggregate( EventRepository by eventRepository, IDecider by decider {} +/** + * Event Sourced Snapshotting aggregate constructor-like function. + * + * The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that are used internally to build/fold new state + * @param decider A decider component of type [IDecider]<[C], [S], [E]> + * @param eventRepository An aggregate event repository of type [EventSnapshottingRepository]<[C], [S], [E]> + * @return An object/instance of type [EventSourcingSnapshottingAggregate]<[C], [S], [E]> + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +fun EventSourcingSnapshottingAggregate( + decider: IDecider, + eventRepository: EventSnapshottingRepository +): EventSourcingAggregate = + object : EventSourcingAggregate, + EventSnapshottingRepository by eventRepository, + IDecider by decider {} + /** * Event Sourced Locking aggregate factory function. * @@ -235,6 +298,29 @@ fun EventSourcingLockingAggregate( EventLockingRepository by eventRepository, IDecider by decider {} +/** + * Event Sourced Snapshotting and Locking aggregate constructor-like function. + * + * The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that are used internally to build/fold new state + * @param V Version + * @param decider A decider component of type [IDecider]<[C], [S], [E]> + * @param eventRepository An aggregate event repository of type [EventSnapshottingLockingRepository]<[C], [S], [E], [V]> + * @return An object/instance of type [EventSourcingSnapshottingLockingAggregate]<[C], [S], [E], [V]> + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +fun EventSourcingSnapshottingLockingAggregate( + decider: IDecider, + eventRepository: EventSnapshottingLockingRepository +): EventSourcingSnapshottingLockingAggregate = + object : EventSourcingSnapshottingLockingAggregate, + EventSnapshottingLockingRepository by eventRepository, + IDecider by decider {} + /** * Event Sourced Orchestrating aggregate factory function. * diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateRepository.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateRepository.kt index c4327076..d24b3131 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateRepository.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateRepository.kt @@ -76,4 +76,17 @@ interface StateLockingRepository { * @return newly saved State of type [Pair]<[S], [V]> */ suspend fun S.save(currentStateVersion: V?): Pair + + /** + * + * Save state + * You can update/save the item/state, but only if the `version` number in the storage has not changed. + * + * @receiver State/[S] + * @param currentStateVersion The current version of the state + * @param newStateVersion The new version of the state + * @return newly saved State of type [Pair]<[S], [V]> + */ + suspend fun S.save(currentStateVersion: V?, newStateVersion: V?): Pair = + save(currentStateVersion) }