diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 0366ea90f3..04fb0db906 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -19,6 +19,12 @@ ## Deprecations ## New additions +* Add `snow spcs service events` command to retrieve service-specific events: + * Supports filtering by service name, container name, instance ID, time intervals (`--since`, `--until`), and pagination (`--first`, `--last`). + * Use `--all` to fetch all columns. +* Add `snow spcs service metrics` command to fetch service metrics: + * Supports filtering by service name, container name, instance ID, and time intervals (`--since`, `--until`). + * Use `--all` to fetch all columns. ## Fixes and improvements @@ -48,7 +54,6 @@ ## Fixes and improvements * Fixed inability to add patches to lowercase quoted versions. * Fixes label being set to blank instead of None when not provided. -* Added a feature flag `ENABLE_SPCS_LOG_STREAMING` to control the rollout of the log streaming feature. # v3.2.2 diff --git a/src/snowflake/cli/_plugins/spcs/common.py b/src/snowflake/cli/_plugins/spcs/common.py index 100bdf6d82..6855a34419 100644 --- a/src/snowflake/cli/_plugins/spcs/common.py +++ b/src/snowflake/cli/_plugins/spcs/common.py @@ -14,7 +14,9 @@ from __future__ import annotations +import json import sys +from datetime import datetime from typing import TextIO from click import ClickException @@ -23,6 +25,22 @@ from snowflake.cli.api.project.util import unquote_identifier from snowflake.connector.errors import ProgrammingError +EVENT_COLUMN_NAMES = [ + "TIMESTAMP", + "START_TIMESTAMP", + "OBSERVED_TIMESTAMP", + "TRACE", + "RESOURCE", + "RESOURCE_ATTRIBUTES", + "SCOPE", + "SCOPE_ATTRIBUTES", + "RECORD_TYPE", + "RECORD", + "RECORD_ATTRIBUTES", + "VALUE", + "EXEMPLARS", +] + if not sys.stdout.closed and sys.stdout.isatty(): GREEN = "\033[32m" BLUE = "\033[34m" @@ -124,5 +142,116 @@ def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> li return new_log_records_sorted +def build_resource_clause( + service_name: str, instance_id: str, container_name: str +) -> str: + resource_filters = [] + if service_name: + resource_filters.append( + f"resource_attributes:\"snow.service.name\" = '{service_name}'" + ) + if instance_id: + resource_filters.append( + f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' " + f"OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')" + ) + if container_name: + resource_filters.append( + f"resource_attributes:\"snow.service.container.name\" = '{container_name}'" + ) + return " and ".join(resource_filters) if resource_filters else "1=1" + + +def build_time_clauses( + since: str | datetime | None, until: str | datetime | None +) -> tuple[str, str]: + since_clause = "" + until_clause = "" + + if isinstance(since, datetime): + since_clause = f"and timestamp >= '{since}'" + elif isinstance(since, str) and since: + since_clause = f"and timestamp >= sysdate() - interval '{since}'" + + if isinstance(until, datetime): + until_clause = f"and timestamp <= '{until}'" + elif isinstance(until, str) and until: + until_clause = f"and timestamp <= sysdate() - interval '{until}'" + + return since_clause, until_clause + + +def format_event_row(event_dict: dict) -> dict: + try: + resource_attributes = json.loads(event_dict.get("RESOURCE_ATTRIBUTES", "{}")) + record_attributes = json.loads(event_dict.get("RECORD_ATTRIBUTES", "{}")) + record = json.loads(event_dict.get("RECORD", "{}")) + + database_name = resource_attributes.get("snow.database.name", "N/A") + schema_name = resource_attributes.get("snow.schema.name", "N/A") + service_name = resource_attributes.get("snow.service.name", "N/A") + instance_name = resource_attributes.get("snow.service.instance", "N/A") + container_name = resource_attributes.get("snow.service.container.name", "N/A") + event_name = record_attributes.get("event.name", "Unknown Event") + event_value = event_dict.get("VALUE", "Unknown Value") + severity = record.get("severity_text", "Unknown Severity") + + return { + "TIMESTAMP": event_dict.get("TIMESTAMP", "N/A"), + "DATABASE NAME": database_name, + "SCHEMA NAME": schema_name, + "SERVICE NAME": service_name, + "INSTANCE ID": instance_name, + "CONTAINER NAME": container_name, + "SEVERITY": severity, + "EVENT NAME": event_name, + "EVENT VALUE": event_value, + } + except (json.JSONDecodeError, KeyError) as e: + raise RecordProcessingError(f"Error processing event row.") + + +def format_metric_row(metric_dict: dict) -> dict: + try: + resource_attributes = json.loads(metric_dict["RESOURCE_ATTRIBUTES"]) + record = json.loads(metric_dict["RECORD"]) + + database_name = resource_attributes.get("snow.database.name", "N/A") + schema_name = resource_attributes.get("snow.schema.name", "N/A") + service_name = resource_attributes.get("snow.service.name", "N/A") + instance_name = resource_attributes.get( + "snow.service.container.instance", "N/A" + ) + container_name = resource_attributes.get("snow.service.container.name", "N/A") + + metric_name = record["metric"].get("name", "Unknown Metric") + metric_value = metric_dict.get("VALUE", "Unknown Value") + + return { + "TIMESTAMP": metric_dict.get("TIMESTAMP", "N/A"), + "DATABASE NAME": database_name, + "SCHEMA NAME": schema_name, + "SERVICE NAME": service_name, + "INSTANCE ID": instance_name, + "CONTAINER NAME": container_name, + "METRIC NAME": metric_name, + "METRIC VALUE": metric_value, + } + except (json.JSONDecodeError, KeyError) as e: + raise RecordProcessingError(f"Error processing metric row.") + + +class RecordProcessingError(ClickException): + """Raised when processing an event or metric record fails due to invalid data.""" + + pass + + +class SPCSEventTableError(ClickException): + """Raised when there is an issue related to the SPCS event table.""" + + pass + + class NoPropertiesProvidedError(ClickException): pass diff --git a/src/snowflake/cli/_plugins/spcs/services/commands.py b/src/snowflake/cli/_plugins/spcs/services/commands.py index a5d9fdcba0..b240df0dc1 100644 --- a/src/snowflake/cli/_plugins/spcs/services/commands.py +++ b/src/snowflake/cli/_plugins/spcs/services/commands.py @@ -38,12 +38,12 @@ from snowflake.cli.api.commands.snow_typer import SnowTyperFactory from snowflake.cli.api.constants import ObjectType from snowflake.cli.api.exceptions import ( - FeatureNotEnabledError, IncompatibleParametersError, ) from snowflake.cli.api.feature_flags import FeatureFlag from snowflake.cli.api.identifiers import FQN from snowflake.cli.api.output.types import ( + CollectionResult, CommandResult, MessageResult, QueryJsonValueResult, @@ -59,6 +59,38 @@ short_help="Manages services.", ) +# Define common options +container_name_option = typer.Option( + ..., + "--container-name", + help="Name of the container.", + show_default=False, +) + +instance_id_option = typer.Option( + ..., + "--instance-id", + help="ID of the service instance, starting with 0.", + show_default=False, +) + +since_option = typer.Option( + default="", + help="Fetch events that are newer than this time ago, in Snowflake interval syntax.", +) + +until_option = typer.Option( + default="", + help="Fetch events that are older than this time ago, in Snowflake interval syntax.", +) + +show_all_columns_option = typer.Option( + False, + "--all", + is_flag=True, + help="Fetch all columns.", +) + def _service_name_callback(name: FQN) -> FQN: if not is_valid_object_name(name.identifier, max_depth=2, allow_quoted=False): @@ -213,18 +245,8 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult: @app.command(requires_connection=True) def logs( name: FQN = ServiceNameArgument, - container_name: str = typer.Option( - ..., - "--container-name", - help="Name of the container.", - show_default=False, - ), - instance_id: str = typer.Option( - ..., - "--instance-id", - help="ID of the service instance, starting with 0.", - show_default=False, - ), + container_name: str = container_name_option, + instance_id: str = instance_id_option, num_lines: int = typer.Option( DEFAULT_NUM_LINES, "--num-lines", help="Number of lines to retrieve." ), @@ -241,12 +263,17 @@ def logs( False, "--include-timestamps", help="Include timestamps in logs.", is_flag=True ), follow: bool = typer.Option( - False, "--follow", help="Stream logs in real-time.", is_flag=True + False, + "--follow", + help="Stream logs in real-time.", + is_flag=True, + hidden=True, ), follow_interval: int = typer.Option( 2, "--follow-interval", help="Set custom polling intervals for log streaming (--follow flag) in seconds.", + hidden=True, ), **options, ): @@ -254,11 +281,6 @@ def logs( Retrieves local logs from a service container. """ if follow: - if FeatureFlag.ENABLE_SPCS_LOG_STREAMING.is_disabled(): - raise FeatureNotEnabledError( - "ENABLE_SPCS_LOG_STREAMING", - "Streaming logs from spcs containers is disabled.", - ) if num_lines != DEFAULT_NUM_LINES: raise IncompatibleParametersError(["--follow", "--num-lines"]) if previous_logs: @@ -297,6 +319,95 @@ def logs( return StreamResult(cast(Generator[CommandResult, None, None], stream)) +@app.command( + requires_connection=True, + is_enabled=FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_enabled, +) +def events( + name: FQN = ServiceNameArgument, + container_name: str = container_name_option, + instance_id: str = instance_id_option, + since: str = since_option, + until: str = until_option, + first: int = typer.Option( + default=None, + show_default=False, + help="Fetch only the first N events. Cannot be used with --last.", + ), + last: int = typer.Option( + default=None, + show_default=False, + help="Fetch only the last N events. Cannot be used with --first.", + ), + show_all_columns: bool = show_all_columns_option, + **options, +): + """ + Retrieve platform events for a service container. + """ + + if first is not None and last is not None: + raise IncompatibleParametersError(["--first", "--last"]) + + manager = ServiceManager() + events = manager.get_events( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + since=since, + until=until, + first=first, + last=last, + show_all_columns=show_all_columns, + ) + + if not events: + return MessageResult("No events found.") + + return CollectionResult(events) + + +@app.command( + requires_connection=True, + is_enabled=FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_enabled, +) +def metrics( + name: FQN = ServiceNameArgument, + container_name: str = container_name_option, + instance_id: str = instance_id_option, + since: str = since_option, + until: str = until_option, + show_all_columns: bool = show_all_columns_option, + **options, +): + """ + Retrieve platform metrics for a service container. + """ + + manager = ServiceManager() + if since or until: + metrics = manager.get_all_metrics( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + since=since, + until=until, + show_all_columns=show_all_columns, + ) + else: + metrics = manager.get_latest_metrics( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + show_all_columns=show_all_columns, + ) + + if not metrics: + return MessageResult("No metrics found.") + + return CollectionResult(metrics) + + @app.command(requires_connection=True) def upgrade( name: FQN = ServiceNameArgument, diff --git a/src/snowflake/cli/_plugins/spcs/services/manager.py b/src/snowflake/cli/_plugins/spcs/services/manager.py index 504fc4cc81..86d8dad9a5 100644 --- a/src/snowflake/cli/_plugins/spcs/services/manager.py +++ b/src/snowflake/cli/_plugins/spcs/services/manager.py @@ -16,14 +16,21 @@ import json import time +from datetime import datetime from pathlib import Path from typing import List, Optional import yaml from snowflake.cli._plugins.object.common import Tag from snowflake.cli._plugins.spcs.common import ( + EVENT_COLUMN_NAMES, NoPropertiesProvidedError, + SPCSEventTableError, + build_resource_clause, + build_time_clauses, filter_log_timestamp, + format_event_row, + format_metric_row, handle_object_already_exists, new_logs_only, strip_empty_lines, @@ -31,7 +38,7 @@ from snowflake.cli.api.constants import DEFAULT_SIZE_LIMIT_MB, ObjectType from snowflake.cli.api.secure_path import SecurePath from snowflake.cli.api.sql_execution import SqlExecutionMixin -from snowflake.connector.cursor import SnowflakeCursor +from snowflake.connector.cursor import DictCursor, SnowflakeCursor from snowflake.connector.errors import ProgrammingError @@ -199,6 +206,167 @@ def stream_logs( except KeyboardInterrupt: return + def get_account_event_table(self): + query = "show parameters like 'event_table' in account" + results = self.execute_query(query, cursor_class=DictCursor) + event_table = next( + (r["value"] for r in results if r["key"] == "EVENT_TABLE"), "" + ) + if not event_table: + raise SPCSEventTableError("No SPCS event table configured in the account.") + return event_table + + def get_events( + self, + service_name: str, + instance_id: str, + container_name: str, + since: str | datetime | None = None, + until: str | datetime | None = None, + first: Optional[int] = None, + last: Optional[int] = None, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + since_clause, until_clause = build_time_clauses(since, until) + + first_clause = f"limit {first}" if first is not None else "" + last_clause = f"limit {last}" if last is not None else "" + + query = f"""\ + select * + from ( + select * + from {account_event_table} + where ( + {resource_clause} + {since_clause} + {until_clause} + ) + and record_type = 'LOG' + and scope['name'] = 'snow.spcs.platform' + order by timestamp desc + {last_clause} + ) + order by timestamp asc + {first_clause} + """ + + cursor = self.execute_query(query) + raw_events = cursor.fetchall() + if not raw_events: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, event)) for event in raw_events] + + formatted_events = [] + for raw_event in raw_events: + event_dict = dict(zip(EVENT_COLUMN_NAMES, raw_event)) + formatted = format_event_row(event_dict) + formatted_events.append(formatted) + + return formatted_events + + def get_all_metrics( + self, + service_name: str, + instance_id: str, + container_name: str, + since: str | datetime | None = None, + until: str | datetime | None = None, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + since_clause, until_clause = build_time_clauses(since, until) + + query = f"""\ + select * + from {account_event_table} + where ( + {resource_clause} + {since_clause} + {until_clause} + ) + and record_type = 'METRIC' + and scope['name'] = 'snow.spcs.platform' + order by timestamp desc + """ + + cursor = self.execute_query(query) + raw_metrics = cursor.fetchall() + if not raw_metrics: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, metric)) for metric in raw_metrics] + + formatted_metrics = [] + for raw_metric in raw_metrics: + metric_dict = dict(zip(EVENT_COLUMN_NAMES, raw_metric)) + formatted = format_metric_row(metric_dict) + formatted_metrics.append(formatted) + + return formatted_metrics + + def get_latest_metrics( + self, + service_name: str, + instance_id: str, + container_name: str, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + + query = f""" + with rankedmetrics as ( + select + *, + row_number() over ( + partition by record['metric']['name'] + order by timestamp desc + ) as rank + from {account_event_table} + where + record_type = 'METRIC' + and scope['name'] = 'snow.spcs.platform' + and {resource_clause} + and timestamp > dateadd('hour', -1, current_timestamp) + ) + select * + from rankedmetrics + where rank = 1 + order by timestamp desc; + """ + + cursor = self.execute_query(query) + raw_metrics = cursor.fetchall() + if not raw_metrics: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, metric)) for metric in raw_metrics] + + formatted_metrics = [] + for raw_metric in raw_metrics: + metric_dict = dict(zip(EVENT_COLUMN_NAMES, raw_metric)) + formatted = format_metric_row(metric_dict) + formatted_metrics.append(formatted) + + return formatted_metrics + def upgrade_spec(self, service_name: str, spec_path: Path): spec = self._read_yaml(spec_path) query = f"alter service {service_name} from specification $$ {spec} $$" diff --git a/src/snowflake/cli/api/exceptions.py b/src/snowflake/cli/api/exceptions.py index 2aac4e9608..fad1e97c10 100644 --- a/src/snowflake/cli/api/exceptions.py +++ b/src/snowflake/cli/api/exceptions.py @@ -229,13 +229,3 @@ def __init__(self, show_obj_query: str): super().__init__( f"Received multiple rows from result of SQL statement: {show_obj_query}. Usage of 'show_specific_object' may not be properly scoped." ) - - -class FeatureNotEnabledError(ClickException): - def __init__(self, feature_name: str, custom_message: Optional[str] = None): - base_message = f"To enable it, add '{feature_name} = true' to '[cli.features]' section of your configuration file." - if custom_message: - message = f"{custom_message} {base_message}" - else: - message = base_message - super().__init__(message) diff --git a/src/snowflake/cli/api/feature_flags.py b/src/snowflake/cli/api/feature_flags.py index 478768a318..a0b44344a4 100644 --- a/src/snowflake/cli/api/feature_flags.py +++ b/src/snowflake/cli/api/feature_flags.py @@ -63,7 +63,8 @@ class FeatureFlag(FeatureFlagMixin): ENABLE_STREAMLIT_VERSIONED_STAGE = BooleanFlag( "ENABLE_STREAMLIT_VERSIONED_STAGE", False ) - ENABLE_SPCS_LOG_STREAMING = BooleanFlag("ENABLE_SPCS_LOG_STREAMING", False) ENABLE_SEPARATE_AUTHENTICATION_POLICY_ID = BooleanFlag( "ENABLE_SEPARATE_AUTHENTICATION_POLICY_ID", False ) + ENABLE_SPCS_SERVICE_EVENTS = BooleanFlag("ENABLE_SPCS_SERVICE_EVENTS", False) + ENABLE_SPCS_SERVICE_METRICS = BooleanFlag("ENABLE_SPCS_SERVICE_METRICS", False) diff --git a/tests/__snapshots__/test_help_messages.ambr b/tests/__snapshots__/test_help_messages.ambr index 08f91eec3b..d7ed85a125 100644 --- a/tests/__snapshots__/test_help_messages.ambr +++ b/tests/__snapshots__/test_help_messages.ambr @@ -8400,11 +8400,6 @@ | --since TEXT Start log retrieval from a | | specified UTC timestamp. | | --include-timestamps Include timestamps in logs. | - | --follow Stream logs in real-time. | - | --follow-interval INTEGER Set custom polling intervals for | - | log streaming (--follow flag) in | - | seconds. | - | [default: 2] | | --help -h Show this message and exit. | +------------------------------------------------------------------------------+ +- Connection configuration ---------------------------------------------------+ diff --git a/tests/spcs/__snapshots__/test_services.ambr b/tests/spcs/__snapshots__/test_services.ambr new file mode 100644 index 0000000000..bc4ce8f624 --- /dev/null +++ b/tests/spcs/__snapshots__/test_services.ambr @@ -0,0 +1,16 @@ +# serializer version: 1 +# name: test_latest_metrics + ''' + +------------------------------------------------------------------------------+ + | TIMESTA | DATABAS | SCHEMA | SERVICE | INSTANC | CONTAIN | METRIC | METRIC | + | MP | E NAME | NAME | NAME | E ID | ER NAME | NAME | VALUE | + |---------+---------+---------+---------+---------+---------+---------+--------| + | 2024-12 | N/A | N/A | LOG_EVE | N/A | log-pri | contain | 0.0005 | + | -10 | | | NT | | nter | er.cpu. | 007168 | + | 18:53:2 | | | | | | usage | 666666 | + | 1.80900 | | | | | | | 691 | + | 0 | | | | | | | | + +------------------------------------------------------------------------------+ + + ''' +# --- diff --git a/tests/spcs/test_services.py b/tests/spcs/test_services.py index b48dedd1ce..4e7f012a07 100644 --- a/tests/spcs/test_services.py +++ b/tests/spcs/test_services.py @@ -13,8 +13,10 @@ # limitations under the License. import itertools import json +import re from datetime import datetime from pathlib import Path +from tempfile import TemporaryDirectory from textwrap import dedent from unittest.mock import Mock, call, patch @@ -56,6 +58,18 @@ } +@pytest.fixture() +def enable_events_and_metrics_config(): + with TemporaryDirectory() as tempdir: + config_toml = Path(tempdir) / "config.toml" + config_toml.write_text( + "[cli.features]\n" + "enable_spcs_service_events = true\n" + "enable_spcs_service_metrics = true\n" + ) + yield config_toml + + @patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") def test_create_service(mock_execute_query, other_directory): service_name = "test_service" @@ -605,12 +619,11 @@ def test_stream_logs_with_include_timestamps_true(mock_sleep, mock_logs): @patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") -@patch( - "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_LOG_STREAMING.is_disabled" -) -def test_logs_incompatible_flags(mock_is_disabled, mock_execute_query, runner): - mock_is_disabled.return_value = False - result = runner.invoke( +def test_logs_incompatible_flags( + mock_execute_query, runner, enable_events_and_metrics_config +): + result = runner.invoke_with_config_file( + enable_events_and_metrics_config, [ "spcs", "service", @@ -623,7 +636,7 @@ def test_logs_incompatible_flags(mock_is_disabled, mock_execute_query, runner): "--follow", "--num-lines", "100", - ] + ], ) assert ( result.exit_code != 0 @@ -632,13 +645,7 @@ def test_logs_incompatible_flags(mock_is_disabled, mock_execute_query, runner): @patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") -@patch( - "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_LOG_STREAMING.is_disabled" -) -def test_logs_incompatible_flags_follow_previous_logs( - mock_is_disabled, mock_execute_query, runner -): - mock_is_disabled.return_value = False +def test_logs_incompatible_flags_follow_previous_logs(mock_execute_query, runner): result = runner.invoke( [ "spcs", @@ -663,35 +670,201 @@ def test_logs_incompatible_flags_follow_previous_logs( ) -@patch( - "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_LOG_STREAMING.is_disabled" -) -def test_logs_streaming_disabled(mock_is_disabled, runner): - mock_is_disabled.return_value = True - result = runner.invoke( +def test_logs_streaming_flag_is_hidden(runner): + result = runner.invoke(["spcs", "service", "logs", "--help"]) + assert result.exit_code == 0 + assert "--follow" not in result.output + + +def test_service_events_disabled(runner, empty_snowcli_config): + result = runner.invoke_with_config_file( + empty_snowcli_config, [ "spcs", "service", - "logs", - "test_service", + "events", + "LOG_EVENT", "--container-name", - "test_container", + "log-printer", "--instance-id", "0", - "--follow", - "--num-lines", - "100", - ] + "--since", + "1 minute", + ], + ) + assert ( + result.exit_code != 0 + ), "Expected a non-zero exit code due to feature flag being disabled" + + expected_output = ( + "Usage: default spcs service [OPTIONS] COMMAND [ARGS]...\n" + "Try 'default spcs service --help' for help.\n" + "+- Error ----------------------------------------------------------------------+\n" + "| No such command 'events'. |\n" + "+------------------------------------------------------------------------------+\n" + ) + assert ( + result.output == expected_output + ), f"Expected formatted output not found: {result.output}" + + +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_events_all_filters( + mock_execute_query, runner, enable_events_and_metrics_config +): + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + "2024-12-14 22:27:25.420", + None, + "2024-12-14 22:27:25.420", + None, + None, + json.dumps( + { + "snow.compute_pool.id": 230, + "snow.compute_pool.name": "MY_POOL", + "snow.database.id": 5, + "snow.database.name": "TESTDB", + "snow.schema.id": 5, + "snow.schema.name": "PUBLIC", + "snow.service.container.name": "log-printer", + "snow.service.id": 1568, + "snow.service.instance": "0", + "snow.service.name": "LOG_EVENT", + "snow.service.type": "SERVICE", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "LOG", + json.dumps({"severity_text": "INFO"}), + json.dumps({"event.name": "CONTAINER.STATUS_CHANGE"}), + json.dumps({"message": "Running", "status": "READY"}), + None, + ) + ] + ), + ] + + result = runner.invoke_with_config_file( + enable_events_and_metrics_config, + [ + "spcs", + "service", + "events", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "2 hours", + "--until", + "1 hour", + "--last", + "10", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ], + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + " select *\n" + " from (\n" + " select *\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where (\n" + " resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer'\n" + " and timestamp >= sysdate() - interval '2 hours'\n" + " and timestamp <= sysdate() - interval '1 hour'\n" + " )\n" + " and record_type = 'LOG'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " order by timestamp desc\n" + " limit 10\n" + " )\n" + " order by timestamp asc\n" + " \n" + " " + ) + + assert ( + actual_query == expected_query + ), f"Generated query does not match expected query.\n\nActual:\n{actual_query}\n\nExpected:\n{expected_query}" + + +def test_events_first_last_incompatibility(runner, enable_events_and_metrics_config): + result = runner.invoke_with_config_file( + enable_events_and_metrics_config, + [ + "spcs", + "service", + "events", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--first", + "10", + "--last", + "5", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ], + ) + + assert result.exit_code != 0, result.output + + expected_error = "Parameters '--first' and '--last' are incompatible" + assert expected_error in result.output + + +def test_service_metrics_disabled(runner, empty_snowcli_config): + result = runner.invoke_with_config_file( + empty_snowcli_config, + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "1 minute", + ], ) assert ( result.exit_code != 0 - ), "Expected a non-zero exit code due to feature flag disabled" + ), "Expected a non-zero exit code due to feature flag being disabled" expected_output = ( + "Usage: default spcs service [OPTIONS] COMMAND [ARGS]...\n" + "Try 'default spcs service --help' for help.\n" "+- Error ----------------------------------------------------------------------+\n" - "| Streaming logs from spcs containers is disabled. To enable it, add |\n" - "| 'ENABLE_SPCS_LOG_STREAMING = true' to '[cli.features]' section of your |\n" - "| configuration file. |\n" + "| No such command 'metrics'. |\n" "+------------------------------------------------------------------------------+\n" ) assert ( @@ -699,6 +872,198 @@ def test_logs_streaming_disabled(mock_is_disabled, runner): ), f"Expected formatted output not found: {result.output}" +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_latest_metrics( + mock_execute_query, runner, snapshot, enable_events_and_metrics_config +): + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + datetime(2024, 12, 10, 18, 53, 21, 809000), + datetime(2024, 12, 10, 18, 52, 51, 809000), + None, + None, + None, + json.dumps( + { + "snow.account.name": "XACCOUNTTEST1", + "snow.compute_pool.id": 20641, + "snow.compute_pool.name": "MY_POOL", + "snow.service.container.name": "log-printer", + "snow.service.name": "LOG_EVENT", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "METRIC", + json.dumps( + {"metric": {"name": "container.cpu.usage", "unit": "cpu"}} + ), + None, + "0.0005007168666666691", + None, + ) + ] + ), + ] + + result = runner.invoke_with_config_file( + enable_events_and_metrics_config, + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ], + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + assert result.output == snapshot + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + "\n" + " with rankedmetrics as (\n" + " select \n" + " *,\n" + " row_number() over (\n" + " partition by record['metric']['name'] \n" + " order by timestamp desc\n" + " ) as rank\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where \n" + " record_type = 'METRIC'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " and resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer' \n" + " and timestamp > dateadd('hour', -1, current_timestamp) \n" + " )\n" + " select *\n" + " from rankedmetrics\n" + " where rank = 1\n" + " order by timestamp desc;\n" + " " + ) + + actual_normalized = normalize_query(actual_query) + expected_normalized = normalize_query(expected_query) + + assert actual_normalized == expected_normalized, ( + f"Generated query does not match expected query.\n\n" + f"Actual:\n{actual_query}\n\nExpected:\n{expected_query}" + ) + + +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_metrics_all_filters( + mock_execute_query, runner, enable_events_and_metrics_config +): + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + datetime(2024, 12, 10, 18, 53, 21, 809000), + datetime(2024, 12, 10, 18, 52, 51, 809000), + None, + None, + None, + json.dumps( + { + "snow.account.name": "XACCOUNTTEST1", + "snow.compute_pool.id": 20641, + "snow.compute_pool.name": "MY_POOL", + "snow.service.container.name": "log-printer", + "snow.service.name": "LOG_EVENT", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "METRIC", + json.dumps( + {"metric": {"name": "container.cpu.usage", "unit": "cpu"}} + ), + None, + "0.0005007168666666691", + None, + ) + ] + ), + ] + + result = runner.invoke_with_config_file( + enable_events_and_metrics_config, + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "2 hour", + "--until", + "1 hour", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ], + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + " select *\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where (\n" + " resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer'\n" + " and timestamp >= sysdate() - interval '2 hour'\n" + " and timestamp <= sysdate() - interval '1 hour'\n" + " )\n" + " and record_type = 'METRIC'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " order by timestamp desc\n" + " " + ) + + assert ( + actual_query == expected_query + ), f"Generated query does not match expected query.\n\nActual:\n{actual_query}\n\nExpected:\n{expected_query}" + + def test_read_yaml(other_directory): tmp_dir = Path(other_directory) spec_path = tmp_dir / "spec.yml" @@ -1152,3 +1517,8 @@ def test_command_aliases(mock_connector, runner, mock_ctx, command, parameters): queries = ctx.get_queries() assert queries[0] == queries[1] + + +def normalize_query(query): + """Normalize SQL query by stripping extra whitespace and formatting.""" + return re.sub(r"\s+", " ", query.strip())