Skip to content

Commit

Permalink
Separate and commented logic in _insert_events().
Browse files Browse the repository at this point in the history
  • Loading branch information
johnbywater committed Nov 4, 2023
1 parent 4413d6d commit 4ed4fb7
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions eventsourcing_eventstoredb/recorders.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,38 @@ def insert_events(
def _insert_events(
self, stored_events: List[StoredEvent], **kwargs: Any
) -> Optional[Sequence[int]]:
originator_ids = list(set([e.originator_id for e in stored_events]))
if len(originator_ids) == 0:
return []
elif len(originator_ids) > 1:
raise ProgrammingError(
"EventStoreDB can't atomically store events in more than one stream"
)
else:
stream_name = str(originator_ids[0])

first_originator_version = stored_events[0].originator_version
current_version: Union[int, StreamState]
if first_originator_version == 0:
current_version = StreamState.NO_STREAM
else:
current_version = first_originator_version - 1
new_events: List[NewEvent] = []
for i, stored_event in enumerate(stored_events):
if stored_event.originator_version != i + first_originator_version:
raise IntegrityError("Originator version out of sequence")

# Protect against appending old snapshot after new.
if self.for_snapshotting:
# Protect against appending old snapshot after new.
assert len(stored_events) == 1, len(stored_events)
recorded_snapshots = list(
self.select_events(
originator_id=originator_ids[0],
originator_id=stored_events[0].originator_id,
desc=True,
limit=1,
)
)
if len(recorded_snapshots) > 0:
last_snapshot = recorded_snapshots[0]
if last_snapshot.originator_version > first_originator_version:
if last_snapshot.originator_version > stored_events[0].originator_version:
return []
else:
# Make sure all stored events have same originator ID.
originator_ids = list(set([e.originator_id for e in stored_events]))
if len(originator_ids) == 0:
return []
elif len(originator_ids) > 1:
raise ProgrammingError(
"EventStoreDB can't atomically store events in more than one stream"
)

# Make sure stored events have a gapless sequence of originator_versions.
for i in range(1, len(stored_events)):
if stored_events[i].originator_version != i + stored_events[0].originator_version:
raise IntegrityError("Gap detected in originator versions")

# Convert StoredEvent objects to NewEvent objects.
new_events: List[NewEvent] = []
for stored_event in stored_events:
if self.for_snapshotting:
metadata = json.dumps(
Expand All @@ -91,10 +87,22 @@ def _insert_events(
content_type="application/octet-stream",
)
new_events.append(new_event)

# Decide 'stream_name' argument.
stream_name = str(stored_events[0].originator_id)
if self.for_snapshotting:
stream_name = self.create_snapshot_stream_name(stream_name)

# Decide 'current_version' argument.
if self.for_snapshotting:
current_version: Union[int, StreamState] = StreamState.ANY # Disable OCC.
else:
if stored_events[0].originator_version == 0:
current_version = StreamState.NO_STREAM
else:
current_version = stored_events[0].originator_version - 1

try:
if self.for_snapshotting:
stream_name = self.create_snapshot_stream_name(stream_name)
current_version = StreamState.ANY # Disable OCC.
commit_position = self.client.append_events(
stream_name=stream_name,
current_version=current_version,
Expand All @@ -107,7 +115,6 @@ def _insert_events(
return [commit_position] * len(new_events) # The best we can do?

def create_snapshot_stream_name(self, stream_name: str) -> str:
# return str(uuid5(NAMESPACE_OID, f"/snapshots/{stream_name}"))
return self.SNAPSHOT_STREAM_PREFIX + stream_name

def select_events( # noqa: C901
Expand Down

0 comments on commit 4ed4fb7

Please sign in to comment.