Skip to content

Commit

Permalink
Improve queuing & cleanup in PDFWorkerController & RecognizerController
Browse files Browse the repository at this point in the history
  • Loading branch information
mvasilak committed Feb 25, 2025
1 parent d1fd62c commit 70bc91b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 94 deletions.
76 changes: 39 additions & 37 deletions Zotero/Controllers/PDFWorkerController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,20 @@ final class PDFWorkerController {

// MARK: Actions
func queue(work: PDFWork) -> Observable<Update> {
return Observable.create { [weak self] subscriber in
guard let self else { return Disposables.create() }
accessQueue.async(flags: .barrier) { [weak self] in
guard let self else { return }
if let subject = queue[work]?.1 {
subject.subscribe(subscriber)
.disposed(by: disposeBag)
return
}
let state: PDFWorkState = .enqueued
let subject: PublishSubject<Update> = PublishSubject()
queue[work] = (state, subject)
subject.subscribe(subscriber)
let subject = PublishSubject<Update>()
accessQueue.async(flags: .barrier) { [weak self] in
guard let self else { return }
if let existingSubject = queue[work]?.1 {
existingSubject.subscribe(subject)
.disposed(by: disposeBag)

startWorkIfNeeded()
return
}
return Disposables.create()
let state: PDFWorkState = .enqueued
queue[work] = (state, subject)

startWorkIfNeeded()
}
return subject.asObservable()
}

private func startWorkIfNeeded() {
Expand Down Expand Up @@ -117,9 +112,8 @@ final class PDFWorkerController {
}
guard let pdfWorkerWebViewHandler else {
DDLogError("PDFWorkerController: can't create PDFWorkerWebViewHandler instance")
cleanupPDFWorker(for: work) { observable in
observable?.on(.next(Update(work: work, kind: .failed)))
}
cleanupPDFWorker(for: work).subscribe(onSuccess: { $0.on(.next(Update(work: work, kind: .failed))) })
.disposed(by: disposeBag)
return
}

Expand Down Expand Up @@ -147,27 +141,24 @@ final class PDFWorkerController {
case .success(let data):
switch data {
case .recognizerData(let data), .fullText(let data):
cleanupPDFWorker(for: work) { observable in
observable?.on(.next(Update(work: work, kind: .extractedData(data: data))))
}
cleanupPDFWorker(for: work).subscribe(onSuccess: { $0.on(.next(Update(work: work, kind: .extractedData(data: data)))) })
.disposed(by: disposeBag)
}

case .failure(let error):
DDLogError("PDFWorkerController: recognizer failed - \(error)")
cleanupPDFWorker(for: work) { observable in
observable?.on(.next(Update(work: work, kind: .failed)))
}
cleanupPDFWorker(for: work).subscribe(onSuccess: { $0.on(.next(Update(work: work, kind: .failed))) })
.disposed(by: disposeBag)
}
}
}
}
}

func cancel(work: PDFWork) {
cleanupPDFWorker(for: work) { observable in
DDLogInfo("PDFWorkerController: cancelled \(work)")
observable?.on(.next(Update(work: work, kind: .cancelled)))
}
DDLogInfo("PDFWorkerController: cancelled \(work)")
cleanupPDFWorker(for: work).subscribe(onSuccess: { $0.on(.next(Update(work: work, kind: .cancelled))) })
.disposed(by: disposeBag)
}

func cancellAllWorks() {
Expand All @@ -187,20 +178,31 @@ final class PDFWorkerController {
}
}

