diff --git a/dataworkspace/dataworkspace/apps/applications/utils.py b/dataworkspace/dataworkspace/apps/applications/utils.py index 1e8975391f..5506deeb81 100644 --- a/dataworkspace/dataworkspace/apps/applications/utils.py +++ b/dataworkspace/dataworkspace/apps/applications/utils.py @@ -24,7 +24,7 @@ from django.db import IntegrityError, connections from django.db.models import Q import gevent -from psycopg2 import connect, sql +from psycopg2 import sql import requests from mohawk import Sender from pytz import utc @@ -56,15 +56,17 @@ GLOBAL_LOCK_ID, close_all_connections_if_not_in_atomic_block, create_tools_access_iam_role, - database_dsn, stable_identification_suffix, source_tables_for_app, source_tables_for_user, + database_engine, + execute_sql, transaction_and_lock, new_private_database_credentials, postgres_user, has_tools_cert_expired, ) + from dataworkspace.apps.applications.gitlab import gitlab_has_developer_access from dataworkspace.apps.datasets.constants import UserAccessType from dataworkspace.apps.datasets.models import ( @@ -529,35 +531,38 @@ def _do_delete_unused_datasets_users(): database_obj = Database.objects.get(memorable_name=memorable_name) database_name = database_data["NAME"] - with connect(database_dsn(database_data)) as conn, conn.cursor() as cur: + with database_engine(database_data).connect() as conn: logger.info("delete_unused_datasets_users: finding database users") - cur.execute( - """ - SELECT usename FROM pg_catalog.pg_user - WHERE - ( - valuntil != 'infinity' - AND usename LIKE 'user_%' - AND usename NOT LIKE '%_qs' - AND usename NOT LIKE '%_quicksight' - AND usename NOT LIKE '%_explorer' - AND usename NOT LIKE '%_superset' - ) - OR - ( - valuntil != 'infinity' - AND valuntil < now() - AND usename LIKE 'user_%' AND + results = execute_sql( + conn, + sql.SQL( + """ + SELECT usename FROM pg_catalog.pg_user + WHERE ( - usename LIKE '%_qs' - OR usename LIKE '%_explorer' - OR usename LIKE '%_superset' + valuntil != 'infinity' + AND usename LIKE 'user_%' + AND usename NOT LIKE '%_qs' + AND usename NOT LIKE '%_quicksight' + AND usename NOT LIKE '%_explorer' + AND usename NOT LIKE '%_superset' ) - ) - ORDER BY usename; + OR + ( + valuntil != 'infinity' + AND valuntil < now() + AND usename LIKE 'user_%' AND + ( + usename LIKE '%_qs' + OR usename LIKE '%_explorer' + OR usename LIKE '%_superset' + ) + ) + ORDER BY usename; """ + ), ) - usenames = [result[0] for result in cur.fetchall()] + usenames = [result[0] for result in results.fetchall()] logger.info("delete_unused_datasets_users: waiting in case they were just created") gevent.sleep(15) @@ -596,9 +601,9 @@ def _do_delete_unused_datasets_users(): # Multiple concurrent GRANT or REVOKE on the same object can result in # "tuple concurrently updated" errors - with connect( - database_dsn(database_data) - ) as conn, conn.cursor() as cur, transaction_and_lock(cur, GLOBAL_LOCK_ID): + with database_engine(database_data).connect() as conn, transaction_and_lock( + conn, GLOBAL_LOCK_ID + ): for usename in not_in_use_usernames: try: logger.info( @@ -606,18 +611,20 @@ def _do_delete_unused_datasets_users(): usename, ) - cur.execute( + execute_sql( + conn, sql.SQL("REVOKE CONNECT ON DATABASE {} FROM {};").format( sql.Identifier(database_name), sql.Identifier(usename), - ) + ), ) - cur.execute( + execute_sql( + conn, sql.SQL("REVOKE ALL PRIVILEGES ON DATABASE {} FROM {};").format( sql.Identifier(database_name), sql.Identifier(usename), - ) + ), ) logger.info( @@ -634,49 +641,56 @@ def _do_delete_unused_datasets_users(): # # REASSIGN OWNED requires privileges on both the source role(s) and # the target role so these are granted first. - cur.execute( + execute_sql( + conn, sql.SQL("GRANT {} TO {};").format( sql.Identifier(usename), sql.Identifier(database_data["USER"]), - ) + ), ) - cur.execute( + execute_sql( + conn, sql.SQL("GRANT {} TO {};").format( sql.Identifier(db_persistent_role), sql.Identifier(database_data["USER"]), - ) + ), ) # The REASSIGN OWNED BY means any objects like tables that were # owned by the temporary user get transferred to the permanent user - cur.execute( + execute_sql( + conn, sql.SQL("REASSIGN OWNED BY {} TO {};").format( sql.Identifier(usename), sql.Identifier(db_persistent_role), - ) + ), ) # ... so the only effect of DROP OWNED BY is to REVOKE any # remaining permissions by the temporary user, so it can then get # deleted below - cur.execute(sql.SQL("DROP OWNED BY {};").format(sql.Identifier(usename))) + execute_sql( + conn, sql.SQL("DROP OWNED BY {};").format(sql.Identifier(usename)) + ) # ... and then cleanup the roles on the master user (since there are # performance implications for the master user having a lot of roles, # specifically it can cause slowness on connect) - cur.execute( + execute_sql( + conn, sql.SQL("REVOKE {} FROM {};").format( sql.Identifier(usename), sql.Identifier(database_data["USER"]), - ) + ), ) - cur.execute( + execute_sql( + conn, sql.SQL("REVOKE {} FROM {};").format( sql.Identifier(db_persistent_role), sql.Identifier(database_data["USER"]), - ) + ), ) - cur.execute(sql.SQL("DROP USER {};").format(sql.Identifier(usename))) + execute_sql(conn, sql.SQL("DROP USER {};").format(sql.Identifier(usename))) except Exception: # pylint: disable=broad-except logger.exception( "delete_unused_datasets_users: Failed deleting %s", diff --git a/dataworkspace/dataworkspace/apps/core/utils.py b/dataworkspace/dataworkspace/apps/core/utils.py index 67ef1e895d..59c4fce2a9 100644 --- a/dataworkspace/dataworkspace/apps/core/utils.py +++ b/dataworkspace/dataworkspace/apps/core/utils.py @@ -134,19 +134,31 @@ def get_cursor(database_memorable_name): yield cursor +def execute_sql(conn, sql_obj): + # This avoids "argument 1 must be psycopg2.extensions.connection, not PGConnectionProxy" + # which can happen when elastic-apm wraps the connection object when using psycopg2 + unwrapped_connection = getattr( + conn.connection.driver_connection, "__wrapped__", conn.connection.driver_connection + ) + return conn.execute(sa.text(sql_obj.as_string(unwrapped_connection))) + + @contextmanager -def transaction_and_lock(cursor, lock_id): +def transaction_and_lock(conn, lock_id): try: - cursor.execute(sql.SQL("BEGIN")) - cursor.execute( - sql.SQL("SELECT pg_advisory_xact_lock({lock_id})").format(lock_id=sql.Literal(lock_id)) + conn.begin() + execute_sql( + conn, + sql.SQL("SELECT pg_advisory_xact_lock({lock_id})").format( + lock_id=sql.Literal(lock_id) + ), ) yield - except Exception: # pylint: disable=broad-except - cursor.execute(sql.SQL("ROLLBACK")) + except Exception: + conn.rollback() raise else: - cursor.execute(sql.SQL("COMMIT")) + conn.commit() def new_private_database_credentials( @@ -200,6 +212,9 @@ def get_new_credentials(database_memorable_name, tables): ) with database_engine(database_data).connect() as sync_roles_conn: + execute_sql(sync_roles_conn, sql.SQL("SET statement_timeout = '120s'")) + sync_roles_conn.commit() + # Temporary database user that can login, and has membership of sync_roles( sync_roles_conn, @@ -243,91 +258,100 @@ def get_new_credentials(database_memorable_name, tables): lock_key=GLOBAL_LOCK_ID, ) - # PostgreSQL doesn't handle concurrent - # - GRANT/REVOKEs on the same database object - # - ALTER USER ... SET - # Either can result in "tuple concurrentl updated" errors. So we lock. - with get_cursor(database_memorable_name) as cur, transaction_and_lock(cur, GLOBAL_LOCK_ID): - # Temporarily grant the current user the roles to be able to manage them below - all_roles = [db_role, db_user] + db_shared_roles - cur.execute( - sql.SQL("GRANT {all_roles} TO CURRENT_USER").format( - all_roles=sql.SQL(",").join(sql.Identifier(role) for role in all_roles) + # PostgreSQL doesn't handle concurrent + # - GRANT/REVOKEs on the same database object + # - ALTER USER ... SET + # Either can result in "tuple concurrentl updated" errors. So we lock. + with transaction_and_lock(sync_roles_conn, GLOBAL_LOCK_ID): + # Temporarily grant the current user the roles to be able to manage them below + all_roles = [db_role, db_user] + db_shared_roles + execute_sql( + sync_roles_conn, + sql.SQL("GRANT {all_roles} TO CURRENT_USER").format( + all_roles=sql.SQL(",").join(sql.Identifier(role) for role in all_roles) + ), ) - ) - # If the user creates tables in any of the shared schemas, make sure the corresponding - # role for that schema have all privilege on them (which unfortunately does not - # mean ownership) - for db_shared_role in db_shared_roles: - cur.execute( - sql.SQL( - """ - ALTER DEFAULT PRIVILEGES - FOR USER {} - IN SCHEMA {} - GRANT ALL ON TABLES TO {}; - """ - ).format( - sql.Identifier(db_role), - sql.Identifier(db_shared_role), - sql.Identifier(db_shared_role), + # If the user creates tables in any of the shared schemas, make sure the corresponding + # role for that schema have all privilege on them (which unfortunately does not + # mean ownership) + for db_shared_role in db_shared_roles: + execute_sql( + sync_roles_conn, + sql.SQL( + """ + ALTER DEFAULT PRIVILEGES + FOR USER {} + IN SCHEMA {} + GRANT ALL ON TABLES TO {}; + """ + ).format( + sql.Identifier(db_role), + sql.Identifier(db_shared_role), + sql.Identifier(db_shared_role), + ), ) - ) - # Make it so by default, objects created by the user are owned by the role - # This seems to have a horrible performance impact on connecting, so we don't do it for - # contexts that can't create objects. The reason for the performance impact on - # connecting is currently unknown, but seems to be related to the number of other roles - # granted - if not ( - db_user.endswith("_qs") - or db_user.endswith("_superset") - or db_user.endswith("_explorer") - ): - cur.execute( - sql.SQL("ALTER USER {} SET ROLE {};").format( - sql.Identifier(db_user), sql.Identifier(db_role) + # Make it so by default, objects created by the user are owned by the role + # This seems to have a horrible performance impact on connecting, so we don't do it for + # contexts that can't create objects. The reason for the performance impact on + # connecting is currently unknown, but seems to be related to the number of other roles + # granted + if not ( + db_user.endswith("_qs") + or db_user.endswith("_superset") + or db_user.endswith("_explorer") + ): + execute_sql( + sync_roles_conn, + sql.SQL("ALTER USER {} SET ROLE {};").format( + sql.Identifier(db_user), sql.Identifier(db_role) + ), ) - ) - # Give the user reasonable timeouts - cur.execute( - sql.SQL("ALTER USER {} SET idle_in_transaction_session_timeout = '60min';").format( - sql.Identifier(db_user) + # Give the user reasonable timeouts + execute_sql( + sync_roles_conn, + sql.SQL( + "ALTER USER {} SET idle_in_transaction_session_timeout = '60min';" + ).format(sql.Identifier(db_user)), ) - ) - cur.execute( - sql.SQL("ALTER USER {} SET statement_timeout = '60min';").format( - sql.Identifier(db_user) + execute_sql( + sync_roles_conn, + sql.SQL("ALTER USER {} SET statement_timeout = '60min';").format( + sql.Identifier(db_user) + ), ) - ) - cur.execute( - sql.SQL("ALTER USER {} SET pgaudit.log = {};").format( - sql.Identifier(db_user), - sql.Literal(settings.PGAUDIT_LOG_SCOPES), + execute_sql( + sync_roles_conn, + sql.SQL("ALTER USER {} SET pgaudit.log = {};").format( + sql.Identifier(db_user), + sql.Literal(settings.PGAUDIT_LOG_SCOPES), + ), ) - ) - cur.execute( - sql.SQL("ALTER USER {} SET pgaudit.log_catalog = off;").format( - sql.Identifier(db_user), - sql.Literal(settings.PGAUDIT_LOG_SCOPES), + execute_sql( + sync_roles_conn, + sql.SQL("ALTER USER {} SET pgaudit.log_catalog = off;").format( + sql.Identifier(db_user), + sql.Literal(settings.PGAUDIT_LOG_SCOPES), + ), ) - ) - cur.execute( - sql.SQL("ALTER USER {} WITH CONNECTION LIMIT {};").format( - sql.Identifier(db_user), - sql.Literal(50 if db_user.endswith("_qs") else 10), + execute_sql( + sync_roles_conn, + sql.SQL("ALTER USER {} WITH CONNECTION LIMIT {};").format( + sql.Identifier(db_user), + sql.Literal(50 if db_user.endswith("_qs") else 10), + ), ) - ) - # Make sure we don't keep the roles in the current user (we don't need them, and - # the master user having a lot of roles can slow login) - cur.execute( - sql.SQL("REVOKE {all_roles} FROM CURRENT_USER").format( - all_roles=sql.SQL(",").join(sql.Identifier(role) for role in all_roles) + # Make sure we don't keep the roles in the current user (we don't need them, and + # the master user having a lot of roles can slow login) + execute_sql( + sync_roles_conn, + sql.SQL("REVOKE {all_roles} FROM CURRENT_USER").format( + all_roles=sql.SQL(",").join(sql.Identifier(role) for role in all_roles) + ), ) - ) logger.info( "Generated new credentials for permanent role %s in %s seconds",