Skip to content

Commit

Permalink
feat(preprocessing): adjust nextclade pipeline for reprocessing (#1510)
Browse files Browse the repository at this point in the history
* Adjust nextclade pipeline with version param
* Set pipeline version in values.yaml
  • Loading branch information
corneliusroemer authored and chaoran-chen committed Mar 31, 2024
1 parent a9e030e commit 39e6032
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
2 changes: 2 additions & 0 deletions kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ defaultOrganisms:
args:
- "prepro"
configFile:
version: 1
log_level: DEBUG
nextclade_dataset_name: nextstrain/mpox/all-clades
nextclade_dataset_tag: 2024-01-16--20-31-02Z
Expand Down Expand Up @@ -338,6 +339,7 @@ defaultOrganisms:
args:
- "prepro"
configFile:
version: 1
log_level: DEBUG
nextclade_dataset_name: nextstrain/ebola/zaire
nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/ebola/data_output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Config:
reference_length: int = 197209
batch_size: int = 5
processing_spec: dict[str, dict[str, Any]] = dataclasses.field(default_factory=dict)
version: int = 1


def load_config_from_yaml(config_file: str, config: Config) -> Config:
Expand Down
5 changes: 3 additions & 2 deletions preprocessing/nextclade/src/loculus_preprocessing/prepro.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
def fetch_unprocessed_sequences(n: int, config: Config) -> Sequence[UnprocessedEntry]:
url = config.backend_host.rstrip("/") + "/extract-unprocessed-data"
logging.debug(f"Fetching {n} unprocessed sequences from {url}")
params = {"numberOfSequenceEntries": n}
params = {"numberOfSequenceEntries": n, "pipelineVersion": config.version}
headers = {"Authorization": "Bearer " + get_jwt(config)}
response = requests.post(url, data=params, headers=headers, timeout=10)
if not response.ok:
Expand Down Expand Up @@ -309,7 +309,8 @@ def submit_processed_sequences(processed: Sequence[ProcessedEntry], config: Conf
"Content-Type": "application/x-ndjson",
"Authorization": "Bearer " + get_jwt(config),
}
response = requests.post(url, data=ndjson_string, headers=headers, timeout=10)
params = {"pipelineVersion": config.version}
response = requests.post(url, data=ndjson_string, headers=headers, params=params, timeout=10)
if not response.ok:
with open("failed_submission.json", "w", encoding="utf-8") as f:
f.write(ndjson_string)
Expand Down

0 comments on commit 39e6032

Please sign in to comment.