private func cleanupPDFWorker(for work: PDFWork, completion: @escaping (_ observable: PublishSubject<Update>?) -> Void) {
if DispatchQueue.getSpecific(key: dispatchSpecificKey) == accessQueueLabel {
cleanup(for: work, completion: completion)
} else {
accessQueue.async(flags: .barrier) {
cleanup(for: work, completion: completion)
private func cleanupPDFWorker(for work: PDFWork) -> Maybe<PublishSubject<Update>> {
return Maybe.create { [weak self] maybe in
guard let self else {
maybe(.completed)
return Disposables.create()
}
if DispatchQueue.getSpecific(key: dispatchSpecificKey) == accessQueueLabel {
cleanup(for: work, maybe: maybe)
} else {
accessQueue.async(flags: .barrier) {
cleanup(for: work, maybe: maybe)
}
}
return Disposables.create()
}

func cleanup(for work: PDFWork, completion: @escaping (_ observable: PublishSubject<Update>?) -> Void) {
func cleanup(for work: PDFWork, maybe: (MaybeEvent<PublishSubject<Update>>) -> Void) {
let observable = queue.removeValue(forKey: work).flatMap({ $0.observable })
DDLogInfo("PDFWorkerController: cleaned up for \(work)")
pdfWorkerWebViewHandlersByPDFWork.removeValue(forKey: work)?.webViewHandler.removeFromSuperviewAsynchronously()
completion(observable)
if let observable {
maybe(.success(observable))
} else {
maybe(.completed)
}
startWorkIfNeeded()
}
}
Expand Down
113 changes: 56 additions & 57 deletions Zotero/Controllers/RecognizerController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,24 @@ final class RecognizerController {

// MARK: Actions
func queue(task: Task) -> Observable<Update> {
// Queue task regardless of any subscribers
let subject = PublishSubject<Update>()
accessQueue.async(flags: .barrier) { [weak self] in
guard let self, queue[task] == nil else { return }
guard let self else { return }
if let existingSubject = queue[task]?.1 {
existingSubject.subscribe(subject)
.disposed(by: disposeBag)
return
}
let state: TaskState = .enqueued
let observable: PublishSubject<Update> = PublishSubject()
queue[task] = (state, observable)
observable.subscribe(onNext: { [weak self] update in
queue[task] = (state, subject)
subject.subscribe(onNext: { [weak self] update in
self?.updatesSubject.on(.next(update))
}).disposed(by: disposeBag)

emmitUpdate(for: task, observable: observable, kind: .enqueued)
emmitUpdate(for: task, observable: subject, kind: .enqueued)
startRecognitionIfNeeded()
}
return Observable<Update>.create { [weak self] subscriber in
guard let self else { return Disposables.create() }
accessQueue.async(flags: .barrier) { [weak self] in
guard let self, let subject = queue[task]?.1 else { return }
subject.subscribe(subscriber)
.disposed(by: disposeBag)
}
return Disposables.create()
}
return subject.asObservable()
}

private func emmitUpdate(for task: Task, observable: PublishSubject<Update>, kind: Update.Kind) {
Expand Down Expand Up @@ -214,14 +210,12 @@ final class RecognizerController {
switch update.kind {
case .failed:
DDLogError("RecognizerController: \(task) - recognizer failed")
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.recognizerFailed))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.recognizerFailed)))) })
.disposed(by: disposeBag)

case .cancelled:
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .cancelled)))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .cancelled))) })
.disposed(by: disposeBag)

case .inProgress:
break
Expand All @@ -234,9 +228,8 @@ final class RecognizerController {

case .fullText:
DDLogError("RecognizerController: \(task) - PDF worker error")
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.pdfWorkerError))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.pdfWorkerError)))) })
.disposed(by: disposeBag)
}
}
}
Expand All @@ -259,10 +252,10 @@ final class RecognizerController {
process(response: response.0)
},
onFailure: { [weak self] error in
guard let self else { return }
DDLogError("RecognizerController: \(task) - remote recognizer request failed: \(error)")
self?.cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(error as! Error))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(error as! Error)))) })
.disposed(by: disposeBag)
}
)
.disposed(by: disposeBag)
Expand All @@ -283,9 +276,8 @@ final class RecognizerController {
identifiers.append(.title(identifier))
}
guard !identifiers.isEmpty else {
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.remoteRecognizerFailed))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.remoteRecognizerFailed)))) })
.disposed(by: disposeBag)
return
}
startIdentifiersLookup(for: task, with: response, pendingIdentifiers: identifiers)
Expand All @@ -300,9 +292,8 @@ final class RecognizerController {
return
}
guard case .remoteRecognitionInProgress = state else {
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.unexpectedState))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.unexpectedState)))) })
.disposed(by: disposeBag)
return
}
lookupNextIdentifier(for: task, with: response, pendingIdentifiers: pendingIdentifiers)
Expand All @@ -324,9 +315,8 @@ final class RecognizerController {
return
}
guard case .identifiersLookupInProgress(let response, _, let pendingIdentifiers) = state else {
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.unexpectedState))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.unexpectedState)))) })
.disposed(by: disposeBag)
return
}
lookupNextIdentifier(for: task, with: response, pendingIdentifiers: pendingIdentifiers)
Expand All @@ -340,9 +330,8 @@ final class RecognizerController {
return
}
guard !pendingIdentifiers.isEmpty else {
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .failed(.noRemainingIdentifiersForLookup))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .failed(.noRemainingIdentifiersForLookup)))) })
.disposed(by: disposeBag)
return
}
var remainingIdentifiers = pendingIdentifiers
Expand Down Expand Up @@ -488,9 +477,8 @@ final class RecognizerController {
func createParentIfNeeded(for task: Task, with itemResponse: ItemResponse, schemaController: SchemaController, dateParser: DateParser) {
switch task.kind {
case .simple:
cleanupTask(for: task) { observable in
observable?.on(.next(Update(task: task, kind: .translated(itemResponse: itemResponse))))
}
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .translated(itemResponse: itemResponse)))) })
.disposed(by: disposeBag)

