diff --git a/.cross_sync/transformers.py b/.cross_sync/transformers.py index ab2d5dd63..42ba3f83c 100644 --- a/.cross_sync/transformers.py +++ b/.cross_sync/transformers.py @@ -81,7 +81,11 @@ def visit_FunctionDef(self, node): def visit_Constant(self, node): """Replace string type annotations""" - node.s = self.replacements.get(node.s, node.s) + try: + node.s = self.replacements.get(node.s, node.s) + except TypeError: + # ignore unhashable types (e.g. list) + pass return node diff --git a/docs/data_client/async_data_authorized_view.rst b/docs/data_client/async_data_authorized_view.rst new file mode 100644 index 000000000..7d7312970 --- /dev/null +++ b/docs/data_client/async_data_authorized_view.rst @@ -0,0 +1,11 @@ +Authorized View Async +~~~~~~~~~~~~~~~~~~~~~ + + .. note:: + + It is generally not recommended to use the async client in an otherwise synchronous codebase. To make use of asyncio's + performance benefits, the codebase should be designed to be async from the ground up. + +.. autoclass:: google.cloud.bigtable.data._async.client.AuthorizedViewAsync + :members: + :inherited-members: diff --git a/docs/data_client/async_data_table.rst b/docs/data_client/async_data_table.rst index 3b7973e8e..37c396570 100644 --- a/docs/data_client/async_data_table.rst +++ b/docs/data_client/async_data_table.rst @@ -8,4 +8,4 @@ Table Async .. autoclass:: google.cloud.bigtable.data._async.client.TableAsync :members: - :show-inheritance: + :inherited-members: diff --git a/docs/data_client/data_client_usage.rst b/docs/data_client/data_client_usage.rst index f5bbac278..708dafc62 100644 --- a/docs/data_client/data_client_usage.rst +++ b/docs/data_client/data_client_usage.rst @@ -9,6 +9,7 @@ Sync Surface sync_data_client sync_data_table + sync_data_authorized_view sync_data_mutations_batcher sync_data_execute_query_iterator @@ -20,6 +21,7 @@ Async Surface async_data_client async_data_table + async_data_authorized_view async_data_mutations_batcher async_data_execute_query_iterator diff --git a/docs/data_client/sync_data_authorized_view.rst b/docs/data_client/sync_data_authorized_view.rst new file mode 100644 index 000000000..c0ac29721 --- /dev/null +++ b/docs/data_client/sync_data_authorized_view.rst @@ -0,0 +1,6 @@ +Authorized View +~~~~~~~~~~~~~~~ + +.. autoclass:: google.cloud.bigtable.data._sync_autogen.client.AuthorizedView + :members: + :inherited-members: diff --git a/google/cloud/bigtable/data/__init__.py b/google/cloud/bigtable/data/__init__.py index 15f9bc167..9439f0f8d 100644 --- a/google/cloud/bigtable/data/__init__.py +++ b/google/cloud/bigtable/data/__init__.py @@ -17,9 +17,11 @@ from google.cloud.bigtable.data._async.client import BigtableDataClientAsync from google.cloud.bigtable.data._async.client import TableAsync +from google.cloud.bigtable.data._async.client import AuthorizedViewAsync from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync from google.cloud.bigtable.data._sync_autogen.client import BigtableDataClient from google.cloud.bigtable.data._sync_autogen.client import Table +from google.cloud.bigtable.data._sync_autogen.client import AuthorizedView from google.cloud.bigtable.data._sync_autogen.mutations_batcher import MutationsBatcher from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery @@ -76,9 +78,11 @@ __all__ = ( "BigtableDataClientAsync", "TableAsync", + "AuthorizedViewAsync", "MutationsBatcherAsync", "BigtableDataClient", "Table", + "AuthorizedView", "MutationsBatcher", "RowKeySamples", "ReadRowsQuery", diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index bf618bf04..01fbe876b 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -15,10 +15,10 @@ from __future__ import annotations from typing import Sequence, TYPE_CHECKING -import functools from google.api_core import exceptions as core_exceptions from google.api_core import retry as retries +import google.cloud.bigtable_v2.types.bigtable as types_pb import google.cloud.bigtable.data.exceptions as bt_exceptions from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory @@ -36,12 +36,16 @@ from google.cloud.bigtable_v2.services.bigtable.async_client import ( BigtableAsyncClient as GapicClientType, ) - from google.cloud.bigtable.data._async.client import TableAsync as TableType + from google.cloud.bigtable.data._async.client import ( # type: ignore + _ApiSurfaceAsync as ApiSurfaceType, + ) else: from google.cloud.bigtable_v2.services.bigtable.client import ( # type: ignore BigtableClient as GapicClientType, ) - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType # type: ignore + from google.cloud.bigtable.data._sync_autogen.client import ( # type: ignore + _ApiSurface as ApiSurfaceType, + ) __CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen._mutate_rows" @@ -70,7 +74,7 @@ class _MutateRowsOperationAsync: def __init__( self, gapic_client: GapicClientType, - table: TableType, + table: ApiSurfaceType, mutation_entries: list["RowMutationEntry"], operation_timeout: float, attempt_timeout: float | None, @@ -84,13 +88,8 @@ def __init__( f"{_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across " f"all entries. Found {total_mutations}." ) - # create partial function to pass to trigger rpc call - self._gapic_fn = functools.partial( - gapic_client.mutate_rows, - table_name=table.table_name, - app_profile_id=table.app_profile_id, - retry=None, - ) + self._table = table + self._gapic_fn = gapic_client.mutate_rows # create predicate for determining which errors are retryable self.is_retryable = retries.if_exception_type( # RPC level errors @@ -173,8 +172,12 @@ async def _run_attempt(self): # make gapic request try: result_generator = await self._gapic_fn( + request=types_pb.MutateRowsRequest( + entries=request_entries, + app_profile_id=self._table.app_profile_id, + **self._table._request_path, + ), timeout=next(self.timeout_generator), - entries=request_entries, retry=None, ) async for result_list in result_generator: diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 6d2fa3a7d..bf482c4dc 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -37,9 +37,11 @@ if TYPE_CHECKING: if CrossSync.is_async: - from google.cloud.bigtable.data._async.client import TableAsync as TableType + from google.cloud.bigtable.data._async.client import ( + _ApiSurfaceAsync as ApiSurfaceType, + ) else: - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType # type: ignore + from google.cloud.bigtable.data._sync_autogen.client import _ApiSurface as ApiSurfaceType # type: ignore __CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen._read_rows" @@ -78,7 +80,7 @@ class _ReadRowsOperationAsync: def __init__( self, query: ReadRowsQuery, - table: TableType, + table: ApiSurfaceType, operation_timeout: float, attempt_timeout: float, retryable_exceptions: Sequence[type[Exception]] = (), @@ -90,7 +92,7 @@ def __init__( if isinstance(query, dict): self.request = ReadRowsRequestPB( **query, - table_name=table.table_name, + **table._request_path, app_profile_id=table.app_profile_id, ) else: diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index c7cc0de6b..5c86e27bd 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -43,6 +43,10 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest +from google.cloud.bigtable_v2.types.bigtable import SampleRowKeysRequest +from google.cloud.bigtable_v2.types.bigtable import MutateRowRequest +from google.cloud.bigtable_v2.types.bigtable import CheckAndMutateRowRequest +from google.cloud.bigtable_v2.types.bigtable import ReadModifyWriteRowRequest from google.cloud.client import ClientWithProject from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore from google.api_core import retry as retries @@ -386,10 +390,13 @@ async def _manage_channel( replace_symbols={ "TableAsync": "Table", "ExecuteQueryIteratorAsync": "ExecuteQueryIterator", + "_ApiSurfaceAsync": "_ApiSurface", } ) async def _register_instance( - self, instance_id: str, owner: TableAsync | ExecuteQueryIteratorAsync + self, + instance_id: str, + owner: _ApiSurfaceAsync | ExecuteQueryIteratorAsync, ) -> None: """ Registers an instance with the client, and warms the channel for the instance @@ -422,10 +429,13 @@ async def _register_instance( replace_symbols={ "TableAsync": "Table", "ExecuteQueryIteratorAsync": "ExecuteQueryIterator", + "_ApiSurfaceAsync": "_ApiSurface", } ) async def _remove_instance_registration( - self, instance_id: str, owner: TableAsync | "ExecuteQueryIteratorAsync" + self, + instance_id: str, + owner: _ApiSurfaceAsync | ExecuteQueryIteratorAsync, ) -> bool: """ Removes an instance from the client's registered instances, to prevent @@ -510,6 +520,72 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs """ return TableAsync(self, instance_id, table_id, *args, **kwargs) + @CrossSync.convert( + replace_symbols={"AuthorizedViewAsync": "AuthorizedView"}, + docstring_format_vars={ + "LOOP_MESSAGE": ( + "Must be created within an async context (running event loop)", + "", + ), + "RAISE_NO_LOOP": ( + "RuntimeError: if called outside of an async context (no running event loop)", + "None", + ), + }, + ) + def get_authorized_view( + self, instance_id: str, table_id: str, view_id: str, *args, **kwargs + ) -> AuthorizedViewAsync: + """ + Returns an authorized view instance for making data API requests. All arguments are passed + directly to the AuthorizedViewAsync constructor. + + {LOOP_MESSAGE} + + Args: + instance_id: The Bigtable instance ID to associate with this client. + instance_id is combined with the client's project to fully + specify the instance + table_id: The ID of the table. table_id is combined with the + instance_id and the client's project to fully specify the table + view_id: The id for the authorized view to use for requests + app_profile_id: The app profile to associate with requests. + https://cloud.google.com/bigtable/docs/app-profiles + default_read_rows_operation_timeout: The default timeout for read rows + operations, in seconds. If not set, defaults to Table's value + default_read_rows_attempt_timeout: The default timeout for individual + read rows rpc requests, in seconds. If not set, defaults Table's value + default_mutate_rows_operation_timeout: The default timeout for mutate rows + operations, in seconds. If not set, defaults to Table's value + default_mutate_rows_attempt_timeout: The default timeout for individual + mutate rows rpc requests, in seconds. If not set, defaults Table's value + default_operation_timeout: The default timeout for all other operations, in + seconds. If not set, defaults to Table's value + default_attempt_timeout: The default timeout for all other individual rpc + requests, in seconds. If not set, defaults to Table's value + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. If not set, + defaults to Table's value + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. If not set, + defaults to Table's value + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. If not set, defaults to + Table's value + Returns: + AuthorizedViewAsync: a table instance for making data API requests + Raises: + {RAISE_NO_LOOP} + """ + return CrossSync.AuthorizedView( + self, + instance_id, + table_id, + view_id, + *args, + **kwargs, + ) + @CrossSync.convert( replace_symbols={"ExecuteQueryIteratorAsync": "ExecuteQueryIterator"} ) @@ -610,13 +686,12 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._gapic_client.__aexit__(exc_type, exc_val, exc_tb) -@CrossSync.convert_class(sync_name="Table", add_mapping_for_name="Table") -class TableAsync: +@CrossSync.convert_class(sync_name="_ApiSurface") +class _ApiSurfaceAsync: """ - Main Data API surface + Abstract class containing API surface for BigtableDataClient. Should not be created directly - Table object maintains table_id, and app_profile_id context, and passes them with - each call + Can be instantiated as a Table or an AuthorizedView """ @CrossSync.convert( @@ -740,6 +815,9 @@ def __init__( default_mutate_rows_retryable_errors or () ) self.default_retryable_errors = default_retryable_errors or () + # used to populate table_name or authorized_view_name fields in request protos + self._request_path = {"table_name": self.table_name} + try: self._register_instance_future = CrossSync.create_task( self.client._register_instance, @@ -1108,8 +1186,9 @@ async def sample_row_keys( @CrossSync.convert async def execute_rpc(): results = await self.client._gapic_client.sample_row_keys( - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=SampleRowKeysRequest( + app_profile_id=self.app_profile_id, **self._request_path + ), timeout=next(attempt_timeout_gen), retry=None, ) @@ -1236,10 +1315,14 @@ async def mutate_row( target = partial( self.client._gapic_client.mutate_row, - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - mutations=[mutation._to_pb() for mutation in mutations_list], - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=MutateRowRequest( + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + mutations=[mutation._to_pb() for mutation in mutations_list], + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=attempt_timeout, retry=None, ) @@ -1361,12 +1444,16 @@ async def check_and_mutate_row( false_case_mutations = [false_case_mutations] false_case_list = [m._to_pb() for m in false_case_mutations or []] result = await self.client._gapic_client.check_and_mutate_row( - true_mutations=true_case_list, - false_mutations=false_case_list, - predicate_filter=predicate._to_pb() if predicate is not None else None, - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=CheckAndMutateRowRequest( + true_mutations=true_case_list, + false_mutations=false_case_list, + predicate_filter=predicate._to_pb() if predicate is not None else None, + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=operation_timeout, retry=None, ) @@ -1411,10 +1498,14 @@ async def read_modify_write_row( if not rules: raise ValueError("rules must contain at least one item") result = await self.client._gapic_client.read_modify_write_row( - rules=[rule._to_pb() for rule in rules], - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=ReadModifyWriteRowRequest( + rules=[rule._to_pb() for rule in rules], + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=operation_timeout, retry=None, ) @@ -1451,3 +1542,100 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): grpc channels will no longer be warmed """ await self.close() + + +@CrossSync.convert_class( + sync_name="Table", + add_mapping_for_name="Table", + replace_symbols={"_ApiSurfaceAsync": "_ApiSurface"}, +) +class TableAsync(_ApiSurfaceAsync): + """ + Main Data API surface for interacting with a Bigtable table. + + Table object maintains table_id, and app_profile_id context, and passes them with + each call + """ + + +@CrossSync.convert_class( + sync_name="AuthorizedView", + add_mapping_for_name="AuthorizedView", + replace_symbols={"_ApiSurfaceAsync": "_ApiSurface"}, +) +class AuthorizedViewAsync(_ApiSurfaceAsync): + """ + Provides access to an authorized view of a table. + + An authorized view is a subset of a table that you configure to include specific table data. + Then you grant access to the authorized view separately from access to the table. + + AuthorizedView object maintains table_id, app_profile_id, and authorized_view_id context, + and passed them with each call + """ + + @CrossSync.convert( + docstring_format_vars={ + "LOOP_MESSAGE": ( + "Must be created within an async context (running event loop)", + "", + ), + "RAISE_NO_LOOP": ( + "RuntimeError: if called outside of an async context (no running event loop)", + "None", + ), + } + ) + def __init__( + self, + client, + instance_id, + table_id, + view_id, + app_profile_id: str | None = None, + **kwargs, + ): + """ + Initialize an AuthorizedView instance + + {LOOP_MESSAGE} + + Args: + instance_id: The Bigtable instance ID to associate with this client. + instance_id is combined with the client's project to fully + specify the instance + table_id: The ID of the table. table_id is combined with the + instance_id and the client's project to fully specify the table + view_id: The id for the authorized view to use for requests + app_profile_id: The app profile to associate with requests. + https://cloud.google.com/bigtable/docs/app-profiles + default_read_rows_operation_timeout: The default timeout for read rows + operations, in seconds. If not set, defaults to 600 seconds (10 minutes) + default_read_rows_attempt_timeout: The default timeout for individual + read rows rpc requests, in seconds. If not set, defaults to 20 seconds + default_mutate_rows_operation_timeout: The default timeout for mutate rows + operations, in seconds. If not set, defaults to 600 seconds (10 minutes) + default_mutate_rows_attempt_timeout: The default timeout for individual + mutate rows rpc requests, in seconds. If not set, defaults to 60 seconds + default_operation_timeout: The default timeout for all other operations, in + seconds. If not set, defaults to 60 seconds + default_attempt_timeout: The default timeout for all other individual rpc + requests, in seconds. If not set, defaults to 20 seconds + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. + Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted) + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + Raises: + {RAISE_NO_LOOP} + """ + super().__init__(client, instance_id, table_id, app_profile_id, **kwargs) + self.authorized_view_id = view_id + self.authorized_view_name: str = self.client._gapic_client.authorized_view_path( + self.client.project, instance_id, table_id, view_id + ) + self._request_path = {"authorized_view_name": self.authorized_view_name} diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 6e15bb5f3..b7fc9b9ec 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -37,9 +37,11 @@ from google.cloud.bigtable.data.mutations import RowMutationEntry if CrossSync.is_async: - from google.cloud.bigtable.data._async.client import TableAsync as TableType + from google.cloud.bigtable.data._async.client import ( + _ApiSurfaceAsync as ApiSurfaceType, + ) else: - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType # type: ignore + from google.cloud.bigtable.data._sync_autogen.client import _ApiSurface as ApiSurfaceType # type: ignore __CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.mutations_batcher" @@ -210,7 +212,7 @@ class MutationsBatcherAsync: def __init__( self, - table: TableType, + table: ApiSurfaceType, *, flush_interval: float | None = 5, flush_limit_mutation_count: int | None = 1000, diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 4c45e5c1c..3320215dd 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -28,8 +28,8 @@ if TYPE_CHECKING: import grpc - from google.cloud.bigtable.data import TableAsync - from google.cloud.bigtable.data import Table + from google.cloud.bigtable.data._async.client import _ApiSurfaceAsync + from google.cloud.bigtable.data._sync_autogen.client import _ApiSurface """ Helper functions used in various places in the library. @@ -121,7 +121,7 @@ def _retry_exception_factory( def _get_timeouts( operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, - table: "TableAsync" | "Table", + table: "_ApiSurfaceAsync" | "_ApiSurface", ) -> tuple[float, float]: """ Convert passed in timeout values to floats, using table defaults if necessary. @@ -208,7 +208,7 @@ def _get_error_type( def _get_retryable_errors( call_codes: Sequence["grpc.StatusCode" | int | type[Exception]] | TABLE_DEFAULT, - table: "TableAsync" | "Table", + table: "_ApiSurfaceAsync" | "_ApiSurface", ) -> list[type[Exception]]: """ Convert passed in retryable error codes to a list of exception types. diff --git a/google/cloud/bigtable/data/_sync_autogen/_mutate_rows.py b/google/cloud/bigtable/data/_sync_autogen/_mutate_rows.py index 8e8c5ca89..2e7a02fe7 100644 --- a/google/cloud/bigtable/data/_sync_autogen/_mutate_rows.py +++ b/google/cloud/bigtable/data/_sync_autogen/_mutate_rows.py @@ -17,9 +17,9 @@ from __future__ import annotations from typing import Sequence, TYPE_CHECKING -import functools from google.api_core import exceptions as core_exceptions from google.api_core import retry as retries +import google.cloud.bigtable_v2.types.bigtable as types_pb import google.cloud.bigtable.data.exceptions as bt_exceptions from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory @@ -32,7 +32,9 @@ from google.cloud.bigtable_v2.services.bigtable.client import ( BigtableClient as GapicClientType, ) - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType + from google.cloud.bigtable.data._sync_autogen.client import ( + _ApiSurface as ApiSurfaceType, + ) class _MutateRowsOperation: @@ -57,7 +59,7 @@ class _MutateRowsOperation: def __init__( self, gapic_client: GapicClientType, - table: TableType, + table: ApiSurfaceType, mutation_entries: list["RowMutationEntry"], operation_timeout: float, attempt_timeout: float | None, @@ -68,12 +70,8 @@ def __init__( raise ValueError( f"mutate_rows requests can contain at most {_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across all entries. Found {total_mutations}." ) - self._gapic_fn = functools.partial( - gapic_client.mutate_rows, - table_name=table.table_name, - app_profile_id=table.app_profile_id, - retry=None, - ) + self._table = table + self._gapic_fn = gapic_client.mutate_rows self.is_retryable = retries.if_exception_type( *retryable_exceptions, bt_exceptions._MutateRowsIncomplete ) @@ -140,8 +138,12 @@ def _run_attempt(self): return try: result_generator = self._gapic_fn( + request=types_pb.MutateRowsRequest( + entries=request_entries, + app_profile_id=self._table.app_profile_id, + **self._table._request_path, + ), timeout=next(self.timeout_generator), - entries=request_entries, retry=None, ) for result_list in result_generator: diff --git a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py index 92619c6a4..9c80e897b 100644 --- a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py +++ b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py @@ -34,7 +34,9 @@ from google.cloud.bigtable.data._cross_sync import CrossSync if TYPE_CHECKING: - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType + from google.cloud.bigtable.data._sync_autogen.client import ( + _ApiSurface as ApiSurfaceType, + ) class _ReadRowsOperation: @@ -70,7 +72,7 @@ class _ReadRowsOperation: def __init__( self, query: ReadRowsQuery, - table: TableType, + table: ApiSurfaceType, operation_timeout: float, attempt_timeout: float, retryable_exceptions: Sequence[type[Exception]] = (), @@ -81,9 +83,7 @@ def __init__( self.operation_timeout = operation_timeout if isinstance(query, dict): self.request = ReadRowsRequestPB( - **query, - table_name=table.table_name, - app_profile_id=table.app_profile_id, + **query, **table._request_path, app_profile_id=table.app_profile_id ) else: self.request = query._to_pb(table) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 37e192147..260c81f66 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -34,6 +34,10 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest +from google.cloud.bigtable_v2.types.bigtable import SampleRowKeysRequest +from google.cloud.bigtable_v2.types.bigtable import MutateRowRequest +from google.cloud.bigtable_v2.types.bigtable import CheckAndMutateRowRequest +from google.cloud.bigtable_v2.types.bigtable import ReadModifyWriteRowRequest from google.cloud.client import ClientWithProject from google.cloud.environment_vars import BIGTABLE_EMULATOR from google.api_core import retry as retries @@ -291,7 +295,7 @@ def _manage_channel( next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0) def _register_instance( - self, instance_id: str, owner: Table | ExecuteQueryIterator + self, instance_id: str, owner: _ApiSurface | ExecuteQueryIterator ) -> None: """Registers an instance with the client, and warms the channel for the instance The client will periodically refresh grpc channel used to make @@ -316,7 +320,7 @@ def _register_instance( self._start_background_channel_refresh() def _remove_instance_registration( - self, instance_id: str, owner: Table | "ExecuteQueryIterator" + self, instance_id: str, owner: _ApiSurface | ExecuteQueryIterator ) -> bool: """Removes an instance from the client's registered instances, to prevent warming new channels for the instance @@ -384,6 +388,52 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> Table: None""" return Table(self, instance_id, table_id, *args, **kwargs) + def get_authorized_view( + self, instance_id: str, table_id: str, view_id: str, *args, **kwargs + ) -> AuthorizedView: + """Returns an authorized view instance for making data API requests. All arguments are passed + directly to the AuthorizedView constructor. + + + + Args: + instance_id: The Bigtable instance ID to associate with this client. + instance_id is combined with the client's project to fully + specify the instance + table_id: The ID of the table. table_id is combined with the + instance_id and the client's project to fully specify the table + view_id: The id for the authorized view to use for requests + app_profile_id: The app profile to associate with requests. + https://cloud.google.com/bigtable/docs/app-profiles + default_read_rows_operation_timeout: The default timeout for read rows + operations, in seconds. If not set, defaults to Table's value + default_read_rows_attempt_timeout: The default timeout for individual + read rows rpc requests, in seconds. If not set, defaults Table's value + default_mutate_rows_operation_timeout: The default timeout for mutate rows + operations, in seconds. If not set, defaults to Table's value + default_mutate_rows_attempt_timeout: The default timeout for individual + mutate rows rpc requests, in seconds. If not set, defaults Table's value + default_operation_timeout: The default timeout for all other operations, in + seconds. If not set, defaults to Table's value + default_attempt_timeout: The default timeout for all other individual rpc + requests, in seconds. If not set, defaults to Table's value + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. If not set, + defaults to Table's value + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. If not set, + defaults to Table's value + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. If not set, defaults to + Table's value + Returns: + AuthorizedView: a table instance for making data API requests + Raises: + None""" + return CrossSync._Sync_Impl.AuthorizedView( + self, instance_id, table_id, view_id, *args, **kwargs + ) + def execute_query( self, query: str, @@ -473,13 +523,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._gapic_client.__exit__(exc_type, exc_val, exc_tb) -@CrossSync._Sync_Impl.add_mapping_decorator("Table") -class Table: +class _ApiSurface: """ - Main Data API surface + Abstract class containing API surface for BigtableDataClient. Should not be created directly - Table object maintains table_id, and app_profile_id context, and passes them with - each call + Can be instantiated as a Table or an AuthorizedView """ def __init__( @@ -582,6 +630,7 @@ def __init__( default_mutate_rows_retryable_errors or () ) self.default_retryable_errors = default_retryable_errors or () + self._request_path = {"table_name": self.table_name} try: self._register_instance_future = CrossSync._Sync_Impl.create_task( self.client._register_instance, @@ -920,8 +969,9 @@ def sample_row_keys( def execute_rpc(): results = self.client._gapic_client.sample_row_keys( - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=SampleRowKeysRequest( + app_profile_id=self.app_profile_id, **self._request_path + ), timeout=next(attempt_timeout_gen), retry=None, ) @@ -1037,10 +1087,14 @@ def mutate_row( sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) target = partial( self.client._gapic_client.mutate_row, - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - mutations=[mutation._to_pb() for mutation in mutations_list], - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=MutateRowRequest( + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + mutations=[mutation._to_pb() for mutation in mutations_list], + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=attempt_timeout, retry=None, ) @@ -1155,12 +1209,16 @@ def check_and_mutate_row( false_case_mutations = [false_case_mutations] false_case_list = [m._to_pb() for m in false_case_mutations or []] result = self.client._gapic_client.check_and_mutate_row( - true_mutations=true_case_list, - false_mutations=false_case_list, - predicate_filter=predicate._to_pb() if predicate is not None else None, - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=CheckAndMutateRowRequest( + true_mutations=true_case_list, + false_mutations=false_case_list, + predicate_filter=predicate._to_pb() if predicate is not None else None, + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=operation_timeout, retry=None, ) @@ -1202,10 +1260,14 @@ def read_modify_write_row( if not rules: raise ValueError("rules must contain at least one item") result = self.client._gapic_client.read_modify_write_row( - rules=[rule._to_pb() for rule in rules], - row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, - table_name=self.table_name, - app_profile_id=self.app_profile_id, + request=ReadModifyWriteRowRequest( + rules=[rule._to_pb() for rule in rules], + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), timeout=operation_timeout, retry=None, ) @@ -1232,3 +1294,78 @@ def __exit__(self, exc_type, exc_val, exc_tb): Unregister this instance with the client, so that grpc channels will no longer be warmed""" self.close() + + +@CrossSync._Sync_Impl.add_mapping_decorator("Table") +class Table(_ApiSurface): + """ + Main Data API surface for interacting with a Bigtable table. + + Table object maintains table_id, and app_profile_id context, and passes them with + each call + """ + + +@CrossSync._Sync_Impl.add_mapping_decorator("AuthorizedView") +class AuthorizedView(_ApiSurface): + """ + Provides access to an authorized view of a table. + + An authorized view is a subset of a table that you configure to include specific table data. + Then you grant access to the authorized view separately from access to the table. + + AuthorizedView object maintains table_id, app_profile_id, and authorized_view_id context, + and passed them with each call + """ + + def __init__( + self, + client, + instance_id, + table_id, + view_id, + app_profile_id: str | None = None, + **kwargs, + ): + """Initialize an AuthorizedView instance + + + + Args: + instance_id: The Bigtable instance ID to associate with this client. + instance_id is combined with the client's project to fully + specify the instance + table_id: The ID of the table. table_id is combined with the + instance_id and the client's project to fully specify the table + view_id: The id for the authorized view to use for requests + app_profile_id: The app profile to associate with requests. + https://cloud.google.com/bigtable/docs/app-profiles + default_read_rows_operation_timeout: The default timeout for read rows + operations, in seconds. If not set, defaults to 600 seconds (10 minutes) + default_read_rows_attempt_timeout: The default timeout for individual + read rows rpc requests, in seconds. If not set, defaults to 20 seconds + default_mutate_rows_operation_timeout: The default timeout for mutate rows + operations, in seconds. If not set, defaults to 600 seconds (10 minutes) + default_mutate_rows_attempt_timeout: The default timeout for individual + mutate rows rpc requests, in seconds. If not set, defaults to 60 seconds + default_operation_timeout: The default timeout for all other operations, in + seconds. If not set, defaults to 60 seconds + default_attempt_timeout: The default timeout for all other individual rpc + requests, in seconds. If not set, defaults to 20 seconds + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. + Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted) + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + Raises: + None""" + super().__init__(client, instance_id, table_id, app_profile_id, **kwargs) + self.authorized_view_id = view_id + self.authorized_view_name: str = self.client._gapic_client.authorized_view_path( + self.client.project, instance_id, table_id, view_id + ) + self._request_path = {"authorized_view_name": self.authorized_view_name} diff --git a/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py b/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py index 2e4237b74..a9ebaa9d8 100644 --- a/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py +++ b/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py @@ -32,7 +32,9 @@ if TYPE_CHECKING: from google.cloud.bigtable.data.mutations import RowMutationEntry - from google.cloud.bigtable.data._sync_autogen.client import Table as TableType + from google.cloud.bigtable.data._sync_autogen.client import ( + _ApiSurface as ApiSurfaceType, + ) _MB_SIZE = 1024 * 1024 @@ -179,7 +181,7 @@ class MutationsBatcher: def __init__( self, - table: TableType, + table: ApiSurfaceType, *, flush_interval: float | None = 5, flush_limit_mutation_count: int | None = 1000, diff --git a/google/cloud/bigtable/data/read_rows_query.py b/google/cloud/bigtable/data/read_rows_query.py index e0839a2af..7652bfbb9 100644 --- a/google/cloud/bigtable/data/read_rows_query.py +++ b/google/cloud/bigtable/data/read_rows_query.py @@ -489,11 +489,11 @@ def _to_pb(self, table) -> ReadRowsRequestPB: ReadRowsRequest protobuf """ return ReadRowsRequestPB( - table_name=table.table_name, app_profile_id=table.app_profile_id, filter=self.filter._to_pb() if self.filter else None, rows_limit=self.limit or 0, rows=self._row_set, + **table._request_path, ) def __eq__(self, other): diff --git a/tests/system/data/setup_fixtures.py b/tests/system/data/setup_fixtures.py index 3b5a0af06..9dd3b5d73 100644 --- a/tests/system/data/setup_fixtures.py +++ b/tests/system/data/setup_fixtures.py @@ -20,6 +20,12 @@ import os import uuid +from . import TEST_FAMILY, TEST_FAMILY_2 + +# authorized view subset to allow all qualifiers +ALLOW_ALL = "" +ALL_QUALIFIERS = {"qualifier_prefixes": [ALLOW_ALL]} + @pytest.fixture(scope="session") def admin_client(): @@ -140,6 +146,59 @@ def table_id( print(f"Table {init_table_id} not found, skipping deletion") +@pytest.fixture(scope="session") +def authorized_view_id( + admin_client, + project_id, + instance_id, + table_id, +): + """ + Creates and returns a new temporary authorized view for the test session + + Args: + - admin_client: Client for interacting with the Table Admin API. Supplied by the admin_client fixture. + - project_id: The project ID of the GCP project to test against. Supplied by the project_id fixture. + - instance_id: The ID of the Bigtable instance to test against. Supplied by the instance_id fixture. + - table_id: The ID of the table to create the authorized view for. Supplied by the table_id fixture. + """ + from google.api_core import exceptions + from google.api_core import retry + + retry = retry.Retry( + predicate=retry.if_exception_type(exceptions.FailedPrecondition) + ) + new_view_id = uuid.uuid4().hex[:8] + parent_path = f"projects/{project_id}/instances/{instance_id}/tables/{table_id}" + new_path = f"{parent_path}/authorizedViews/{new_view_id}" + try: + print(f"Creating view: {new_path}") + admin_client.table_admin_client.create_authorized_view( + request={ + "parent": parent_path, + "authorized_view_id": new_view_id, + "authorized_view": { + "subset_view": { + "row_prefixes": [ALLOW_ALL], + "family_subsets": { + TEST_FAMILY: ALL_QUALIFIERS, + TEST_FAMILY_2: ALL_QUALIFIERS, + }, + }, + }, + }, + retry=retry, + ) + except exceptions.AlreadyExists: + pass + yield new_view_id + print(f"Deleting view: {new_path}") + try: + admin_client.table_admin_client.delete_authorized_view(name=new_path) + except exceptions.NotFound: + print(f"View {new_view_id} not found, skipping deletion") + + @pytest.fixture(scope="session") def project_id(client): """Returns the project ID from the client.""" diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index b97859de1..bb95feef9 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -17,7 +17,7 @@ import uuid import os from google.api_core import retry -from google.api_core.exceptions import ClientError +from google.api_core.exceptions import ClientError, PermissionDenied from google.cloud.bigtable.data.read_modify_write_rules import _MAX_INCREMENT_VALUE from google.cloud.environment_vars import BIGTABLE_EMULATOR @@ -90,10 +90,21 @@ async def client(self): yield client @CrossSync.convert - @CrossSync.pytest_fixture(scope="session") - async def table(self, client, table_id, instance_id): - async with client.get_table(instance_id, table_id) as table: - yield table + @CrossSync.pytest_fixture(scope="session", params=["table", "authorized_view"]) + async def table(self, client, table_id, authorized_view_id, instance_id, request): + """ + This fixture runs twice: once for a standard table, and once with an authorized view + """ + if request.param == "table": + async with client.get_table(instance_id, table_id) as table: + yield table + elif request.param == "authorized_view": + async with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as view: + yield view + else: + raise ValueError(f"unknown table type: {request.param}") @CrossSync.drop @pytest.fixture(scope="session") @@ -1014,3 +1025,20 @@ async def test_literal_value_filter( assert len(row_list) == bool( expect_match ), f"row {type(cell_value)}({cell_value}) not found with {type(filter_input)}({filter_input}) filter" + + @CrossSync.pytest + async def test_authorized_view_unauthenticated( + self, client, authorized_view_id, instance_id, table_id + ): + """ + Requesting family outside authorized family_subset should raise exception + """ + from google.cloud.bigtable.data.mutations import SetCell + + async with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as view: + mutation = SetCell(family="unauthorized", qualifier="q", new_value="v") + with pytest.raises(PermissionDenied) as e: + await view.mutate_row(b"row-key", mutation) + assert "outside the Authorized View" in e.value.message diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index 2dde82bf1..45d07dac9 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -19,7 +19,7 @@ import uuid import os from google.api_core import retry -from google.api_core.exceptions import ClientError +from google.api_core.exceptions import ClientError, PermissionDenied from google.cloud.bigtable.data.read_modify_write_rules import _MAX_INCREMENT_VALUE from google.cloud.environment_vars import BIGTABLE_EMULATOR from google.cloud.bigtable.data._cross_sync import CrossSync @@ -78,10 +78,19 @@ def client(self): with CrossSync._Sync_Impl.DataClient(project=project) as client: yield client - @pytest.fixture(scope="session") - def table(self, client, table_id, instance_id): - with client.get_table(instance_id, table_id) as table: - yield table + @pytest.fixture(scope="session", params=["table", "authorized_view"]) + def table(self, client, table_id, authorized_view_id, instance_id, request): + """This fixture runs twice: once for a standard table, and once with an authorized view""" + if request.param == "table": + with client.get_table(instance_id, table_id) as table: + yield table + elif request.param == "authorized_view": + with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as view: + yield view + else: + raise ValueError(f"unknown table type: {request.param}") @pytest.fixture(scope="session") def column_family_config(self): @@ -826,3 +835,17 @@ def test_literal_value_filter( assert len(row_list) == bool( expect_match ), f"row {type(cell_value)}({cell_value}) not found with {type(filter_input)}({filter_input}) filter" + + def test_authorized_view_unauthenticated( + self, client, authorized_view_id, instance_id, table_id + ): + """Requesting family outside authorized family_subset should raise exception""" + from google.cloud.bigtable.data.mutations import SetCell + + with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as view: + mutation = SetCell(family="unauthorized", qualifier="q", new_value="v") + with pytest.raises(PermissionDenied) as e: + view.mutate_row(b"row-key", mutation) + assert "outside the Authorized View" in e.value.message diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index 13f668fd3..325a6470f 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -15,6 +15,8 @@ import pytest from google.cloud.bigtable_v2.types import MutateRowsResponse +from google.cloud.bigtable.data.mutations import RowMutationEntry +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.rpc import status_pb2 from google.api_core.exceptions import DeadlineExceeded from google.api_core.exceptions import Forbidden @@ -37,8 +39,11 @@ def _target_class(self): def _make_one(self, *args, **kwargs): if not args: + fake_table = CrossSync.Mock() + fake_table._request_path = {"table_name": "table"} + fake_table.app_profile_id = None kwargs["gapic_client"] = kwargs.pop("gapic_client", mock.Mock()) - kwargs["table"] = kwargs.pop("table", CrossSync.Mock()) + kwargs["table"] = kwargs.pop("table", fake_table) kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5) kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1) kwargs["retryable_exceptions"] = kwargs.pop("retryable_exceptions", ()) @@ -46,9 +51,8 @@ def _make_one(self, *args, **kwargs): return self._target_class()(*args, **kwargs) def _make_mutation(self, count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", [DeleteAllFromRow() for _ in range(count)]) + mutation.size = lambda: size return mutation @CrossSync.convert @@ -95,16 +99,10 @@ def test_ctor(self): attempt_timeout, retryable_exceptions, ) - # running gapic_fn should trigger a client call + # running gapic_fn should trigger a client call with baked-in args assert client.mutate_rows.call_count == 0 instance._gapic_fn() assert client.mutate_rows.call_count == 1 - # gapic_fn should call with table details - inner_kwargs = client.mutate_rows.call_args[1] - assert len(inner_kwargs) == 3 - assert inner_kwargs["table_name"] == table.table_name - assert inner_kwargs["app_profile_id"] == table.app_profile_id - assert inner_kwargs["retry"] is None # entries should be passed down entries_w_pb = [_EntryWithProto(e, e._to_pb()) for e in entries] assert instance.mutations == entries_w_pb @@ -174,6 +172,8 @@ async def test_mutate_rows_attempt_exception(self, exc_type): """ client = CrossSync.Mock() table = mock.Mock() + table._request_path = {"table_name": "table"} + table.app_profile_id = None entries = [self._make_mutation(), self._make_mutation()] operation_timeout = 0.05 expected_exception = exc_type("test") @@ -307,7 +307,8 @@ async def test_run_attempt_single_entry_success(self): assert mock_gapic_fn.call_count == 1 _, kwargs = mock_gapic_fn.call_args assert kwargs["timeout"] == expected_timeout - assert kwargs["entries"] == [mutation._to_pb()] + request = kwargs["request"] + assert request.entries == [mutation._to_pb()] @CrossSync.pytest async def test_run_attempt_empty_request(self): diff --git a/tests/unit/data/_async/test__read_rows.py b/tests/unit/data/_async/test__read_rows.py index 944681a84..c43f46d5a 100644 --- a/tests/unit/data/_async/test__read_rows.py +++ b/tests/unit/data/_async/test__read_rows.py @@ -54,7 +54,7 @@ def test_ctor(self): client.read_rows.return_value = None table = mock.Mock() table._client = client - table.table_name = "test_table" + table._request_path = {"table_name": "test_table"} table.app_profile_id = "test_profile" expected_operation_timeout = 42 expected_request_timeout = 44 @@ -78,7 +78,7 @@ def test_ctor(self): assert instance._remaining_count == row_limit assert instance.operation_timeout == expected_operation_timeout assert client.read_rows.call_count == 0 - assert instance.request.table_name == table.table_name + assert instance.request.table_name == "test_table" assert instance.request.app_profile_id == table.app_profile_id assert instance.request.rows_limit == row_limit @@ -267,7 +267,7 @@ async def mock_stream(): query = ReadRowsQuery(limit=start_limit) table = mock.Mock() - table.table_name = "table_name" + table._request_path = {"table_name": "table_name"} table.app_profile_id = "app_profile_id" instance = self._make_one(query, table, 10, 10) assert instance._remaining_count == start_limit @@ -306,7 +306,7 @@ async def mock_stream(): query = ReadRowsQuery(limit=start_limit) table = mock.Mock() - table.table_name = "table_name" + table._request_path = {"table_name": "table_name"} table.app_profile_id = "app_profile_id" instance = self._make_one(query, table, 10, 10) assert instance._remaining_count == start_limit diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 18ff69ffd..d9d50b7af 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -28,6 +28,7 @@ from google.api_core import exceptions as core_exceptions from google.cloud.bigtable.data.exceptions import InvalidChunk from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.cloud.bigtable.data import TABLE_DEFAULT from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule @@ -803,8 +804,12 @@ async def test__multiple_instance_registration(self): assert len(client._instance_owners[instance_1_key]) == 0 assert len(client._instance_owners[instance_2_key]) == 0 + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) @CrossSync.pytest - async def test_get_table(self): + async def test_get_api_surface(self, method): + """ + test client.get_table and client.get_authorized_view + """ from google.cloud.bigtable.data._helpers import _WarmedInstanceKey client = self._make_client(project="project-id") @@ -812,67 +817,92 @@ async def test_get_table(self): expected_table_id = "table-id" expected_instance_id = "instance-id" expected_app_profile_id = "app-profile-id" - table = client.get_table( - expected_instance_id, - expected_table_id, - expected_app_profile_id, - ) + if method == "get_table": + surface = client.get_table( + expected_instance_id, + expected_table_id, + expected_app_profile_id, + ) + assert isinstance(surface, CrossSync.TestTable._get_target_class()) + elif method == "get_authorized_view": + surface = client.get_authorized_view( + expected_instance_id, + expected_table_id, + "view_id", + expected_app_profile_id, + ) + assert isinstance(surface, CrossSync.TestAuthorizedView._get_target_class()) + assert ( + surface.authorized_view_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}/authorizedViews/view_id" + ) + else: + raise TypeError(f"unexpected method: {method}") await CrossSync.yield_to_event_loop() - assert isinstance(table, CrossSync.TestTable._get_target_class()) - assert table.table_id == expected_table_id + assert surface.table_id == expected_table_id assert ( - table.table_name + surface.table_name == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" ) - assert table.instance_id == expected_instance_id + assert surface.instance_id == expected_instance_id assert ( - table.instance_name + surface.instance_name == f"projects/{client.project}/instances/{expected_instance_id}" ) - assert table.app_profile_id == expected_app_profile_id - assert table.client is client + assert surface.app_profile_id == expected_app_profile_id + assert surface.client is client instance_key = _WarmedInstanceKey( - table.instance_name, table.table_name, table.app_profile_id + surface.instance_name, surface.table_name, surface.app_profile_id ) assert instance_key in client._active_instances - assert client._instance_owners[instance_key] == {id(table)} + assert client._instance_owners[instance_key] == {id(surface)} await client.close() + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) @CrossSync.pytest - async def test_get_table_arg_passthrough(self): + async def test_api_surface_arg_passthrough(self, method): """ - All arguments passed in get_table should be sent to constructor + All arguments passed in get_table and get_authorized_view should be sent to constructor """ + if method == "get_table": + surface_type = CrossSync.TestTable._get_target_class() + elif method == "get_authorized_view": + surface_type = CrossSync.TestAuthorizedView._get_target_class() + else: + raise TypeError(f"unexpected method: {method}") + async with self._make_client(project="project-id") as client: - with mock.patch.object( - CrossSync.TestTable._get_target_class(), "__init__" - ) as mock_constructor: + with mock.patch.object(surface_type, "__init__") as mock_constructor: mock_constructor.return_value = None assert not client._active_instances - expected_table_id = "table-id" - expected_instance_id = "instance-id" - expected_app_profile_id = "app-profile-id" - expected_args = (1, "test", {"test": 2}) + expected_args = ( + "table", + "instance", + "view", + "app_profile", + 1, + "test", + {"test": 2}, + ) expected_kwargs = {"hello": "world", "test": 2} - client.get_table( - expected_instance_id, - expected_table_id, - expected_app_profile_id, + getattr(client, method)( *expected_args, **expected_kwargs, ) mock_constructor.assert_called_once_with( client, - expected_instance_id, - expected_table_id, - expected_app_profile_id, *expected_args, **expected_kwargs, ) + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) @CrossSync.pytest - async def test_get_table_context_manager(self): + async def test_api_surface_context_manager(self, method): + """ + get_table and get_authorized_view should work as context managers + """ + from functools import partial from google.cloud.bigtable.data._helpers import _WarmedInstanceKey expected_table_id = "table-id" @@ -880,17 +910,35 @@ async def test_get_table_context_manager(self): expected_app_profile_id = "app-profile-id" expected_project_id = "project-id" - with mock.patch.object( - CrossSync.TestTable._get_target_class(), "close" - ) as close_mock: + if method == "get_table": + surface_type = CrossSync.TestTable._get_target_class() + elif method == "get_authorized_view": + surface_type = CrossSync.TestAuthorizedView._get_target_class() + else: + raise TypeError(f"unexpected method: {method}") + + with mock.patch.object(surface_type, "close") as close_mock: async with self._make_client(project=expected_project_id) as client: - async with client.get_table( - expected_instance_id, - expected_table_id, - expected_app_profile_id, - ) as table: + if method == "get_table": + fn = partial( + client.get_table, + expected_instance_id, + expected_table_id, + expected_app_profile_id, + ) + elif method == "get_authorized_view": + fn = partial( + client.get_authorized_view, + expected_instance_id, + expected_table_id, + "view_id", + expected_app_profile_id, + ) + else: + raise TypeError(f"unexpected method: {method}") + async with fn() as table: await CrossSync.yield_to_event_loop() - assert isinstance(table, CrossSync.TestTable._get_target_class()) + assert isinstance(table, surface_type) assert table.table_id == expected_table_id assert ( table.table_name @@ -988,8 +1036,20 @@ def _make_client(self, *args, **kwargs): def _get_target_class(): return CrossSync.Table + def _make_one( + self, + client, + instance_id="instance", + table_id="table", + app_profile_id=None, + **kwargs, + ): + return self._get_target_class()( + client, instance_id, table_id, app_profile_id, **kwargs + ) + @CrossSync.pytest - async def test_table_ctor(self): + async def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey expected_table_id = "table-id" @@ -1019,6 +1079,14 @@ async def test_table_ctor(self): await CrossSync.yield_to_event_loop() assert table.table_id == expected_table_id assert table.instance_id == expected_instance_id + assert ( + table.table_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" + ) + assert ( + table.instance_name + == f"projects/{client.project}/instances/{expected_instance_id}" + ) assert table.app_profile_id == expected_app_profile_id assert table.client is client instance_key = _WarmedInstanceKey( @@ -1052,23 +1120,15 @@ async def test_table_ctor(self): await client.close() @CrossSync.pytest - async def test_table_ctor_defaults(self): + async def test_ctor_defaults(self): """ should provide default timeout values and app_profile_id """ - expected_table_id = "table-id" - expected_instance_id = "instance-id" client = self._make_client() assert not client._active_instances - table = self._get_target_class()( - client, - expected_instance_id, - expected_table_id, - ) + table = self._make_one(client) await CrossSync.yield_to_event_loop() - assert table.table_id == expected_table_id - assert table.instance_id == expected_instance_id assert table.app_profile_id is None assert table.client is client assert table.default_operation_timeout == 60 @@ -1080,7 +1140,7 @@ async def test_table_ctor_defaults(self): await client.close() @CrossSync.pytest - async def test_table_ctor_invalid_timeout_values(self): + async def test_ctor_invalid_timeout_values(self): """ bad timeout values should raise ValueError """ @@ -1099,10 +1159,10 @@ async def test_table_ctor_invalid_timeout_values(self): ] for operation_timeout, attempt_timeout in timeout_pairs: with pytest.raises(ValueError) as e: - self._get_target_class()(client, "", "", **{attempt_timeout: -1}) + self._make_one(client, **{attempt_timeout: -1}) assert "attempt_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: - self._get_target_class()(client, "", "", **{operation_timeout: -1}) + self._make_one(client, **{operation_timeout: -1}) assert "operation_timeout must be greater than 0" in str(e.value) await client.close() @@ -1152,13 +1212,13 @@ def test_table_ctor_sync(self): ("sample_row_keys", (), False, ()), ( "mutate_row", - (b"row_key", [mock.Mock()]), + (b"row_key", [DeleteAllFromRow()]), False, (), ), ( "bulk_mutate_rows", - ([mutations.RowMutationEntry(b"key", [mutations.DeleteAllFromRow()])],), + ([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],), False, (_MutateRowsIncomplete,), ), @@ -1270,7 +1330,7 @@ async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_ gapic_client = gapic_client._client gapic_client._transport = transport_mock gapic_client._is_universe_domain_valid = True - table = self._get_target_class()(client, "instance-id", "table-id", profile) + table = self._make_one(client, app_profile_id=profile) try: test_fn = table.__getattribute__(fn_name) maybe_stream = await test_fn(*fn_args) @@ -1286,12 +1346,131 @@ async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_ # expect x-goog-request-params tag assert metadata[0][0] == "x-goog-request-params" routing_str = metadata[0][1] - assert "table_name=" + table.table_name in routing_str + assert self._expected_routing_header(table) in routing_str if include_app_profile: assert "app_profile_id=profile" in routing_str else: assert "app_profile_id=" not in routing_str + @staticmethod + def _expected_routing_header(table): + """ + the expected routing header for this _ApiSurface type + """ + return f"table_name={table.table_name}" + + +@CrossSync.convert_class( + "TestAuthorizedView", add_mapping_for_name="TestAuthorizedView" +) +class TestAuthorizedViewsAsync(CrossSync.TestTable): + """ + Inherit tests from TestTableAsync, with some modifications + """ + + @staticmethod + @CrossSync.convert + def _get_target_class(): + return CrossSync.AuthorizedView + + def _make_one( + self, + client, + instance_id="instance", + table_id="table", + view_id="view", + app_profile_id=None, + **kwargs, + ): + return self._get_target_class()( + client, instance_id, table_id, view_id, app_profile_id, **kwargs + ) + + @staticmethod + def _expected_routing_header(view): + """ + the expected routing header for this _ApiSurface type + """ + return f"authorized_view_name={view.authorized_view_name}" + + @CrossSync.pytest + async def test_ctor(self): + from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + + expected_table_id = "table-id" + expected_instance_id = "instance-id" + expected_view_id = "view_id" + expected_app_profile_id = "app-profile-id" + expected_operation_timeout = 123 + expected_attempt_timeout = 12 + expected_read_rows_operation_timeout = 1.5 + expected_read_rows_attempt_timeout = 0.5 + expected_mutate_rows_operation_timeout = 2.5 + expected_mutate_rows_attempt_timeout = 0.75 + client = self._make_client() + assert not client._active_instances + + table = self._get_target_class()( + client, + expected_instance_id, + expected_table_id, + expected_view_id, + expected_app_profile_id, + default_operation_timeout=expected_operation_timeout, + default_attempt_timeout=expected_attempt_timeout, + default_read_rows_operation_timeout=expected_read_rows_operation_timeout, + default_read_rows_attempt_timeout=expected_read_rows_attempt_timeout, + default_mutate_rows_operation_timeout=expected_mutate_rows_operation_timeout, + default_mutate_rows_attempt_timeout=expected_mutate_rows_attempt_timeout, + ) + await CrossSync.yield_to_event_loop() + assert table.table_id == expected_table_id + assert ( + table.table_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" + ) + assert table.instance_id == expected_instance_id + assert ( + table.instance_name + == f"projects/{client.project}/instances/{expected_instance_id}" + ) + assert table.authorized_view_id == expected_view_id + assert ( + table.authorized_view_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}/authorizedViews/{expected_view_id}" + ) + assert table.app_profile_id == expected_app_profile_id + assert table.client is client + instance_key = _WarmedInstanceKey( + table.instance_name, table.table_name, table.app_profile_id + ) + assert instance_key in client._active_instances + assert client._instance_owners[instance_key] == {id(table)} + assert table.default_operation_timeout == expected_operation_timeout + assert table.default_attempt_timeout == expected_attempt_timeout + assert ( + table.default_read_rows_operation_timeout + == expected_read_rows_operation_timeout + ) + assert ( + table.default_read_rows_attempt_timeout + == expected_read_rows_attempt_timeout + ) + assert ( + table.default_mutate_rows_operation_timeout + == expected_mutate_rows_operation_timeout + ) + assert ( + table.default_mutate_rows_attempt_timeout + == expected_mutate_rows_attempt_timeout + ) + # ensure task reaches completion + await table._register_instance_future + assert table._register_instance_future.done() + assert not table._register_instance_future.cancelled() + assert table._register_instance_future.exception() is None + await client.close() + @CrossSync.convert_class( "TestReadRows", @@ -2123,11 +2302,12 @@ async def test_sample_row_keys_gapic_params(self): await table.sample_row_keys(attempt_timeout=expected_timeout) args, kwargs = sample_row_keys.call_args assert len(args) == 0 - assert len(kwargs) == 4 + assert len(kwargs) == 3 assert kwargs["timeout"] == expected_timeout - assert kwargs["app_profile_id"] == expected_profile - assert kwargs["table_name"] == table.table_name assert kwargs["retry"] is None + request = kwargs["request"] + assert request.app_profile_id == expected_profile + assert request.table_name == table.table_name @pytest.mark.parametrize( "retryable_exception", @@ -2223,17 +2403,18 @@ async def test_mutate_row(self, mutation_arg): ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0].kwargs + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["row_key"] == b"row_key" + assert request.row_key == b"row_key" formatted_mutations = ( [mutation._to_pb() for mutation in mutation_arg] if isinstance(mutation_arg, list) else [mutation_arg._to_pb()] ) - assert kwargs["mutations"] == formatted_mutations + assert request.mutations == formatted_mutations assert kwargs["timeout"] == expected_attempt_timeout # make sure gapic layer is not retrying assert kwargs["retry"] is None @@ -2405,11 +2586,12 @@ async def test_bulk_mutate_rows(self, mutation_arg): ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args[1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["entries"] == [bulk_mutation._to_pb()] + assert request.entries == [bulk_mutation._to_pb()] assert kwargs["timeout"] == expected_attempt_timeout assert kwargs["retry"] is None @@ -2430,12 +2612,13 @@ async def test_bulk_mutate_rows_multiple_entries(self): ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args[1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["entries"][0] == entry_1._to_pb() - assert kwargs["entries"][1] == entry_2._to_pb() + assert request.entries[0] == entry_1._to_pb() + assert request.entries[1] == entry_2._to_pb() @CrossSync.pytest @pytest.mark.parametrize( @@ -2743,8 +2926,8 @@ async def test_check_and_mutate(self, gapic_result): ) row_key = b"row_key" predicate = None - true_mutations = [mock.Mock()] - false_mutations = [mock.Mock(), mock.Mock()] + true_mutations = [DeleteAllFromRow()] + false_mutations = [DeleteAllFromRow(), DeleteAllFromRow()] operation_timeout = 0.2 found = await table.check_and_mutate_row( row_key, @@ -2755,16 +2938,17 @@ async def test_check_and_mutate(self, gapic_result): ) assert found == gapic_result kwargs = mock_gapic.call_args[1] - assert kwargs["table_name"] == table.table_name - assert kwargs["row_key"] == row_key - assert kwargs["predicate_filter"] == predicate - assert kwargs["true_mutations"] == [ + request = kwargs["request"] + assert request.table_name == table.table_name + assert request.row_key == row_key + assert bool(request.predicate_filter) is False + assert request.true_mutations == [ m._to_pb() for m in true_mutations ] - assert kwargs["false_mutations"] == [ + assert request.false_mutations == [ m._to_pb() for m in false_mutations ] - assert kwargs["app_profile_id"] == app_profile + assert request.app_profile_id == app_profile assert kwargs["timeout"] == operation_timeout assert kwargs["retry"] is None @@ -2806,16 +2990,18 @@ async def test_check_and_mutate_single_mutations(self): false_case_mutations=false_mutation, ) kwargs = mock_gapic.call_args[1] - assert kwargs["true_mutations"] == [true_mutation._to_pb()] - assert kwargs["false_mutations"] == [false_mutation._to_pb()] + request = kwargs["request"] + assert request.true_mutations == [true_mutation._to_pb()] + assert request.false_mutations == [false_mutation._to_pb()] @CrossSync.pytest async def test_check_and_mutate_predicate_object(self): """predicate filter should be passed to gapic request""" from google.cloud.bigtable_v2.types import CheckAndMutateRowResponse + from google.cloud.bigtable_v2.types.data import RowFilter mock_predicate = mock.Mock() - predicate_pb = {"predicate": "dict"} + predicate_pb = RowFilter({"sink": True}) mock_predicate._to_pb.return_value = predicate_pb async with self._make_client() as client: async with client.get_table("instance", "table") as table: @@ -2828,10 +3014,11 @@ async def test_check_and_mutate_predicate_object(self): await table.check_and_mutate_row( b"row_key", mock_predicate, - false_case_mutations=[mock.Mock()], + false_case_mutations=[DeleteAllFromRow()], ) kwargs = mock_gapic.call_args[1] - assert kwargs["predicate_filter"] == predicate_pb + request = kwargs["request"] + assert request.predicate_filter == predicate_pb assert mock_predicate._to_pb.call_count == 1 assert kwargs["retry"] is None @@ -2839,11 +3026,11 @@ async def test_check_and_mutate_predicate_object(self): async def test_check_and_mutate_mutations_parsing(self): """mutations objects should be converted to protos""" from google.cloud.bigtable_v2.types import CheckAndMutateRowResponse - from google.cloud.bigtable.data.mutations import DeleteAllFromRow + from google.cloud.bigtable.data.mutations import DeleteAllFromFamily mutations = [mock.Mock() for _ in range(5)] for idx, mutation in enumerate(mutations): - mutation._to_pb.return_value = f"fake {idx}" + mutation._to_pb.return_value = DeleteAllFromFamily(f"fake {idx}")._to_pb() mutations.append(DeleteAllFromRow()) async with self._make_client() as client: async with client.get_table("instance", "table") as table: @@ -2860,11 +3047,15 @@ async def test_check_and_mutate_mutations_parsing(self): false_case_mutations=mutations[2:], ) kwargs = mock_gapic.call_args[1] - assert kwargs["true_mutations"] == ["fake 0", "fake 1"] - assert kwargs["false_mutations"] == [ - "fake 2", - "fake 3", - "fake 4", + request = kwargs["request"] + assert request.true_mutations == [ + DeleteAllFromFamily("fake 0")._to_pb(), + DeleteAllFromFamily("fake 1")._to_pb(), + ] + assert request.false_mutations == [ + DeleteAllFromFamily("fake 2")._to_pb(), + DeleteAllFromFamily("fake 3")._to_pb(), + DeleteAllFromFamily("fake 4")._to_pb(), DeleteAllFromRow()._to_pb(), ] assert all( @@ -2912,7 +3103,8 @@ async def test_read_modify_write_call_rule_args(self, call_rules, expected_rules await table.read_modify_write_row("key", call_rules) assert mock_gapic.call_count == 1 found_kwargs = mock_gapic.call_args_list[0][1] - assert found_kwargs["rules"] == expected_rules + request = found_kwargs["request"] + assert request.rules == expected_rules assert found_kwargs["retry"] is None @pytest.mark.parametrize("rules", [[], None]) @@ -2935,15 +3127,16 @@ async def test_read_modify_write_call_defaults(self): with mock.patch.object( client._gapic_client, "read_modify_write_row" ) as mock_gapic: - await table.read_modify_write_row(row_key, mock.Mock()) + await table.read_modify_write_row(row_key, IncrementRule("f", "q")) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == f"projects/{project}/instances/{instance}/tables/{table_id}" ) - assert kwargs["app_profile_id"] is None - assert kwargs["row_key"] == row_key.encode() + assert bool(request.app_profile_id) is False + assert request.row_key == row_key.encode() assert kwargs["timeout"] > 1 @CrossSync.pytest @@ -2960,13 +3153,14 @@ async def test_read_modify_write_call_overrides(self): ) as mock_gapic: await table.read_modify_write_row( row_key, - mock.Mock(), + IncrementRule("f", "q"), operation_timeout=expected_timeout, ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] - assert kwargs["app_profile_id"] is profile_id - assert kwargs["row_key"] == row_key + request = kwargs["request"] + assert request.app_profile_id is profile_id + assert request.row_key == row_key assert kwargs["timeout"] == expected_timeout @CrossSync.pytest @@ -2977,10 +3171,11 @@ async def test_read_modify_write_string_key(self): with mock.patch.object( client._gapic_client, "read_modify_write_row" ) as mock_gapic: - await table.read_modify_write_row(row_key, mock.Mock()) + await table.read_modify_write_row(row_key, IncrementRule("f", "q")) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] - assert kwargs["row_key"] == row_key.encode() + request = kwargs["request"] + assert request.row_key == row_key.encode() @CrossSync.pytest async def test_read_modify_write_row_building(self): @@ -2999,7 +3194,9 @@ async def test_read_modify_write_row_building(self): ) as mock_gapic: with mock.patch.object(Row, "_from_pb") as constructor_mock: mock_gapic.return_value = mock_response - await table.read_modify_write_row("key", mock.Mock()) + await table.read_modify_write_row( + "key", IncrementRule("f", "q") + ) assert constructor_mock.call_count == 1 constructor_mock.assert_called_once_with(mock_response.row) diff --git a/tests/unit/data/_async/test_mutations_batcher.py b/tests/unit/data/_async/test_mutations_batcher.py index 2df8dde6d..8bc5f0474 100644 --- a/tests/unit/data/_async/test_mutations_batcher.py +++ b/tests/unit/data/_async/test_mutations_batcher.py @@ -19,6 +19,8 @@ import google.api_core.exceptions as core_exceptions import google.api_core.retry from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data.mutations import RowMutationEntry +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.cloud.bigtable.data import TABLE_DEFAULT from google.cloud.bigtable.data._cross_sync import CrossSync @@ -38,9 +40,9 @@ def _make_one(self, max_mutation_count=10, max_mutation_bytes=100): @staticmethod def _make_mutation(count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", DeleteAllFromRow()) + mutation.mutations = [DeleteAllFromRow() for _ in range(count)] + mutation.size = lambda: size return mutation def test_ctor(self): @@ -308,6 +310,8 @@ def _make_one(self, table=None, **kwargs): if table is None: table = mock.Mock() + table._request_path = {"table_name": "table"} + table.app_profile_id = None table.default_mutate_rows_operation_timeout = 10 table.default_mutate_rows_attempt_timeout = 10 table.default_mutate_rows_retryable_errors = ( @@ -319,9 +323,9 @@ def _make_one(self, table=None, **kwargs): @staticmethod def _make_mutation(count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", DeleteAllFromRow()) + mutation.size = lambda: size + mutation.mutations = [DeleteAllFromRow() for _ in range(count)] return mutation @CrossSync.pytest diff --git a/tests/unit/data/_sync_autogen/test__mutate_rows.py b/tests/unit/data/_sync_autogen/test__mutate_rows.py index 2173c88fb..b83cea65f 100644 --- a/tests/unit/data/_sync_autogen/test__mutate_rows.py +++ b/tests/unit/data/_sync_autogen/test__mutate_rows.py @@ -17,6 +17,8 @@ import pytest from google.cloud.bigtable_v2.types import MutateRowsResponse +from google.cloud.bigtable.data.mutations import RowMutationEntry +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.rpc import status_pb2 from google.api_core.exceptions import DeadlineExceeded from google.api_core.exceptions import Forbidden @@ -34,8 +36,11 @@ def _target_class(self): def _make_one(self, *args, **kwargs): if not args: + fake_table = CrossSync._Sync_Impl.Mock() + fake_table._request_path = {"table_name": "table"} + fake_table.app_profile_id = None kwargs["gapic_client"] = kwargs.pop("gapic_client", mock.Mock()) - kwargs["table"] = kwargs.pop("table", CrossSync._Sync_Impl.Mock()) + kwargs["table"] = kwargs.pop("table", fake_table) kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5) kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1) kwargs["retryable_exceptions"] = kwargs.pop("retryable_exceptions", ()) @@ -43,9 +48,8 @@ def _make_one(self, *args, **kwargs): return self._target_class()(*args, **kwargs) def _make_mutation(self, count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", [DeleteAllFromRow() for _ in range(count)]) + mutation.size = lambda: size return mutation def _mock_stream(self, mutation_list, error_dict): @@ -92,11 +96,6 @@ def test_ctor(self): assert client.mutate_rows.call_count == 0 instance._gapic_fn() assert client.mutate_rows.call_count == 1 - inner_kwargs = client.mutate_rows.call_args[1] - assert len(inner_kwargs) == 3 - assert inner_kwargs["table_name"] == table.table_name - assert inner_kwargs["app_profile_id"] == table.app_profile_id - assert inner_kwargs["retry"] is None entries_w_pb = [_EntryWithProto(e, e._to_pb()) for e in entries] assert instance.mutations == entries_w_pb assert next(instance.timeout_generator) == attempt_timeout @@ -148,6 +147,8 @@ def test_mutate_rows_attempt_exception(self, exc_type): """exceptions raised from attempt should be raised in MutationsExceptionGroup""" client = CrossSync._Sync_Impl.Mock() table = mock.Mock() + table._request_path = {"table_name": "table"} + table.app_profile_id = None entries = [self._make_mutation(), self._make_mutation()] operation_timeout = 0.05 expected_exception = exc_type("test") @@ -260,7 +261,8 @@ def test_run_attempt_single_entry_success(self): assert mock_gapic_fn.call_count == 1 (_, kwargs) = mock_gapic_fn.call_args assert kwargs["timeout"] == expected_timeout - assert kwargs["entries"] == [mutation._to_pb()] + request = kwargs["request"] + assert request.entries == [mutation._to_pb()] def test_run_attempt_empty_request(self): """Calling with no mutations should result in no API calls""" diff --git a/tests/unit/data/_sync_autogen/test__read_rows.py b/tests/unit/data/_sync_autogen/test__read_rows.py index 973b07bcb..a545142d3 100644 --- a/tests/unit/data/_sync_autogen/test__read_rows.py +++ b/tests/unit/data/_sync_autogen/test__read_rows.py @@ -48,7 +48,7 @@ def test_ctor(self): client.read_rows.return_value = None table = mock.Mock() table._client = client - table.table_name = "test_table" + table._request_path = {"table_name": "test_table"} table.app_profile_id = "test_profile" expected_operation_timeout = 42 expected_request_timeout = 44 @@ -72,7 +72,7 @@ def test_ctor(self): assert instance._remaining_count == row_limit assert instance.operation_timeout == expected_operation_timeout assert client.read_rows.call_count == 0 - assert instance.request.table_name == table.table_name + assert instance.request.table_name == "test_table" assert instance.request.app_profile_id == table.app_profile_id assert instance.request.rows_limit == row_limit @@ -252,7 +252,7 @@ def mock_stream(): query = ReadRowsQuery(limit=start_limit) table = mock.Mock() - table.table_name = "table_name" + table._request_path = {"table_name": "table_name"} table.app_profile_id = "app_profile_id" instance = self._make_one(query, table, 10, 10) assert instance._remaining_count == start_limit @@ -287,7 +287,7 @@ def mock_stream(): query = ReadRowsQuery(limit=start_limit) table = mock.Mock() - table.table_name = "table_name" + table._request_path = {"table_name": "table_name"} table.app_profile_id = "app_profile_id" instance = self._make_one(query, table, 10, 10) assert instance._remaining_count == start_limit diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index c5c6bac30..acb5013c1 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -27,6 +27,7 @@ from google.api_core import exceptions as core_exceptions from google.cloud.bigtable.data.exceptions import InvalidChunk from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.cloud.bigtable.data import TABLE_DEFAULT from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule @@ -659,7 +660,9 @@ def test__multiple_instance_registration(self): assert len(client._instance_owners[instance_1_key]) == 0 assert len(client._instance_owners[instance_2_key]) == 0 - def test_get_table(self): + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) + def test_get_api_surface(self, method): + """test client.get_table and client.get_authorized_view""" from google.cloud.bigtable.data._helpers import _WarmedInstanceKey client = self._make_client(project="project-id") @@ -667,77 +670,115 @@ def test_get_table(self): expected_table_id = "table-id" expected_instance_id = "instance-id" expected_app_profile_id = "app-profile-id" - table = client.get_table( - expected_instance_id, expected_table_id, expected_app_profile_id - ) + if method == "get_table": + surface = client.get_table( + expected_instance_id, expected_table_id, expected_app_profile_id + ) + assert isinstance( + surface, CrossSync._Sync_Impl.TestTable._get_target_class() + ) + elif method == "get_authorized_view": + surface = client.get_authorized_view( + expected_instance_id, + expected_table_id, + "view_id", + expected_app_profile_id, + ) + assert isinstance( + surface, CrossSync._Sync_Impl.TestAuthorizedView._get_target_class() + ) + assert ( + surface.authorized_view_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}/authorizedViews/view_id" + ) + else: + raise TypeError(f"unexpected method: {method}") CrossSync._Sync_Impl.yield_to_event_loop() - assert isinstance(table, CrossSync._Sync_Impl.TestTable._get_target_class()) - assert table.table_id == expected_table_id + assert surface.table_id == expected_table_id assert ( - table.table_name + surface.table_name == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" ) - assert table.instance_id == expected_instance_id + assert surface.instance_id == expected_instance_id assert ( - table.instance_name + surface.instance_name == f"projects/{client.project}/instances/{expected_instance_id}" ) - assert table.app_profile_id == expected_app_profile_id - assert table.client is client + assert surface.app_profile_id == expected_app_profile_id + assert surface.client is client instance_key = _WarmedInstanceKey( - table.instance_name, table.table_name, table.app_profile_id + surface.instance_name, surface.table_name, surface.app_profile_id ) assert instance_key in client._active_instances - assert client._instance_owners[instance_key] == {id(table)} + assert client._instance_owners[instance_key] == {id(surface)} client.close() - def test_get_table_arg_passthrough(self): - """All arguments passed in get_table should be sent to constructor""" + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) + def test_api_surface_arg_passthrough(self, method): + """All arguments passed in get_table and get_authorized_view should be sent to constructor""" + if method == "get_table": + surface_type = CrossSync._Sync_Impl.TestTable._get_target_class() + elif method == "get_authorized_view": + surface_type = CrossSync._Sync_Impl.TestAuthorizedView._get_target_class() + else: + raise TypeError(f"unexpected method: {method}") with self._make_client(project="project-id") as client: - with mock.patch.object( - CrossSync._Sync_Impl.TestTable._get_target_class(), "__init__" - ) as mock_constructor: + with mock.patch.object(surface_type, "__init__") as mock_constructor: mock_constructor.return_value = None assert not client._active_instances - expected_table_id = "table-id" - expected_instance_id = "instance-id" - expected_app_profile_id = "app-profile-id" - expected_args = (1, "test", {"test": 2}) - expected_kwargs = {"hello": "world", "test": 2} - client.get_table( - expected_instance_id, - expected_table_id, - expected_app_profile_id, - *expected_args, - **expected_kwargs, + expected_args = ( + "table", + "instance", + "view", + "app_profile", + 1, + "test", + {"test": 2}, ) + expected_kwargs = {"hello": "world", "test": 2} + getattr(client, method)(*expected_args, **expected_kwargs) mock_constructor.assert_called_once_with( - client, - expected_instance_id, - expected_table_id, - expected_app_profile_id, - *expected_args, - **expected_kwargs, + client, *expected_args, **expected_kwargs ) - def test_get_table_context_manager(self): + @pytest.mark.parametrize("method", ["get_table", "get_authorized_view"]) + def test_api_surface_context_manager(self, method): + """get_table and get_authorized_view should work as context managers""" + from functools import partial from google.cloud.bigtable.data._helpers import _WarmedInstanceKey expected_table_id = "table-id" expected_instance_id = "instance-id" expected_app_profile_id = "app-profile-id" expected_project_id = "project-id" - with mock.patch.object( - CrossSync._Sync_Impl.TestTable._get_target_class(), "close" - ) as close_mock: + if method == "get_table": + surface_type = CrossSync._Sync_Impl.TestTable._get_target_class() + elif method == "get_authorized_view": + surface_type = CrossSync._Sync_Impl.TestAuthorizedView._get_target_class() + else: + raise TypeError(f"unexpected method: {method}") + with mock.patch.object(surface_type, "close") as close_mock: with self._make_client(project=expected_project_id) as client: - with client.get_table( - expected_instance_id, expected_table_id, expected_app_profile_id - ) as table: - CrossSync._Sync_Impl.yield_to_event_loop() - assert isinstance( - table, CrossSync._Sync_Impl.TestTable._get_target_class() + if method == "get_table": + fn = partial( + client.get_table, + expected_instance_id, + expected_table_id, + expected_app_profile_id, ) + elif method == "get_authorized_view": + fn = partial( + client.get_authorized_view, + expected_instance_id, + expected_table_id, + "view_id", + expected_app_profile_id, + ) + else: + raise TypeError(f"unexpected method: {method}") + with fn() as table: + CrossSync._Sync_Impl.yield_to_event_loop() + assert isinstance(table, surface_type) assert table.table_id == expected_table_id assert ( table.table_name @@ -806,7 +847,19 @@ def _make_client(self, *args, **kwargs): def _get_target_class(): return CrossSync._Sync_Impl.Table - def test_table_ctor(self): + def _make_one( + self, + client, + instance_id="instance", + table_id="table", + app_profile_id=None, + **kwargs, + ): + return self._get_target_class()( + client, instance_id, table_id, app_profile_id, **kwargs + ) + + def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey expected_table_id = "table-id" @@ -835,6 +888,14 @@ def test_table_ctor(self): CrossSync._Sync_Impl.yield_to_event_loop() assert table.table_id == expected_table_id assert table.instance_id == expected_instance_id + assert ( + table.table_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" + ) + assert ( + table.instance_name + == f"projects/{client.project}/instances/{expected_instance_id}" + ) assert table.app_profile_id == expected_app_profile_id assert table.client is client instance_key = _WarmedInstanceKey( @@ -866,18 +927,12 @@ def test_table_ctor(self): assert table._register_instance_future.exception() is None client.close() - def test_table_ctor_defaults(self): + def test_ctor_defaults(self): """should provide default timeout values and app_profile_id""" - expected_table_id = "table-id" - expected_instance_id = "instance-id" client = self._make_client() assert not client._active_instances - table = self._get_target_class()( - client, expected_instance_id, expected_table_id - ) + table = self._make_one(client) CrossSync._Sync_Impl.yield_to_event_loop() - assert table.table_id == expected_table_id - assert table.instance_id == expected_instance_id assert table.app_profile_id is None assert table.client is client assert table.default_operation_timeout == 60 @@ -888,7 +943,7 @@ def test_table_ctor_defaults(self): assert table.default_mutate_rows_attempt_timeout == 60 client.close() - def test_table_ctor_invalid_timeout_values(self): + def test_ctor_invalid_timeout_values(self): """bad timeout values should raise ValueError""" client = self._make_client() timeout_pairs = [ @@ -904,10 +959,10 @@ def test_table_ctor_invalid_timeout_values(self): ] for operation_timeout, attempt_timeout in timeout_pairs: with pytest.raises(ValueError) as e: - self._get_target_class()(client, "", "", **{attempt_timeout: -1}) + self._make_one(client, **{attempt_timeout: -1}) assert "attempt_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: - self._get_target_class()(client, "", "", **{operation_timeout: -1}) + self._make_one(client, **{operation_timeout: -1}) assert "operation_timeout must be greater than 0" in str(e.value) client.close() @@ -920,10 +975,10 @@ def test_table_ctor_invalid_timeout_values(self): ("read_rows_sharded", ([ReadRowsQuery()],), True, ()), ("row_exists", (b"row_key",), True, ()), ("sample_row_keys", (), False, ()), - ("mutate_row", (b"row_key", [mock.Mock()]), False, ()), + ("mutate_row", (b"row_key", [DeleteAllFromRow()]), False, ()), ( "bulk_mutate_rows", - ([mutations.RowMutationEntry(b"key", [mutations.DeleteAllFromRow()])],), + ([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],), False, (_MutateRowsIncomplete,), ), @@ -1020,7 +1075,7 @@ def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): gapic_client = client._gapic_client gapic_client._transport = transport_mock gapic_client._is_universe_domain_valid = True - table = self._get_target_class()(client, "instance-id", "table-id", profile) + table = self._make_one(client, app_profile_id=profile) try: test_fn = table.__getattribute__(fn_name) maybe_stream = test_fn(*fn_args) @@ -1033,12 +1088,121 @@ def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): assert len(metadata) == 1 assert metadata[0][0] == "x-goog-request-params" routing_str = metadata[0][1] - assert "table_name=" + table.table_name in routing_str + assert self._expected_routing_header(table) in routing_str if include_app_profile: assert "app_profile_id=profile" in routing_str else: assert "app_profile_id=" not in routing_str + @staticmethod + def _expected_routing_header(table): + """the expected routing header for this _ApiSurface type""" + return f"table_name={table.table_name}" + + +@CrossSync._Sync_Impl.add_mapping_decorator("TestAuthorizedView") +class TestAuthorizedView(CrossSync._Sync_Impl.TestTable): + """ + Inherit tests from TestTableAsync, with some modifications + """ + + @staticmethod + def _get_target_class(): + return CrossSync._Sync_Impl.AuthorizedView + + def _make_one( + self, + client, + instance_id="instance", + table_id="table", + view_id="view", + app_profile_id=None, + **kwargs, + ): + return self._get_target_class()( + client, instance_id, table_id, view_id, app_profile_id, **kwargs + ) + + @staticmethod + def _expected_routing_header(view): + """the expected routing header for this _ApiSurface type""" + return f"authorized_view_name={view.authorized_view_name}" + + def test_ctor(self): + from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + + expected_table_id = "table-id" + expected_instance_id = "instance-id" + expected_view_id = "view_id" + expected_app_profile_id = "app-profile-id" + expected_operation_timeout = 123 + expected_attempt_timeout = 12 + expected_read_rows_operation_timeout = 1.5 + expected_read_rows_attempt_timeout = 0.5 + expected_mutate_rows_operation_timeout = 2.5 + expected_mutate_rows_attempt_timeout = 0.75 + client = self._make_client() + assert not client._active_instances + table = self._get_target_class()( + client, + expected_instance_id, + expected_table_id, + expected_view_id, + expected_app_profile_id, + default_operation_timeout=expected_operation_timeout, + default_attempt_timeout=expected_attempt_timeout, + default_read_rows_operation_timeout=expected_read_rows_operation_timeout, + default_read_rows_attempt_timeout=expected_read_rows_attempt_timeout, + default_mutate_rows_operation_timeout=expected_mutate_rows_operation_timeout, + default_mutate_rows_attempt_timeout=expected_mutate_rows_attempt_timeout, + ) + CrossSync._Sync_Impl.yield_to_event_loop() + assert table.table_id == expected_table_id + assert ( + table.table_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}" + ) + assert table.instance_id == expected_instance_id + assert ( + table.instance_name + == f"projects/{client.project}/instances/{expected_instance_id}" + ) + assert table.authorized_view_id == expected_view_id + assert ( + table.authorized_view_name + == f"projects/{client.project}/instances/{expected_instance_id}/tables/{expected_table_id}/authorizedViews/{expected_view_id}" + ) + assert table.app_profile_id == expected_app_profile_id + assert table.client is client + instance_key = _WarmedInstanceKey( + table.instance_name, table.table_name, table.app_profile_id + ) + assert instance_key in client._active_instances + assert client._instance_owners[instance_key] == {id(table)} + assert table.default_operation_timeout == expected_operation_timeout + assert table.default_attempt_timeout == expected_attempt_timeout + assert ( + table.default_read_rows_operation_timeout + == expected_read_rows_operation_timeout + ) + assert ( + table.default_read_rows_attempt_timeout + == expected_read_rows_attempt_timeout + ) + assert ( + table.default_mutate_rows_operation_timeout + == expected_mutate_rows_operation_timeout + ) + assert ( + table.default_mutate_rows_attempt_timeout + == expected_mutate_rows_attempt_timeout + ) + table._register_instance_future + assert table._register_instance_future.done() + assert not table._register_instance_future.cancelled() + assert table._register_instance_future.exception() is None + client.close() + @CrossSync._Sync_Impl.add_mapping_decorator("TestReadRows") class TestReadRows: @@ -1772,11 +1936,12 @@ def test_sample_row_keys_gapic_params(self): table.sample_row_keys(attempt_timeout=expected_timeout) (args, kwargs) = sample_row_keys.call_args assert len(args) == 0 - assert len(kwargs) == 4 + assert len(kwargs) == 3 assert kwargs["timeout"] == expected_timeout - assert kwargs["app_profile_id"] == expected_profile - assert kwargs["table_name"] == table.table_name assert kwargs["retry"] is None + request = kwargs["request"] + assert request.app_profile_id == expected_profile + assert request.table_name == table.table_name @pytest.mark.parametrize( "retryable_exception", @@ -1864,17 +2029,18 @@ def test_mutate_row(self, mutation_arg): ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0].kwargs + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["row_key"] == b"row_key" + assert request.row_key == b"row_key" formatted_mutations = ( [mutation._to_pb() for mutation in mutation_arg] if isinstance(mutation_arg, list) else [mutation_arg._to_pb()] ) - assert kwargs["mutations"] == formatted_mutations + assert request.mutations == formatted_mutations assert kwargs["timeout"] == expected_attempt_timeout assert kwargs["retry"] is None @@ -2018,11 +2184,12 @@ def test_bulk_mutate_rows(self, mutation_arg): ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args[1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["entries"] == [bulk_mutation._to_pb()] + assert request.entries == [bulk_mutation._to_pb()] assert kwargs["timeout"] == expected_attempt_timeout assert kwargs["retry"] is None @@ -2040,12 +2207,13 @@ def test_bulk_mutate_rows_multiple_entries(self): table.bulk_mutate_rows([entry_1, entry_2]) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args[1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == "projects/project/instances/instance/tables/table" ) - assert kwargs["entries"][0] == entry_1._to_pb() - assert kwargs["entries"][1] == entry_2._to_pb() + assert request.entries[0] == entry_1._to_pb() + assert request.entries[1] == entry_2._to_pb() @pytest.mark.parametrize( "exception", @@ -2313,8 +2481,8 @@ def test_check_and_mutate(self, gapic_result): ) row_key = b"row_key" predicate = None - true_mutations = [mock.Mock()] - false_mutations = [mock.Mock(), mock.Mock()] + true_mutations = [DeleteAllFromRow()] + false_mutations = [DeleteAllFromRow(), DeleteAllFromRow()] operation_timeout = 0.2 found = table.check_and_mutate_row( row_key, @@ -2325,16 +2493,17 @@ def test_check_and_mutate(self, gapic_result): ) assert found == gapic_result kwargs = mock_gapic.call_args[1] - assert kwargs["table_name"] == table.table_name - assert kwargs["row_key"] == row_key - assert kwargs["predicate_filter"] == predicate - assert kwargs["true_mutations"] == [ + request = kwargs["request"] + assert request.table_name == table.table_name + assert request.row_key == row_key + assert bool(request.predicate_filter) is False + assert request.true_mutations == [ m._to_pb() for m in true_mutations ] - assert kwargs["false_mutations"] == [ + assert request.false_mutations == [ m._to_pb() for m in false_mutations ] - assert kwargs["app_profile_id"] == app_profile + assert request.app_profile_id == app_profile assert kwargs["timeout"] == operation_timeout assert kwargs["retry"] is None @@ -2374,15 +2543,17 @@ def test_check_and_mutate_single_mutations(self): false_case_mutations=false_mutation, ) kwargs = mock_gapic.call_args[1] - assert kwargs["true_mutations"] == [true_mutation._to_pb()] - assert kwargs["false_mutations"] == [false_mutation._to_pb()] + request = kwargs["request"] + assert request.true_mutations == [true_mutation._to_pb()] + assert request.false_mutations == [false_mutation._to_pb()] def test_check_and_mutate_predicate_object(self): """predicate filter should be passed to gapic request""" from google.cloud.bigtable_v2.types import CheckAndMutateRowResponse + from google.cloud.bigtable_v2.types.data import RowFilter mock_predicate = mock.Mock() - predicate_pb = {"predicate": "dict"} + predicate_pb = RowFilter({"sink": True}) mock_predicate._to_pb.return_value = predicate_pb with self._make_client() as client: with client.get_table("instance", "table") as table: @@ -2393,21 +2564,24 @@ def test_check_and_mutate_predicate_object(self): predicate_matched=True ) table.check_and_mutate_row( - b"row_key", mock_predicate, false_case_mutations=[mock.Mock()] + b"row_key", + mock_predicate, + false_case_mutations=[DeleteAllFromRow()], ) kwargs = mock_gapic.call_args[1] - assert kwargs["predicate_filter"] == predicate_pb + request = kwargs["request"] + assert request.predicate_filter == predicate_pb assert mock_predicate._to_pb.call_count == 1 assert kwargs["retry"] is None def test_check_and_mutate_mutations_parsing(self): """mutations objects should be converted to protos""" from google.cloud.bigtable_v2.types import CheckAndMutateRowResponse - from google.cloud.bigtable.data.mutations import DeleteAllFromRow + from google.cloud.bigtable.data.mutations import DeleteAllFromFamily mutations = [mock.Mock() for _ in range(5)] for idx, mutation in enumerate(mutations): - mutation._to_pb.return_value = f"fake {idx}" + mutation._to_pb.return_value = DeleteAllFromFamily(f"fake {idx}")._to_pb() mutations.append(DeleteAllFromRow()) with self._make_client() as client: with client.get_table("instance", "table") as table: @@ -2424,11 +2598,15 @@ def test_check_and_mutate_mutations_parsing(self): false_case_mutations=mutations[2:], ) kwargs = mock_gapic.call_args[1] - assert kwargs["true_mutations"] == ["fake 0", "fake 1"] - assert kwargs["false_mutations"] == [ - "fake 2", - "fake 3", - "fake 4", + request = kwargs["request"] + assert request.true_mutations == [ + DeleteAllFromFamily("fake 0")._to_pb(), + DeleteAllFromFamily("fake 1")._to_pb(), + ] + assert request.false_mutations == [ + DeleteAllFromFamily("fake 2")._to_pb(), + DeleteAllFromFamily("fake 3")._to_pb(), + DeleteAllFromFamily("fake 4")._to_pb(), DeleteAllFromRow()._to_pb(), ] assert all( @@ -2471,7 +2649,8 @@ def test_read_modify_write_call_rule_args(self, call_rules, expected_rules): table.read_modify_write_row("key", call_rules) assert mock_gapic.call_count == 1 found_kwargs = mock_gapic.call_args_list[0][1] - assert found_kwargs["rules"] == expected_rules + request = found_kwargs["request"] + assert request.rules == expected_rules assert found_kwargs["retry"] is None @pytest.mark.parametrize("rules", [[], None]) @@ -2492,15 +2671,16 @@ def test_read_modify_write_call_defaults(self): with mock.patch.object( client._gapic_client, "read_modify_write_row" ) as mock_gapic: - table.read_modify_write_row(row_key, mock.Mock()) + table.read_modify_write_row(row_key, IncrementRule("f", "q")) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] + request = kwargs["request"] assert ( - kwargs["table_name"] + request.table_name == f"projects/{project}/instances/{instance}/tables/{table_id}" ) - assert kwargs["app_profile_id"] is None - assert kwargs["row_key"] == row_key.encode() + assert bool(request.app_profile_id) is False + assert request.row_key == row_key.encode() assert kwargs["timeout"] > 1 def test_read_modify_write_call_overrides(self): @@ -2515,12 +2695,15 @@ def test_read_modify_write_call_overrides(self): client._gapic_client, "read_modify_write_row" ) as mock_gapic: table.read_modify_write_row( - row_key, mock.Mock(), operation_timeout=expected_timeout + row_key, + IncrementRule("f", "q"), + operation_timeout=expected_timeout, ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] - assert kwargs["app_profile_id"] is profile_id - assert kwargs["row_key"] == row_key + request = kwargs["request"] + assert request.app_profile_id is profile_id + assert request.row_key == row_key assert kwargs["timeout"] == expected_timeout def test_read_modify_write_string_key(self): @@ -2530,10 +2713,11 @@ def test_read_modify_write_string_key(self): with mock.patch.object( client._gapic_client, "read_modify_write_row" ) as mock_gapic: - table.read_modify_write_row(row_key, mock.Mock()) + table.read_modify_write_row(row_key, IncrementRule("f", "q")) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args_list[0][1] - assert kwargs["row_key"] == row_key.encode() + request = kwargs["request"] + assert request.row_key == row_key.encode() def test_read_modify_write_row_building(self): """results from gapic call should be used to construct row""" @@ -2549,7 +2733,7 @@ def test_read_modify_write_row_building(self): ) as mock_gapic: with mock.patch.object(Row, "_from_pb") as constructor_mock: mock_gapic.return_value = mock_response - table.read_modify_write_row("key", mock.Mock()) + table.read_modify_write_row("key", IncrementRule("f", "q")) assert constructor_mock.call_count == 1 constructor_mock.assert_called_once_with(mock_response.row) diff --git a/tests/unit/data/_sync_autogen/test_mutations_batcher.py b/tests/unit/data/_sync_autogen/test_mutations_batcher.py index 59ea621ac..d7668ff7a 100644 --- a/tests/unit/data/_sync_autogen/test_mutations_batcher.py +++ b/tests/unit/data/_sync_autogen/test_mutations_batcher.py @@ -22,6 +22,8 @@ import google.api_core.exceptions as core_exceptions import google.api_core.retry from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data.mutations import RowMutationEntry +from google.cloud.bigtable.data.mutations import DeleteAllFromRow from google.cloud.bigtable.data import TABLE_DEFAULT from google.cloud.bigtable.data._cross_sync import CrossSync @@ -36,9 +38,9 @@ def _make_one(self, max_mutation_count=10, max_mutation_bytes=100): @staticmethod def _make_mutation(count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", DeleteAllFromRow()) + mutation.mutations = [DeleteAllFromRow() for _ in range(count)] + mutation.size = lambda: size return mutation def test_ctor(self): @@ -258,6 +260,8 @@ def _make_one(self, table=None, **kwargs): if table is None: table = mock.Mock() + table._request_path = {"table_name": "table"} + table.app_profile_id = None table.default_mutate_rows_operation_timeout = 10 table.default_mutate_rows_attempt_timeout = 10 table.default_mutate_rows_retryable_errors = ( @@ -268,9 +272,9 @@ def _make_one(self, table=None, **kwargs): @staticmethod def _make_mutation(count=1, size=1): - mutation = mock.Mock() - mutation.size.return_value = size - mutation.mutations = [mock.Mock()] * count + mutation = RowMutationEntry("k", DeleteAllFromRow()) + mutation.size = lambda: size + mutation.mutations = [DeleteAllFromRow() for _ in range(count)] return mutation def test_ctor_defaults(self):