Skip to content

Commit

Permalink
Query count + proper pool check. (#217)
Browse files Browse the repository at this point in the history
* Query count + proper pool check.

* remove todos
  • Loading branch information
isoos authored Oct 31, 2023
1 parent 1cf35f1 commit 0e0cea3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
7 changes: 1 addition & 6 deletions lib/src/pool/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ class _PoolConnection implements Connection {
Duration _elapsedInUse = Duration.zero;
DateTime _lastReturned = DateTime.now();
bool _isInUse = false;
int _queryCount = 0;

_PoolConnection(
this._pool, this._endpoint, this._connectionSettings, this._connection);
Expand All @@ -235,7 +234,7 @@ class _PoolConnection implements Connection {
if (_elapsedInUse >= _pool._settings.maxSessionUse) {
return true;
}
if (_queryCount >= _pool._settings.maxQueryCount) {
if (_connection.queryCount >= _pool._settings.maxQueryCount) {
return true;
}
return false;
Expand Down Expand Up @@ -268,7 +267,6 @@ class _PoolConnection implements Connection {
QueryMode? queryMode,
Duration? timeout,
}) {
_queryCount++;
return _connection.execute(
query,
parameters: parameters,
Expand All @@ -280,7 +278,6 @@ class _PoolConnection implements Connection {

@override
Future<Statement> prepare(Object query) {
// TODO: increment query count on statement runs
return _connection.prepare(query);
}

Expand All @@ -289,7 +286,6 @@ class _PoolConnection implements Connection {
Future<R> Function(Session session) fn, {
SessionSettings? settings,
}) {
// TODO: increment query count on session callbacks
return _connection.run(fn, settings: settings);
}

Expand All @@ -298,7 +294,6 @@ class _PoolConnection implements Connection {
Future<R> Function(Session session) fn, {
TransactionSettings? settings,
}) {
// TODO: increment query count on session callbacks
return _connection.runTx(
fn,
settings: settings,
Expand Down
15 changes: 11 additions & 4 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import 'dart:typed_data';

import 'package:async/async.dart' as async;
import 'package:charcode/ascii.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart' as pool;
import 'package:postgres/src/v3/resolved_settings.dart';
import 'package:stream_channel/stream_channel.dart';

import '../../postgres.dart';
Expand All @@ -15,10 +15,11 @@ import '../binary_codec.dart';
import '../text_codec.dart';
import 'protocol.dart';
import 'query_description.dart';
import 'resolved_settings.dart';

const _debugLog = false;

String identifier(String source) {
String _identifier(String source) {
// To avoid complex ambiguity rules, we always wrap identifier in double
// quotes. That means the only character we need to escape are double quotes
// in the source.
Expand Down Expand Up @@ -110,6 +111,7 @@ abstract class _PgSessionBase implements Session {
}

if (isSimple || (ignoreRows && variables.isEmpty)) {
_connection._queryCount++;
// Great, we can just run a simple query.
final controller = StreamController<ResultRow>();
final items = <ResultRow>[];
Expand Down Expand Up @@ -294,9 +296,13 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {

var _statementCounter = 0;
var _portalCounter = 0;
var _queryCount = 0;

late final _channels = _Channels(this);

@internal
int get queryCount => _queryCount;

@override
Channels get channels => _channels;

Expand Down Expand Up @@ -472,6 +478,7 @@ class _PreparedStatement extends Statement {
Object? parameters, {
Duration? timeout,
}) async {
_session._connection._queryCount++;
timeout ??= _session._settings.queryTimeout;
final items = <ResultRow>[];
final subscription = bind(parameters).listen(items.add);
Expand Down Expand Up @@ -763,7 +770,7 @@ class _Channels implements Channels {

void _subscribe(String channel, MultiStreamController firstListener) {
Future(() async {
await _connection.execute(Sql('LISTEN ${identifier(channel)}'),
await _connection.execute(Sql('LISTEN ${_identifier(channel)}'),
ignoreRows: true);
}).onError<Object>((error, stackTrace) {
_activeListeners[channel]?.remove(firstListener);
Expand All @@ -782,7 +789,7 @@ class _Channels implements Channels {
_activeListeners.remove(channel);

// Send unlisten command
await _connection.execute(Sql('UNLISTEN ${identifier(channel)}'),
await _connection.execute(Sql('UNLISTEN ${_identifier(channel)}'),
ignoreRows: true);
}
}
Expand Down

0 comments on commit 0e0cea3

Please sign in to comment.