forked from libp2p/js-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: crash during DHT query abort when reading is slow (libp2p#2225)
If a DHT query is aborted during reading, the deferred promise can become rejected while nothing is `await`ing it. Switch the implementation to use a `pushable` queue instead. Fixes libp2p#2216
- Loading branch information
1 parent
effcfaa
commit c960eb6
Showing
4 changed files
with
146 additions
and
71 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import { CodeError } from '@libp2p/interface/errors' | ||
import { pushable } from 'it-pushable' | ||
import type { CleanUpEvents } from './manager.js' | ||
import type { QueryEvent } from '../index.js' | ||
import type { TypedEventTarget } from '@libp2p/interface/events' | ||
import type { Logger } from '@libp2p/logger' | ||
import type Queue from 'p-queue' | ||
|
||
export async function * queueToGenerator (queue: Queue, signal: AbortSignal, cleanUp: TypedEventTarget<CleanUpEvents>, log: Logger): AsyncGenerator<QueryEvent, void, undefined> { | ||
const stream = pushable<QueryEvent>({ | ||
objectMode: true | ||
}) | ||
|
||
const cleanup = (err?: Error): void => { | ||
log('clean up queue, results %d, queue size %d, pending tasks %d', stream.readableLength, queue.size, queue.pending) | ||
queue.clear() | ||
stream.end(err) | ||
} | ||
|
||
const onQueueJobComplete = (result: QueryEvent): void => { | ||
if (result != null) { | ||
stream.push(result) | ||
} | ||
} | ||
|
||
const onQueueError = (err: Error): void => { | ||
log('queue error', err) | ||
cleanup(err) | ||
} | ||
|
||
const onQueueIdle = (): void => { | ||
log('queue idle') | ||
cleanup() | ||
} | ||
|
||
// clear the queue and throw if the query is aborted | ||
const onSignalAbort = (): void => { | ||
log('abort queue') | ||
cleanup(new CodeError('Query aborted', 'ERR_QUERY_ABORTED')) | ||
} | ||
|
||
// the user broke out of the loop early, ensure we resolve the deferred result | ||
// promise and clear the queue of any remaining jobs | ||
const onCleanUp = (): void => { | ||
cleanup() | ||
} | ||
|
||
// add listeners | ||
queue.on('completed', onQueueJobComplete) | ||
queue.on('error', onQueueError) | ||
queue.on('idle', onQueueIdle) | ||
signal.addEventListener('abort', onSignalAbort) | ||
cleanUp.addEventListener('cleanup', onCleanUp) | ||
|
||
try { | ||
yield * stream | ||
} finally { | ||
// remove listeners | ||
queue.removeListener('completed', onQueueJobComplete) | ||
queue.removeListener('error', onQueueError) | ||
queue.removeListener('idle', onQueueIdle) | ||
signal.removeEventListener('abort', onSignalAbort) | ||
cleanUp.removeEventListener('cleanup', onCleanUp) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import { TypedEventEmitter } from '@libp2p/interface/events' | ||
import { logger } from '@libp2p/logger' | ||
import { expect } from 'aegir/chai' | ||
import delay from 'delay' | ||
import all from 'it-all' | ||
import Queue from 'p-queue' | ||
import Sinon from 'sinon' | ||
import { isNode } from 'wherearewe' | ||
import { queueToGenerator } from '../../src/query/utils.js' | ||
import type { CleanUpEvents } from '../../src/query/manager.js' | ||
|
||
describe('query utils', () => { | ||
describe('queue to generator', () => { | ||
it('converts a queue to a generator', async () => { | ||
const queue = new Queue() | ||
const controller = new AbortController() | ||
const signal = controller.signal | ||
const cleanUp = new TypedEventEmitter<CleanUpEvents>() | ||
const log = logger('test-logger') | ||
|
||
void queue.add(async () => { | ||
await delay(10) | ||
return true | ||
}) | ||
|
||
const results = await all(queueToGenerator(queue, signal, cleanUp, log)) | ||
|
||
expect(results).to.deep.equal([true]) | ||
}) | ||
|
||
it('aborts during read', async () => { | ||
const listener = Sinon.stub() | ||
|
||
if (isNode) { | ||
process.on('unhandledRejection', listener) | ||
} | ||
|
||
const queue = new Queue({ | ||
concurrency: 1 | ||
}) | ||
const controller = new AbortController() | ||
const signal = controller.signal | ||
const cleanUp = new TypedEventEmitter<CleanUpEvents>() | ||
const log = logger('test-logger') | ||
|
||
void queue.add(async () => { | ||
await delay(10) | ||
return 1 | ||
}) | ||
void queue.add(async () => { | ||
await delay(10) | ||
return 2 | ||
}) | ||
|
||
let count = 1 | ||
|
||
await expect((async () => { | ||
for await (const result of queueToGenerator(queue, signal, cleanUp, log) as any) { | ||
expect(result).to.equal(count) | ||
count++ | ||
|
||
// get the first result | ||
if (result === 1) { | ||
// abort the queue | ||
controller.abort() | ||
} | ||
} | ||
})()).to.eventually.be.rejected | ||
.with.property('code', 'ERR_QUERY_ABORTED') | ||
|
||
if (isNode) { | ||
process.removeListener('unhandledRejection', listener) | ||
expect(listener.called).to.be.false('unhandled promise rejection detected') | ||
} | ||
}) | ||
}) | ||
}) |