-
-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework error handling #3549
Rework error handling #3549
Changes from all commits
1576f6c
64c6028
f3a4609
81b770f
ff18387
a7f9fb1
7f1e474
76cfd4d
c39639f
bbdfbbe
0e4f349
c64cfa2
6fba70e
eb4abf7
9d6551d
ba4c7c5
0f1c2bb
43a7ca9
0849080
644a87e
ff81141
481d641
1edf345
b09fe99
1dcbd09
ec8b61d
b2c2ae5
04ef182
0c0ebf6
d597758
11e3052
0079140
475ce9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,111 +17,125 @@ import PostgresKit | |
import Vapor | ||
|
||
|
||
/// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array). | ||
/// | ||
/// - Parameters: | ||
/// - client: `Client` object | ||
/// - database: `Database` object | ||
/// - results: `Joined<Package, Repository>` results to update | ||
/// - stage: Processing stage | ||
func updatePackages(client: Client, | ||
database: Database, | ||
results: [Result<Joined<Package, Repository>, Error>], | ||
stage: Package.ProcessingStage) async throws { | ||
do { | ||
let total = results.count | ||
let errors = results.filter(\.isError).count | ||
let errorRate = total > 0 ? 100.0 * Double(errors) / Double(total) : 0.0 | ||
switch errorRate { | ||
case 0: | ||
Current.logger().info("Updating \(total) packages for stage '\(stage)'") | ||
case 0..<20: | ||
Current.logger().info("Updating \(total) packages for stage '\(stage)' (errors: \(errors))") | ||
default: | ||
Current.logger().critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%") | ||
} | ||
} | ||
for result in results { | ||
do { | ||
try await updatePackage(client: client, database: database, result: result, stage: stage) | ||
} catch { | ||
Current.logger().critical("updatePackage failed: \(error)") | ||
} | ||
} | ||
|
||
Current.logger().debug("updateStatus ops: \(results.count)") | ||
// TODO: Adopt ProcessingError also in Analysis and then factor out generic parts back into Common | ||
protocol ProcessingError: Error, CustomStringConvertible { | ||
associatedtype UnderlyingError: Error & CustomStringConvertible | ||
var packageId: Package.Id { get } | ||
var underlyingError: UnderlyingError { get } | ||
var level: Logger.Level { get } | ||
var status: Package.Status { get } | ||
} | ||
|
||
|
||
func updatePackage(client: Client, | ||
database: Database, | ||
result: Result<Joined<Package, Repository>, Error>, | ||
stage: Package.ProcessingStage) async throws { | ||
switch result { | ||
case .success(let res): | ||
let pkg = res.package | ||
if stage == .ingestion && pkg.status == .new { | ||
// newly ingested package: leave status == .new for fast-track | ||
// analysis | ||
} else { | ||
pkg.status = .ok | ||
// TODO: Leaving this extension here for now in order to group the updating/error reporting in one place for both Ingestion and Analysis. Eventually these should either go to their respective files or move common parts into a Common namespace. | ||
extension Analyze { | ||
/// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array). | ||
/// | ||
/// - Parameters: | ||
/// - client: `Client` object | ||
/// - database: `Database` object | ||
/// - results: `Joined<Package, Repository>` results to update | ||
/// - stage: Processing stage | ||
static func updatePackages(client: Client, | ||
database: Database, | ||
results: [Result<Joined<Package, Repository>, Error>]) async throws { | ||
do { | ||
let total = results.count | ||
let errors = results.filter(\.isError).count | ||
let errorRate = total > 0 ? 100.0 * Double(errors) / Double(total) : 0.0 | ||
switch errorRate { | ||
case 0: | ||
Current.logger().info("Updating \(total) packages for stage 'analysis'") | ||
case 0..<20: | ||
Current.logger().info("Updating \(total) packages for stage 'analysis' (errors: \(errors))") | ||
default: | ||
Current.logger().critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%") | ||
} | ||
pkg.processingStage = stage | ||
} | ||
for result in results { | ||
do { | ||
try await pkg.update(on: database) | ||
try await updatePackage(client: client, database: database, result: result) | ||
} catch { | ||
Current.logger().report(error: error) | ||
Current.logger().critical("updatePackage failed: \(error)") | ||
} | ||
} | ||
|
||
// PSQLError also conforms to DatabaseError but we want to intercept it specifically, | ||
// because it allows us to log more concise error messages via serverInfo[.message] | ||
case let .failure(error) where error is PSQLError: | ||
// Escalate database errors to critical | ||
let error = error as! PSQLError | ||
let msg = error.serverInfo?[.message] ?? String(reflecting: error) | ||
Current.logger().critical("\(msg)") | ||
try await recordError(database: database, error: error, stage: stage) | ||
Current.logger().debug("updateStatus ops: \(results.count)") | ||
} | ||
|
||
case let .failure(error) where error is DatabaseError: | ||
// Escalate database errors to critical | ||
Current.logger().critical("\(String(reflecting: error))") | ||
try await recordError(database: database, error: error, stage: stage) | ||
static func updatePackage(client: Client, | ||
database: Database, | ||
result: Result<Joined<Package, Repository>, Error>) async throws { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We temporarily introduce specific static The plan is to consolidate the two again once analysis moves to typed throws as well and base them both on the shared protocol |
||
switch result { | ||
case .success(let res): | ||
try await res.package.update(on: database, status: .ok, stage: .analysis) | ||
|
||
case let .failure(error): | ||
Current.logger().report(error: error) | ||
try await recordError(database: database, error: error, stage: stage) | ||
// PSQLError also conforms to DatabaseError but we want to intercept it specifically, | ||
// because it allows us to log more concise error messages via serverInfo[.message] | ||
case let .failure(error) where error is PSQLError: | ||
// Escalate database errors to critical | ||
let error = error as! PSQLError | ||
let msg = error.serverInfo?[.message] ?? String(reflecting: error) | ||
Current.logger().critical("\(msg)") | ||
try await recordError(database: database, error: error) | ||
|
||
case let .failure(error) where error is DatabaseError: | ||
// Escalate database errors to critical | ||
Current.logger().critical("\(String(reflecting: error))") | ||
try await recordError(database: database, error: error) | ||
|
||
case let .failure(error): | ||
Current.logger().report(error: error) | ||
try await recordError(database: database, error: error) | ||
} | ||
} | ||
} | ||
|
||
static func recordError(database: Database, error: Error) async throws { | ||
func setStatus(id: Package.Id?, status: Package.Status) async throws { | ||
guard let id = id else { return } | ||
try await Package.query(on: database) | ||
.filter(\.$id == id) | ||
.set(\.$processingStage, to: .analysis) | ||
.set(\.$status, to: status) | ||
.update() | ||
} | ||
|
||
guard let error = error as? AppError else { return } | ||
|
||
func recordError(database: Database, | ||
error: Error, | ||
stage: Package.ProcessingStage) async throws { | ||
func setStatus(id: Package.Id?, status: Package.Status) async throws { | ||
guard let id = id else { return } | ||
try await Package.query(on: database) | ||
.filter(\.$id == id) | ||
.set(\.$processingStage, to: stage) | ||
.set(\.$status, to: status) | ||
.update() | ||
switch error { | ||
case let .analysisError(id, _): | ||
try await setStatus(id: id, status: .analysisFailed) | ||
case .envVariableNotSet, .shellCommandFailed: | ||
break | ||
case let .genericError(id, _): | ||
try await setStatus(id: id, status: .ingestionFailed) | ||
case let .invalidPackageCachePath(id, _): | ||
try await setStatus(id: id, status: .invalidCachePath) | ||
case let .cacheDirectoryDoesNotExist(id, _): | ||
try await setStatus(id: id, status: .cacheDirectoryDoesNotExist) | ||
case let .invalidRevision(id, _): | ||
try await setStatus(id: id, status: .analysisFailed) | ||
case let .noValidVersions(id, _): | ||
try await setStatus(id: id, status: .noValidVersions) | ||
} | ||
} | ||
} | ||
|
||
guard let error = error as? AppError else { return } | ||
|
||
switch error { | ||
case let .analysisError(id, _): | ||
try await setStatus(id: id, status: .analysisFailed) | ||
case .envVariableNotSet, .shellCommandFailed: | ||
break | ||
case let .genericError(id, _): | ||
try await setStatus(id: id, status: .ingestionFailed) | ||
case let .invalidPackageCachePath(id, _): | ||
try await setStatus(id: id, status: .invalidCachePath) | ||
case let .cacheDirectoryDoesNotExist(id, _): | ||
try await setStatus(id: id, status: .cacheDirectoryDoesNotExist) | ||
case let .invalidRevision(id, _): | ||
try await setStatus(id: id, status: .analysisFailed) | ||
case let .noValidVersions(id, _): | ||
try await setStatus(id: id, status: .noValidVersions) | ||
// TODO: Leaving this extension here for now in order to group the updating/error reporting in one place for both Ingestion and Analysis. Eventually these should either go to their respective files or move common parts into a Common namespace. | ||
extension Ingestion { | ||
static func updatePackage(client: Client, | ||
database: Database, | ||
result: Result<Joined<Package, Repository>, Ingestion.Error>, | ||
stage: Package.ProcessingStage) async throws { | ||
switch result { | ||
case .success(let res): | ||
// for newly ingested package leave status == .new in order to fast-track analysis | ||
let updatedStatus: Package.Status = res.package.status == .new ? .new : .ok | ||
try await res.package.update(on: database, status: updatedStatus, stage: stage) | ||
case .failure(let failure): | ||
Current.logger().log(level: failure.level, "\(failure)") | ||
try await Package.update(for: failure.packageId, on: database, status: failure.status, stage: stage) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updatePackages
(note the plural) was only called from analysis. Moving this into theAnalyze
namespace (which I'll update toAnalysis
at some point in the future) makes this explicit.