case .createParentForItem(let libraryId, let key):
backgroundQueue.async { [weak self] in
Expand All @@ -512,21 +500,21 @@ final class RecognizerController {
DDLogError("RecognizerController: can't create parent for item - \(error)")
update = Update(task: task, kind: .failed(error as! Error))
}
cleanupTask(for: task) { observable in
cleanupTask(for: task).subscribe(onSuccess: {
if let update {
observable?.on(.next(update))
$0.on(.next(update))
}
}
})
.disposed(by: disposeBag)
}
}
}
}

func cancel(task: Task) {
cleanupTask(for: task) { observable in
DDLogInfo("RecognizerController: cancelled \(task)")
observable?.on(.next(Update(task: task, kind: .cancelled)))
}
DDLogInfo("RecognizerController: cancelled \(task)")
cleanupTask(for: task).subscribe(onSuccess: { $0.on(.next(Update(task: task, kind: .cancelled))) })
.disposed(by: disposeBag)
}

func cancellAllTasks() {
Expand All @@ -546,24 +534,35 @@ final class RecognizerController {
}
}

private func cleanupTask(for task: Task, completion: @escaping (_ observable: PublishSubject<Update>?) -> Void) {
if DispatchQueue.getSpecific(key: dispatchSpecificKey) == accessQueueLabel {
cleanup(for: task, completion: completion)
} else {
accessQueue.async(flags: .barrier) {
cleanup(for: task, completion: completion)
private func cleanupTask(for task: Task) -> Maybe<PublishSubject<Update>> {
return Maybe.create { [weak self] maybe in
guard let self else {
maybe(.completed)
return Disposables.create()
}
if DispatchQueue.getSpecific(key: dispatchSpecificKey) == accessQueueLabel {
cleanup(for: task, maybe: maybe)
} else {
accessQueue.async(flags: .barrier) {
cleanup(for: task, maybe: maybe)
}
}
return Disposables.create()
}

func cleanup(for task: Task, completion: @escaping (_ observable: PublishSubject<Update>?) -> Void) {
func cleanup(for task: Task, maybe: (MaybeEvent<PublishSubject<Update>>) -> Void) {
let observable = queue.removeValue(forKey: task).flatMap({ $0.observable })
if case .createParentForItem(let libraryId, let key) = task.kind, var libraryLatestUpdates = latestUpdates[libraryId] {
libraryLatestUpdates[key] = nil
latestUpdates[libraryId] = libraryLatestUpdates
}
DDLogInfo("RecognizerController: \(task) - cleaned up")
lookupWebViewHandlersByTask.removeValue(forKey: task)?.webViewHandler.removeFromSuperviewAsynchronously()
completion(observable)
if let observable {
maybe(.success(observable))
} else {
maybe(.completed)
}
startRecognitionIfNeeded()
}
}
Expand Down

0 comments on commit 70bc91b

Please sign in to comment.