diff --git a/.github/ISSUE_TEMPLATE/pool.md b/.github/ISSUE_TEMPLATE/pool.md new file mode 100644 index 000000000..7af32c4a6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/pool.md @@ -0,0 +1,5 @@ +--- +name: "package:pool" +about: "Create a bug or file a feature request against package:pool." +labels: "package:pool" +--- \ No newline at end of file diff --git a/.github/labeler.yml b/.github/labeler.yml index 39d78aa01..c4d658fba 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -84,6 +84,10 @@ - changed-files: - any-glob-to-any-file: 'pkgs/package_config/**' +'package:pool': + - changed-files: + - any-glob-to-any-file: 'pkgs/pool/**' + 'package:source_map_stack_trace': - changed-files: - any-glob-to-any-file: 'pkgs/source_map_stack_trace/**' diff --git a/.github/workflows/pool.yaml b/.github/workflows/pool.yaml new file mode 100644 index 000000000..6d64062a8 --- /dev/null +++ b/.github/workflows/pool.yaml @@ -0,0 +1,78 @@ +name: package:pool + +on: + # Run on PRs and pushes to the default branch. + push: + branches: [ main ] + paths: + - '.github/workflows/pool.yaml' + - 'pkgs/pool/**' + pull_request: + branches: [ main ] + paths: + - '.github/workflows/pool.yaml' + - 'pkgs/pool/**' + schedule: + - cron: "0 0 * * 0" + +env: + PUB_ENVIRONMENT: bot.github + + +defaults: + run: + working-directory: pkgs/pool/ + +jobs: + # Check code formatting and static analysis on a single OS (linux) + # against Dart dev. + analyze: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + sdk: [dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Check formatting + run: dart format --output=none --set-exit-if-changed . + if: always() && steps.install.outcome == 'success' + - name: Analyze code + run: dart analyze --fatal-infos + if: always() && steps.install.outcome == 'success' + + # Run tests on a matrix consisting of two dimensions: + # 1. OS: ubuntu-latest, (macos-latest, windows-latest) + # 2. release channel: dev + test: + needs: analyze + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + # Add macos-latest and/or windows-latest if relevant for this package. + os: [ubuntu-latest] + sdk: [3.4, dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Run VM tests + run: dart test --platform vm + if: always() && steps.install.outcome == 'success' + - name: Run Chrome tests + run: dart test --platform chrome + if: always() && steps.install.outcome == 'success' + - name: Run Chrome tests - wasm + run: dart test --platform chrome -c dart2wasm + if: always() && steps.install.outcome == 'success' && matrix.sdk == 'dev' diff --git a/README.md b/README.md index 0acab2c58..0b97b210f 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ don't naturally belong to other topic monorepos (like | [mime](pkgs/mime/) | Utilities for handling media (MIME) types, including determining a type from a file extension and file contents. | [![package issues](https://img.shields.io/badge/package:mime-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Amime) | [![pub package](https://img.shields.io/pub/v/mime.svg)](https://pub.dev/packages/mime) | | [oauth2](pkgs/oauth2/) | A client library for authenticating with a remote service via OAuth2 on behalf of a user, and making authorized HTTP requests with the user's OAuth2 credentials. | [![package issues](https://img.shields.io/badge/package:oauth2-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aoauth2) | [![pub package](https://img.shields.io/pub/v/oauth2.svg)](https://pub.dev/packages/oauth2) | | [package_config](pkgs/package_config/) | Support for reading and writing Dart Package Configuration files. | [![package issues](https://img.shields.io/badge/package:package_config-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apackage_config) | [![pub package](https://img.shields.io/pub/v/package_config.svg)](https://pub.dev/packages/package_config) | +| [pool](pkgs/pool/) | Manage a finite pool of resources. Useful for controlling concurrent file system or network requests. | [![package issues](https://img.shields.io/badge/package:pool-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apool) | [![pub package](https://img.shields.io/pub/v/pool.svg)](https://pub.dev/packages/pool) | | [source_map_stack_trace](pkgs/source_map_stack_trace/) | A package for applying source maps to stack traces. | [![package issues](https://img.shields.io/badge/package:source_map_stack_trace-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_map_stack_trace) | [![pub package](https://img.shields.io/pub/v/source_map_stack_trace.svg)](https://pub.dev/packages/source_map_stack_trace) | | [unified_analytics](pkgs/unified_analytics/) | A package for logging analytics for all Dart and Flutter related tooling to Google Analytics. | [![package issues](https://img.shields.io/badge/package:unified_analytics-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aunified_analytics) | [![pub package](https://img.shields.io/pub/v/unified_analytics.svg)](https://pub.dev/packages/unified_analytics) | diff --git a/pkgs/pool/.gitignore b/pkgs/pool/.gitignore new file mode 100644 index 000000000..e450c836d --- /dev/null +++ b/pkgs/pool/.gitignore @@ -0,0 +1,5 @@ +# Don’t commit the following directories created by pub. +.dart_tool/ +.packages +.pub/ +pubspec.lock diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md new file mode 100644 index 000000000..56424fc6f --- /dev/null +++ b/pkgs/pool/CHANGELOG.md @@ -0,0 +1,105 @@ +## 1.5.2-wip + +* Require Dart 3.4. +* Move to `dart-lang/tools` monorepo. + +## 1.5.1 + +* Populate the pubspec `repository` field. + +## 1.5.0 + +* Stable release for null safety. + +## 1.5.0-nullsafety.3 + +* Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release + guidelines. + +## 1.5.0-nullsafety.2 + +* Allow prerelease versions of the 2.12 sdk. + +## 1.5.0-nullsafety.1 + +* Allow 2.10 stable and 2.11.0 dev SDK versions. + +## 1.5.0-nullsafety + +* Migrate to null safety. +* `forEach`: Avoid `await null` if the `Stream` is not paused. + Improves trivial benchmark by 40%. + +## 1.4.0 + +* Add `forEach` to `Pool` to support efficient async processing of an + `Iterable`. + +* Throw ArgumentError if poolSize <= 0 + +## 1.3.6 + +* Set max SDK version to `<3.0.0`, and adjust other dependencies. + +## 1.3.5 + +- Updated SDK version to 2.0.0-dev.17.0 + +## 1.3.4 + +* Modify code to eliminate Future flattening. + +## 1.3.3 + +* Declare support for `async` 2.0.0. + +## 1.3.2 + +* Update to make the code work with strong-mode clean Zone API. + +* Required minimum SDK of 1.23.0. + +## 1.3.1 + +* Fix the type annotation of `Pool.withResource()` to indicate that it takes + `() -> FutureOr`. + +## 1.3.0 + +* Add a `Pool.done` getter that returns the same future returned by + `Pool.close()`. + +## 1.2.4 + +* Fix a strong-mode error. + +## 1.2.3 + +* Fix a bug in which `Pool.withResource()` could throw a `StateError` when + called immediately before closing the pool. + +## 1.2.2 + +* Fix strong mode warnings and add generic method annotations. + +## 1.2.1 + +* Internal changes only. + +## 1.2.0 + +* Add `Pool.close()`, which forbids new resource requests and releases all + releasable resources. + +## 1.1.0 + +* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it + can be released without forcing it to deallocate immediately. + +## 1.0.2 + +* Fixed the homepage. + +## 1.0.1 + +* A `TimeoutException` is now correctly thrown if the pool detects a deadlock. diff --git a/pkgs/pool/LICENSE b/pkgs/pool/LICENSE new file mode 100644 index 000000000..000cd7bec --- /dev/null +++ b/pkgs/pool/LICENSE @@ -0,0 +1,27 @@ +Copyright 2014, the Dart project authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + * Neither the name of Google LLC nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pkgs/pool/README.md b/pkgs/pool/README.md new file mode 100644 index 000000000..461e872b8 --- /dev/null +++ b/pkgs/pool/README.md @@ -0,0 +1,57 @@ +[![Build Status](https://github.com/dart-lang/tools/actions/workflows/pool.yaml/badge.svg)](https://github.com/dart-lang/tools/actions/workflows/pool.yaml) +[![pub package](https://img.shields.io/pub/v/pool.svg)](https://pub.dev/packages/pool) +[![package publisher](https://img.shields.io/pub/publisher/pool.svg)](https://pub.dev/packages/pool/publisher) + +The pool package exposes a `Pool` class which makes it easy to manage a limited +pool of resources. + +The easiest way to use a pool is by calling `withResource`. This runs a callback +and returns its result, but only once there aren't too many other callbacks +currently running. + +```dart +// Create a Pool that will only allocate 10 resources at once. After 30 seconds +// of inactivity with all resources checked out, the pool will throw an error. +final pool = new Pool(10, timeout: new Duration(seconds: 30)); + +Future readFile(String path) { + // Since the call to [File.readAsString] is within [withResource], no more + // than ten files will be open at once. + return pool.withResource(() => new File(path).readAsString()); +} +``` + +For more fine-grained control, the user can also explicitly request generic +`PoolResource` objects that can later be released back into the pool. This is +what `withResource` does under the covers: requests a resource, then releases it +once the callback completes. + +`Pool` ensures that only a limited number of resources are allocated at once. +It's the caller's responsibility to ensure that the corresponding physical +resource is only consumed when a `PoolResource` is allocated. + +```dart +class PooledFile implements RandomAccessFile { + final RandomAccessFile _file; + final PoolResource _resource; + + static Future open(String path) { + return pool.request().then((resource) { + return new File(path).open().then((file) { + return new PooledFile._(file, resource); + }); + }); + } + + PooledFile(this._file, this._resource); + + // ... + + Future close() { + return _file.close.then((_) { + _resource.release(); + return this; + }); + } +} +``` diff --git a/pkgs/pool/analysis_options.yaml b/pkgs/pool/analysis_options.yaml new file mode 100644 index 000000000..44cda4da2 --- /dev/null +++ b/pkgs/pool/analysis_options.yaml @@ -0,0 +1,5 @@ +include: package:dart_flutter_team_lints/analysis_options.yaml + +analyzer: + language: + strict-casts: true diff --git a/pkgs/pool/benchmark/for_each_benchmark.dart b/pkgs/pool/benchmark/for_each_benchmark.dart new file mode 100644 index 000000000..0cd2543e2 --- /dev/null +++ b/pkgs/pool/benchmark/for_each_benchmark.dart @@ -0,0 +1,55 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:pool/pool.dart'; + +void main(List args) async { + var poolSize = args.isEmpty ? 5 : int.parse(args.first); + print('Pool size: $poolSize'); + + final pool = Pool(poolSize); + final watch = Stopwatch()..start(); + final start = DateTime.now(); + + DateTime? lastLog; + Duration? fastest; + late int fastestIteration; + var i = 1; + + void log(bool force) { + var now = DateTime.now(); + if (force || + lastLog == null || + now.difference(lastLog!) > const Duration(seconds: 1)) { + lastLog = now; + print([ + now.difference(start), + i.toString().padLeft(10), + fastestIteration.toString().padLeft(7), + fastest!.inMicroseconds.toString().padLeft(9) + ].join(' ')); + } + } + + print(['Elapsed ', 'Iterations', 'Fastest', 'Time (us)'].join(' ')); + + for (;; i++) { + watch.reset(); + + var sum = await pool + .forEach(Iterable.generate(100000), (i) => i) + .reduce((a, b) => a + b); + + assert(sum == 4999950000, 'was $sum'); + + var elapsed = watch.elapsed; + if (fastest == null || fastest > elapsed) { + fastest = elapsed; + fastestIteration = i; + log(true); + } else { + log(false); + } + } +} diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart new file mode 100644 index 000000000..70e9df158 --- /dev/null +++ b/pkgs/pool/lib/pool.dart @@ -0,0 +1,380 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:collection'; + +import 'package:async/async.dart'; +import 'package:stack_trace/stack_trace.dart'; + +/// Manages an abstract pool of resources with a limit on how many may be in use +/// at once. +/// +/// When a resource is needed, the user should call [request]. When the returned +/// future completes with a [PoolResource], the resource may be allocated. Once +/// the resource has been released, the user should call [PoolResource.release]. +/// The pool will ensure that only a certain number of [PoolResource]s may be +/// allocated at once. +class Pool { + /// Completers for requests beyond the first [_maxAllocatedResources]. + /// + /// When an item is released, the next element of [_requestedResources] will + /// be completed. + final _requestedResources = Queue>(); + + /// Callbacks that must be called before additional resources can be + /// allocated. + /// + /// See [PoolResource.allowRelease]. + final _onReleaseCallbacks = Queue(); + + /// Completers that will be completed once `onRelease` callbacks are done + /// running. + /// + /// These are kept in a queue to ensure that the earliest request completes + /// first regardless of what order the `onRelease` callbacks complete in. + final _onReleaseCompleters = Queue>(); + + /// The maximum number of resources that may be allocated at once. + final int _maxAllocatedResources; + + /// The number of resources that are currently allocated. + int _allocatedResources = 0; + + /// The timeout timer. + /// + /// This timer is canceled as long as the pool is below the resource limit. + /// It's reset once the resource limit is reached and again every time an + /// resource is released or a new resource is requested. If it fires, that + /// indicates that the caller became deadlocked, likely due to files waiting + /// for additional files to be read before they could be closed. + /// + /// This is `null` if this pool shouldn't time out. + RestartableTimer? _timer; + + /// The amount of time to wait before timing out the pending resources. + final Duration? _timeout; + + /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources + /// that have been marked releasable. + /// + /// This is `null` until [close] is called. + FutureGroup? _closeGroup; + + /// Whether [close] has been called. + bool get isClosed => _closeMemo.hasRun; + + /// A future that completes once the pool is closed and all its outstanding + /// resources have been released. + /// + /// If any [PoolResource.allowRelease] callback throws an exception after the + /// pool is closed, this completes with that exception. + Future get done => _closeMemo.future; + + /// Creates a new pool with the given limit on how many resources may be + /// allocated at once. + /// + /// If [timeout] is passed, then if that much time passes without any activity + /// all pending [request] futures will throw a [TimeoutException]. This is + /// intended to avoid deadlocks. + Pool(this._maxAllocatedResources, {Duration? timeout}) : _timeout = timeout { + if (_maxAllocatedResources <= 0) { + throw ArgumentError.value(_maxAllocatedResources, 'maxAllocatedResources', + 'Must be greater than zero.'); + } + + if (timeout != null) { + // Start the timer canceled since we only want to start counting down once + // we've run out of available resources. + _timer = RestartableTimer(timeout, _onTimeout)..cancel(); + } + } + + /// Request a [PoolResource]. + /// + /// If the maximum number of resources is already allocated, this will delay + /// until one of them is released. + Future request() { + if (isClosed) { + throw StateError('request() may not be called on a closed Pool.'); + } + + if (_allocatedResources < _maxAllocatedResources) { + _allocatedResources++; + return Future.value(PoolResource._(this)); + } else if (_onReleaseCallbacks.isNotEmpty) { + return _runOnRelease(_onReleaseCallbacks.removeFirst()); + } else { + var completer = Completer(); + _requestedResources.add(completer); + _resetTimer(); + return completer.future; + } + } + + /// Requests a resource for the duration of [callback], which may return a + /// Future. + /// + /// The return value of [callback] is piped to the returned Future. + Future withResource(FutureOr Function() callback) async { + if (isClosed) { + throw StateError('withResource() may not be called on a closed Pool.'); + } + + var resource = await request(); + try { + return await callback(); + } finally { + resource.release(); + } + } + + /// Returns a [Stream] containing the result of [action] applied to each + /// element of [elements]. + /// + /// While [action] is invoked on each element of [elements] in order, + /// it's possible the return [Stream] may have items out-of-order – especially + /// if the completion time of [action] varies. + /// + /// If [action] throws an error the source item along with the error object + /// and [StackTrace] are passed to [onError], if it is provided. If [onError] + /// returns `true`, the error is added to the returned [Stream], otherwise + /// it is ignored. + /// + /// Errors thrown from iterating [elements] will not be passed to + /// [onError]. They will always be added to the returned stream as an error. + /// + /// Note: all of the resources of the this [Pool] will be used when the + /// returned [Stream] is listened to until it is completed or canceled. + /// + /// Note: if this [Pool] is closed before the returned [Stream] is listened + /// to, a [StateError] is thrown. + Stream forEach( + Iterable elements, FutureOr Function(S source) action, + {bool Function(S item, Object error, StackTrace stack)? onError}) { + onError ??= (item, e, s) => true; + + var cancelPending = false; + + Completer? resumeCompleter; + late StreamController controller; + + late Iterator iterator; + + Future run(int _) async { + while (iterator.moveNext()) { + // caching `current` is necessary because there are async breaks + // in this code and `iterator` is shared across many workers + final current = iterator.current; + + _resetTimer(); + + if (resumeCompleter != null) { + await resumeCompleter!.future; + } + + if (cancelPending) { + break; + } + + T value; + try { + value = await action(current); + } catch (e, stack) { + if (onError!(current, e, stack)) { + controller.addError(e, stack); + } + continue; + } + controller.add(value); + } + } + + Future? doneFuture; + + void onListen() { + iterator = elements.iterator; + + assert(doneFuture == null); + var futures = Iterable>.generate( + _maxAllocatedResources, (i) => withResource(() => run(i))); + doneFuture = Future.wait(futures, eagerError: true) + .then((_) {}) + .catchError(controller.addError); + + doneFuture!.whenComplete(controller.close); + } + + controller = StreamController( + sync: true, + onListen: onListen, + onCancel: () async { + assert(!cancelPending); + cancelPending = true; + await doneFuture; + }, + onPause: () { + assert(resumeCompleter == null); + resumeCompleter = Completer(); + }, + onResume: () { + assert(resumeCompleter != null); + resumeCompleter!.complete(); + resumeCompleter = null; + }, + ); + + return controller.stream; + } + + /// Closes the pool so that no more resources are requested. + /// + /// Existing resource requests remain unchanged. + /// + /// Any resources that are marked as releasable using + /// [PoolResource.allowRelease] are released immediately. Once all resources + /// have been released and any `onRelease` callbacks have completed, the + /// returned future completes successfully. If any `onRelease` callback throws + /// an error, the returned future completes with that error. + /// + /// This may be called more than once; it returns the same [Future] each time. + Future close() => _closeMemo.runOnce(_close); + + Future _close() { + if (_closeGroup != null) return _closeGroup!.future; + + _resetTimer(); + + _closeGroup = FutureGroup(); + for (var callback in _onReleaseCallbacks) { + _closeGroup!.add(Future.sync(callback)); + } + + _allocatedResources -= _onReleaseCallbacks.length; + _onReleaseCallbacks.clear(); + + if (_allocatedResources == 0) _closeGroup!.close(); + return _closeGroup!.future; + } + + final _closeMemo = AsyncMemoizer(); + + /// If there are any pending requests, this will fire the oldest one. + void _onResourceReleased() { + _resetTimer(); + + if (_requestedResources.isNotEmpty) { + var pending = _requestedResources.removeFirst(); + pending.complete(PoolResource._(this)); + } else { + _allocatedResources--; + if (isClosed && _allocatedResources == 0) _closeGroup!.close(); + } + } + + /// If there are any pending requests, this will fire the oldest one after + /// running [onRelease]. + void _onResourceReleaseAllowed(void Function() onRelease) { + _resetTimer(); + + if (_requestedResources.isNotEmpty) { + var pending = _requestedResources.removeFirst(); + pending.complete(_runOnRelease(onRelease)); + } else if (isClosed) { + _closeGroup!.add(Future.sync(onRelease)); + _allocatedResources--; + if (_allocatedResources == 0) _closeGroup!.close(); + } else { + var zone = Zone.current; + var registered = zone.registerCallback(onRelease); + _onReleaseCallbacks.add(() => zone.run(registered)); + } + } + + /// Runs [onRelease] and returns a Future that completes to a resource once an + /// [onRelease] callback completes. + /// + /// Futures returned by [_runOnRelease] always complete in the order they were + /// created, even if earlier [onRelease] callbacks take longer to run. + Future _runOnRelease(void Function() onRelease) { + Future.sync(onRelease).then((value) { + _onReleaseCompleters.removeFirst().complete(PoolResource._(this)); + }).catchError((Object error, StackTrace stackTrace) { + _onReleaseCompleters.removeFirst().completeError(error, stackTrace); + }); + + var completer = Completer.sync(); + _onReleaseCompleters.add(completer); + return completer.future; + } + + /// A resource has been requested, allocated, or released. + void _resetTimer() { + if (_timer == null) return; + + if (_requestedResources.isEmpty) { + _timer!.cancel(); + } else { + _timer!.reset(); + } + } + + /// Handles [_timer] timing out by causing all pending resource completers to + /// emit exceptions. + void _onTimeout() { + for (var completer in _requestedResources) { + completer.completeError( + TimeoutException( + 'Pool deadlock: all resources have been ' + 'allocated for too long.', + _timeout), + Chain.current()); + } + _requestedResources.clear(); + _timer = null; + } +} + +/// A member of a [Pool]. +/// +/// A [PoolResource] is a token that indicates that a resource is allocated. +/// When the associated resource is released, the user should call [release]. +class PoolResource { + final Pool _pool; + + /// Whether `this` has been released yet. + bool _released = false; + + PoolResource._(this._pool); + + /// Tells the parent [Pool] that the resource associated with this resource is + /// no longer allocated, and that a new [PoolResource] may be allocated. + void release() { + if (_released) { + throw StateError('A PoolResource may only be released once.'); + } + _released = true; + _pool._onResourceReleased(); + } + + /// Tells the parent [Pool] that the resource associated with this resource is + /// no longer necessary, but should remain allocated until more resources are + /// needed. + /// + /// When [Pool.request] is called and there are no remaining available + /// resources, the [onRelease] callback is called. It should free the + /// resource, and it may return a Future or `null`. Once that completes, the + /// [Pool.request] call will complete to a new [PoolResource]. + /// + /// This is useful when a resource's main function is complete, but it may + /// produce additional information later on. For example, an isolate's task + /// may be complete, but it could still emit asynchronous errors. + void allowRelease(FutureOr Function() onRelease) { + if (_released) { + throw StateError('A PoolResource may only be released once.'); + } + _released = true; + _pool._onResourceReleaseAllowed(onRelease); + } +} diff --git a/pkgs/pool/pubspec.yaml b/pkgs/pool/pubspec.yaml new file mode 100644 index 000000000..a205b7494 --- /dev/null +++ b/pkgs/pool/pubspec.yaml @@ -0,0 +1,18 @@ +name: pool +version: 1.5.2-wip +description: >- + Manage a finite pool of resources. + Useful for controlling concurrent file system or network requests. +repository: https://github.com/dart-lang/tools/tree/main/pkgs/pool + +environment: + sdk: ^3.4.0 + +dependencies: + async: ^2.5.0 + stack_trace: ^1.10.0 + +dev_dependencies: + dart_flutter_team_lints: ^3.0.0 + fake_async: ^1.2.0 + test: ^1.16.6 diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart new file mode 100644 index 000000000..6334a8abd --- /dev/null +++ b/pkgs/pool/test/pool_test.dart @@ -0,0 +1,745 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:pool/pool.dart'; +import 'package:stack_trace/stack_trace.dart'; +import 'package:test/test.dart'; + +void main() { + group('request()', () { + test('resources can be requested freely up to the limit', () { + var pool = Pool(50); + for (var i = 0; i < 50; i++) { + expect(pool.request(), completes); + } + }); + + test('resources block past the limit', () { + FakeAsync().run((async) { + var pool = Pool(50); + for (var i = 0; i < 50; i++) { + expect(pool.request(), completes); + } + expect(pool.request(), doesNotComplete); + + async.elapse(const Duration(seconds: 1)); + }); + }); + + test('a blocked resource is allocated when another is released', () { + FakeAsync().run((async) { + var pool = Pool(50); + for (var i = 0; i < 49; i++) { + expect(pool.request(), completes); + } + + pool.request().then((lastAllocatedResource) { + // This will only complete once [lastAllocatedResource] is released. + expect(pool.request(), completes); + + Future.delayed(const Duration(microseconds: 1)).then((_) { + lastAllocatedResource.release(); + }); + }); + + async.elapse(const Duration(seconds: 1)); + }); + }); + }); + + group('withResource()', () { + test('can be called freely up to the limit', () { + var pool = Pool(50); + for (var i = 0; i < 50; i++) { + pool.withResource(expectAsync0(() => Completer().future)); + } + }); + + test('blocks the callback past the limit', () { + FakeAsync().run((async) { + var pool = Pool(50); + for (var i = 0; i < 50; i++) { + pool.withResource(expectAsync0(() => Completer().future)); + } + pool.withResource(expectNoAsync()); + + async.elapse(const Duration(seconds: 1)); + }); + }); + + test('a blocked resource is allocated when another is released', () { + FakeAsync().run((async) { + var pool = Pool(50); + for (var i = 0; i < 49; i++) { + pool.withResource(expectAsync0(() => Completer().future)); + } + + var completer = Completer(); + pool.withResource(() => completer.future); + var blockedResourceAllocated = false; + pool.withResource(() { + blockedResourceAllocated = true; + }); + + Future.delayed(const Duration(microseconds: 1)).then((_) { + expect(blockedResourceAllocated, isFalse); + completer.complete(); + return Future.delayed(const Duration(microseconds: 1)); + }).then((_) { + expect(blockedResourceAllocated, isTrue); + }); + + async.elapse(const Duration(seconds: 1)); + }); + }); + + // Regression test for #3. + test('can be called immediately before close()', () async { + var pool = Pool(1); + unawaited(pool.withResource(expectAsync0(() {}))); + await pool.close(); + }); + }); + + group('with a timeout', () { + test("doesn't time out if there are no pending requests", () { + FakeAsync().run((async) { + var pool = Pool(50, timeout: const Duration(seconds: 5)); + for (var i = 0; i < 50; i++) { + expect(pool.request(), completes); + } + + async.elapse(const Duration(seconds: 6)); + }); + }); + + test('resets the timer if a resource is returned', () { + FakeAsync().run((async) { + var pool = Pool(50, timeout: const Duration(seconds: 5)); + for (var i = 0; i < 49; i++) { + expect(pool.request(), completes); + } + + pool.request().then((lastAllocatedResource) { + // This will only complete once [lastAllocatedResource] is released. + expect(pool.request(), completes); + + Future.delayed(const Duration(seconds: 3)).then((_) { + lastAllocatedResource.release(); + expect(pool.request(), doesNotComplete); + }); + }); + + async.elapse(const Duration(seconds: 6)); + }); + }); + + test('resets the timer if a resource is requested', () { + FakeAsync().run((async) { + var pool = Pool(50, timeout: const Duration(seconds: 5)); + for (var i = 0; i < 50; i++) { + expect(pool.request(), completes); + } + expect(pool.request(), doesNotComplete); + + Future.delayed(const Duration(seconds: 3)).then((_) { + expect(pool.request(), doesNotComplete); + }); + + async.elapse(const Duration(seconds: 6)); + }); + }); + + test('times out if nothing happens', () { + FakeAsync().run((async) { + var pool = Pool(50, timeout: const Duration(seconds: 5)); + for (var i = 0; i < 50; i++) { + expect(pool.request(), completes); + } + expect(pool.request(), throwsA(const TypeMatcher())); + + async.elapse(const Duration(seconds: 6)); + }); + }); + }); + + group('allowRelease()', () { + test('runs the callback once the resource limit is exceeded', () async { + var pool = Pool(50); + for (var i = 0; i < 49; i++) { + expect(pool.request(), completes); + } + + var resource = await pool.request(); + var onReleaseCalled = false; + resource.allowRelease(() => onReleaseCalled = true); + await Future.delayed(Duration.zero); + expect(onReleaseCalled, isFalse); + + expect(pool.request(), completes); + await Future.delayed(Duration.zero); + expect(onReleaseCalled, isTrue); + }); + + test('runs the callback immediately if there are blocked requests', + () async { + var pool = Pool(1); + var resource = await pool.request(); + + // This will be blocked until [resource.allowRelease] is called. + expect(pool.request(), completes); + + var onReleaseCalled = false; + resource.allowRelease(() => onReleaseCalled = true); + await Future.delayed(Duration.zero); + expect(onReleaseCalled, isTrue); + }); + + test('blocks the request until the callback completes', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var requestComplete = false; + unawaited(pool.request().then((_) => requestComplete = true)); + + var completer = Completer(); + resource.allowRelease(() => completer.future); + await Future.delayed(Duration.zero); + expect(requestComplete, isFalse); + + completer.complete(); + await Future.delayed(Duration.zero); + expect(requestComplete, isTrue); + }); + + test('completes requests in request order regardless of callback order', + () async { + var pool = Pool(2); + var resource1 = await pool.request(); + var resource2 = await pool.request(); + + var request1Complete = false; + unawaited(pool.request().then((_) => request1Complete = true)); + var request2Complete = false; + unawaited(pool.request().then((_) => request2Complete = true)); + + var onRelease1Called = false; + var completer1 = Completer(); + resource1.allowRelease(() { + onRelease1Called = true; + return completer1.future; + }); + await Future.delayed(Duration.zero); + expect(onRelease1Called, isTrue); + + var onRelease2Called = false; + var completer2 = Completer(); + resource2.allowRelease(() { + onRelease2Called = true; + return completer2.future; + }); + await Future.delayed(Duration.zero); + expect(onRelease2Called, isTrue); + expect(request1Complete, isFalse); + expect(request2Complete, isFalse); + + // Complete the second resource's onRelease callback first. Even though it + // was triggered by the second blocking request, it should complete the + // first one to preserve ordering. + completer2.complete(); + await Future.delayed(Duration.zero); + expect(request1Complete, isTrue); + expect(request2Complete, isFalse); + + completer1.complete(); + await Future.delayed(Duration.zero); + expect(request1Complete, isTrue); + expect(request2Complete, isTrue); + }); + + test('runs onRequest in the zone it was created', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var outerZone = Zone.current; + runZoned(() { + var innerZone = Zone.current; + expect(innerZone, isNot(equals(outerZone))); + + resource.allowRelease(expectAsync0(() { + expect(Zone.current, equals(innerZone)); + })); + }); + + await pool.request(); + }); + }); + + test("done doesn't complete without close", () async { + var pool = Pool(1); + unawaited(pool.done.then(expectAsync1((_) {}, count: 0))); + + var resource = await pool.request(); + resource.release(); + + await Future.delayed(Duration.zero); + }); + + group('close()', () { + test('disallows request() and withResource()', () { + var pool = Pool(1)..close(); + expect(pool.request, throwsStateError); + expect(() => pool.withResource(() {}), throwsStateError); + }); + + test('pending requests are fulfilled', () async { + var pool = Pool(1); + var resource1 = await pool.request(); + expect( + pool.request().then((resource2) { + resource2.release(); + }), + completes); + expect(pool.done, completes); + expect(pool.close(), completes); + resource1.release(); + }); + + test('pending requests are fulfilled with allowRelease', () async { + var pool = Pool(1); + var resource1 = await pool.request(); + + var completer = Completer(); + expect( + pool.request().then((resource2) { + expect(completer.isCompleted, isTrue); + resource2.release(); + }), + completes); + expect(pool.close(), completes); + + resource1.allowRelease(() => completer.future); + await Future.delayed(Duration.zero); + + completer.complete(); + }); + + test("doesn't complete until all resources are released", () async { + var pool = Pool(2); + var resource1 = await pool.request(); + var resource2 = await pool.request(); + var resource3Future = pool.request(); + + var resource1Released = false; + var resource2Released = false; + var resource3Released = false; + expect( + pool.close().then((_) { + expect(resource1Released, isTrue); + expect(resource2Released, isTrue); + expect(resource3Released, isTrue); + }), + completes); + + resource1Released = true; + resource1.release(); + await Future.delayed(Duration.zero); + + resource2Released = true; + resource2.release(); + await Future.delayed(Duration.zero); + + var resource3 = await resource3Future; + resource3Released = true; + resource3.release(); + }); + + test('active onReleases complete as usual', () async { + var pool = Pool(1); + var resource = await pool.request(); + + // Set up an onRelease callback whose completion is controlled by + // [completer]. + var completer = Completer(); + resource.allowRelease(() => completer.future); + expect( + pool.request().then((_) { + expect(completer.isCompleted, isTrue); + }), + completes); + + await Future.delayed(Duration.zero); + unawaited(pool.close()); + + await Future.delayed(Duration.zero); + completer.complete(); + }); + + test('inactive onReleases fire', () async { + var pool = Pool(2); + var resource1 = await pool.request(); + var resource2 = await pool.request(); + + var completer1 = Completer(); + resource1.allowRelease(() => completer1.future); + var completer2 = Completer(); + resource2.allowRelease(() => completer2.future); + + expect( + pool.close().then((_) { + expect(completer1.isCompleted, isTrue); + expect(completer2.isCompleted, isTrue); + }), + completes); + + await Future.delayed(Duration.zero); + completer1.complete(); + + await Future.delayed(Duration.zero); + completer2.complete(); + }); + + test('new allowReleases fire immediately', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var completer = Completer(); + expect( + pool.close().then((_) { + expect(completer.isCompleted, isTrue); + }), + completes); + + await Future.delayed(Duration.zero); + resource.allowRelease(() => completer.future); + + await Future.delayed(Duration.zero); + completer.complete(); + }); + + test('an onRelease error is piped to the return value', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var completer = Completer(); + resource.allowRelease(() => completer.future); + + expect(pool.done, throwsA('oh no!')); + expect(pool.close(), throwsA('oh no!')); + + await Future.delayed(Duration.zero); + completer.completeError('oh no!'); + }); + }); + + group('forEach', () { + late Pool pool; + + tearDown(() async { + await pool.close(); + }); + + const delayedToStringDuration = Duration(milliseconds: 10); + + Future delayedToString(int i) => + Future.delayed(delayedToStringDuration, () => i.toString()); + + for (var itemCount in [0, 5]) { + for (var poolSize in [1, 5, 6]) { + test('poolSize: $poolSize, itemCount: $itemCount', () async { + pool = Pool(poolSize); + + var finishedItems = 0; + + await for (var item in pool.forEach( + Iterable.generate(itemCount, (i) { + expect(i, lessThanOrEqualTo(finishedItems + poolSize), + reason: 'the iterator should be called lazily'); + return i; + }), + delayedToString)) { + expect(int.parse(item), lessThan(itemCount)); + finishedItems++; + } + + expect(finishedItems, itemCount); + }); + } + } + + test('pool closed before listen', () async { + pool = Pool(2); + + var stream = pool.forEach(Iterable.generate(5), delayedToString); + + await pool.close(); + + expect(stream.toList(), throwsStateError); + }); + + test('completes even if the pool is partially used', () async { + pool = Pool(2); + + var resource = await pool.request(); + + var stream = pool.forEach([], delayedToString); + + expect(await stream.length, 0); + + resource.release(); + }); + + test('stream paused longer than timeout', () async { + pool = Pool(2, timeout: delayedToStringDuration); + + var resource = await pool.request(); + + var stream = pool.forEach( + Iterable.generate(100, (i) { + expect(i, lessThan(20), + reason: 'The timeout should happen ' + 'before the entire iterable is iterated.'); + return i; + }), (i) async { + await Future.delayed(Duration(milliseconds: i)); + return i; + }); + + await expectLater( + stream.toList, + throwsA(const TypeMatcher().having( + (te) => te.message, + 'message', + contains('Pool deadlock: ' + 'all resources have been allocated for too long.')))); + + resource.release(); + }); + + group('timing and timeout', () { + for (var poolSize in [2, 8, 64]) { + for (var otherTaskCount + in [0, 1, 7, 63].where((otc) => otc < poolSize)) { + test('poolSize: $poolSize, otherTaskCount: $otherTaskCount', + () async { + final itemCount = 128; + pool = Pool(poolSize, timeout: const Duration(milliseconds: 20)); + + var otherTasks = await Future.wait( + Iterable.generate(otherTaskCount) + .map((i) => pool.request())); + + try { + var finishedItems = 0; + + var watch = Stopwatch()..start(); + + await for (var item in pool.forEach( + Iterable.generate(itemCount, (i) { + expect(i, lessThanOrEqualTo(finishedItems + poolSize), + reason: 'the iterator should be called lazily'); + return i; + }), + delayedToString)) { + expect(int.parse(item), lessThan(itemCount)); + finishedItems++; + } + + expect(finishedItems, itemCount); + + final expectedElapsed = + delayedToStringDuration.inMicroseconds * 4; + + expect((watch.elapsed ~/ itemCount).inMicroseconds, + lessThan(expectedElapsed / (poolSize - otherTaskCount)), + reason: 'Average time per task should be ' + 'proportionate to the available pool resources.'); + } finally { + for (var task in otherTasks) { + task.release(); + } + } + }); + } + } + }, testOn: 'vm'); + + test('partial iteration', () async { + pool = Pool(5); + var stream = pool.forEach(Iterable.generate(100), delayedToString); + expect(await stream.take(10).toList(), hasLength(10)); + }); + + test('pool close during data with waiting to be done', () async { + pool = Pool(5); + + var stream = pool.forEach(Iterable.generate(100), delayedToString); + + var dataCount = 0; + var subscription = stream.listen((data) { + dataCount++; + pool.close(); + }); + + await subscription.asFuture(); + expect(dataCount, 100); + await subscription.cancel(); + }); + + test('pause and resume ', () async { + var generatedCount = 0; + var dataCount = 0; + final poolSize = 5; + + pool = Pool(poolSize); + + var stream = pool.forEach( + Iterable.generate(40, (i) { + expect(generatedCount, lessThanOrEqualTo(dataCount + 2 * poolSize), + reason: 'The iterator should not be called ' + 'much faster than the data is consumed.'); + generatedCount++; + return i; + }), + delayedToString); + + // ignore: cancel_subscriptions + late StreamSubscription subscription; + + subscription = stream.listen( + (data) { + dataCount++; + + if (int.parse(data) % 3 == 1) { + subscription.pause(Future(() async { + await Future.delayed(const Duration(milliseconds: 100)); + })); + } + }, + onError: registerException, + onDone: expectAsync0(() { + expect(dataCount, 40); + }), + ); + }); + + group('cancel', () { + final dataSize = 32; + for (var i = 1; i < 5; i++) { + test('with pool size $i', () async { + pool = Pool(i); + + var stream = + pool.forEach(Iterable.generate(dataSize), delayedToString); + + var cancelCompleter = Completer(); + + StreamSubscription subscription; + + var eventCount = 0; + subscription = stream.listen((data) { + eventCount++; + if (int.parse(data) == dataSize ~/ 2) { + cancelCompleter.complete(); + } + }, onError: registerException); + + await cancelCompleter.future; + + await subscription.cancel(); + + expect(eventCount, 1 + dataSize ~/ 2); + }); + } + }); + + group('errors', () { + Future errorInIterator({ + bool Function(int item, Object error, StackTrace stack)? onError, + }) async { + pool = Pool(20); + + var listFuture = pool + .forEach( + Iterable.generate(100, (i) { + if (i == 50) { + throw StateError('error while generating item in iterator'); + } + + return i; + }), + delayedToString, + onError: onError) + .toList(); + + await expectLater(() async => listFuture, throwsStateError); + } + + test('iteration, no onError', () async { + await errorInIterator(); + }); + test('iteration, with onError', () async { + await errorInIterator(onError: (i, e, s) => false); + }); + + test('error in action, no onError', () async { + pool = Pool(20); + + var listFuture = pool.forEach(Iterable.generate(100), (i) async { + await Future.delayed(const Duration(milliseconds: 10)); + if (i == 10) { + throw UnsupportedError('10 is not supported'); + } + return i.toString(); + }).toList(); + + await expectLater(() async => listFuture, throwsUnsupportedError); + }); + + test('error in action, no onError', () async { + pool = Pool(20); + + var list = await pool.forEach(Iterable.generate(100), + (int i) async { + await Future.delayed(const Duration(milliseconds: 10)); + if (i % 10 == 0) { + throw UnsupportedError('Multiples of 10 not supported'); + } + return i.toString(); + }, + onError: (item, error, stack) => + error is! UnsupportedError).toList(); + + expect(list, hasLength(90)); + }); + }); + }); + + test('throw error when pool limit <= 0', () { + expect(() => Pool(-1), throwsArgumentError); + expect(() => Pool(0), throwsArgumentError); + }); +} + +/// Returns a function that will cause the test to fail if it's called. +/// +/// This should only be called within a [FakeAsync.run] zone. +void Function() expectNoAsync() { + var stack = Trace.current(1); + return () => registerException( + TestFailure('Expected function not to be called.'), stack); +} + +/// A matcher for Futures that asserts that they don't complete. +/// +/// This should only be called within a [FakeAsync.run] zone. +Matcher get doesNotComplete => predicate((Future future) { + var stack = Trace.current(1); + future.then((_) => registerException( + TestFailure('Expected future not to complete.'), stack)); + return true; + });