Skip to content

Commit

Permalink
use sync in repository
Browse files Browse the repository at this point in the history
  • Loading branch information
johnxnguyen committed Feb 14, 2025
1 parent d1c6235 commit 3f24a5e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ final class UpdateEventsRepository: UpdateEventsRepositoryProtocol {
private let updateEventsLocalStore: any UpdateEventsLocalStoreProtocol
private let encoder = JSONEncoder()
private let decoder = JSONDecoder()
private let pullPendingEventsSync: PullPendingUpdateEventsSync

private let pullLastUpdateEventIDSync: PullLastUpdateEventIDSync

Expand All @@ -58,65 +59,18 @@ final class UpdateEventsRepository: UpdateEventsRepositoryProtocol {
api: updateEventsAPI,
store: updateEventsLocalStore
)
self.pullPendingEventsSync = PullPendingUpdateEventsSync(
selfClientID: selfClientID,
api: updateEventsAPI,
store: updateEventsLocalStore,
decryptor: updateEventDecryptor
)
}

// MARK: - Pull pending events

func pullPendingEvents() async throws {
WireLogger.sync.debug("pulling pending events")
// We want all events since this event.
guard let lastEventID = updateEventsLocalStore.lastEventID() else {
throw UpdateEventsRepositoryError.lastEventIDMissing
}

// We'll insert new events from this index.
var currentIndex = try await updateEventsLocalStore.indexOfLastEventEnvelope() + 1

// Events are fetched in batches.
for try await envelopes in updateEventsAPI.getUpdateEvents(
selfClientID: selfClientID,
sinceEventID: lastEventID
) {
let batchCount = envelopes.count
var count = 0
WireLogger.sync.debug("received batch of \(batchCount) envelopes")

// If we need to abort, do it before processing the next page.
try Task.checkCancellation()

for envelope in envelopes {
count += 1

WireLogger.sync.debug(
"decrypting envelope (\(count) of \(batchCount))",
attributes: [.eventEnvelopeID: envelope.id]
)

// We can only decrypt once so store the decrypted events for later retrieval.
var decryptedEnvelope = envelope
decryptedEnvelope.events = try await updateEventDecryptor.decryptEvents(in: envelope)

WireLogger.sync.debug(
"persisting envelope (\(count) of \(batchCount)",
attributes: [.eventEnvelopeID: envelope.id]
)

let decryptedEnvelopeData = try encoder.encode(decryptedEnvelope)

try await updateEventsLocalStore.persistEventEnvelope(
decryptedEnvelopeData,
index: currentIndex
)

currentIndex += 1

if !envelope.isTransient {
// Update the last event id so we don't refetch the same events.
// Transient events aren't stored in the backend's event stream.
storeLastEventEnvelopeID(envelope.id)
}
}
}
try await pullPendingEventsSync.pull()
}

func pullLastEventID() async throws {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ final class UpdateEventsRepositoryTests: XCTestCase {
// When
try await sut.pullPendingEvents()
XCTFail("expected an error, but none was thrown")
} catch UpdateEventsRepositoryError.lastEventIDMissing {
} catch PullPendingUpdateEventsSyncError.noLastEventID {
// Then it threw the right error.
} catch {
XCTFail("unexpected error: \(error)")
Expand Down

0 comments on commit 3f24a5e

Please sign in to comment.