Skip to content

Commit

Permalink
Prevent command already in progress errors in the Postgres integrat…
Browse files Browse the repository at this point in the history
…ion (#15489)
  • Loading branch information
jmeunier28 authored Aug 18, 2023
1 parent 35f4601 commit d205671
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 429 deletions.
2 changes: 2 additions & 0 deletions .ddev/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ paramiko = ['LGPL-2.1-only']
oracledb = ['Apache-2.0']
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
psycopg = ['LGPL-3.0-only']
# https://github.com/psycopg/psycopg/blob/master/psycopg_pool/LICENSE.txt
psycopg-pool = ['LGPL-3.0-only']
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
psycopg2-binary = ['LGPL-3.0-only', 'BSD-3-Clause']
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc.
protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc. All rights reserved.
psutil,PyPI,BSD-3-Clause,"Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola'"
psycopg,PyPI,LGPL-3.0-only,Copyright (C) 2020 The Psycopg Team
psycopg-pool,PyPI,LGPL-3.0-only,Copyright (C) 2020 The Psycopg Team
psycopg2-binary,PyPI,BSD-3-Clause,Copyright 2013 Federico Di Gregorio
psycopg2-binary,PyPI,LGPL-3.0-only,Copyright (C) 2013 Federico Di Gregorio
pyasn1,PyPI,BSD-3-Clause,"Copyright (c) 2005-2019, Ilya Etingof <[email protected]>"
Expand Down
1 change: 1 addition & 0 deletions datadog_checks_base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

* Downgrade pydantic to 2.0.2 ([#15596](https://github.com/DataDog/integrations-core/pull/15596))
* Bump cryptography to 41.0.3 ([#15517](https://github.com/DataDog/integrations-core/pull/15517))
* Prevent `command already in progress` errors in the Postgres integration ([#15489](https://github.com/DataDog/integrations-core/pull/15489))

## 32.7.0 / 2023-08-10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protobuf==3.20.2; python_version > '3.0'
psutil==5.9.0
psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'
psycopg[binary]==3.1.10; python_version > '3.0'
psycopg-pool==3.1.7; python_version > '3.0'
pyasn1==0.4.6
pycryptodomex==3.10.1
pydantic==2.0.2; python_version > '3.0'
Expand Down
1 change: 1 addition & 0 deletions datadog_checks_dev/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

* Ignore `pydantic` when bumping the dependencies ([#15597](https://github.com/DataDog/integrations-core/pull/15597))
* Stop using the TOX_ENV_NAME variable ([#15528](https://github.com/DataDog/integrations-core/pull/15528))
* Prevent `command already in progress` errors in the Postgres integration ([#15489](https://github.com/DataDog/integrations-core/pull/15489))

## 23.0.0 / 2023-08-10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
'psycopg2-binary': ['LGPL-3.0-only', 'BSD-3-Clause'],
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
'psycopg': ['LGPL-3.0-only'],
# https://github.com/psycopg/psycopg/blob/master/psycopg_pool/LICENSE.txt
'psycopg-pool': ['LGPL-3.0-only'],
# https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst
'pycryptodomex': ['Unlicense', 'BSD-2-Clause'],
# https://github.com/requests/requests-kerberos/pull/123
Expand Down
1 change: 1 addition & 0 deletions postgres/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
***Fixed***:

* Update datadog-checks-base dependency version to 32.6.0 ([#15604](https://github.com/DataDog/integrations-core/pull/15604))
* Prevent `command already in progress` errors in the Postgres integration ([#15489](https://github.com/DataDog/integrations-core/pull/15489))

## 14.1.0 / 2023-08-10

Expand Down
85 changes: 51 additions & 34 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from typing import Callable, Dict

import psycopg
from psycopg_pool import ConnectionPool

from datadog_checks.base import AgentCheck

Expand All @@ -25,18 +25,16 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: psycopg.Connection,
connection: ConnectionPool,
deadline: int,
active: bool,
last_accessed: int,
thread: threading.Thread,
persistent: bool,
):
self.connection = connection
self.deadline = deadline
self.active = active
self.last_accessed = last_accessed
self.thread = thread
self.persistent = persistent


Expand Down Expand Up @@ -68,41 +66,47 @@ def __repr__(self):
def reset(self):
self.__init__()

def __init__(self, check: AgentCheck, connect_fn: Callable[[str], None], max_conns: int = None):
self.log = check.log
def __init__(self, check: AgentCheck, connect_fn: Callable[[str, int, int], None], max_conns: int = None):
self._check = check
self._log = check.log
self._config = check._config
self.max_conns: int = max_conns
self._stats = self.Stats()
self._mu = threading.RLock()
self._query_lock = threading.Lock()
self._conns: Dict[str, ConnectionInfo] = {}

if hasattr(inspect, 'signature'):
connect_sig = inspect.signature(connect_fn)
if len(connect_sig.parameters) != 1:
if not (len(connect_sig.parameters) >= 1):
raise ValueError(
"Invalid signature for the connection function. "
"A single parameter for dbname is expected, got signature: {}".format(connect_sig)
"Expected parameters: dbname, min_pool_size, max_pool_size. "
"Got signature: {}".format(connect_sig)
)
self.connect_fn = connect_fn

def _get_connection_raw(
def _get_connection_pool(
self,
dbname: str,
ttl_ms: int,
timeout: int = None,
startup_fn: Callable[[psycopg.Connection], None] = None,
min_pool_size: int = 1,
max_pool_size: int = None,
startup_fn: Callable[[ConnectionPool], None] = None,
persistent: bool = False,
) -> psycopg.Connection:
) -> ConnectionPool:
"""
Return a connection from the pool.
Return a connection pool for the requested database from the managed pool.
Pass a function to startup_func if there is an action needed with the connection
when re-establishing it.
"""
start = datetime.datetime.now()
self.prune_connections()
with self._mu:
conn = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None, None))
db = conn.connection
if db is None or db.closed:
conn = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None))
db_pool = conn.connection
if db_pool is None or db_pool.closed:
if self.max_conns is not None:
# try to free space until we succeed
while len(self._conns) >= self.max_conns:
Expand All @@ -113,27 +117,22 @@ def _get_connection_raw(
time.sleep(0.01)
continue
self._stats.connection_opened += 1
db = self.connect_fn(dbname)
db_pool = self.connect_fn(dbname, min_pool_size, max_pool_size)
if startup_fn:
startup_fn(db)
startup_fn(db_pool)
else:
# if already in pool, retain persistence status
persistent = conn.persistent

if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

deadline = datetime.datetime.now() + datetime.timedelta(milliseconds=ttl_ms)
self._conns[dbname] = ConnectionInfo(
connection=db,
connection=db_pool,
deadline=deadline,
active=True,
last_accessed=datetime.datetime.now(),
thread=threading.current_thread(),
persistent=persistent,
)
return db
return db_pool

@contextlib.contextmanager
def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persistent: bool = False):
Expand All @@ -144,16 +143,19 @@ def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persiste
Blocks until a connection can be added to the pool,
and optionally takes a timeout in seconds.
"""
with self._mu:
pool = self._get_connection_pool(dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, persistent=persistent)
db = pool.getconn(timeout=timeout)
try:
with self._mu:
db = self._get_connection_raw(dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, persistent=persistent)
yield db
finally:
with self._mu:
try:
self._conns[dbname].active = False
pool.putconn(db)
if not self._conns[dbname].persistent:
self._conns[dbname].active = False
except KeyError:
# if self._get_connection_raw hit an exception, self._conns[dbname] didn't get populated
# if self._get_connection_raw hit an exception, self._conns[conn_name] didn't get populated
pass

def prune_connections(self):
Expand All @@ -166,10 +168,10 @@ def prune_connections(self):
"""
with self._mu:
now = datetime.datetime.now()
for dbname, conn in list(self._conns.items()):
if conn.deadline < now:
for conn_name, conn in list(self._conns.items()):
if conn.deadline < now and not conn.active and not conn.persistent:
self._stats.connection_pruned += 1
self._terminate_connection_unsafe(dbname)
self._terminate_connection_unsafe(conn_name)

def close_all_connections(self, timeout=None):
"""
Expand Down Expand Up @@ -202,14 +204,29 @@ def evict_lru(self) -> str:
return None

def _terminate_connection_unsafe(self, dbname: str):
db = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None, None)).connection
db = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None)).connection
if db is not None:
try:
self._stats.connection_closed += 1
if not db.closed:
db.close()
self._stats.connection_closed += 1
except Exception:
self._stats.connection_closed_failed += 1
self.log.exception("failed to close DB connection for db=%s", dbname)
self._log.exception("failed to close DB connection for db=%s", dbname)
return False
return True

def get_main_db_pool(self, max_pool_conn_size: int = 3):
"""
Returns a memoized, persistent psycopg connection pool to `self.dbname`.
Is meant to be shared across multiple threads, and opens a preconfigured max number of connections.
:return: a psycopg connection
"""
conn = self._get_connection_pool(
dbname=self._config.dbname,
ttl_ms=self._config.idle_connection_timeout,
max_pool_size=max_pool_conn_size,
startup_fn=self._check.load_pg_settings,
persistent=True,
)
return conn
63 changes: 29 additions & 34 deletions postgres/datadog_checks/postgres/explain_parameterized_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,28 @@ def __init__(self, check, config):
def explain_statement(self, dbname, statement, obfuscated_statement):
if self._check.version < V12:
return None
self._set_plan_cache_mode(dbname)

query_signature = compute_sql_signature(obfuscated_statement)
if not self._create_prepared_statement(dbname, statement, obfuscated_statement, query_signature):
return None
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
self._set_plan_cache_mode(conn)

result = self._explain_prepared_statement(dbname, statement, obfuscated_statement, query_signature)
self._deallocate_prepared_statement(dbname, query_signature)
if result:
return result[0]['explain_statement'][0]
if not self._create_prepared_statement(conn, statement, obfuscated_statement, query_signature):
return None

result = self._explain_prepared_statement(conn, statement, obfuscated_statement, query_signature)
self._deallocate_prepared_statement(conn, query_signature)
if result:
return result[0]['explain_statement'][0]
return None

def _set_plan_cache_mode(self, dbname):
self._execute_query(dbname, "SET plan_cache_mode = force_generic_plan")
def _set_plan_cache_mode(self, conn):
self._execute_query(conn, "SET plan_cache_mode = force_generic_plan")

@tracked_method(agent_check_getter=agent_check_getter)
def _create_prepared_statement(self, dbname, statement, obfuscated_statement, query_signature):
def _create_prepared_statement(self, conn, statement, obfuscated_statement, query_signature):
try:
self._execute_query(
dbname,
PREPARE_STATEMENT_QUERY.format(query_signature=query_signature, statement=statement),
conn, PREPARE_STATEMENT_QUERY.format(query_signature=query_signature, statement=statement)
)
return True
except Exception as e:
Expand All @@ -108,26 +109,24 @@ def _create_prepared_statement(self, dbname, statement, obfuscated_statement, qu
return False

@tracked_method(agent_check_getter=agent_check_getter)
def _get_number_of_parameters_for_prepared_statement(self, dbname, query_signature):
rows = self._execute_query_and_fetch_rows(
dbname, PARAM_TYPES_COUNT_QUERY.format(query_signature=query_signature)
)
def _get_number_of_parameters_for_prepared_statement(self, conn, query_signature):
rows = self._execute_query_and_fetch_rows(conn, PARAM_TYPES_COUNT_QUERY.format(query_signature=query_signature))
count = 0
if rows and 'count' in rows[0]:
count = rows[0]['count']
return count

@tracked_method(agent_check_getter=agent_check_getter)
def _explain_prepared_statement(self, dbname, statement, obfuscated_statement, query_signature):
def _explain_prepared_statement(self, conn, statement, obfuscated_statement, query_signature):
null_parameter = ','.join(
'null' for _ in range(self._get_number_of_parameters_for_prepared_statement(dbname, query_signature))
'null' for _ in range(self._get_number_of_parameters_for_prepared_statement(conn, query_signature))
)
execute_prepared_statement_query = EXECUTE_PREPARED_STATEMENT_QUERY.format(
prepared_statement=query_signature, generic_values=null_parameter
)
try:
return self._execute_query_and_fetch_rows(
dbname,
conn,
EXPLAIN_QUERY.format(
explain_function=self._config.statement_samples_config.get(
'explain_function', 'datadog.explain_statement'
Expand All @@ -147,27 +146,23 @@ def _explain_prepared_statement(self, dbname, statement, obfuscated_statement, q
)
return None

def _deallocate_prepared_statement(self, dbname, query_signature):
def _deallocate_prepared_statement(self, conn, query_signature):
try:
self._execute_query(
dbname, "DEALLOCATE PREPARE dd_{query_signature}".format(query_signature=query_signature)
)
self._execute_query(conn, "DEALLOCATE PREPARE dd_{query_signature}".format(query_signature=query_signature))
except Exception as e:
logger.warning(
'Failed to deallocate prepared statement query_signature=[%s] | err=[%s]',
query_signature,
e,
)

def _execute_query(self, dbname, query):
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)
def _execute_query(self, conn, query):
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)

def _execute_query_and_fetch_rows(self, dbname, query):
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)
return cursor.fetchall()
def _execute_query_and_fetch_rows(self, conn, query):
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)
return cursor.fetchall()
Loading

0 comments on commit d205671

Please sign in to comment.