From 16db6144fbe2a696668367cd4f41a757c3ffa4e5 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 24 Sep 2024 18:02:50 -0400 Subject: [PATCH 01/12] check new samples job checks all successful runs --- .../check_for_new_samples_from_pipeline.py | 88 ++++++++++++------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index ffa517cba3..c5f1c780e4 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -5,11 +5,12 @@ from django.db.models.functions import JSONObject import json import logging +import re from reference_data.models import GENOME_VERSION_LOOKUP from seqr.models import Family, Sample, SavedVariant from seqr.utils.communication_utils import safe_post_to_slack -from seqr.utils.file_utils import file_iter, does_file_exist +from seqr.utils.file_utils import file_iter, list_files from seqr.utils.search.add_data_utils import notify_search_data_loaded from seqr.utils.search.utils import parse_valid_variant_id from seqr.utils.search.hail_search_utils import hail_variant_multi_lookup, search_data_type @@ -22,7 +23,10 @@ logger = logging.getLogger(__name__) -GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v3.1/{path}/runs/{version}/' +GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v3.1/{genome_version}/{dataset_type}/runs/{version}/_SUCCESS' +GS_PATH_FIELDS = ['genome_version', 'dataset_type', 'version'] +GS_PATH_REGEX = GS_PATH_TEMPLATE.format(**{field: f'(?P<{field}>[^/]+)' for field in GS_PATH_FIELDS}) + DATASET_TYPE_MAP = {'GCNV': Sample.DATASET_TYPE_SV_CALLS} USER_EMAIL = 'manage_command' MAX_LOOKUP_VARIANTS = 5000 @@ -39,32 +43,61 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument('path') - parser.add_argument('version') - parser.add_argument('--allow-failed', action='store_true') + parser.add_argument('--genome_version') + parser.add_argument('--dataset_type') + parser.add_argument('--version') def handle(self, *args, **options): - path = options['path'] - version = options['version'] - genome_version, dataset_type = path.split('/') - dataset_type = DATASET_TYPE_MAP.get(dataset_type, dataset_type) - - if Sample.objects.filter(data_source=version, is_active=True).exists(): - logger.info(f'Data already loaded for {path}: {version}') + gs_path = GS_PATH_TEMPLATE.format(**{field: options[field] or '*' for field in GS_PATH_FIELDS}) + success_runs = {path: re.match(GS_PATH_REGEX, path).groupdict() for path in list_files(gs_path, user=None)} + if not success_runs: + user_args = {f'{k}={v}' for k, v in options.items() if v} + raise CommandError(f'No successful runs found for {", ".join(user_args)}') + + loaded_runs = set(Sample.objects.filter(data_source__isnull=False, is_active=True).values_list('data_source', flat=True)) + new_runs = {path: run for path, run in success_runs.items() if run['version'] not in loaded_runs} + if not new_runs: + logger.info(f'Data already loaded for all {len(success_runs)} runs') return - logger.info(f'Loading new samples from {path}: {version}') - gs_path = GS_PATH_TEMPLATE.format(path=path, version=version) - if not does_file_exist(gs_path + '_SUCCESS'): - if options['allow_failed']: - logger.warning(f'Loading for failed run {path}: {version}') - else: - raise CommandError(f'Run failed for {path}: {version}, unable to load data') + logger.info(f'Loading new samples from {len(success_runs)} run(s)') + updated_families_by_data_type = defaultdict(set) + updated_variants_by_data_type = defaultdict(list) + errors = [] + for path, run in new_runs.items(): + try: + metadata_path = path.replace('_SUCCESS', 'metadata.json') + data_type, updated_families, updated_variants_by_id = self._load_new_samples(metadata_path, **run) + data_type_key = (data_type, run['genome_version']) + updated_families_by_data_type[data_type_key].update(updated_families) + updated_variants_by_data_type[data_type_key].update(updated_variants_by_id) + except CommandError as e: + errors.append(f'Error loading {run["version"]}: {e}') + + # Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added + reset_cached_search_results(project=None) + + for data_type_key, updated_families in updated_families_by_data_type.items(): + self._reload_shared_variant_annotations( + *data_type_key, updated_variants_by_data_type[data_type_key], exclude_families=updated_families, + ) + + for error in errors: + logger.error(error) - metadata = json.loads(next(line for line in file_iter(gs_path + 'metadata.json'))) + logger.info('DONE') + + @classmethod + def _load_new_samples(cls, metadata_path, genome_version, dataset_type, version): + dataset_type = DATASET_TYPE_MAP.get(dataset_type, dataset_type) + + logger.info(f'Loading new samples from {genome_version}/{dataset_type}: {version}') + + metadata = json.loads(next(line for line in file_iter(metadata_path))) families = Family.objects.filter(guid__in=metadata['family_samples'].keys()) if len(families) < len(metadata['family_samples']): invalid = metadata['family_samples'].keys() - set(families.values_list('guid', flat=True)) - raise CommandError(f'Invalid families in run metadata {path}: {version} - {", ".join(invalid)}') + raise CommandError(f'Invalid families in run metadata {genome_version}/{dataset_type}: {version} - {", ".join(invalid)}') family_project_map = {f.guid: f.project for f in families.select_related('project')} samples_by_project = defaultdict(list) @@ -96,9 +129,6 @@ def handle(self, *args, **options): user=None, ) - # Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added - reset_cached_search_results(project=None) - # Send loading notifications and update Airtable PDOs update_sample_data_by_project = { s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate( @@ -121,7 +151,7 @@ def handle(self, *args, **options): updated_families.update(project_families) updated_project_families.append((project.id, project.name, project.genome_version, project_families)) if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS: - split_project_pdos[project.name] = self._update_pdos(session, project.guid, sample_ids) + split_project_pdos[project.name] = cls._update_pdos(session, project.guid, sample_ids) # Send failure notifications failed_family_samples = metadata.get('failed_family_samples', {}) @@ -149,10 +179,7 @@ def handle(self, *args, **options): updated_variants_by_id = update_projects_saved_variant_json( updated_project_families, user_email=USER_EMAIL, dataset_type=dataset_type) - self._reload_shared_variant_annotations( - search_data_type(dataset_type, sample_type), genome_version, updated_variants_by_id, exclude_families=updated_families) - - logger.info('DONE') + return search_data_type(dataset_type, sample_type), updated_families, updated_variants_by_id @staticmethod def _update_pdos(session, project_guid, sample_ids): @@ -228,7 +255,8 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian for v in variant_models: variants_by_id[v.variant_id].append(v) - logger.info(f'Reloading shared annotations for {len(variant_models)} {data_type} {genome_version} saved variants ({len(variants_by_id)} unique)') + variant_type_summary = f'{data_type} {genome_version} saved variants' + logger.info(f'Reloading shared annotations for {len(variant_models)} {variant_type_summary} ({len(variants_by_id)} unique)') updated_variants_by_id = { variant_id: {k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes', 'genotypeFilters'}} @@ -250,7 +278,7 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian updated_variant_models.append(variant_model) SavedVariant.objects.bulk_update(updated_variant_models, ['saved_variant_json'], batch_size=10000) - logger.info(f'Updated {len(updated_variant_models)} saved variants') + logger.info(f'Updated {len(updated_variant_models)} {variant_type_summary}') reload_shared_variant_annotations = Command._reload_shared_variant_annotations From 5434b5dca473586ec0db113bb53bed5e28ecefaf Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 24 Sep 2024 18:04:23 -0400 Subject: [PATCH 02/12] do not reload old runs --- seqr/management/commands/check_for_new_samples_from_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index c5f1c780e4..4e47261df2 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -54,7 +54,7 @@ def handle(self, *args, **options): user_args = {f'{k}={v}' for k, v in options.items() if v} raise CommandError(f'No successful runs found for {", ".join(user_args)}') - loaded_runs = set(Sample.objects.filter(data_source__isnull=False, is_active=True).values_list('data_source', flat=True)) + loaded_runs = set(Sample.objects.filter(data_source__isnull=False).values_list('data_source', flat=True)) new_runs = {path: run for path, run in success_runs.items() if run['version'] not in loaded_runs} if not new_runs: logger.info(f'Data already loaded for all {len(success_runs)} runs') From 0e94df4b097779cfdf1cea2b54826302b0135157 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Wed, 25 Sep 2024 14:01:38 -0400 Subject: [PATCH 03/12] test failures --- .../check_for_new_samples_from_pipeline.py | 23 +- ...eck_for_new_samples_from_pipeline_tests.py | 252 ++++++++++-------- 2 files changed, 149 insertions(+), 126 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 4e47261df2..66c0e0f523 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -23,8 +23,8 @@ logger = logging.getLogger(__name__) -GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v3.1/{genome_version}/{dataset_type}/runs/{version}/_SUCCESS' -GS_PATH_FIELDS = ['genome_version', 'dataset_type', 'version'] +GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v3.1/{genome_version}/{dataset_type}/runs/{run_version}/_SUCCESS' +GS_PATH_FIELDS = ['genome_version', 'dataset_type', 'run_version'] GS_PATH_REGEX = GS_PATH_TEMPLATE.format(**{field: f'(?P<{field}>[^/]+)' for field in GS_PATH_FIELDS}) DATASET_TYPE_MAP = {'GCNV': Sample.DATASET_TYPE_SV_CALLS} @@ -42,20 +42,19 @@ class Command(BaseCommand): help = 'Check for newly loaded seqr samples' def add_arguments(self, parser): - parser.add_argument('path') parser.add_argument('--genome_version') parser.add_argument('--dataset_type') - parser.add_argument('--version') + parser.add_argument('--run-version') def handle(self, *args, **options): gs_path = GS_PATH_TEMPLATE.format(**{field: options[field] or '*' for field in GS_PATH_FIELDS}) success_runs = {path: re.match(GS_PATH_REGEX, path).groupdict() for path in list_files(gs_path, user=None)} if not success_runs: - user_args = {f'{k}={v}' for k, v in options.items() if v} + user_args = [f'{k}={options[k]}' for k in GS_PATH_FIELDS if options[k]] raise CommandError(f'No successful runs found for {", ".join(user_args)}') loaded_runs = set(Sample.objects.filter(data_source__isnull=False).values_list('data_source', flat=True)) - new_runs = {path: run for path, run in success_runs.items() if run['version'] not in loaded_runs} + new_runs = {path: run for path, run in success_runs.items() if run['run_version'] not in loaded_runs} if not new_runs: logger.info(f'Data already loaded for all {len(success_runs)} runs') return @@ -71,8 +70,8 @@ def handle(self, *args, **options): data_type_key = (data_type, run['genome_version']) updated_families_by_data_type[data_type_key].update(updated_families) updated_variants_by_data_type[data_type_key].update(updated_variants_by_id) - except CommandError as e: - errors.append(f'Error loading {run["version"]}: {e}') + except Exception as e: + errors.append(f'Error loading {run["run_version"]}: {e}') # Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added reset_cached_search_results(project=None) @@ -88,16 +87,16 @@ def handle(self, *args, **options): logger.info('DONE') @classmethod - def _load_new_samples(cls, metadata_path, genome_version, dataset_type, version): + def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_version): dataset_type = DATASET_TYPE_MAP.get(dataset_type, dataset_type) - logger.info(f'Loading new samples from {genome_version}/{dataset_type}: {version}') + logger.info(f'Loading new samples from {genome_version}/{dataset_type}: {run_version}') metadata = json.loads(next(line for line in file_iter(metadata_path))) families = Family.objects.filter(guid__in=metadata['family_samples'].keys()) if len(families) < len(metadata['family_samples']): invalid = metadata['family_samples'].keys() - set(families.values_list('guid', flat=True)) - raise CommandError(f'Invalid families in run metadata {genome_version}/{dataset_type}: {version} - {", ".join(invalid)}') + raise CommandError(f'Invalid families in run metadata {genome_version}/{dataset_type}: {run_version} - {", ".join(invalid)}') family_project_map = {f.guid: f.project for f in families.select_related('project')} samples_by_project = defaultdict(list) @@ -123,7 +122,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, version) updated_samples, inactivated_sample_guids, *args = match_and_update_search_samples( projects=samples_by_project.keys(), sample_project_tuples=sample_project_tuples, - sample_data={'data_source': version, 'elasticsearch_index': ';'.join(metadata['callsets'])}, + sample_data={'data_source': run_version, 'elasticsearch_index': ';'.join(metadata['callsets'])}, sample_type=sample_type, dataset_type=dataset_type, user=None, diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 7a59b27460..bd55fa8b26 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -113,6 +113,56 @@ ] } +RUN_PATHS = [ + b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/_SUCCESS', + b'gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/_SUCCESS', + b'gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/auto__2024-08-12/_SUCCESS', + b'gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/_SUCCESS', +] +METADATA_FILES = [{ + 'callsets': ['1kg.vcf.gz'], + 'sample_type': 'WES', + 'family_samples': { + 'F000011_11': ['NA20885'], + 'F000012_12': ['NA20888', 'NA20889'], + 'F000014_14': ['NA21234'], + }, + 'failed_family_samples': { + 'relatedness_check': { + 'F000001_1': {'reasons': [ + 'Sample NA19679 has expected relation "parent" to NA19675 but has coefficients [0.0, 0.8505002045292791, 0.14949979547072176, 0.5747498977353613]', + 'Sample NA19678 has expected relation "sibling" to NA19675 but has coefficients [0.17424888135104177, 0.6041745754450025, 0.22157654320395614, 0.5236638309264574]', + ]}, + }, + 'sex_check': { + 'F000001_1': {'reasons': ['Sample NA19679 has pedigree sex F but imputed sex M']}, + 'F000014_14': {'reasons': ['Sample NA21987 has pedigree sex M but imputed sex F']}, + }, + 'missing_samples': { + 'F000002_2': {'reasons': ["Missing samples: {'HG00732', 'HG00733'}"]}, + 'F000003_3': {'reasons': ["Missing samples: {'NA20870'}"]}, + }, + } +}, { + 'callsets': ['invalid_family.vcf'], + 'sample_type': 'WGS', + 'family_samples': {'F0000123_ABC': ['NA22882', 'NA20885']}, +}, { + 'callsets': ['invalid_sample.vcf'], + 'sample_type': 'WGS', + 'family_samples': {'F000003_3': ['NA22882', 'NA20885']}, +}, { + 'callsets': ['gcnv.bed.gz'], + 'sample_type': 'WES', + 'family_samples': {'F000004_4': ['NA20872'], 'F000012_12': ['NA20889']}, +}] + + +def mock_metadata_file(index): + m = mock.MagicMock() + m.stdout = [json.dumps(METADATA_FILES[index]).encode()] + return m + @mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST) @mock.patch('seqr.models.random.randint', lambda *args: GUID_ID) @@ -142,30 +192,47 @@ def setUp(self): self.addCleanup(patcher.stop) super().setUp() - def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls, reload_annotations_logs, has_additional_requests=False): - self.mock_subprocess.return_value.stdout = [json.dumps(metadata).encode()] - self.mock_subprocess.return_value.wait.return_value = 0 + def _test_call(self, reload_calls, reload_annotations_logs, has_additional_requests=False, additional_errors=None): + self.mock_subprocess.reset_mock() + mock_ls_process = mock.MagicMock() + mock_ls_process.communicate.return_value = b'\n'.join(RUN_PATHS), b'' + self.mock_subprocess.side_effect = [mock_ls_process] + [mock_metadata_file(i) for i in range(len(RUN_PATHS))] - call_command('check_for_new_samples_from_pipeline', path, 'auto__2023-08-08') + call_command('check_for_new_samples_from_pipeline') - self.mock_subprocess.assert_has_calls([mock.call(command, stdout=-1, stderr=-2, shell=True) for command in [ - f'gsutil ls gs://seqr-hail-search-data/v3.1/{path}/runs/auto__2023-08-08/_SUCCESS', - f'gsutil cat gs://seqr-hail-search-data/v3.1/{path}/runs/auto__2023-08-08/metadata.json', - ]], any_order=True) + self.mock_subprocess.assert_has_calls([mock.call(command, stdout=-1, stderr=stderr, shell=True) for (command, stderr) in [ + ('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/_SUCCESS', -1), + ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2), + ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2), + ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/auto__2024-08-12/metadata.json', -2), + ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2), + ]]) self.mock_logger.info.assert_has_calls([ - mock.call(f'Loading new samples from {path}: auto__2023-08-08'), - mock.call(f'Loading {len(sample_guids)} WES {dataset_type} samples in 2 projects'), + mock.call('Loading new samples from 4 run(s)'), + mock.call('Loading new samples from GRCh38/SNV_INDEL: auto__2023-08-09'), + mock.call('Loading new samples from GRCh37/SNV_INDEL: manual__2023-11-02'), + mock.call('Loading new samples from GRCh37/MITO: auto__2024-08-12'), + mock.call('Loading 2 WGS MITO samples in 1 projects'), + mock.call('Loading new samples from GRCh38/SV: auto__2024-09-14'), ] + [mock.call(log) for log in reload_annotations_logs] + [ mock.call('DONE'), ]) - self.mock_logger.warining.assert_not_called() + self.mock_logger.warning.assert_not_called() + error_logs = [ + mock.call('Error loading manual__2023-11-02: Invalid families in run metadata GRCh37/SNV_INDEL: manual__2023-11-02 - F0000123_ABC'), + mock.call('Error loading auto__2024-08-12: Matches not found for sample ids: NA20885, NA22882'), + ] + if additional_errors: + additional_error_logs = [mock.call(error) for error in additional_errors] + error_logs = [additional_error_logs[0]] + error_logs + additional_error_logs[1:] + self.mock_logger.error.assert_has_calls(error_logs) self.mock_redis.return_value.delete.assert_called_with('search_results__*', 'variant_lookup_results__*') - self.mock_utils_logger.info.assert_has_calls([ - mock.call('Reset 2 cached results'), - mock.call('Reloading saved variants in 2 projects'), - ]) + util_info_logs = [mock.call('Reset 2 cached results')] + if reload_calls: + util_info_logs.append(mock.call('Reloading saved variants in 2 projects')) + self.mock_utils_logger.info.assert_has_calls(util_info_logs) # Test reload saved variants self.assertEqual(len(responses.calls), len(reload_calls) + (9 if has_additional_requests else 0)) @@ -175,19 +242,6 @@ def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls self.assertEqual(resp.request.headers.get('From'), 'manage_command') self.assertDictEqual(json.loads(resp.request.body), call) - # Tests Sample models created/updated - updated_sample_models = Sample.objects.filter(guid__in=sample_guids) - self.assertEqual(len(updated_sample_models), len(sample_guids)) - self.assertSetEqual({'WES'}, set(updated_sample_models.values_list('sample_type', flat=True))) - self.assertSetEqual({dataset_type}, set(updated_sample_models.values_list('dataset_type', flat=True))) - self.assertSetEqual({True}, set(updated_sample_models.values_list('is_active', flat=True))) - self.assertSetEqual({'1kg.vcf.gz'}, set(updated_sample_models.values_list('elasticsearch_index', flat=True))) - self.assertSetEqual({'auto__2023-08-08'}, set(updated_sample_models.values_list('data_source', flat=True))) - self.assertSetEqual( - {datetime.now().strftime('%Y-%m-%d')}, - {date.strftime('%Y-%m-%d') for date in updated_sample_models.values_list('loaded_date', flat=True)} - ) - @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.MAX_LOOKUP_VARIANTS', 1) @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.BASE_URL', 'https://test-seqr.org/') @mock.patch('seqr.views.utils.airtable_utils.MAX_UPDATE_RECORDS', 2) @@ -223,56 +277,18 @@ def test_command(self, mock_email, mock_airtable_utils): }) # Test errors + self.mock_subprocess.return_value.communicate.return_value = b'', b'One or more URLs matched no objects' with self.assertRaises(CommandError) as ce: - call_command('check_for_new_samples_from_pipeline') - self.assertEqual(str(ce.exception), 'Error: the following arguments are required: path, version') - - with self.assertRaises(CommandError) as ce: - call_command('check_for_new_samples_from_pipeline', 'GRCh38/SNV_INDEL', 'auto__2023-08-08') - self.assertEqual(str(ce.exception), 'Run failed for GRCh38/SNV_INDEL: auto__2023-08-08, unable to load data') - - metadata = { - 'callsets': ['1kg.vcf.gz'], - 'sample_type': 'WES', - 'family_samples': { - 'F0000123_ABC': ['NA22882', 'NA20885'], - 'F000012_12': ['NA20888', 'NA20889'], - 'F000014_14': ['NA21234'], - }, - 'failed_family_samples': { - 'relatedness_check': { - 'F000001_1': {'reasons': [ - 'Sample NA19679 has expected relation "parent" to NA19675 but has coefficients [0.0, 0.8505002045292791, 0.14949979547072176, 0.5747498977353613]', - 'Sample NA19678 has expected relation "sibling" to NA19675 but has coefficients [0.17424888135104177, 0.6041745754450025, 0.22157654320395614, 0.5236638309264574]', - ]}, - }, - 'sex_check': { - 'F000001_1': {'reasons': ['Sample NA19679 has pedigree sex F but imputed sex M']}, - 'F000014_14': {'reasons': ['Sample NA21987 has pedigree sex M but imputed sex F']}, - }, - 'missing_samples': { - 'F000002_2': {'reasons': ["Missing samples: {'HG00732', 'HG00733'}"]}, - 'F000003_3': {'reasons': ["Missing samples: {'NA20870'}"]}, - }, - } - } - self.mock_subprocess.return_value.wait.return_value = 1 - self.mock_subprocess.return_value.stdout = [json.dumps(metadata).encode()] - - with self.assertRaises(CommandError) as ce: - call_command('check_for_new_samples_from_pipeline', 'GRCh38/SNV_INDEL', 'auto__2023-08-08', '--allow-failed') - self.assertEqual( - str(ce.exception), 'Invalid families in run metadata GRCh38/SNV_INDEL: auto__2023-08-08 - F0000123_ABC') - self.mock_logger.warning.assert_called_with('Loading for failed run GRCh38/SNV_INDEL: auto__2023-08-08') + call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO') + self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO') + self.mock_subprocess.assert_called_with( + f'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/_SUCCESS', stdout=-1, stderr=-1, shell=True + ) - metadata['family_samples']['F000011_11'] = metadata['family_samples'].pop('F0000123_ABC') - self.mock_subprocess.return_value.stdout = [json.dumps(metadata).encode()] - self.mock_subprocess.return_value.wait.return_value = 0 - with self.assertRaises(CommandError) as ce: - call_command('check_for_new_samples_from_pipeline', 'GRCh38/SNV_INDEL', 'auto__2023-08-08') - self.assertEqual( - str(ce.exception), - 'Data has genome version GRCh38 but the following projects have conflicting versions: R0003_test (GRCh37)') + self._test_call([], [], additional_errors=[ + 'Error loading auto__2023-08-09: Data has genome version GRCh38 but the following projects have conflicting versions: R0003_test (GRCh37)', + 'Error loading auto__2024-09-14: Data has genome version GRCh38 but the following projects have conflicting versions: R0001_1kg (GRCh37), R0003_test (GRCh37)', + ]) # Update fixture data to allow testing edge cases Project.objects.filter(id__in=[1, 3]).update(genome_version=38) @@ -281,21 +297,12 @@ def test_command(self, mock_email, mock_airtable_utils): sv.saved_variant_json['genomeVersion'] = '38' sv.save() - with self.assertRaises(ValueError) as ce: - call_command('check_for_new_samples_from_pipeline', 'GRCh38/SNV_INDEL', 'auto__2023-08-08') - self.assertEqual(str(ce.exception), 'Matches not found for sample ids: NA22882') - - metadata['family_samples']['F000011_11'] = metadata['family_samples']['F000011_11'][1:] - # Test success self.mock_logger.reset_mock() - self.mock_subprocess.reset_mock() search_body = { 'genome_version': 'GRCh38', 'num_results': 1, 'variant_ids': [['1', 248367227, 'TC', 'T']], 'variant_keys': [], } - self._test_success('GRCh38/SNV_INDEL', metadata, dataset_type='SNV_INDEL', sample_guids={ - EXISTING_SAMPLE_GUID, REPLACED_SAMPLE_GUID, NEW_SAMPLE_GUID_P3, NEW_SAMPLE_GUID_P4, - }, has_additional_requests=True, reload_calls=[ + self._test_call(has_additional_requests=True, reload_calls=[ {**search_body, 'sample_data': {'SNV_INDEL': [ {'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}, {'individual_guid': 'I000016_na20888', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20888', 'sample_type': 'WES'}, @@ -307,6 +314,23 @@ def test_command(self, mock_email, mock_airtable_utils): 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 saved variants', ]) + # Tests Sample models created/updated + # TODO test GCNV + sample_guids = { + EXISTING_SAMPLE_GUID, REPLACED_SAMPLE_GUID, NEW_SAMPLE_GUID_P3, NEW_SAMPLE_GUID_P4, + } + dataset_type = 'SNV_INDEL' + updated_sample_models = Sample.objects.filter(guid__in=sample_guids) + self.assertEqual(len(updated_sample_models), len(sample_guids)) + self.assertSetEqual({'WES'}, set(updated_sample_models.values_list('sample_type', flat=True))) + self.assertSetEqual({dataset_type}, set(updated_sample_models.values_list('dataset_type', flat=True))) + self.assertSetEqual({True}, set(updated_sample_models.values_list('is_active', flat=True))) + self.assertSetEqual({'1kg.vcf.gz'}, set(updated_sample_models.values_list('elasticsearch_index', flat=True))) + self.assertSetEqual({'auto__2023-08-08'}, set(updated_sample_models.values_list('data_source', flat=True))) + self.assertSetEqual( + {datetime.now().strftime('%Y-%m-%d')}, + {date.strftime('%Y-%m-%d') for date in updated_sample_models.values_list('loaded_date', flat=True)} + ) old_data_sample_guid = 'S000143_na20885' self.assertFalse(Sample.objects.get(guid=old_data_sample_guid).is_active) @@ -492,31 +516,31 @@ def test_command(self, mock_email, mock_airtable_utils): self.mock_send_slack.assert_not_called() self.assertFalse(Sample.objects.filter(last_modified_date__gt=sample_last_modified).exists()) - @responses.activate - def test_gcnv_command(self): - responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=400) - metadata = { - 'callsets': ['1kg.vcf.gz'], - 'sample_type': 'WES', - 'family_samples': {'F000004_4': ['NA20872'], 'F000012_12': ['NA20889']}, - } - self._test_success('GRCh37/GCNV', metadata, dataset_type='SV', sample_guids={f'S00000{GUID_ID}_na20872', f'S00000{GUID_ID}_na20889'}, reload_calls=[{ - 'genome_version': 'GRCh37', 'num_results': 1, 'variant_ids': [], 'variant_keys': ['prefix_19107_DEL'], - 'sample_data': {'SV_WES': [{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}]}, - }], reload_annotations_logs=['No additional saved variants to update']) - - self.mock_send_slack.assert_has_calls([ - mock.call( - 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/R0001_1kg/project_page\n```NA20872```', - ), mock.call( - 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/{PROJECT_GUID}/project_page\n```NA20889```', - ), - ]) - - self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') - self.mock_utils_logger.info.assert_has_calls([ - mock.call('Reload Summary: '), - mock.call('Skipped the following 1 project with no saved variants: 1kg project nåme with uniçøde'), - mock.call('1 failed projects'), - mock.call(' Test Reprocessed Project: Bad Request'), - ]) + # @responses.activate + # def test_gcnv_command(self): + # responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=400) + # metadata = { + # 'callsets': ['1kg.vcf.gz'], + # 'sample_type': 'WES', + # 'family_samples': {'F000004_4': ['NA20872'], 'F000012_12': ['NA20889']}, + # } + # self._test_call('GRCh37/GCNV', metadata, dataset_type='SV', sample_guids={f'S00000{GUID_ID}_na20872', f'S00000{GUID_ID}_na20889'}, reload_calls=[{ + # 'genome_version': 'GRCh37', 'num_results': 1, 'variant_ids': [], 'variant_keys': ['prefix_19107_DEL'], + # 'sample_data': {'SV_WES': [{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}]}, + # }], reload_annotations_logs=['No additional saved variants to update']) + # + # self.mock_send_slack.assert_has_calls([ + # mock.call( + # 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/R0001_1kg/project_page\n```NA20872```', + # ), mock.call( + # 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/{PROJECT_GUID}/project_page\n```NA20889```', + # ), + # ]) + # + # self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') + # self.mock_utils_logger.info.assert_has_calls([ + # mock.call('Reload Summary: '), + # mock.call('Skipped the following 1 project with no saved variants: 1kg project nåme with uniçøde'), + # mock.call('1 failed projects'), + # mock.call(' Test Reprocessed Project: Bad Request'), + # ]) From e2b1f6bd91f1fcaa21a67f4555b2304b08e499f5 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Thu, 26 Sep 2024 12:56:28 -0400 Subject: [PATCH 04/12] fix test --- .../check_for_new_samples_from_pipeline.py | 12 +- ...eck_for_new_samples_from_pipeline_tests.py | 129 ++++++++++-------- seqr/utils/search/hail_search_utils.py | 2 +- 3 files changed, 80 insertions(+), 63 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 66c0e0f523..fa935b69fe 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -61,8 +61,7 @@ def handle(self, *args, **options): logger.info(f'Loading new samples from {len(success_runs)} run(s)') updated_families_by_data_type = defaultdict(set) - updated_variants_by_data_type = defaultdict(list) - errors = [] + updated_variants_by_data_type = defaultdict(dict) for path, run in new_runs.items(): try: metadata_path = path.replace('_SUCCESS', 'metadata.json') @@ -71,7 +70,7 @@ def handle(self, *args, **options): updated_families_by_data_type[data_type_key].update(updated_families) updated_variants_by_data_type[data_type_key].update(updated_variants_by_id) except Exception as e: - errors.append(f'Error loading {run["run_version"]}: {e}') + logger.error(f'Error loading {run["run_version"]}: {e}') # Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added reset_cached_search_results(project=None) @@ -81,9 +80,6 @@ def handle(self, *args, **options): *data_type_key, updated_variants_by_data_type[data_type_key], exclude_families=updated_families, ) - for error in errors: - logger.error(error) - logger.info('DONE') @classmethod @@ -246,15 +242,15 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian family_guids=updated_annotation_samples.values_list('individual__family__guid', flat=True).distinct(), ) + variant_type_summary = f'{data_type} {genome_version} saved variants' if not variant_models: - logger.info('No additional saved variants to update') + logger.info(f'No additional {variant_type_summary} to update') return variants_by_id = defaultdict(list) for v in variant_models: variants_by_id[v.variant_id].append(v) - variant_type_summary = f'{data_type} {genome_version} saved variants' logger.info(f'Reloading shared annotations for {len(variant_models)} {variant_type_summary} ({len(variants_by_id)} unique)') updated_variants_by_id = { diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index bd55fa8b26..9b939be6c3 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -14,12 +14,17 @@ MOCK_HAIL_HOST = 'http://test-hail-host' GUID_ID = 54321 +GCNV_GUID_ID = 12345 NEW_SAMPLE_GUID_P3 = f'S00000{GUID_ID}_na20888' NEW_SAMPLE_GUID_P4 = f'S00000{GUID_ID}_na21234' REPLACED_SAMPLE_GUID = f'S00000{GUID_ID}_na20885' -EXISTING_SAMPLE_GUID = 'S000154_na20889' +EXISTING_INACTIVE_SAMPLE_GUID = 'S000154_na20889' +ACTIVE_SAMPLE_GUID = f'S00000{GUID_ID}_na20889' EXISTING_WGS_SAMPLE_GUID = 'S000144_na20888' EXISTING_SV_SAMPLE_GUID = 'S000147_na21234' +SAMPLE_GUIDS = [ACTIVE_SAMPLE_GUID, REPLACED_SAMPLE_GUID, NEW_SAMPLE_GUID_P3, NEW_SAMPLE_GUID_P4] +GCNV_SAMPLE_GUID = f'S00000{GCNV_GUID_ID}_na20889' +GCNV_SAMPLE_GUIDS = [f'S00000{GCNV_GUID_ID}_na20872', GCNV_SAMPLE_GUID] namespace_path = 'ext-data/anvil-non-analyst-project 1000 Genomes Demo' anvil_link = f'{namespace_path}' @@ -116,11 +121,11 @@ RUN_PATHS = [ b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/_SUCCESS', b'gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/_SUCCESS', - b'gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/auto__2024-08-12/_SUCCESS', + b'gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/_SUCCESS', b'gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/_SUCCESS', ] METADATA_FILES = [{ - 'callsets': ['1kg.vcf.gz'], + 'callsets': ['1kg.vcf.gz', 'new_samples.vcf.gz'], 'sample_type': 'WES', 'family_samples': { 'F000011_11': ['NA20885'], @@ -165,7 +170,6 @@ def mock_metadata_file(index): @mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST) -@mock.patch('seqr.models.random.randint', lambda *args: GUID_ID) @mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_URL', 'http://testairtable') @mock.patch('seqr.utils.search.add_data_utils.BASE_URL', SEQR_URL) @mock.patch('seqr.utils.search.add_data_utils.SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL', 'anvil-data-loading') @@ -190,13 +194,17 @@ def setUp(self): self.mock_redis = patcher.start() self.mock_redis.return_value.keys.side_effect = lambda pattern: [pattern] self.addCleanup(patcher.stop) + patcher = mock.patch('seqr.models.random.randint') + mock_rand_int = patcher.start() + mock_rand_int.side_effect = [GUID_ID, GUID_ID, GUID_ID, GUID_ID, GCNV_GUID_ID, GCNV_GUID_ID] + self.addCleanup(patcher.stop) + self.mock_ls_process = mock.MagicMock() + self.mock_ls_process.communicate.return_value = b'\n'.join(RUN_PATHS), b'' super().setUp() - def _test_call(self, reload_calls, reload_annotations_logs, has_additional_requests=False, additional_errors=None): + def _test_call(self, error_logs, reload_annotations_logs=None, run_loading_logs=None, reload_calls=None): self.mock_subprocess.reset_mock() - mock_ls_process = mock.MagicMock() - mock_ls_process.communicate.return_value = b'\n'.join(RUN_PATHS), b'' - self.mock_subprocess.side_effect = [mock_ls_process] + [mock_metadata_file(i) for i in range(len(RUN_PATHS))] + self.mock_subprocess.side_effect = [self.mock_ls_process] + [mock_metadata_file(i) for i in range(len(RUN_PATHS))] call_command('check_for_new_samples_from_pipeline') @@ -204,29 +212,25 @@ def _test_call(self, reload_calls, reload_annotations_logs, has_additional_reque ('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/_SUCCESS', -1), ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2), ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2), - ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/auto__2024-08-12/metadata.json', -2), + ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2), ('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2), ]]) + loading_logs = [] + for data_type, version in [ + ('GRCh38/SNV_INDEL', 'auto__2023-08-09'), ('GRCh37/SNV_INDEL', 'manual__2023-11-02'), + ('GRCh38/MITO', 'auto__2024-08-12'), ('GRCh38/SV', 'auto__2024-09-14'), + ]: + loading_logs.append(mock.call(f'Loading new samples from {data_type}: {version}')) + if (run_loading_logs or {}).get(data_type): + loading_logs.append(mock.call(run_loading_logs[data_type])) self.mock_logger.info.assert_has_calls([ mock.call('Loading new samples from 4 run(s)'), - mock.call('Loading new samples from GRCh38/SNV_INDEL: auto__2023-08-09'), - mock.call('Loading new samples from GRCh37/SNV_INDEL: manual__2023-11-02'), - mock.call('Loading new samples from GRCh37/MITO: auto__2024-08-12'), - mock.call('Loading 2 WGS MITO samples in 1 projects'), - mock.call('Loading new samples from GRCh38/SV: auto__2024-09-14'), - ] + [mock.call(log) for log in reload_annotations_logs] + [ + ] + loading_logs + [mock.call(log) for log in reload_annotations_logs or []] + [ mock.call('DONE'), ]) self.mock_logger.warning.assert_not_called() - error_logs = [ - mock.call('Error loading manual__2023-11-02: Invalid families in run metadata GRCh37/SNV_INDEL: manual__2023-11-02 - F0000123_ABC'), - mock.call('Error loading auto__2024-08-12: Matches not found for sample ids: NA20885, NA22882'), - ] - if additional_errors: - additional_error_logs = [mock.call(error) for error in additional_errors] - error_logs = [additional_error_logs[0]] + error_logs + additional_error_logs[1:] - self.mock_logger.error.assert_has_calls(error_logs) + self.mock_logger.error.assert_has_calls([mock.call(error) for error in error_logs]) self.mock_redis.return_value.delete.assert_called_with('search_results__*', 'variant_lookup_results__*') util_info_logs = [mock.call('Reset 2 cached results')] @@ -235,9 +239,9 @@ def _test_call(self, reload_calls, reload_annotations_logs, has_additional_reque self.mock_utils_logger.info.assert_has_calls(util_info_logs) # Test reload saved variants - self.assertEqual(len(responses.calls), len(reload_calls) + (9 if has_additional_requests else 0)) - for i, call in enumerate(reload_calls): - resp = responses.calls[i+(7 if has_additional_requests else 0)] + self.assertEqual(len(responses.calls), len(reload_calls) + 9 if reload_calls else 0) + for i, call in enumerate(reload_calls or []): + resp = responses.calls[i+7] self.assertEqual(resp.request.url, f'{MOCK_HAIL_HOST}:5000/search') self.assertEqual(resp.request.headers.get('From'), 'manage_command') self.assertDictEqual(json.loads(resp.request.body), call) @@ -285,10 +289,13 @@ def test_command(self, mock_email, mock_airtable_utils): f'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/_SUCCESS', stdout=-1, stderr=-1, shell=True ) - self._test_call([], [], additional_errors=[ + self._test_call(error_logs=[ 'Error loading auto__2023-08-09: Data has genome version GRCh38 but the following projects have conflicting versions: R0003_test (GRCh37)', + 'Error loading manual__2023-11-02: Invalid families in run metadata GRCh37/SNV_INDEL: manual__2023-11-02 - F0000123_ABC', + 'Error loading auto__2024-08-12: Data has genome version GRCh38 but the following projects have conflicting versions: R0001_1kg (GRCh37)', 'Error loading auto__2024-09-14: Data has genome version GRCh38 but the following projects have conflicting versions: R0001_1kg (GRCh37), R0003_test (GRCh37)', ]) + self.assertEqual(Sample.objects.filter(guid__in=SAMPLE_GUIDS + GCNV_SAMPLE_GUIDS).count(), 0) # Update fixture data to allow testing edge cases Project.objects.filter(id__in=[1, 3]).update(genome_version=38) @@ -302,35 +309,46 @@ def test_command(self, mock_email, mock_airtable_utils): search_body = { 'genome_version': 'GRCh38', 'num_results': 1, 'variant_ids': [['1', 248367227, 'TC', 'T']], 'variant_keys': [], } - self._test_call(has_additional_requests=True, reload_calls=[ + self._test_call(reload_calls=[ {**search_body, 'sample_data': {'SNV_INDEL': [ - {'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}, {'individual_guid': 'I000016_na20888', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20888', 'sample_type': 'WES'}, + {'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}, ]}}, {**search_body, 'sample_data': {'SNV_INDEL': [ {'individual_guid': 'I000018_na21234', 'family_guid': 'F000014_14', 'project_guid': 'R0004_non_analyst_project', 'affected': 'A', 'sample_id': 'NA21234', 'sample_type': 'WES'}, ]}}, ], reload_annotations_logs=[ - 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 saved variants', + 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 SNV_INDEL GRCh38 saved variants', + 'No additional SV_WES GRCh38 saved variants to update', + ], run_loading_logs={ + 'GRCh38/SNV_INDEL': 'Loading 4 WES SNV_INDEL samples in 2 projects', + 'GRCh38/MITO': 'Loading 2 WGS MITO samples in 1 projects', + 'GRCh38/SV': 'Loading 2 WES SV samples in 2 projects', + }, error_logs=[ + 'Error loading manual__2023-11-02: Invalid families in run metadata GRCh37/SNV_INDEL: manual__2023-11-02 - F0000123_ABC', + 'Error loading auto__2024-08-12: Matches not found for sample ids: NA20885, NA22882', ]) # Tests Sample models created/updated - # TODO test GCNV - sample_guids = { - EXISTING_SAMPLE_GUID, REPLACED_SAMPLE_GUID, NEW_SAMPLE_GUID_P3, NEW_SAMPLE_GUID_P4, - } - dataset_type = 'SNV_INDEL' - updated_sample_models = Sample.objects.filter(guid__in=sample_guids) - self.assertEqual(len(updated_sample_models), len(sample_guids)) + updated_sample_models = Sample.objects.filter(guid__in=SAMPLE_GUIDS+GCNV_SAMPLE_GUIDS) + self.assertEqual(len(updated_sample_models), len(SAMPLE_GUIDS+GCNV_SAMPLE_GUIDS)) self.assertSetEqual({'WES'}, set(updated_sample_models.values_list('sample_type', flat=True))) - self.assertSetEqual({dataset_type}, set(updated_sample_models.values_list('dataset_type', flat=True))) self.assertSetEqual({True}, set(updated_sample_models.values_list('is_active', flat=True))) - self.assertSetEqual({'1kg.vcf.gz'}, set(updated_sample_models.values_list('elasticsearch_index', flat=True))) - self.assertSetEqual({'auto__2023-08-08'}, set(updated_sample_models.values_list('data_source', flat=True))) self.assertSetEqual( {datetime.now().strftime('%Y-%m-%d')}, {date.strftime('%Y-%m-%d') for date in updated_sample_models.values_list('loaded_date', flat=True)} ) + snv_indel_samples = updated_sample_models.filter(guid__in=SAMPLE_GUIDS) + self.assertEqual(len(snv_indel_samples), len(SAMPLE_GUIDS)) + self.assertSetEqual({'SNV_INDEL'}, set(snv_indel_samples.values_list('dataset_type', flat=True))) + self.assertSetEqual({'1kg.vcf.gz;new_samples.vcf.gz'}, set(snv_indel_samples.values_list('elasticsearch_index', flat=True))) + self.assertSetEqual({'auto__2023-08-09'}, set(snv_indel_samples.values_list('data_source', flat=True))) + gcnv_samples = updated_sample_models.filter(guid__in=GCNV_SAMPLE_GUIDS) + self.assertEqual(len(gcnv_samples), len(GCNV_SAMPLE_GUIDS)) + self.assertSetEqual({'SV'}, set(gcnv_samples.values_list('dataset_type', flat=True))) + self.assertSetEqual({'gcnv.bed.gz'}, set(gcnv_samples.values_list('elasticsearch_index', flat=True))) + self.assertSetEqual({'auto__2024-09-14'}, set(gcnv_samples.values_list('data_source', flat=True))) + old_data_sample_guid = 'S000143_na20885' self.assertFalse(Sample.objects.get(guid=old_data_sample_guid).is_active) @@ -354,7 +372,7 @@ def test_command(self, mock_email, mock_airtable_utils): ) self.assertSetEqual( set(Individual.objects.get(guid='I000017_na20889').sample_set.values_list('guid', flat=True)), - {EXISTING_SAMPLE_GUID} + {EXISTING_INACTIVE_SAMPLE_GUID, ACTIVE_SAMPLE_GUID, GCNV_SAMPLE_GUID} ) self.assertSetEqual( set(Individual.objects.get(guid='I000018_na21234').sample_set.values_list('guid', flat=True)), @@ -441,7 +459,7 @@ def test_command(self, mock_email, mock_airtable_utils): ]) # Test notifications - self.assertEqual(self.mock_send_slack.call_count, 6) + self.assertEqual(self.mock_send_slack.call_count, 8) self.mock_send_slack.assert_has_calls([ mock.call( 'seqr-data-loading', @@ -472,9 +490,16 @@ def test_command(self, mock_email, mock_airtable_utils): - 2: Missing samples: {'HG00732', 'HG00733'} - 3: Missing samples: {'NA20870'}""", ), + mock.call( + 'seqr-data-loading', + f'1 new WES SV samples are loaded in {SEQR_URL}project/R0001_1kg/project_page\n```NA20872```', + ), mock.call( + 'seqr-data-loading', + f'1 new WES SV samples are loaded in {SEQR_URL}project/{PROJECT_GUID}/project_page\n```NA20889```', + ), ]) - self.assertEqual(mock_email.call_count, 2) + self.assertEqual(mock_email.call_count, 4) mock_email.assert_has_calls([ mock.call(body=INTERNAL_TEXT_EMAIL, subject='New data available in seqr', to=['test_user_manager@test.com']), mock.call().attach_alternative(INTERNAL_HTML_EMAIL, 'text/html'), @@ -496,22 +521,24 @@ def test_command(self, mock_email, mock_airtable_utils): 'and_filters': {'AnVIL Project URL': 'https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page'}, 'update': {'Status': 'Available in Seqr'}})]) - self.assertEqual(self.manager_user.notifications.count(), 3) + self.assertEqual(self.manager_user.notifications.count(), 5) self.assertEqual( - str(self.manager_user.notifications.first()), 'Test Reprocessed Project Loaded 2 new WES samples 0 minutes ago') + str(self.manager_user.notifications.first()), 'Test Reprocessed Project Loaded 1 new WES SV samples 0 minutes ago') self.assertEqual(self.collaborator_user.notifications.count(), 2) self.assertEqual( str(self.collaborator_user.notifications.first()), 'Non-Analyst Project Loaded 1 new WES samples 0 minutes ago') # Test reloading has no effect + self.mock_ls_process.communicate.return_value = b'\n'.join([RUN_PATHS[0], RUN_PATHS[3]]), b'' + self.mock_subprocess.side_effect = [self.mock_ls_process] self.mock_logger.reset_mock() mock_email.reset_mock() self.mock_send_slack.reset_mock() sample_last_modified = Sample.objects.filter( last_modified_date__isnull=False).values_list('last_modified_date', flat=True).order_by('-last_modified_date')[0] - call_command('check_for_new_samples_from_pipeline', 'GRCh38/SNV_INDEL', 'auto__2023-08-08') - self.mock_logger.info.assert_called_with(f'Data already loaded for GRCh38/SNV_INDEL: auto__2023-08-08') + call_command('check_for_new_samples_from_pipeline') + self.mock_logger.info.assert_called_with('Data already loaded for all 2 runs') mock_email.assert_not_called() self.mock_send_slack.assert_not_called() self.assertFalse(Sample.objects.filter(last_modified_date__gt=sample_last_modified).exists()) @@ -529,13 +556,7 @@ def test_command(self, mock_email, mock_airtable_utils): # 'sample_data': {'SV_WES': [{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}]}, # }], reload_annotations_logs=['No additional saved variants to update']) # - # self.mock_send_slack.assert_has_calls([ - # mock.call( - # 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/R0001_1kg/project_page\n```NA20872```', - # ), mock.call( - # 'seqr-data-loading', f'1 new WES SV samples are loaded in {SEQR_URL}project/{PROJECT_GUID}/project_page\n```NA20889```', - # ), - # ]) + # # self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') # self.mock_utils_logger.info.assert_has_calls([ diff --git a/seqr/utils/search/hail_search_utils.py b/seqr/utils/search/hail_search_utils.py index 774e21ee9b..02c33274db 100644 --- a/seqr/utils/search/hail_search_utils.py +++ b/seqr/utils/search/hail_search_utils.py @@ -143,7 +143,7 @@ def _get_sample_data(samples, inheritance_filter=None, inheritance_mode=None, ** ) if inheritance_mode == X_LINKED_RECESSIVE: sample_values['sex'] = F('individual__sex') - sample_data = samples.order_by('id').values('individual__individual_id', 'dataset_type', 'sample_type', **sample_values) + sample_data = samples.order_by('guid').values('individual__individual_id', 'dataset_type', 'sample_type', **sample_values) custom_affected = (inheritance_filter or {}).pop('affected', None) if custom_affected: From 0f16edd341c4b7979c1671317800d744133a79bb Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Thu, 26 Sep 2024 14:52:31 -0400 Subject: [PATCH 05/12] clean up gcnv tests --- ...eck_for_new_samples_from_pipeline_tests.py | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 9b939be6c3..8480fd6486 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -279,6 +279,11 @@ def test_command(self, mock_email, mock_airtable_utils): responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/multi_lookup', status=200, json={ 'results': [{'variantId': '1-46859832-G-A', 'updated_new_field': 'updated_value', 'rsid': 'rs123'}], }) + responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=200, json={ + 'results': [{'variantId': '1-248367227-TC-T', 'familyGuids': ['F000014_14'], 'updated_field': 'updated_value'}], + 'total': 1, + }) + responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=400) # Test errors self.mock_subprocess.return_value.communicate.return_value = b'', b'One or more URLs matched no objects' @@ -299,7 +304,7 @@ def test_command(self, mock_email, mock_airtable_utils): # Update fixture data to allow testing edge cases Project.objects.filter(id__in=[1, 3]).update(genome_version=38) - svs = SavedVariant.objects.filter(guid__in=['SV0000002_1248367227_r0390_100', 'SV0000006_1248367227_r0003_tes']) + svs = SavedVariant.objects.filter(guid__in=['SV0000002_1248367227_r0390_100', 'SV0000006_1248367227_r0003_tes', 'SV0000007_prefix_19107_DEL_r00']) for sv in svs: sv.saved_variant_json['genomeVersion'] = '38' sv.save() @@ -317,6 +322,9 @@ def test_command(self, mock_email, mock_airtable_utils): {**search_body, 'sample_data': {'SNV_INDEL': [ {'individual_guid': 'I000018_na21234', 'family_guid': 'F000014_14', 'project_guid': 'R0004_non_analyst_project', 'affected': 'A', 'sample_id': 'NA21234', 'sample_type': 'WES'}, ]}}, + {'genome_version': 'GRCh38', 'num_results': 1, 'variant_ids': [], 'variant_keys': ['prefix_19107_DEL'], 'sample_data': {'SV_WES': [ + {'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}, + ]}}, ], reload_annotations_logs=[ 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 SNV_INDEL GRCh38 saved variants', 'No additional SV_WES GRCh38 saved variants to update', @@ -421,7 +429,7 @@ def test_command(self, mock_email, mock_airtable_utils): # Test SavedVariant model updated for i, variant_id in enumerate([['1', 1562437, 'G', 'CA'], ['1', 46859832, 'G', 'A']]): - multi_lookup_request = responses.calls[9+i].request + multi_lookup_request = responses.calls[10+i].request self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_HOST}:5000/multi_lookup') self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command') self.assertDictEqual(json.loads(multi_lookup_request.body), { @@ -450,12 +458,19 @@ def test_command(self, mock_email, mock_airtable_utils): self.assertEqual(annotation_updated_json['mainTranscriptId'], 'ENST00000505820') self.assertEqual(len(annotation_updated_json['genotypes']), 3) - self.mock_utils_logger.error.assert_not_called() + self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ + mock.call('Reset 2 cached results'), + mock.call('Reloading saved variants in 2 projects'), mock.call('Updated 0 variants for project Test Reprocessed Project'), mock.call('Updated 1 variants for project Non-Analyst Project'), mock.call('Reload Summary: '), mock.call(' Non-Analyst Project: Updated 1 variants'), + mock.call('Reloading saved variants in 2 projects'), + mock.call('Reload Summary: '), + mock.call('Skipped the following 1 project with no saved variants: 1kg project nåme with uniçøde'), + mock.call('1 failed projects'), + mock.call(' Test Reprocessed Project: Bad Request'), ]) # Test notifications @@ -542,26 +557,3 @@ def test_command(self, mock_email, mock_airtable_utils): mock_email.assert_not_called() self.mock_send_slack.assert_not_called() self.assertFalse(Sample.objects.filter(last_modified_date__gt=sample_last_modified).exists()) - - # @responses.activate - # def test_gcnv_command(self): - # responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=400) - # metadata = { - # 'callsets': ['1kg.vcf.gz'], - # 'sample_type': 'WES', - # 'family_samples': {'F000004_4': ['NA20872'], 'F000012_12': ['NA20889']}, - # } - # self._test_call('GRCh37/GCNV', metadata, dataset_type='SV', sample_guids={f'S00000{GUID_ID}_na20872', f'S00000{GUID_ID}_na20889'}, reload_calls=[{ - # 'genome_version': 'GRCh37', 'num_results': 1, 'variant_ids': [], 'variant_keys': ['prefix_19107_DEL'], - # 'sample_data': {'SV_WES': [{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}]}, - # }], reload_annotations_logs=['No additional saved variants to update']) - # - - # - # self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') - # self.mock_utils_logger.info.assert_has_calls([ - # mock.call('Reload Summary: '), - # mock.call('Skipped the following 1 project with no saved variants: 1kg project nåme with uniçøde'), - # mock.call('1 failed projects'), - # mock.call(' Test Reprocessed Project: Bad Request'), - # ]) From 4e130f4719e5c3bcf1cf167a513cc47a45f0d890 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Thu, 26 Sep 2024 15:05:56 -0400 Subject: [PATCH 06/12] fix unit test --- .../tests/check_for_new_samples_from_pipeline_tests.py | 2 -- seqr/management/tests/reload_saved_variant_annotations_tests.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 8480fd6486..ac64e98af5 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -460,8 +460,6 @@ def test_command(self, mock_email, mock_airtable_utils): self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ - mock.call('Reset 2 cached results'), - mock.call('Reloading saved variants in 2 projects'), mock.call('Updated 0 variants for project Test Reprocessed Project'), mock.call('Updated 1 variants for project Non-Analyst Project'), mock.call('Reload Summary: '), diff --git a/seqr/management/tests/reload_saved_variant_annotations_tests.py b/seqr/management/tests/reload_saved_variant_annotations_tests.py index 81cdb7ae5c..e61814ba9b 100644 --- a/seqr/management/tests/reload_saved_variant_annotations_tests.py +++ b/seqr/management/tests/reload_saved_variant_annotations_tests.py @@ -39,7 +39,7 @@ def test_command(self, mock_logger): mock_logger.info.assert_has_calls([mock.call(log) for log in [ 'Reloading shared annotations for 3 SNV_INDEL GRCh37 saved variants (3 unique)', 'Fetched 2 additional variants', - 'Updated 2 saved variants', + 'Updated 2 SNV_INDEL GRCh37 saved variants', ]]) self.assertEqual(len(responses.calls), 1) From e596902efd949e3795a161ff414857274779f45f Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Mon, 30 Sep 2024 14:50:06 -0400 Subject: [PATCH 07/12] correctly filter for samples in multiple pdos --- .../check_for_new_samples_from_pipeline.py | 11 +++--- seqr/views/utils/airtable_utils.py | 36 ++++++++++++++++--- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index fa935b69fe..10fb38aa6e 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -178,18 +178,15 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers @staticmethod def _update_pdos(session, project_guid, sample_ids): - airtable_samples = session.fetch_records( - 'Samples', fields=['CollaboratorSampleID', 'SeqrCollaboratorSampleID', 'PDOID'], - or_filters={'PDOStatus': LOADABLE_PDO_STATUSES}, - and_filters={'SeqrProject': f'{BASE_URL}project/{project_guid}/project_page'} + airtable_samples = session.get_samples_for_matched_pdos( + LOADABLE_PDO_STATUSES, pdo_fields=['PDOID'], project_guid=project_guid, ) pdo_ids = set() skipped_pdo_samples = defaultdict(list) for record_id, sample in airtable_samples.items(): - pdo_id = sample['PDOID'][0] - sample_id = sample.get('SeqrCollaboratorSampleID') or sample['CollaboratorSampleID'] - if sample_id in sample_ids: + pdo_id = sample['pdos'][0]['PDOID'] + if sample['sample_id'] in sample_ids: pdo_ids.add(pdo_id) else: skipped_pdo_samples[pdo_id].append(record_id) diff --git a/seqr/views/utils/airtable_utils.py b/seqr/views/utils/airtable_utils.py index 5ee56a3002..ee03afe233 100644 --- a/seqr/views/utils/airtable_utils.py +++ b/seqr/views/utils/airtable_utils.py @@ -1,3 +1,4 @@ +import re import requests from collections import defaultdict from django.core.exceptions import PermissionDenied @@ -5,7 +6,7 @@ from seqr.utils.logging_utils import SeqrLogger from seqr.views.utils.terra_api_utils import is_cloud_authenticated -from settings import AIRTABLE_API_KEY, AIRTABLE_URL +from settings import AIRTABLE_API_KEY, AIRTABLE_URL, BASE_URL logger = SeqrLogger(__name__) @@ -102,12 +103,14 @@ def _safe_bulk_update_records(self, update_type, record_type, records, error_det return updated_records - def fetch_records(self, record_type, fields, or_filters, and_filters=None, page_size=PAGE_SIZE): + def fetch_records(self, record_type, fields, or_filters, and_filters=None, page_size=PAGE_SIZE, filter_query_template="{key}='{value}'"): self._session.params.update({'fields[]': fields, 'pageSize': page_size}) filter_formulas = [] for key, values in or_filters.items(): - filter_formulas += [f"{key}='{value}'" for value in sorted(values)] - and_filter_formulas = ','.join([f"{{{key}}}='{value}'" for key, value in (and_filters or {}).items()]) + filter_formulas += [filter_query_template.format(key=key, value=value) for value in sorted(values)] + and_filter_formulas = ','.join([ + filter_query_template.format(key=f'{{{key}}}', value=value) for key, value in (and_filters or {}).items() + ]) records = {} for i in range(0, len(filter_formulas), MAX_OR_FILTERS): filter_formula_group = filter_formulas[i:i + MAX_OR_FILTERS] @@ -149,3 +152,28 @@ def get_samples_for_sample_ids(self, sample_ids, fields): if missing: records_by_id.update(self._get_samples_for_id_field(missing, 'SeqrCollaboratorSampleID', fields)) return records_by_id + + def get_samples_for_matched_pdos(self, pdo_statuses, pdo_fields, project_guid=None): + sample_records = self.fetch_records( + 'Samples', fields=[ + 'CollaboratorSampleID', 'SeqrCollaboratorSampleID', 'PDOStatus', 'SeqrProject', *pdo_fields, + ], + or_filters={'PDOStatus': pdo_statuses}, + and_filters={'SeqrProject': f'{BASE_URL}project/{project_guid}/project_page'}, + # Filter for array contains value instead of exact match + filter_query_template="SEARCH('{value}', ARRAYJOIN({key}, ';'))", + ) + + for sample in sample_records.values(): + pdos = [{ + 'project_guid': re.match(f'{BASE_URL}project/([^/]+)/project_page', sample['SeqrProject'][i]).group(1), + **{field: sample[field][i] for field in pdo_fields} + } for i, status in enumerate(sample['PDOStatus']) if status in pdo_statuses] + if project_guid: + pdos = [pdo for pdo in pdos if pdo['project_guid'] == project_guid] + sample.update({ + 'pdos': pdos, + 'sample_id': sample.get('SeqrCollaboratorSampleID') or sample['CollaboratorSampleID'], + }) + + return {record_id: sample for record_id, sample in sample_records.items() if sample['pdos']} \ No newline at end of file From 8c185ee1d7725a4e031f1cd576b4366ab236e7d2 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Mon, 30 Sep 2024 15:35:58 -0400 Subject: [PATCH 08/12] update tests --- .../check_for_new_samples_from_pipeline.py | 2 +- ...eck_for_new_samples_from_pipeline_tests.py | 27 ++++++++++++++++--- seqr/views/utils/airtable_utils.py | 2 +- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 10fb38aa6e..3e578a684f 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -19,7 +19,7 @@ from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \ get_saved_variants -from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL +from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL logger = logging.getLogger(__name__) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index ac64e98af5..ac4dbbf931 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -63,7 +63,9 @@ 'id': 'rec2B6OGmQpAkQW3s', 'fields': { 'CollaboratorSampleID': 'NA19675_1', - 'PDOID': ['recW24C2CJW5lT64K'], + 'PDOID': ['rec2B67GmXpAkQW8z', 'recW24C2CJW5lT64K'], + 'SeqrProject': ['https://test-seqr.org/project/R0002_empty/project_page', 'https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Historic', 'Methods (Loading)'], }, }, { @@ -71,6 +73,8 @@ 'fields': { 'CollaboratorSampleID': 'NA19678', 'PDOID': ['recW24C2CJW5lT64K'], + 'SeqrProject': ['https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)'], }, }, { @@ -78,6 +82,8 @@ 'fields': { 'CollaboratorSampleID': 'NA19679', 'PDOID': ['rec2Nkg10N1KssPc3'], + 'SeqrProject': ['https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)'], }, }, { @@ -86,6 +92,8 @@ 'SeqrCollaboratorSampleID': 'HG00731', 'CollaboratorSampleID': 'VCGS_FAM203_621_D2', 'PDOID': ['recW24C2CJW5lT64K'], + 'SeqrProject': ['https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)'], }, }, { @@ -94,6 +102,8 @@ 'SeqrCollaboratorSampleID': 'NA20888', 'CollaboratorSampleID': 'NA20888_D1', 'PDOID': ['recW24C2CJW5lT64K'], + 'SeqrProject': ['https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)'], }, }, { @@ -101,6 +111,17 @@ 'fields': { 'CollaboratorSampleID': 'NA20889', 'PDOID': ['rec0RWBVfDVbtlBSL'], + 'SeqrProject': ['https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)'], + }, + }, + { + 'id': 'rec2gRFoDBeHJc7', + 'fields': { + 'CollaboratorSampleID': 'NA20887', + 'PDOID': ['rec0RWBVfDVbtlBSL', 'rec2Nkg1fKgsJc7'], + 'SeqrProject': ['https://test-seqr.org/project/R0002_empty/project_page', 'https://test-seqr.org/project/R0003_test/project_page'], + 'PDOStatus': ['Methods (Loading)', 'Historic'], }, }, ]} @@ -247,7 +268,7 @@ def _test_call(self, error_logs, reload_annotations_logs=None, run_loading_logs= self.assertDictEqual(json.loads(resp.request.body), call) @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.MAX_LOOKUP_VARIANTS', 1) - @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.BASE_URL', 'https://test-seqr.org/') + @mock.patch('seqr.views.utils.airtable_utils.BASE_URL', 'https://test-seqr.org/') @mock.patch('seqr.views.utils.airtable_utils.MAX_UPDATE_RECORDS', 2) @mock.patch('seqr.views.utils.airtable_utils.logger') @mock.patch('seqr.utils.communication_utils.EmailMultiAlternatives') @@ -261,7 +282,7 @@ def test_command(self, mock_email, mock_airtable_utils): airtable_pdo_url = 'http://testairtable/app3Y97xtbbaOopVR/PDO' responses.add( responses.GET, - f"{airtable_samples_url}?fields[]=CollaboratorSampleID&fields[]=SeqrCollaboratorSampleID&fields[]=PDOID&pageSize=100&filterByFormula=AND({{SeqrProject}}='https://test-seqr.org/project/R0003_test/project_page',OR(PDOStatus='Methods (Loading)',PDOStatus='On hold for phenotips, but ready to load'))", + f"{airtable_samples_url}?fields[]=CollaboratorSampleID&fields[]=SeqrCollaboratorSampleID&fields[]=PDOStatus&fields[]=SeqrProject&fields[]=PDOID&pageSize=100&filterByFormula=AND(SEARCH('https://test-seqr.org/project/R0003_test/project_page',ARRAYJOIN({{SeqrProject}},';')),OR(SEARCH('Methods (Loading)',ARRAYJOIN(PDOStatus,';')),SEARCH('On hold for phenotips, but ready to load',ARRAYJOIN(PDOStatus,';'))))", json=AIRTABLE_SAMPLE_RECORDS) responses.add( responses.GET, diff --git a/seqr/views/utils/airtable_utils.py b/seqr/views/utils/airtable_utils.py index ee03afe233..f604298e01 100644 --- a/seqr/views/utils/airtable_utils.py +++ b/seqr/views/utils/airtable_utils.py @@ -161,7 +161,7 @@ def get_samples_for_matched_pdos(self, pdo_statuses, pdo_fields, project_guid=No or_filters={'PDOStatus': pdo_statuses}, and_filters={'SeqrProject': f'{BASE_URL}project/{project_guid}/project_page'}, # Filter for array contains value instead of exact match - filter_query_template="SEARCH('{value}', ARRAYJOIN({key}, ';'))", + filter_query_template="SEARCH('{value}',ARRAYJOIN({key},';'))", ) for sample in sample_records.values(): From c78cdbea6decd791cfb4b80beb4220a1b6255fc9 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Mon, 30 Sep 2024 15:41:16 -0400 Subject: [PATCH 09/12] clean up --- seqr/views/utils/airtable_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seqr/views/utils/airtable_utils.py b/seqr/views/utils/airtable_utils.py index f604298e01..6f4c4ee28b 100644 --- a/seqr/views/utils/airtable_utils.py +++ b/seqr/views/utils/airtable_utils.py @@ -176,4 +176,4 @@ def get_samples_for_matched_pdos(self, pdo_statuses, pdo_fields, project_guid=No 'sample_id': sample.get('SeqrCollaboratorSampleID') or sample['CollaboratorSampleID'], }) - return {record_id: sample for record_id, sample in sample_records.items() if sample['pdos']} \ No newline at end of file + return {record_id: sample for record_id, sample in sample_records.items() if sample['pdos']} From 219bf9a98c6698a418c9bda4681a3cd7f43d9161 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 1 Oct 2024 15:28:10 -0400 Subject: [PATCH 10/12] configurable run directory --- .../check_for_new_samples_from_pipeline.py | 21 ++++++++++++------- ...eck_for_new_samples_from_pipeline_tests.py | 1 + settings.py | 1 + 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 1f50bbd0fd..7149bd62c0 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -19,13 +19,12 @@ from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \ get_saved_variants -from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL +from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL, HAIL_SEARCH_DATA_DIR logger = logging.getLogger(__name__) -GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v3.1/{genome_version}/{dataset_type}/runs/{run_version}/_SUCCESS' -GS_PATH_FIELDS = ['genome_version', 'dataset_type', 'run_version'] -GS_PATH_REGEX = GS_PATH_TEMPLATE.format(**{field: f'(?P<{field}>[^/]+)' for field in GS_PATH_FIELDS}) +RUN_SUCCESS_PATH_TEMPLATE = '{data_dir}/{genome_version}/{dataset_type}/runs/{run_version}/_SUCCESS' +RUN_PATH_FIELDS = ['genome_version', 'dataset_type', 'run_version'] DATASET_TYPE_MAP = {'GCNV': Sample.DATASET_TYPE_SV_CALLS} USER_EMAIL = 'manage_command' @@ -47,10 +46,11 @@ def add_arguments(self, parser): parser.add_argument('--run-version') def handle(self, *args, **options): - gs_path = GS_PATH_TEMPLATE.format(**{field: options[field] or '*' for field in GS_PATH_FIELDS}) - success_runs = {path: re.match(GS_PATH_REGEX, path).groupdict() for path in list_files(gs_path, user=None)} + path = self._run_success_path(lambda field: options[field] or '*') + path_regex = self._run_success_path(lambda field: f'(?P<{field}>[^/]+)') + success_runs = {path: re.match(path_regex, path).groupdict() for path in list_files(path, user=None)} if not success_runs: - user_args = [f'{k}={options[k]}' for k in GS_PATH_FIELDS if options[k]] + user_args = [f'{k}={options[k]}' for k in RUN_PATH_FIELDS if options[k]] raise CommandError(f'No successful runs found for {", ".join(user_args)}') loaded_runs = set(Sample.objects.filter(data_source__isnull=False).values_list('data_source', flat=True)) @@ -82,6 +82,13 @@ def handle(self, *args, **options): logger.info('DONE') + @staticmethod + def _run_success_path(get_field_format): + return RUN_SUCCESS_PATH_TEMPLATE.format( + data_dir=HAIL_SEARCH_DATA_DIR, + **{field: get_field_format(field) for field in RUN_PATH_FIELDS} + ) + @classmethod def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_version): dataset_type = DATASET_TYPE_MAP.get(dataset_type, dataset_type) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index ac64e98af5..4fa9c4ff4c 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -169,6 +169,7 @@ def mock_metadata_file(index): return m +@mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.HAIL_SEARCH_DATA_DIR', 'gs://seqr-hail-search-data/v3.1') @mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST) @mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_URL', 'http://testairtable') @mock.patch('seqr.utils.search.add_data_utils.BASE_URL', SEQR_URL) diff --git a/settings.py b/settings.py index 97a14cb8e1..a0212a458a 100644 --- a/settings.py +++ b/settings.py @@ -156,6 +156,7 @@ MEDIA_URL = '/media/' LOADING_DATASETS_DIR = os.environ.get('LOADING_DATASETS_DIR') +HAIL_SEARCH_DATA_DIR = os.environ.get('HAIL_SEARCH_DATA_DIR') LOGGING = { 'version': 1, From 9bdffbad13a4ea62faee072ec90a167a0654089b Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 1 Oct 2024 15:29:52 -0400 Subject: [PATCH 11/12] test no cache reset when no new data --- .../tests/check_for_new_samples_from_pipeline_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 4fa9c4ff4c..b0821b17a5 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -548,6 +548,7 @@ def test_command(self, mock_email, mock_airtable_utils): self.mock_logger.reset_mock() mock_email.reset_mock() self.mock_send_slack.reset_mock() + self.mock_redis.reset_mock() sample_last_modified = Sample.objects.filter( last_modified_date__isnull=False).values_list('last_modified_date', flat=True).order_by('-last_modified_date')[0] @@ -556,3 +557,4 @@ def test_command(self, mock_email, mock_airtable_utils): mock_email.assert_not_called() self.mock_send_slack.assert_not_called() self.assertFalse(Sample.objects.filter(last_modified_date__gt=sample_last_modified).exists()) + self.mock_redis.return_value.delete.assert_not_called() From e932afc56f214987580c4fcf967baf63541caaa9 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 1 Oct 2024 17:07:51 -0400 Subject: [PATCH 12/12] test local run dirs --- ...eck_for_new_samples_from_pipeline_tests.py | 54 ++++++++++++++++--- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index b0821b17a5..04e7194f6b 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -169,7 +169,7 @@ def mock_metadata_file(index): return m -@mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.HAIL_SEARCH_DATA_DIR', 'gs://seqr-hail-search-data/v3.1') +@mock.patch('seqr.utils.file_utils.os.path.isfile', lambda *args: True) @mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST) @mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_URL', 'http://testairtable') @mock.patch('seqr.utils.search.add_data_utils.BASE_URL', SEQR_URL) @@ -191,6 +191,12 @@ def setUp(self): patcher = mock.patch('seqr.utils.file_utils.subprocess.Popen') self.mock_subprocess = patcher.start() self.addCleanup(patcher.stop) + patcher = mock.patch('seqr.utils.file_utils.glob.glob') + self.mock_glob = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('seqr.utils.file_utils.open') + self.mock_open = patcher.start() + self.addCleanup(patcher.stop) patcher = mock.patch('seqr.views.utils.variant_utils.redis.StrictRedis') self.mock_redis = patcher.start() self.mock_redis.return_value.keys.side_effect = lambda pattern: [pattern] @@ -201,6 +207,9 @@ def setUp(self): self.addCleanup(patcher.stop) self.mock_ls_process = mock.MagicMock() self.mock_ls_process.communicate.return_value = b'\n'.join(RUN_PATHS), b'' + patcher = mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.HAIL_SEARCH_DATA_DIR') + self.mock_data_dir = patcher.start() + self.addCleanup(patcher.stop) super().setUp() def _test_call(self, error_logs, reload_annotations_logs=None, run_loading_logs=None, reload_calls=None): @@ -287,20 +296,49 @@ def test_command(self, mock_email, mock_airtable_utils): responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=400) # Test errors - self.mock_subprocess.return_value.communicate.return_value = b'', b'One or more URLs matched no objects' + self.mock_data_dir.__str__.return_value = '/seqr/seqr-hail-search-data' + self.mock_glob.return_value = [] with self.assertRaises(CommandError) as ce: call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO') self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO') - self.mock_subprocess.assert_called_with( - f'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/_SUCCESS', stdout=-1, stderr=-1, shell=True - ) - - self._test_call(error_logs=[ + self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/GRCh37/MITO/runs/*/_SUCCESS') + self.mock_subprocess.assert_not_called() + + local_files = [ + '/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL/runs/auto__2023-08-09/_SUCCESS', + '/seqr/seqr-hail-search-data/GRCh37/SNV_INDEL/runs/manual__2023-11-02/_SUCCESS', + '/seqr/seqr-hail-search-data/GRCh38/MITO/runs/auto__2024-08-12/_SUCCESS', + '/seqr/seqr-hail-search-data/GRCh38/GCNV/runs/auto__2024-09-14/_SUCCESS', + ] + self.mock_glob.return_value = local_files + self.mock_open.return_value.__enter__.return_value.__iter__.side_effect = [ + iter([json.dumps(METADATA_FILES[i])]) for i in range(len(local_files)) + ] + call_command('check_for_new_samples_from_pipeline') + self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/*/*/runs/*/_SUCCESS') + self.mock_open.assert_has_calls( + [mock.call(path.replace('_SUCCESS', 'metadata.json'), 'r') for path in local_files], any_order=True) + self.mock_subprocess.assert_not_called() + error_logs = [ 'Error loading auto__2023-08-09: Data has genome version GRCh38 but the following projects have conflicting versions: R0003_test (GRCh37)', 'Error loading manual__2023-11-02: Invalid families in run metadata GRCh37/SNV_INDEL: manual__2023-11-02 - F0000123_ABC', 'Error loading auto__2024-08-12: Data has genome version GRCh38 but the following projects have conflicting versions: R0001_1kg (GRCh37)', 'Error loading auto__2024-09-14: Data has genome version GRCh38 but the following projects have conflicting versions: R0001_1kg (GRCh37), R0003_test (GRCh37)', - ]) + ] + self.mock_logger.error.assert_has_calls([mock.call(error) for error in error_logs]) + + self.mock_glob.reset_mock() + self.mock_subprocess.return_value.communicate.return_value = b'', b'One or more URLs matched no objects' + self.mock_data_dir.__str__.return_value = 'gs://seqr-hail-search-data/v3.1' + with self.assertRaises(CommandError) as ce: + call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO') + self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO') + self.mock_subprocess.assert_called_with( + 'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/_SUCCESS', stdout=-1, stderr=-1, shell=True + ) + self.mock_glob.assert_not_called() + + self._test_call(error_logs=error_logs) self.assertEqual(Sample.objects.filter(guid__in=SAMPLE_GUIDS + GCNV_SAMPLE_GUIDS).count(), 0) # Update fixture data to allow testing edge cases