Skip to content

Commit

Permalink
Use lock to limit concurrent connection opening in pool. (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Oct 31, 2023
1 parent 0e0cea3 commit c671d65
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions lib/src/pool/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import 'dart:async';

import 'package:collection/collection.dart';
import 'package:pool/pool.dart' as pool;
import 'package:postgres/src/v3/resolved_settings.dart';

import '../../postgres.dart';
import '../v3/connection.dart';
import '../v3/resolved_settings.dart';

EndpointSelector roundRobinSelector(List<Endpoint> endpoints) {
int nextIndex = 0;
Expand All @@ -26,6 +26,10 @@ class PoolImplementation<L> implements Pool<L> {
_maxConnectionCount,
timeout: _settings.connectTimeout,
);
late final _connectLock = pool.Pool(
1,
timeout: _settings.connectTimeout,
);

PoolImplementation(this._selector, PoolSettings? settings)
: _settings = ResolvedPoolSettings(settings);
Expand Down Expand Up @@ -173,32 +177,34 @@ class PoolImplementation<L> implements Pool<L> {
return oldc;
}

while (_connections.length == _maxConnectionCount) {
final candidates =
_connections.where((c) => c._isInUse == false).toList();
if (candidates.isEmpty) {
throw StateError('The pool should not be in this state.');
return await _connectLock.withResource(() async {
while (_connections.length >= _maxConnectionCount) {
final candidates =
_connections.where((c) => c._isInUse == false).toList();
if (candidates.isEmpty) {
throw StateError('The pool should not be in this state.');
}
final selected = candidates.reduce(
(a, b) => a._lastReturned.isBefore(b._lastReturned) ? a : b);
await selected._dispose();
}
final selected = candidates
.reduce((a, b) => a._lastReturned.isBefore(b._lastReturned) ? a : b);
await selected._dispose();
}

final newc = _PoolConnection(
this,
endpoint,
settings,
await PgConnectionImplementation.connect(
final newc = _PoolConnection(
this,
endpoint,
connectionSettings: settings,
),
);
newc._isInUse = true;
// NOTE: It is important to update _connections list after the isInUse
// flag is set, otherwise race conditions may create conflicts or
// pool close may miss the connection.
_connections.add(newc);
return newc;
settings,
await PgConnectionImplementation.connect(
endpoint,
connectionSettings: settings,
),
);
newc._isInUse = true;
// NOTE: It is important to update _connections list after the isInUse
// flag is set, otherwise race conditions may create conflicts or
// pool close may miss the connection.
_connections.add(newc);
return newc;
});
}
}

Expand Down

0 comments on commit c671d65

Please sign in to comment.