diff --git a/lib/src/pool/pool_impl.dart b/lib/src/pool/pool_impl.dart index 5be688b8..ab9a8eb7 100644 --- a/lib/src/pool/pool_impl.dart +++ b/lib/src/pool/pool_impl.dart @@ -212,7 +212,6 @@ class _PoolConnection implements Connection { Duration _elapsedInUse = Duration.zero; DateTime _lastReturned = DateTime.now(); bool _isInUse = false; - int _queryCount = 0; _PoolConnection( this._pool, this._endpoint, this._connectionSettings, this._connection); @@ -235,7 +234,7 @@ class _PoolConnection implements Connection { if (_elapsedInUse >= _pool._settings.maxSessionUse) { return true; } - if (_queryCount >= _pool._settings.maxQueryCount) { + if (_connection.queryCount >= _pool._settings.maxQueryCount) { return true; } return false; @@ -268,7 +267,6 @@ class _PoolConnection implements Connection { QueryMode? queryMode, Duration? timeout, }) { - _queryCount++; return _connection.execute( query, parameters: parameters, @@ -280,7 +278,6 @@ class _PoolConnection implements Connection { @override Future prepare(Object query) { - // TODO: increment query count on statement runs return _connection.prepare(query); } @@ -289,7 +286,6 @@ class _PoolConnection implements Connection { Future Function(Session session) fn, { SessionSettings? settings, }) { - // TODO: increment query count on session callbacks return _connection.run(fn, settings: settings); } @@ -298,7 +294,6 @@ class _PoolConnection implements Connection { Future Function(Session session) fn, { TransactionSettings? settings, }) { - // TODO: increment query count on session callbacks return _connection.runTx( fn, settings: settings, diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 308dfa3f..d45606b1 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -5,8 +5,8 @@ import 'dart:typed_data'; import 'package:async/async.dart' as async; import 'package:charcode/ascii.dart'; +import 'package:meta/meta.dart'; import 'package:pool/pool.dart' as pool; -import 'package:postgres/src/v3/resolved_settings.dart'; import 'package:stream_channel/stream_channel.dart'; import '../../postgres.dart'; @@ -15,10 +15,11 @@ import '../binary_codec.dart'; import '../text_codec.dart'; import 'protocol.dart'; import 'query_description.dart'; +import 'resolved_settings.dart'; const _debugLog = false; -String identifier(String source) { +String _identifier(String source) { // To avoid complex ambiguity rules, we always wrap identifier in double // quotes. That means the only character we need to escape are double quotes // in the source. @@ -110,6 +111,7 @@ abstract class _PgSessionBase implements Session { } if (isSimple || (ignoreRows && variables.isEmpty)) { + _connection._queryCount++; // Great, we can just run a simple query. final controller = StreamController(); final items = []; @@ -294,9 +296,13 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { var _statementCounter = 0; var _portalCounter = 0; + var _queryCount = 0; late final _channels = _Channels(this); + @internal + int get queryCount => _queryCount; + @override Channels get channels => _channels; @@ -472,6 +478,7 @@ class _PreparedStatement extends Statement { Object? parameters, { Duration? timeout, }) async { + _session._connection._queryCount++; timeout ??= _session._settings.queryTimeout; final items = []; final subscription = bind(parameters).listen(items.add); @@ -763,7 +770,7 @@ class _Channels implements Channels { void _subscribe(String channel, MultiStreamController firstListener) { Future(() async { - await _connection.execute(Sql('LISTEN ${identifier(channel)}'), + await _connection.execute(Sql('LISTEN ${_identifier(channel)}'), ignoreRows: true); }).onError((error, stackTrace) { _activeListeners[channel]?.remove(firstListener); @@ -782,7 +789,7 @@ class _Channels implements Channels { _activeListeners.remove(channel); // Send unlisten command - await _connection.execute(Sql('UNLISTEN ${identifier(channel)}'), + await _connection.execute(Sql('UNLISTEN ${_identifier(channel)}'), ignoreRows: true); } }