diff --git a/bin/gcs_cli.py b/bin/gcs_cli.py index 5df3424a4b2..188f7c37a3c 100755 --- a/bin/gcs_cli.py +++ b/bin/gcs_cli.py @@ -9,6 +9,7 @@ # Usage: ./bin/gcs_cli.py CMD import os +from pathlib import Path import click @@ -126,6 +127,46 @@ def list_objects(bucket_name, details): click.echo("No objects in bucket.") +@gcs_group.command("upload") +@click.argument("source") +@click.argument("destination") +def upload(source, destination): + """Upload files to a bucket""" + + client = get_client() + + # remove protocol from destination if present + destination = destination.split("://", 1)[-1] + bucket_name, _, prefix = destination.partition("/") + + try: + bucket = client.get_bucket(bucket_name) + except NotFound: + click.error(f"GCS bucket {bucket_name!r} does not exist.") + return + + source_path = Path(source) + if not source_path.exists(): + click.error("local path {source!r} does not exist.") + if source_path.is_dir(): + prefix_path = Path(prefix) + sources = [p for p in source_path.rglob("*") if not p.is_dir()] + else: + sources = [source_path] + if not sources: + click.echo("No files in directory {source!r}.") + return + for path in sources: + if path == source_path: + key_path = prefix_path + else: + key_path = prefix_path / path.relative_to(source_path) + key = "/".join(key_path.parts) + blob = bucket.blob(key) + blob.upload_from_filename(path) + click.echo(f"Uploaded gs://{bucket_name}/{key}") + + def main(argv=None): argv = argv or [] gcs_group(argv) diff --git a/bin/process_crashes.sh b/bin/process_crashes.sh index fd169df8d35..975df02a9a7 100755 --- a/bin/process_crashes.sh +++ b/bin/process_crashes.sh @@ -47,9 +47,16 @@ mkdir "${DATADIR}" || echo "${DATADIR} already exists." ./socorro-cmd fetch_crash_data "${DATADIR}" $@ # Make the bucket and sync contents -./bin/socorro_aws_s3.sh mb s3://dev-bucket/ -./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/" -./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/" +# ^^ returns CLOUD_PROVIDER value as uppercase +if [[ "${CLOUD_PROVIDER^^}" == "GCP" ]]; then + ./socorro-cmd gcs create "${CRASHSTORAGE_GCS_BUCKET}" + ./socorro-cmd gcs upload "${DATADIR}" "${CRASHSTORAGE_GCS_BUCKET}" + ./socorro-cmd gcs list_objects "${CRASHSTORAGE_GCS_BUCKET}" +else + ./bin/socorro_aws_s3.sh mb "s3://${CRASHSTORAGE_S3_BUCKET}/" + ./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/" + ./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/" +fi # Add crash ids to queue # ^^ returns CLOUD_PROVIDER value as uppercase diff --git a/bin/recreate_gcs_buckets.sh b/bin/recreate_gcs_buckets.sh new file mode 100755 index 00000000000..94711201e1b --- /dev/null +++ b/bin/recreate_gcs_buckets.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# Usage: bin/recreate_gcs_buckets.sh +# +# Deletes and recreates GCS bucket used for crash storage +# +# Note: This should be called from inside a container. + +set -euo pipefail + +cd /app + +echo "Dropping and recreating GCS crash bucket..." +/app/socorro-cmd gcs delete "${CRASHSTORAGE_GCS_BUCKET}" +/app/socorro-cmd gcs create "${CRASHSTORAGE_GCS_BUCKET}" + +echo "Dropping and recreating GCS telemetry bucket..." +/app/socorro-cmd gcs delete "${TELEMETRY_GCS_BUCKET}" +/app/socorro-cmd gcs create "${TELEMETRY_GCS_BUCKET}" diff --git a/bin/setup_services.sh b/bin/setup_services.sh index eae6b9d8e59..489d4957629 100755 --- a/bin/setup_services.sh +++ b/bin/setup_services.sh @@ -20,6 +20,9 @@ set -euo pipefail # Delete and create local S3 buckets /app/bin/recreate_s3_buckets.sh +# Delete and create local GCS buckets +/app/bin/recreate_gcs_buckets.sh + # Delete and create Elasticsearch indices /app/socorro-cmd es delete /app/socorro-cmd es create diff --git a/bin/test.sh b/bin/test.sh index fab0e3e93e3..ceda80a209b 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -54,6 +54,8 @@ echo ">>> run tests" # Run socorro tests "${PYTEST}" +# default cloud provider is aws, now configure gcp and run only impacted tests +CLOUD_PROVIDER=GCP "${PYTEST}" -m gcp # Collect static and then run pytest in the webapp pushd webapp diff --git a/docker/config/local_dev.env b/docker/config/local_dev.env index bf5f8265561..d06b9f3c792 100644 --- a/docker/config/local_dev.env +++ b/docker/config/local_dev.env @@ -145,3 +145,5 @@ PUBSUB_REPROCESSING_SUBSCRIPTION_NAME=local-reprocessing-sub # GCS # --- STORAGE_EMULATOR_HOST=http://gcs-emulator:8001 +CRASHSTORAGE_GCS_BUCKET=dev-bucket +TELEMETRY_GCS_BUCKET=telemetry-bucket diff --git a/docker/config/test.env b/docker/config/test.env index 7bfecf1d8fc..8e262c8f006 100644 --- a/docker/config/test.env +++ b/docker/config/test.env @@ -15,6 +15,10 @@ SQS_STANDARD_QUEUE=test-standard SQS_PRIORITY_QUEUE=test-priority SQS_REPROCESSING_QUEUE=test-reprocessing +# GCS +CRASHSTORAGE_GCS_BUCKET=crashstats-test +TELEMETRY_GCS_BUCKET=telemetry-test + # Pub/Sub PUBSUB_PROJECT_ID=test PUBSUB_STANDARD_TOPIC_NAME=test-standard diff --git a/docs/crashstorage.rst b/docs/crashstorage.rst index 3dc5675ca76..09363a9f793 100644 --- a/docs/crashstorage.rst +++ b/docs/crashstorage.rst @@ -252,8 +252,8 @@ Implements Radix Tree storage of crashes in a filesystem. Use cases: * For Mozilla use by the collectors. -* For other users, you can use this class as your primary storage instead of S3. - Be sure to implement this in collectors, crashmovers, processors and +* For other users, you can use this class as your primary storage instead of S3 + or GCS. Be sure to implement this in collectors, crashmovers, processors and middleware (depending on which components you use in your configuration). .. Note:: @@ -286,3 +286,20 @@ The "directory" hierarchy of that bucket looks like this: * ``{prefix}/v1/{name_of_thing}/{date}/{id}``: Raw crash data. * ``{prefix}/v1/{name_of_thing}/{id}``: Processed crash data, dumps, dump_names, and other things. + + +socorro.external.gcs: Google Cloud Storage +============================= + +The collector saves raw crash data to Google Cloud Storage. + +The processor loads raw crash data from Google Cloud Storage, processes it, and +then saves the processed crash data back to Google Cloud Storage. + +All of this is done in a single Google Cloud Storage bucket. + +The "directory" hierarchy of that bucket looks like this: + +* ``{prefix}/v1/{name_of_thing}/{date}/{id}``: Raw crash data. +* ``{prefix}/v1/{name_of_thing}/{id}``: Processed crash data, dumps, dump_names, + and other things. diff --git a/docs/dev.rst b/docs/dev.rst index a8c5777fbfd..fea718b9d0f 100644 --- a/docs/dev.rst +++ b/docs/dev.rst @@ -71,7 +71,7 @@ Setup quickstart That will build the app Docker image required for development. -5. Initialize Postgres, Elasticsearch, S3, and SQS. +5. Initialize Postgres, Elasticsearch, GCS, Pub/Sub, S3, and SQS. Then you need to set up services. To do that, run: @@ -92,7 +92,9 @@ Setup quickstart For Elasticsearch, it sets up Super Search fields and the index for processed crash data. - For S3, this creates the required buckets. + For S3 and GCS, this creates the required buckets. + + For Pub/Sub this creates topics and subscriptions. For SQS, this creates queues. @@ -651,8 +653,8 @@ Running the processor is pretty uninteresting since it'll just sit there until you give it something to process. In order to process something, you first need to acquire raw crash data, put the -data in the S3 container in the appropriate place, then you need to add the -crash id to the AWS SQS standard queue. +data in the cloud storage container in the appropriate place, then you need to +add the crash id to the standard queue. We have helper scripts for these steps. @@ -702,8 +704,8 @@ bin/process_crashes.sh ---------------------- You can use the ``bin/process_crashes.sh`` script which will fetch crash -data, sync it with the S3 bucket, and publish the crash ids to AWS SQS queue -for processing. If you have access to memory dumps and use a valid +data, sync it with the cloud storage emulator bucket, and publish the crash ids +to the queue for processing. If you have access to memory dumps and use a valid `API token`_, then memory dumps will be fetched for processing as well. It takes one or more crash ids as arguments. @@ -857,7 +859,7 @@ For help: .. Note:: - Processing will fail unless the crash data is in the S3 container first! + Processing will fail unless the crash data is in the cloud storage container first! Example using all the scripts @@ -878,15 +880,22 @@ Let's process crashes for Firefox from yesterday. We'd do this: # "crashdata" directory on the host app@socorro:/app$ cat crashids.txt | socorro-cmd fetch_crash_data ./crashdata + # if using CLOUD_PROVIDER=AWS (default) # Create a dev-bucket in localstack s3 app@socorro:/app$ bin/socorro_aws_s3.sh mb s3://dev-bucket/ - # Copy that data from the host into the localstack s3 container app@socorro:/app$ bin/socorro_aws_s3.sh sync ./crashdata s3://dev-bucket/ - - # Add all the crash ids to the queue + # Add all the crash ids to the sqs queue app@socorro:/app$ cat crashids.txt | socorro-cmd sqs publish local-dev-standard + # or if using CLOUD_PROVIDER=GCP + # Create a dev-bucket in the GCS emulator + app@socorro:/app$ socorro-cmd gcs create dev-bucket + # Copy that data from the host into the GCS emulator + app@socorro:/app$ socorro-cmd gcs upload ./crashdata gs://dev-bucket/ + # Add all the crash ids to the pubsub topic + app@socorro:/app$ cat crashids.txt | socorro-cmd pubsub publish local-standard-topic + # Then exit the container app@socorro:/app$ exit @@ -911,8 +920,8 @@ To run Antenna in the Socorro local dev environment, do:: It will listen on ``http://localhost:8888/`` for incoming crashes from a breakpad crash reporter. It will save crash data to the ``dev-bucket`` in the -local S3 which is where the processor looks for it. It will publish the crash -ids to the AWS SQS standard queue. +local cloud storage which is where the processor looks for it. It will publish +the crash ids to the standard queue. Connect to PostgreSQL database diff --git a/pytest.ini b/pytest.ini index 61ebc2ccd2f..819f76956d4 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,7 +2,8 @@ # -rsxX - show skipped, failed, and passed tests # --tb=native - print native traceback # -p no:django - disable the pytest-django plugin for Socorro tests -addopts = -rsxX --tb=native -p no:django +# -m 'not gcp' - skip gcp tests unless explicitly requested +addopts = -rsxX --tb=native -p no:django -m 'not gcp' norecursedirs = .git docs config docker __pycache__ testpaths = socorro/ @@ -21,3 +22,7 @@ filterwarnings = # pubsub deprecated the return_immediately flag because it negatively impacts performance, but # that performance cost is fine for our use case, especially in tests. ignore:The return_immediately flag is deprecated and should be set to False.:DeprecationWarning:google.pubsub_v1 + +markers = + aws: tests that require aws backends to be configured in the environment. this is the default. + gcp: tests that require gcp backends to be configured in the environment. skipped unless explicitly requested. diff --git a/socorro/external/gcs/__init__.py b/socorro/external/gcs/__init__.py new file mode 100644 index 00000000000..448bb8652d6 --- /dev/null +++ b/socorro/external/gcs/__init__.py @@ -0,0 +1,3 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. diff --git a/socorro/external/gcs/crashstorage.py b/socorro/external/gcs/crashstorage.py new file mode 100644 index 00000000000..e8aa730ff68 --- /dev/null +++ b/socorro/external/gcs/crashstorage.py @@ -0,0 +1,345 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import json +import os + +import markus +from google.auth.credentials import AnonymousCredentials +from google.api_core.exceptions import NotFound +from google.cloud import storage +from more_itertools import chunked + +from socorro.external.crashstorage_base import ( + CrashStorageBase, + CrashIDNotFound, + MemoryDumpsMapping, +) +from socorro.external.crash_data_mixin import ( + SimplifiedCrashDataMixin, + TelemetryCrashDataMixin, +) +from socorro.external.boto.crashstorage import ( + build_keys, + dict_to_str, + list_to_str, + str_to_list, +) +from socorro.external.boto.connection_context import KeyNotFound +from socorro.lib.libjsonschema import JsonSchemaReducer +from socorro.lib.libsocorrodataschema import ( + get_schema, + permissions_transform_function, + SocorroDataReducer, + transform_schema, +) +from socorro.schemas import TELEMETRY_SOCORRO_CRASH_SCHEMA + + +class GcsCrashStorage(SimplifiedCrashDataMixin, CrashStorageBase): + """Saves and loads crash data to GCS""" + + # Attach to class so it's easier to access without imports + KeyNotFound = KeyNotFound + + def __init__( + self, + bucket="crashstats", + dump_file_suffix=".dump", + metrics_prefix="processor.gcs", + ): + """ + :arg bucket: the GCS bucket to save to + :arg dump_file_suffix: the suffix used to identify a dump file (for use in temp + files) + :arg region: the AWS region to use + :arg access_key: the AWS access_key to use + :arg secret_access_key: the AWS secret_access_key to use + :arg endpoint_url: the endpoint url to use when in a local development + environment + + """ + super().__init__() + + if emulator := os.environ.get("STORAGE_EMULATOR_HOST"): + self.logger.debug( + "STORAGE_EMULATOR_HOST detected, connecting to emulator: %s", + emulator, + ) + self.client = storage.Client( + credentials=AnonymousCredentials(), + project="test", + ) + else: + self.client = storage.Client() + + self.bucket = bucket + self.dump_file_suffix = dump_file_suffix + + self.metrics = markus.get_metrics(metrics_prefix) + + def load_file(self, path): + try: + bucket = self.client.get_bucket(self.bucket) + blob = bucket.blob(path) + return blob.download_as_bytes() + except NotFound as exc: + raise KeyNotFound( + f"(bucket={self.bucket!r} key={path}) not found, no value returned" + ) from exc + + def save_file(self, path, data): + bucket = self.client.get_bucket(self.bucket) + blob = bucket.blob(path) + blob.upload_from_string(data) + + def delete_file(self, path): + try: + bucket = self.client.get_bucket(self.bucket) + blob = bucket.blob(path) + blob.delete() + except NotFound as exc: + raise KeyNotFound( + f"(bucket={self.bucket!r} key={path}) not found, no value returned" + ) from exc + + def save_raw_crash(self, raw_crash, dumps, crash_id): + """Save raw crash data to GCS bucket. + + A raw crash consists of the raw crash annotations and all the dumps that came in + the crash report. We need to save the raw crash file, a dump names file listing + the dumps that came in the crash report, and then each of the dumps. + + """ + if dumps is None: + dumps = MemoryDumpsMapping() + + path = build_keys("raw_crash", crash_id)[0] + raw_crash_data = dict_to_str(raw_crash).encode("utf-8") + self.save_file(path, raw_crash_data) + + path = build_keys("dump_names", crash_id)[0] + dump_names_data = list_to_str(dumps.keys()).encode("utf-8") + self.save_file(path, dump_names_data) + + # We don't know what type of dumps mapping we have. We do know, + # however, that by calling the memory_dump_mapping method, we will get + # a MemoryDumpMapping which is exactly what we need. + dumps = dumps.as_memory_dumps_mapping() + for dump_name, dump in dumps.items(): + if dump_name in (None, "", "upload_file_minidump"): + dump_name = "dump" + path = build_keys(dump_name, crash_id)[0] + self.save_file(path, dump) + + def save_processed_crash(self, raw_crash, processed_crash): + """Save the processed crash file.""" + crash_id = processed_crash["uuid"] + data = dict_to_str(processed_crash).encode("utf-8") + path = build_keys("processed_crash", crash_id)[0] + self.save_file(path, data) + + def list_objects_paginator(self, prefix, page_size=1000): + """Yield pages of object keys in the bucket that have a specified key prefix + + :arg prefix: the prefix to look at + :arg page_size: the number of results to return per page + + :returns: generator of pages (lists) of object keys + + """ + for page in chunked( + self.client.list_blobs( + bucket_or_name=self.bucket, prefix=prefix, page_size=page_size + ), + page_size, + ): + yield [blob.name for blob in page] + + def exists_object(self, key): + """Returns whether the object exists in the bucket + + :arg key: the key to check + + :returns: bool + + """ + try: + bucket = self.client.get_bucket(self.bucket) + bucket.get_blob(key) + return True + except NotFound: + return False + + def get_raw_crash(self, crash_id): + """Get the raw crash file for the given crash id + + :returns: dict + + :raises CrashIDNotFound: if the crash doesn't exist + + """ + for path in build_keys("raw_crash", crash_id): + try: + raw_crash_as_string = self.load_file(path) + data = json.loads(raw_crash_as_string) + return data + except KeyNotFound: + continue + + raise CrashIDNotFound(f"{crash_id} not found") + + def get_raw_dump(self, crash_id, name=None): + """Get a specified dump file for the given crash id. + + :returns: dump as bytes + + :raises CrashIDNotFound: if file does not exist + + """ + try: + if name in (None, "", "upload_file_minidump"): + name = "dump" + path = build_keys(name, crash_id)[0] + a_dump = self.load_file(path) + return a_dump + except KeyNotFound as exc: + raise CrashIDNotFound(f"{crash_id} not found: {exc}") from exc + + def get_dumps(self, crash_id): + """Get all the dump files for a given crash id. + + :returns MemoryDumpsMapping: + + :raises CrashIDNotFound: if file does not exist + + """ + try: + path = build_keys("dump_names", crash_id)[0] + dump_names_as_string = self.load_file(path) + dump_names = str_to_list(dump_names_as_string) + + dumps = MemoryDumpsMapping() + for dump_name in dump_names: + if dump_name in (None, "", "upload_file_minidump"): + dump_name = "dump" + path = build_keys(dump_name, crash_id)[0] + dumps[dump_name] = self.load_file(path) + return dumps + except KeyNotFound as exc: + raise CrashIDNotFound(f"{crash_id} not found: {exc}") from exc + + def get_dumps_as_files(self, crash_id, tmpdir): + """Get the dump files for given crash id and save them to tmp. + + :returns: dict of dumpname -> file path + + :raises CrashIDNotFound: if file does not exist + + """ + in_memory_dumps = self.get_dumps(crash_id) + # convert our native memory dump mapping into a file dump mapping. + return in_memory_dumps.as_file_dumps_mapping( + crash_id, + tmpdir, + self.dump_file_suffix, + ) + + def get_processed_crash(self, crash_id): + """Get the processed crash. + + :returns: dict + + :raises CrashIDNotFound: if file does not exist + + """ + path = build_keys("processed_crash", crash_id)[0] + try: + processed_crash_as_string = self.load_file(path) + except KeyNotFound as exc: + raise CrashIDNotFound(f"{crash_id} not found: {exc}") from exc + return json.loads(processed_crash_as_string) + + +class TelemetryGcsCrashStorage(TelemetryCrashDataMixin, GcsCrashStorage): + """Sends a subset of the processed crash to a GCS bucket + + The subset of the processed crash is based on the JSON Schema which is + derived from "socorro/external/es/super_search_fields.py". + + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Create a reducer that traverses documents and reduces them down to the + # structure of the specified schema + self.build_reducers() + + def build_reducers(self): + processed_crash_schema = get_schema("processed_crash.schema.yaml") + only_public = permissions_transform_function( + permissions_have=["public"], + default_permissions=processed_crash_schema["default_permissions"], + ) + public_processed_crash_schema = transform_schema( + schema=processed_crash_schema, + transform_function=only_public, + ) + self.processed_crash_reducer = SocorroDataReducer( + schema=public_processed_crash_schema + ) + + self.telemetry_reducer = JsonSchemaReducer( + schema=TELEMETRY_SOCORRO_CRASH_SCHEMA + ) + + # List of source -> target keys which have different names for historical reasons + HISTORICAL_MANUAL_KEYS = [ + # processed crash source key, crash report target key + ("build", "build_id"), + ("date_processed", "date"), + ("os_pretty_version", "platform_pretty_version"), + ("os_name", "platform"), + ("os_version", "platform_version"), + ] + + def save_processed_crash(self, raw_crash, processed_crash): + """Save processed crash data. + + For Telemetry, we reduce the processed crash into a crash report that matches + the telemetry_socorro_crash.json schema. + + For historical reasons, we then add some additional fields manually. + + """ + # Reduce processed crash to public-only fields + public_data = self.processed_crash_reducer.traverse(document=processed_crash) + + # Reduce public processed_crash to telemetry schema fields + telemetry_data = self.telemetry_reducer.traverse(document=public_data) + + # Add additional fields that have different names for historical reasons + for source_key, target_key in self.HISTORICAL_MANUAL_KEYS: + if source_key in public_data: + telemetry_data[target_key] = public_data[source_key] + + crash_id = telemetry_data["uuid"] + data = dict_to_str(telemetry_data).encode("utf-8") + path = build_keys("crash_report", crash_id)[0] + self.save_file(path, data) + + def get_processed_crash(self, crash_id): + """Get a crash report from the GCS bucket. + + :returns: dict + + :raises CrashIDNotFound: if file does not exist + + """ + path = build_keys("crash_report", crash_id)[0] + try: + crash_report_as_str = self.load_file(path) + except KeyNotFound as exc: + raise CrashIDNotFound(f"{crash_id} not found: {exc}") from exc + return json.loads(crash_report_as_str) diff --git a/socorro/mozilla_settings.py b/socorro/mozilla_settings.py index d3f5d40711b..b13ab2b630a 100644 --- a/socorro/mozilla_settings.py +++ b/socorro/mozilla_settings.py @@ -210,28 +210,8 @@ def _or_none(val): }, } - -def cloud_provider_parser(val): - """Return 'AWS' or 'GCP'.""" - normalized = val.strip().upper() - if normalized in ("AWS", "GCP"): - return normalized - raise ValueError(f"cloud provider not supported, must be AWS or GCP: {val}") - - -# Cloud provider specific configuration -CLOUD_PROVIDER = _config( - "CLOUD_PROVIDER", - default="AWS", - parser=cloud_provider_parser, - doc="The cloud provider to use for queueing and blob storage. Must be AWS or GCP.", -) -if CLOUD_PROVIDER == "AWS": - QUEUE = QUEUE_SQS -elif CLOUD_PROVIDER == "GCP": - QUEUE = QUEUE_PUBSUB - -STORAGE = { +# Crash report storage configuration if CLOUD_PROVIDER == AWS +S3_STORAGE = { "class": "socorro.external.boto.crashstorage.BotoS3CrashStorage", "options": { "metrics_prefix": "processor.s3", @@ -258,6 +238,18 @@ def cloud_provider_parser(val): "endpoint_url": LOCAL_DEV_AWS_ENDPOINT_URL, }, } +# Crash report storage configuration if CLOUD_PROVIDER == GCP +GCS_STORAGE = { + "class": "socorro.external.gcs.crashstorage.GcsCrashStorage", + "options": { + "metrics_prefix": "processor.gcs", + "bucket": _config( + "CRASHSTORAGE_GCS_BUCKET", + default="", + doc="GCS bucket name for crash report data.", + ), + }, +} ES_STORAGE = { "class": "socorro.external.es.crashstorage.ESCrashStorage", @@ -277,7 +269,8 @@ def cloud_provider_parser(val): }, } -TELEMETRY_STORAGE = { +# Telemetry crash report storage configuration if CLOUD_PROVIDER == AWS +TELEMETRY_S3_STORAGE = { "class": "socorro.external.boto.crashstorage.TelemetryBotoS3CrashStorage", "options": { "metrics_prefix": "processor.telemetry", @@ -300,8 +293,45 @@ def cloud_provider_parser(val): "endpoint_url": LOCAL_DEV_AWS_ENDPOINT_URL, }, } +# Telemetry crash report storage configuration if CLOUD_PROVIDER == GCP +TELEMETRY_GCS_STORAGE = { + "class": "socorro.external.gcs.crashstorage.TelemetryGcsCrashStorage", + "options": { + "metrics_prefix": "processor.telemetry", + "bucket": _config( + "TELEMETRY_GCS_BUCKET", + default="", + doc="GCS bucket name for telemetry data export.", + ), + }, +} + + +def cloud_provider_parser(val): + """Return 'AWS' or 'GCP'.""" + normalized = val.strip().upper() + if normalized in ("AWS", "GCP"): + return normalized + raise ValueError(f"cloud provider not supported, must be AWS or GCP: {val}") + + +# Cloud provider specific configuration +CLOUD_PROVIDER = _config( + "CLOUD_PROVIDER", + default="AWS", + parser=cloud_provider_parser, + doc="The cloud provider to use for queueing and blob storage. Must be AWS or GCP.", +) +if CLOUD_PROVIDER == "AWS": + QUEUE = QUEUE_SQS + STORAGE = S3_STORAGE + TELEMETRY_STORAGE = TELEMETRY_S3_STORAGE +elif CLOUD_PROVIDER == "GCP": + QUEUE = QUEUE_PUBSUB + STORAGE = GCS_STORAGE + TELEMETRY_STORAGE = TELEMETRY_GCS_STORAGE -# Crash report storage source pulls from cloud storage +# Crash report storage source pulls from S3 or GCS CRASH_SOURCE = STORAGE # Each key in this list corresponds to a key in this dict containing a crash report data diff --git a/socorro/tests/conftest.py b/socorro/tests/conftest.py index fa8311cbdb9..fc25daad4a0 100644 --- a/socorro/tests/conftest.py +++ b/socorro/tests/conftest.py @@ -18,6 +18,8 @@ from botocore.client import ClientError, Config from elasticsearch_dsl import Search from google.api_core.exceptions import AlreadyExists, NotFound +from google.auth.credentials import AnonymousCredentials +from google.cloud import storage from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient from google.cloud.pubsub_v1.types import BatchSettings, PublisherOptions from markus.testing import MetricsMock @@ -188,6 +190,87 @@ def s3_helper(): yield s3_helper +class GcsHelper: + """GCS helper class. + + When used in a context, this will clean up any buckets created. + + """ + + def __init__(self): + self._buckets_seen = None + if os.environ.get("STORAGE_EMULATOR_HOST"): + self.client = storage.Client( + credentials=AnonymousCredentials(), + project="test", + ) + else: + self.client = storage.Client() + + def __enter__(self): + self._buckets_seen = set() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + for bucket_name in self._buckets_seen: + try: + bucket = self.client.get_bucket(bucket_or_name=bucket_name) + bucket.delete(force=True) + except NotFound: + pass + self._buckets_seen = None + + def get_crashstorage_bucket(self): + return os.environ["CRASHSTORAGE_GCS_BUCKET"] + + def get_telemetry_bucket(self): + return os.environ["TELEMETRY_GCS_BUCKET"] + + def create_bucket(self, bucket_name): + """Create specified bucket if it doesn't exist.""" + try: + bucket = self.client.get_bucket(bucket_or_name=bucket_name) + except NotFound: + bucket = self.client.create_bucket(bucket_or_name=bucket_name) + if self._buckets_seen is not None: + self._buckets_seen.add(bucket_name) + return bucket + + def upload(self, bucket_name, key, data): + """Puts an object into the specified bucket.""" + bucket = self.create_bucket(bucket_name) + bucket.blob(blob_name=key).upload_from_string(data) + + def download(self, bucket_name, key): + """Fetches an object from the specified bucket""" + bucket = self.create_bucket(bucket_name) + return bucket.blob(blob_name=key).download_as_bytes() + + def list(self, bucket_name): + """Return list of keys for objects in bucket.""" + self.create_bucket(bucket_name) + blobs = list(self.client.list_blobs(bucket_or_name=bucket_name)) + return [blob.name for blob in blobs] + + +@pytest.fixture +def gcs_helper(): + """Returns an GcsHelper for automating repetitive tasks in GCS setup. + + Provides: + + * ``get_crashstorage_bucket()`` + * ``get_telemetry_bucket()`` + * ``create_bucket(bucket_name)`` + * ``upload(bucket_name, key, data)`` + * ``download(bucket_name, key)`` + * ``list(bucket_name)`` + + """ + with GcsHelper() as gcs_helper: + yield gcs_helper + + class ElasticsearchHelper: """Elasticsearch helper class. @@ -551,6 +634,20 @@ def queue_helper(cloud_provider): @pytest.fixture -def storage_helper(s3_helper): - """Generate and return a cloud storage helper using env config.""" - yield s3_helper +def storage_helper(cloud_provider): + """Generate and return a queue helper using env config.""" + actual_backend = "s3" + if os.environ.get("CLOUD_PROVIDER", "").strip().upper() == "GCP": + actual_backend = "gcs" + + expect_backend = "gcs" if cloud_provider == "gcp" else "s3" + if actual_backend != expect_backend: + pytest.fail(f"test requires {expect_backend} but found {actual_backend}") + + if actual_backend == "gcs": + helper = GcsHelper() + else: + helper = S3Helper() + + with helper as _helper: + yield _helper diff --git a/socorro/tests/external/gcs/__init__.py b/socorro/tests/external/gcs/__init__.py new file mode 100644 index 00000000000..448bb8652d6 --- /dev/null +++ b/socorro/tests/external/gcs/__init__.py @@ -0,0 +1,3 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. diff --git a/socorro/tests/external/gcs/test_crash_data.py b/socorro/tests/external/gcs/test_crash_data.py new file mode 100644 index 00000000000..e465b2b9885 --- /dev/null +++ b/socorro/tests/external/gcs/test_crash_data.py @@ -0,0 +1,140 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import json +import os + +import pytest + +from socorro.external.crashstorage_base import CrashIDNotFound +from socorro.lib import MissingArgumentError, BadArgumentError +from socorro.libclass import build_instance_from_settings +from socorro.lib.libooid import create_new_ooid + + +CRASHDATA_SETTINGS = { + "class": "socorro.external.gcs.crashstorage.GcsCrashStorage", + "options": { + "bucket": os.environ["CRASHSTORAGE_GCS_BUCKET"], + }, +} + +TELEMETRY_SETTINGS = { + "class": "socorro.external.gcs.crashstorage.TelemetryGcsCrashStorage", + "options": { + "bucket": os.environ["TELEMETRY_GCS_BUCKET"], + }, +} + + +class TestSimplifiedCrashData: + def test_get_processed(self, gcs_helper): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + bucket = CRASHDATA_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/processed_crash/{crash_id}", + data=json.dumps({"foo": "bar"}).encode("utf-8"), + ) + + result = crashdata.get(uuid=crash_id, datatype="processed") + assert result == {"foo": "bar"} + + def test_get_processed_not_found(self, gcs_helper): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + bucket = CRASHDATA_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + with pytest.raises(CrashIDNotFound): + crashdata.get(uuid=crash_id, datatype="processed") + + def test_get_raw_dump(self, gcs_helper): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + bucket = CRASHDATA_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b"\xa0", + ) + + result = crashdata.get(uuid=crash_id, datatype="raw") + assert result == b"\xa0" + + def test_get_raw_dump_not_found(self, gcs_helper): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + bucket = CRASHDATA_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + with pytest.raises(CrashIDNotFound): + crashdata.get(uuid=crash_id, datatype="raw") + + def test_get_raw_crash_not_found(self, gcs_helper): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + bucket = CRASHDATA_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + with pytest.raises(CrashIDNotFound): + crashdata.get(uuid=crash_id, datatype="meta") + + def test_bad_arguments(self): + crashdata = build_instance_from_settings(CRASHDATA_SETTINGS) + + crash_id = create_new_ooid() + + with pytest.raises(MissingArgumentError): + crashdata.get() + + with pytest.raises(MissingArgumentError): + crashdata.get(uuid=crash_id) + + with pytest.raises(BadArgumentError): + crashdata.get(uuid=crash_id, datatype="junk") + + +class TestTelemetryCrashData: + def test_get_data(self, gcs_helper): + crashdata = build_instance_from_settings(TELEMETRY_SETTINGS) + + bucket = TELEMETRY_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/crash_report/20{crash_id[-6:]}/{crash_id}", + data=json.dumps({"foo": "bar"}).encode("utf-8"), + ) + + result = crashdata.get(uuid=crash_id) + assert result == {"foo": "bar"} + + def test_get_data_not_found(self, gcs_helper): + crashdata = build_instance_from_settings(TELEMETRY_SETTINGS) + + bucket = TELEMETRY_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + with pytest.raises(CrashIDNotFound): + crashdata.get(uuid=crash_id) + + def test_bad_arguments(self): + crashdata = build_instance_from_settings(TELEMETRY_SETTINGS) + + with pytest.raises(MissingArgumentError): + crashdata.get() diff --git a/socorro/tests/external/gcs/test_crashstorage.py b/socorro/tests/external/gcs/test_crashstorage.py new file mode 100644 index 00000000000..a2b3f190440 --- /dev/null +++ b/socorro/tests/external/gcs/test_crashstorage.py @@ -0,0 +1,473 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import json +import os.path +import os + +import pytest + +from socorro.external.gcs.crashstorage import build_keys, dict_to_str +from socorro.external.crashstorage_base import CrashIDNotFound, MemoryDumpsMapping +from socorro.libclass import build_instance_from_settings +from socorro.lib.libdatetime import date_to_string, utc_now +from socorro.lib.libooid import create_new_ooid + + +CRASHSTORAGE_SETTINGS = { + "class": "socorro.external.gcs.crashstorage.GcsCrashStorage", + "options": { + "bucket": os.environ["CRASHSTORAGE_GCS_BUCKET"], + }, +} + + +TELEMETRY_SETTINGS = { + "class": "socorro.external.gcs.crashstorage.TelemetryGcsCrashStorage", + "options": { + "bucket": os.environ["TELEMETRY_GCS_BUCKET"], + }, +} + + +@pytest.mark.parametrize( + "kind, crashid, expected", + [ + ( + "raw_crash", + "0bba929f-8721-460c-dead-a43c20071027", + [ + "v1/raw_crash/20071027/0bba929f-8721-460c-dead-a43c20071027", + ], + ), + ( + "dump_names", + "0bba929f-8721-460c-dead-a43c20071027", + [ + "v1/dump_names/0bba929f-8721-460c-dead-a43c20071027", + ], + ), + ( + "processed_crash", + "0bba929f-8721-460c-dead-a43c20071027", + [ + "v1/processed_crash/0bba929f-8721-460c-dead-a43c20071027", + ], + ), + # For telemetry + ( + "crash_report", + "0bba929f-8721-460c-dead-a43c20071027", + [ + "v1/crash_report/20071027/0bba929f-8721-460c-dead-a43c20071027", + ], + ), + ], +) +def test_build_keys(kind, crashid, expected): + assert build_keys(kind, crashid) == expected + + +class TestGcsCrashStorage: + def test_save_raw_crash_no_dumps(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + now = utc_now() + crash_id = create_new_ooid(timestamp=now) + original_raw_crash = {"submitted_timestamp": date_to_string(now)} + gcs_helper.create_bucket(bucket) + + # Run save_raw_crash + crashstorage.save_raw_crash( + raw_crash=original_raw_crash, + # This is an empty set of dumps--no dumps! + dumps=MemoryDumpsMapping(), + crash_id=crash_id, + ) + + # Verify the raw_crash made it to the right place and has the right + # contents + raw_crash = gcs_helper.download( + bucket_name=bucket, + key=f"v1/raw_crash/20{crash_id[-6:]}/{crash_id}", + ) + + assert json.loads(raw_crash) == original_raw_crash + + # Verify dump_names made it to the right place and has the right contents + dump_names = gcs_helper.download( + bucket_name=bucket, + key=f"v1/dump_names/{crash_id}", + ) + assert json.loads(dump_names) == [] + + def test_save_raw_crash_with_dumps(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + now = utc_now() + crash_id = create_new_ooid(timestamp=now) + original_raw_crash = {"submitted_timestamp": date_to_string(now)} + gcs_helper.create_bucket(bucket) + + # Run save_raw_crash + crashstorage.save_raw_crash( + raw_crash=original_raw_crash, + dumps=MemoryDumpsMapping( + {"dump": b"fake dump", "content_dump": b"fake content dump"} + ), + crash_id=crash_id, + ) + + # Verify the raw_crash made it to the right place and has the right contents + raw_crash = gcs_helper.download( + bucket_name=bucket, + key=f"v1/raw_crash/20{crash_id[-6:]}/{crash_id}", + ) + + assert json.loads(raw_crash) == original_raw_crash + + # Verify dump_names made it to the right place and has the right + # contents + dump_names = gcs_helper.download( + bucket_name=bucket, + key=f"v1/dump_names/{crash_id}", + ) + assert sorted(json.loads(dump_names)) == ["content_dump", "dump"] + + # Verify dumps + dump = gcs_helper.download( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + ) + assert dump == b"fake dump" + + content_dump = gcs_helper.download( + bucket_name=bucket, + key=f"v1/content_dump/{crash_id}", + ) + assert content_dump == b"fake content dump" + + def test_save_processed_crash(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + now = utc_now() + crash_id = create_new_ooid(timestamp=now) + original_raw_crash = {"submitted_timestamp": date_to_string(now)} + original_processed_crash = { + "uuid": crash_id, + "completed_datetime": date_to_string(now), + "signature": "now_this_is_a_signature", + } + + gcs_helper.create_bucket(bucket) + crashstorage.save_processed_crash( + raw_crash=original_raw_crash, + processed_crash=original_processed_crash, + ) + + # Verify processed crash is saved + processed_crash = gcs_helper.download( + bucket_name=bucket, + key=f"v1/processed_crash/{crash_id}", + ) + assert json.loads(processed_crash) == original_processed_crash + # Verify nothing else got saved + assert gcs_helper.list(bucket_name=bucket) == [f"v1/processed_crash/{crash_id}"] + + def test_get_raw_crash(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + now = utc_now() + crash_id = create_new_ooid(timestamp=now) + original_raw_crash = {"submitted_timestamp": date_to_string(now)} + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/raw_crash/20{crash_id[-6:]}/{crash_id}", + data=dict_to_str(original_raw_crash).encode("utf-8"), + ) + + result = crashstorage.get_raw_crash(crash_id) + expected = { + "submitted_timestamp": original_raw_crash["submitted_timestamp"], + } + assert result == expected + + def test_get_raw_crash_not_found(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + with pytest.raises(CrashIDNotFound): + crashstorage.get_raw_crash(crash_id) + + def test_get_raw_dump(self, gcs_helper): + """test fetching the raw dump without naming it""" + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b"this is a raw dump", + ) + + result = crashstorage.get_raw_dump(crash_id) + assert result == b"this is a raw dump" + + def test_get_raw_dump_not_found(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + with pytest.raises(CrashIDNotFound): + crashstorage.get_raw_dump(crash_id) + + def test_get_raw_dump_upload_file_minidump(self, gcs_helper): + """test fetching the raw dump, naming it 'upload_file_minidump'""" + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b"this is a raw dump", + ) + + result = crashstorage.get_raw_dump(crash_id, name="upload_file_minidump") + assert result == b"this is a raw dump" + + def test_get_raw_dump_empty_string(self, gcs_helper): + """test fetching the raw dump, naming it with empty string""" + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b"this is a raw dump", + ) + + result = crashstorage.get_raw_dump(crash_id, name="") + assert result == b"this is a raw dump" + + def test_get_dumps(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump_names/{crash_id}", + data=b'["dump", "content_dump", "city_dump"]', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b'this is "dump", the first one', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/content_dump/{crash_id}", + data=b'this is "content_dump", the second one', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/city_dump/{crash_id}", + data=b'this is "city_dump", the last one', + ) + + result = crashstorage.get_dumps(crash_id) + assert result == { + "dump": b'this is "dump", the first one', + "content_dump": b'this is "content_dump", the second one', + "city_dump": b'this is "city_dump", the last one', + } + + def test_get_dumps_not_found(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + with pytest.raises(CrashIDNotFound): + crashstorage.get_dumps(crash_id) + + def test_get_dumps_as_files(self, gcs_helper, tmp_path): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump_names/{crash_id}", + data=b'["dump", "content_dump", "city_dump"]', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/dump/{crash_id}", + data=b'this is "dump", the first one', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/content_dump/{crash_id}", + data=b'this is "content_dump", the second one', + ) + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/city_dump/{crash_id}", + data=b'this is "city_dump", the last one', + ) + + result = crashstorage.get_dumps_as_files( + crash_id=crash_id, tmpdir=str(tmp_path) + ) + + # We don't care much about the mocked internals as the bulk of that function is + # tested elsewhere. We just need to be concerned about the file writing worked. + expected = { + "content_dump": os.path.join( + str(tmp_path), + f"{crash_id}.content_dump.TEMPORARY.dump", + ), + "city_dump": os.path.join( + str(tmp_path), + f"{crash_id}.city_dump.TEMPORARY.dump", + ), + "upload_file_minidump": os.path.join( + str(tmp_path), + f"{crash_id}.upload_file_minidump.TEMPORARY.dump", + ), + } + assert result == expected + + def test_get_processed_crash(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + processed_crash = { + "a": {"b": {"c": 11}}, + "sensitive": {"x": 2}, + "not_url": "not a url", + # These keys do not survive redaction + "url": "http://example.com", + "json_dump": {"sensitive": 22}, + } + + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/processed_crash/{crash_id}", + data=dict_to_str(processed_crash).encode("utf-8"), + ) + + result = crashstorage.get_processed_crash(crash_id) + assert result == processed_crash + + def test_get_processed_not_found(self, gcs_helper): + crashstorage = build_instance_from_settings(CRASHSTORAGE_SETTINGS) + bucket = CRASHSTORAGE_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + + gcs_helper.create_bucket(bucket) + with pytest.raises(CrashIDNotFound): + crashstorage.get_processed_crash(crash_id) + + +class TestTelemetryGcsCrashStorage: + def test_save_processed_crash(self, gcs_helper): + crashstorage = build_instance_from_settings(TELEMETRY_SETTINGS) + bucket = TELEMETRY_SETTINGS["options"]["bucket"] + now = utc_now() + crash_id = create_new_ooid(timestamp=now) + original_raw_crash = {"submitted_timestamp": date_to_string(now)} + gcs_helper.create_bucket(bucket) + + # Run save_processed_crash + crashstorage.save_processed_crash( + raw_crash=original_raw_crash, + processed_crash={ + "uuid": crash_id, + "completed_datetime": date_to_string(now), + "signature": "now_this_is_a_signature", + "os_name": "Linux", + "some_random_key": "should not appear", + "json_dump": { + "crash_info": { + "address": "0x6357737b", + "some_random_key": "should not appear", + }, + "crashing_thread": { + "frames": [ + { + "frame": 0, + "module": "xul.dll", + "function": None, + "some_random_key": "should not appear", + }, + ], + }, + }, + }, + ) + + # Get the crash data we just saved from the bucket and verify it's contents + crash_data = gcs_helper.download( + bucket_name=bucket, + key=f"v1/crash_report/20{crash_id[-6:]}/{crash_id}", + ) + assert json.loads(crash_data) == { + "platform": "Linux", + "signature": "now_this_is_a_signature", + "uuid": crash_id, + "json_dump": { + "crash_info": { + "address": "0x6357737b", + }, + "crashing_thread": { + "frames": [ + { + "frame": 0, + "function": None, + "module": "xul.dll", + }, + ], + }, + }, + } + + def test_get_processed_crash(self, gcs_helper): + crashstorage = build_instance_from_settings(TELEMETRY_SETTINGS) + bucket = TELEMETRY_SETTINGS["options"]["bucket"] + crash_id = create_new_ooid() + gcs_helper.create_bucket(bucket) + + crash_data = { + "platform": "Linux", + "signature": "now_this_is_a_signature", + "uuid": crash_id, + } + + # Save the data to GCS so we have something to get + gcs_helper.upload( + bucket_name=bucket, + key=f"v1/crash_report/20{crash_id[-6:]}/{crash_id}", + data=json.dumps(crash_data).encode("utf-8"), + ) + + # Get the crash and assert it's the same data + data = crashstorage.get_processed_crash(crash_id=crash_id) + assert data == crash_data