From c671d6558652ae653528ca03ba0c69bac69bdc39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Tue, 31 Oct 2023 23:52:53 +0100 Subject: [PATCH] Use lock to limit concurrent connection opening in pool. (#218) --- lib/src/pool/pool_impl.dart | 54 ++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/lib/src/pool/pool_impl.dart b/lib/src/pool/pool_impl.dart index ab9a8eb7..2c8ec37b 100644 --- a/lib/src/pool/pool_impl.dart +++ b/lib/src/pool/pool_impl.dart @@ -2,10 +2,10 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:pool/pool.dart' as pool; -import 'package:postgres/src/v3/resolved_settings.dart'; import '../../postgres.dart'; import '../v3/connection.dart'; +import '../v3/resolved_settings.dart'; EndpointSelector roundRobinSelector(List endpoints) { int nextIndex = 0; @@ -26,6 +26,10 @@ class PoolImplementation implements Pool { _maxConnectionCount, timeout: _settings.connectTimeout, ); + late final _connectLock = pool.Pool( + 1, + timeout: _settings.connectTimeout, + ); PoolImplementation(this._selector, PoolSettings? settings) : _settings = ResolvedPoolSettings(settings); @@ -173,32 +177,34 @@ class PoolImplementation implements Pool { return oldc; } - while (_connections.length == _maxConnectionCount) { - final candidates = - _connections.where((c) => c._isInUse == false).toList(); - if (candidates.isEmpty) { - throw StateError('The pool should not be in this state.'); + return await _connectLock.withResource(() async { + while (_connections.length >= _maxConnectionCount) { + final candidates = + _connections.where((c) => c._isInUse == false).toList(); + if (candidates.isEmpty) { + throw StateError('The pool should not be in this state.'); + } + final selected = candidates.reduce( + (a, b) => a._lastReturned.isBefore(b._lastReturned) ? a : b); + await selected._dispose(); } - final selected = candidates - .reduce((a, b) => a._lastReturned.isBefore(b._lastReturned) ? a : b); - await selected._dispose(); - } - final newc = _PoolConnection( - this, - endpoint, - settings, - await PgConnectionImplementation.connect( + final newc = _PoolConnection( + this, endpoint, - connectionSettings: settings, - ), - ); - newc._isInUse = true; - // NOTE: It is important to update _connections list after the isInUse - // flag is set, otherwise race conditions may create conflicts or - // pool close may miss the connection. - _connections.add(newc); - return newc; + settings, + await PgConnectionImplementation.connect( + endpoint, + connectionSettings: settings, + ), + ); + newc._isInUse = true; + // NOTE: It is important to update _connections list after the isInUse + // flag is set, otherwise race conditions may create conflicts or + // pool close may miss the connection. + _connections.add(newc); + return newc; + }); } }