diff --git a/docs/api.md b/docs/api.md index 9def0c1..5012b26 100755 --- a/docs/api.md +++ b/docs/api.md @@ -28,6 +28,30 @@ options: --unsorted (Optional) set when file is unsorted respect to tiles; might be slower ``` +## `flowcell_map` +Convert basecalls from a whole flow cell into barcodes-to-spatial coordinates maps (tabular format) that can be used with spacemake +for spatial mapping of transcriptomic libraries. + +Usage: +```text +openst flowcell_map [-h] --bcl-in BCL_IN --tiles-out TILES_OUT [--out-suffix OUT_SUFFIX] [--out-prefix OUT_PREFIX] + [--crop-seq CROP_SEQ] [--rev-comp] [--parallel-processes PARALLEL_PROCESSES] + +options: + -h, --help show this help message and exit + --bcl-in BCL_IN Input directory containing BCL files + --tiles-out TILES_OUT + Output directory for tile coordinate files + --out-suffix OUT_SUFFIX + (Optional) Suffix for output files + --out-prefix OUT_PREFIX + (Optional) Prefix for output files + --crop-seq CROP_SEQ (Optional) Python slice format for cropping sequences + --rev-comp (Optional) Reverse complement the sequences + --parallel-processes PARALLEL_PROCESSES + (Optional) Number of parallel processes to use +``` + ## `image_stitch` Stitch image fields of view into a single image. Currently, it only supports `--microscope keyence`, for the default microscopy setup used in our paper. diff --git a/docs/computational/preprocessing_capture_area.md b/docs/computational/preprocessing_capture_area.md index 0390ed1..0039f99 100644 --- a/docs/computational/preprocessing_capture_area.md +++ b/docs/computational/preprocessing_capture_area.md @@ -13,14 +13,14 @@ information: |GGTCCCGTCCAAGAAGTAAATCGAA|9272|1016| |...|...|...| -Where `cell_bc` is the 32 nucleotide-long spatial barcode, and `x_pos`/`y_pos` are 2D spatial coordinates +Where `cell_bc` is the 25 nucleotide-long spatial barcode, and `x_pos`/`y_pos` are 2D spatial coordinates of a specific **tile** in the capture area (see below). Before diving into the code, let's clarify some of the [terms](#flow-cell-related-terms) that are specific to using Illumina flow cells as capture areas. We quote from [Illumina's documentation](https://support-docs.illumina.com/IN/NextSeq_550-500/Content/IN/NextSeq/FlowCell_Tiles_fNS.htm) -### Flow cell-related terms +## Flow cell-related terms !!! info "Tiles" "Small imaging areas on the flow cell defined as the field of view by the camera. The total number of tiles depends on the number of lanes, swaths, and surfaces that are imaged on @@ -36,67 +36,86 @@ Before diving into the code, let's clarify some of the !!! info "Swath" "A column of tiles in a lane." -### Retrieve spatial barcodes coordinates for one tile -The `x_pos` and `y_pos` coordinates from the table above are given for each tile, separately. This information is -encoded in the `bcl` and `fastq` files. To obtain per-tile barcodes and coordinates, run the following code: +## Generating barcode-to-coordinate map +For each flow cell, we generate plain text files with three columns: `cell_bc`, `x_pos`, and `y_pos`. +These files are later used by `spacemake` to reconstruct the spatial coordinates from transcriptomic libraries. +This process is performed only once per barcoded flow cell. + +!!! warning "Software dependencies" + Running `openst flowcell_map` below requires installing either `bcl2fastq` or `bclconvert`. + You can find instructions for [`bcl2fastq`](https://emea.support.illumina.com/sequencing/sequencing_software/bcl2fastq-conversion-software.html), + and [`bclconvert`](https://emea.support.illumina.com/sequencing/sequencing_software/bcl-convert.html). + + Then, make sure they are added to the `PATH` environment variable. + + For instance, in Linux: + ```bash + export PATH=/path/to/bcl2fastq:$PATH + # or + # export PATH=/path/to/bclconvert:$PATH + ``` + + Make sure you use a version of these softwares compatible with your sequencer. + +Once dependencies have been installed, create the barcode-to-coordinate map for all tiles: ```sh -openst barcode_preprocessing \ - --fastq-in \ - --tilecoords-out \ - --out-suffix \ - --out-prefix \ - --crop-seq \ - --rev-comp \ - --single-tile +openst flowcell_map \ + --bcl-in /path/to/fc/bcl \ + --tiles-out /path/to/fc_tiles \ + --crop-seq 5:30 \ # for default Open-ST sequencing recipe + --rev-comp ``` -Make sure to replace the placeholders: -`` to the `fastq` file of a specific tile; `` where the table-like -files will be written; `` and `` are suffixes and prefixes that are added to the tile file names; -`` from the `--crop-seq` argument is a string in the [Python slice](https://docs.python.org/3/tutorial/datastructures.html) format -(e.g., 2:32 will take nucleotides 2nd until 32th of the sequence in the `fastq` file); `--rev-comp` is provided whether the barcode sequences -must be written into the `csv` as their reverse-complementary; `--single-tile` argument is provided when the `fastq` file only contains data for -a single tile (**our recommendation**). +Make sure to specify the arguments: +- `--crop-seq`: Use a compatible Python slice (e.g., 5:30 will take 25 nucleotides, from the 6th to the 30th from the input reads) +- `--rev-comp`: After cropping the sequences, will compute and store the reverse complement of the barcode sequences -### Retrieve spatial barcodes coordinates for all tiles -Above you generated a single tile coordinate. To process all tiles from a flow cell (in parallel), you can run the -following snippets for Linux, assuming you have access to the basecalls folder. +This command will create as many barcode-to-coordinate compressed text files as there are tiles in the flow cell under the folder `/path/to/fc_tiles` -First create a `lanes_and_tiles.txt` file: +### Workflow Details +The `openst flowcell_map` command executes a multi-step workflow to process the barcode data: -```sh -cat RunInfo.xml | grep "" | sed 's/ *//' | sed 's/<\/Tile>//' | sed 's/^[ \t]*//;s/[ \t]*$//' > lanes_and_tiles.txt -``` +1. **Tile Processing**: Each of the 3,744 tiles (for the S4 flow cell) is processed individually using `bcl2fastq` (or `bclconvert`). +2. **Barcode Preprocessing**: The `barcode_preprocessing` function from our openst tools is applied to each tile. This step: + - Trims barcodes according to the specified `--crop-seq` parameter + - Computes the reverse complement of the barcodes (if `--rev-comp` is specified) + - Adds spatial coordinates (`x_pos` and `y_pos`) to each barcode + +3. **Individual Tile Deduplication**: Each processed tile file is deduplicated to remove duplicate barcodes within the same tile. +4. **Cross-tile Merging and Deduplication**: All deduplicated tile files are merged, and a second round of deduplication is performed to remove duplicate barcodes across different tiles. +5. **File Splitting**: The merged and deduplicated data is split back into individual files, one for each original tile. This step facilitates faster processing with `spacemake` in subsequent analyses. The final tile files are compressed to save storage space. -where `RunInfo.xml` is a file contained in the basecalls directory. +!!! info "Coordinate system of tiles" + The spatial coordinates acquired with `bcl2fastq` (or `bclconvert`) are in a tile-specific coordinate system. For samples spanning multiple tiles, mapping to a global coordinate system becomes necessary. This global mapping is typically done using the `puck_collection` functionality from `spacemake`. -Then, run demultiplexing and conversion to `fastq` -simultaneously to generating the barcode spatial coordinate file: +## (Optional) Retrieve spatial barcodes coordinates for one tile +It is also possible to obtain per-tile barcodes and coordinates: ```sh -cat lanes_and_tiles.txt | xargs -n 1 -P -I {} \ - sh -c 'bcl2fastq -R --no-lane-splitting \ - -o /"{}" --tiles s_"{}"; \ - - openst barcode_preprocessing \ - --fastq-in /{}/Undetermined_S0_R1_001.fastq.gz \ - --tilecoords-out \ - --out-suffix .txt \ - --out-prefix "{}" \ - --crop-seq \ - --rev-comp \ - --single-tile' +openst barcode_preprocessing \ + --fastq-in /path/to/tile.fastq \ + --tilecoords-out /path/to/fc_tiles \ + --out-prefix fc_1_ \ + --crop-seq 5:30 \ + --rev-comp \ + --single-tile ``` -Again, make sure to replace the placeholders: `` and `` are the directories where the -basecall files are contained and where the converted output `fastq` files will be saved; The rest of -arguments have the same meaning as above. If you generated full `fastq` yourself, you can adapt the command -above to remove the call to `bcl2fastq`. +!!! warning + These files do not undergo deduplication, therefore some barcodes might be repeated. Run `openst flowcell_map` to make sure there + are no duplicated barcodes across the flow cell. + +Make sure to replace the placeholders. +`/path/to/tile.fastq` to the `fastq` file of a specific tile; `/path/to/fc_tiles` where the table-like +files will be written; `--out-prefix` (and `--out-suffix`) are prefixes and suffixes that are added to the tile file names; +`--crop-seq 5:30` is a [Python slice](https://docs.python.org/3/tutorial/datastructures.html) +(e.g., 5:30 will take nucleotides 6th until 30th of the sequence in the `fastq` file); `--rev-comp` is provided whether the barcode sequences +must be written into the `csv` as their reverse-complementary, **after cropping**; `--single-tile` argument is provided when the `fastq` file only contains data for +a single tile (**our recommendation**). ## Expected output -After running all the steps of this section, you will have a folder with many `*.txt.gz` files -containing the spatial coordinates of flow cell tiles (you will only need to generate this once per flow cell), -in the tab format described above. \ No newline at end of file +After running all the steps of this section, you will have a folder `/path/to/fc_tiles` with `*.txt.gz` files +containing the spatial coordinates of flow cell tiles. **You only need to generate this once per flow cell.** \ No newline at end of file diff --git a/docs/computational/preprocessing_openst_library.md b/docs/computational/preprocessing_openst_library.md index ec3e4e0..cb90e3f 100644 --- a/docs/computational/preprocessing_openst_library.md +++ b/docs/computational/preprocessing_openst_library.md @@ -42,8 +42,8 @@ The above will add a new Open-ST project with `barcode_flavor`, `run_mode`, `puc These are generated by the `openst` package as [previously described](preprocessing_capture_area.md). - All `puck_barcode_files` generated in the [previous step](preprocessing_capture_area.md) need to be specified after - `--puck_barcode_file`, e.g., with the wildcards `path_to_puck_files/*.txt.gz`. + All `puck_barcode_files` generated in the [previous step](preprocessing_capture_area.md) at the folder `/path/to/fc_tiles` + need to be specified after `--puck_barcode_file`, e.g., with the wildcards `/path/to/fc_tiles/*.txt.gz`. To generate output files and reports only for the relevant tiles per sample, you can configure the variable `spatial_barcode_min_matches` under `run_mode` (see [spacemake documentation](https://spacemake.readthedocs.io/en/latest/config.html#configure-run-modes)). diff --git a/openst/cli.py b/openst/cli.py index 817a86a..2819c96 100644 --- a/openst/cli.py +++ b/openst/cli.py @@ -506,7 +506,7 @@ def get_image_stitch_parser(): "--microscope", type=str, help="microscope model or imaging strategy that was used for imaging", - choices=["keyence"], + choices=["keyence", "axio"], required=True, ) @@ -807,6 +807,41 @@ def cmd_run_barcode_preprocessing(args): _run_barcode_preprocessing(args) +FLOWCELL_MAP_HELP = "Convert basecalls from a whole flow cell into a map of barcodes to spatial coordinates" +def get_flowcell_map_parser(): + parser = argparse.ArgumentParser( + allow_abbrev=False, + add_help=False, + description=FLOWCELL_MAP_HELP, + ) + parser.add_argument("--bcl-in", type=str, required=True, help="Input directory containing BCL files") + parser.add_argument("--tiles-out", required=True, help="Output directory for tile coordinate files") + parser.add_argument("--out-suffix", type=str, default=".txt.gz", help="Suffix for output files") + parser.add_argument("--out-prefix", type=str, default="fc_1_", help="Prefix for output files") + parser.add_argument("--crop-seq", type=str, default=":", help="Python slice format for cropping sequences") + parser.add_argument("--rev-comp", action="store_true", help="Reverse complement the sequences") + parser.add_argument("--parallel-processes", type=int, default=1, help="Number of parallel processes to use") + + return parser + + +def setup_flowcell_map_parser(parent_parser): + parser = parent_parser.add_parser( + "flowcell_map", + help=FLOWCELL_MAP_HELP, + parents=[get_flowcell_map_parser()], + ) + parser.set_defaults(func=cmd_run_flowcell_map) + + return parser + + +def cmd_run_flowcell_map(args): + from openst.preprocessing.flowcell_map import _run_flowcell_map + + _run_flowcell_map(args) + + REPORT_HELP = "Generate HTML reports from metadata files (json)" def get_report_parser(): parser = argparse.ArgumentParser( @@ -1497,6 +1532,7 @@ def cmdline_args(): # TODO: do this iteratively setup_barcode_preprocessing_parser(parent_parser_subparsers) + setup_flowcell_map_parser(parent_parser_subparsers) setup_image_stitch_parser(parent_parser_subparsers) setup_image_preprocess_parser(parent_parser_subparsers) diff --git a/openst/preprocessing/barcode_preprocessing.py b/openst/preprocessing/barcode_preprocessing.py index ff5882b..bc11e2f 100644 --- a/openst/preprocessing/barcode_preprocessing.py +++ b/openst/preprocessing/barcode_preprocessing.py @@ -5,11 +5,10 @@ import pandas as pd import logging - +import dnaio tab = str.maketrans("ACTG", "TGAC") - def reverse_complement_table(seq): return seq.translate(tab)[::-1] @@ -23,35 +22,30 @@ def process_multiple_tiles( in_fastq: str, out_path: str, out_prefix: str, out_suffix: str, sequence_preprocessor: callable = None ): current_tile_number = None - sequences, xcoords, ycoords = [[], [], []] - idx = 0 - with gzip.open(in_fastq, "rt") as f: + n_written = 0 + with dnaio.open(in_fastq) as f: for line in f: - if idx % 4 == 0: - tile_number, xcoord, ycoord = get_tile_number_and_coordinates(line.strip()) - if current_tile_number is None: - current_tile_number = tile_number - - if tile_number != current_tile_number: - logging.info(f"Writing {len(sequences):,} barcodes of file {current_tile_number} to disk") - df = pd.DataFrame({"cell_bc": sequences, "x_pos": xcoords, "y_pos": ycoords}) - df.to_csv( - os.path.join( - out_path, - out_prefix + current_tile_number + out_suffix, - ), - index=False, - sep="\t", - ) - current_tile_number = tile_number - sequences, xcoords, ycoords = [[], [], []] - elif idx % 4 == 1: - sequence = sequence_preprocessor(line) if sequence_preprocessor is not None else line - sequences.append(sequence) - xcoords.append(xcoord) - ycoords.append(ycoord) + tile_number, xcoord, ycoord = get_tile_number_and_coordinates(line.name) + if current_tile_number != tile_number: - idx += 1 + if current_tile_number is not None: + logging.info(f"Written {n_written} spatial barcodes to {current_tile_number}") + tile_file.close() + + current_tile_number = tile_number + + tile_fname = os.path.join( + out_path, + out_prefix + current_tile_number + out_suffix, + ) + + tile_file = open(tile_fname, "w") + tile_file.write("\t".join(["cell_bc", "xcoord", "ycoord"])+"\n") + n_written = 0 + + sequence = sequence_preprocessor(line.sequence) if sequence_preprocessor is not None else line.sequence + tile_file.write("\t".join([sequence, xcoord, ycoord])+"\n") + n_written += 1 def process_single_tile(in_fastq: str, sequence_preprocessor: callable = None) -> pd.DataFrame: diff --git a/openst/preprocessing/flowcell_map.py b/openst/preprocessing/flowcell_map.py new file mode 100644 index 0000000..6328b01 --- /dev/null +++ b/openst/preprocessing/flowcell_map.py @@ -0,0 +1,448 @@ +import argparse +import logging +import multiprocessing +import os +import shutil +import subprocess +import sys +import tempfile +import threading +import xml.etree.ElementTree as ET +from concurrent.futures import ProcessPoolExecutor +from typing import List, Tuple + +import csv +import gzip +import resource +import itertools +import shutil + +from tqdm import tqdm + +# for the last merge sort, in gigabytes +MEM_PER_CORE = 4 +MAX_FILES = 7_488 +CHUNK_SIZE = 10_000_000 +MAX_FILES = 4_000 + +log_lock = threading.Lock() + +def calculate_offsets(file_path, num_processes): + file_size = os.path.getsize(file_path) + chunk_size = file_size // num_processes + offsets = [] + + with open(file_path, 'rb') as f: + offset = 0 + for _ in range(num_processes): + offsets.append(offset) + f.seek(chunk_size, 1) + f.readline() # Move to the next full line + offset = f.tell() + + return offsets + +def run_command(cmd: List[str], description: str) -> Tuple[int, str, str]: + """Run a command and return its exit code, stdout, and stderr.""" + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = process.communicate() + return process.returncode, stdout, stderr + +def log_output(description: str, returncode: int, stdout: str, stderr: str): + """Log the output of a command in a synchronized manner.""" + with log_lock: + logging.debug(f"\n{'=' * 40}\n{description}\n{'=' * 40}") + if stdout: + logging.debug("STDOUT:\n%s", stdout) + if stderr and returncode: + logging.warning("STDERR:\n%s", stderr) + logging.debug("Return code: %d\n", returncode) + +def parse_run_info_xml(run_info_path: str) -> List[str]: + """Parse RunInfo.xml and extract tile information.""" + tree = ET.parse(run_info_path) + root = tree.getroot() + + tiles = [] + for tile_elem in root.findall(".//Tile"): + tiles.append(tile_elem.text.strip()) + + return tiles + + +def process_tile(tile: str, args: argparse.Namespace): + bcl_out_dir = os.path.join(args.bcl_out, tile) + fastq_file = os.path.join(bcl_out_dir, "Undetermined_S0_R1_001.fastq.gz") + + os.makedirs(bcl_out_dir, exist_ok=True) + + if args.demux_tool == "bcl2fastq": + demux_cmd = ["bcl2fastq", "-R", args.bcl_in, "--no-lane-splitting", "-o", bcl_out_dir, "--tiles", f"s_{tile}"] + elif args.demux_tool == "bcl-convert": + demux_cmd = ["bcl-convert", "--bcl-input-directory", args.bcl_in, "--no-lane-splitting", "true", "--force", "--no-sample-sheet", "true", "--output-directory", bcl_out_dir, "--tiles", f"s_{tile}"] + else: + logging.error("The demultiplexing tool has to be 'bcl2fastq' or 'bcl-convert'") + raise ValueError("The demultiplexing tool has to be 'bcl2fastq' or 'bcl-convert'") + + returncode, stdout, stderr = run_command(demux_cmd, f"{args.demux_tool} --tile {tile}") + log_output(f"bcl2fastq for tile {tile}", returncode, stdout, stderr) + if returncode != 0: + logging.warning(f"return code {returncode}, at command '{demux_cmd}'") + + os.makedirs(args.tilecoords_out, exist_ok=True) + + openst_cmd = [ + "openst", + "barcode_preprocessing", + "--fastq-in", + fastq_file, + "--tilecoords-out", + args.tilecoords_out, + "--out-suffix", + args.out_suffix, + "--out-prefix", + f"{args.out_prefix}{tile}", + "--crop-seq", + args.crop_seq, + "--single-tile", + ] + if args.rev_comp: + openst_cmd.append("--rev-comp") + + returncode, stdout, stderr = run_command(openst_cmd, f"openst barcode_preprocessing --tile {tile}") + log_output(f"openst barcode_preprocessing for tile {tile}", returncode, stdout, stderr) + if returncode != 0: + logging.warning(f"return code {returncode}, at command '{openst_cmd}'") + +def process_deduplication_file(file: str, input_folder: str, output_folder: str): + input_file = os.path.join(input_folder, file) + output_file = os.path.join(output_folder, f".uncompressed.sorted.{file}") + cmd = f""" + LC_ALL=C zcat "{input_file}" | + awk -F'\\t' 'NR==1{{print $0"\\tfilename"; next}} {{print $0"\\t{file}"}}' | + tail -n +2 | + sort -u -k1,1 -t$'\\t' > "{output_file}" + """ + returncode, stdout, stderr = run_command(["bash", "-c", cmd], f"deduplicate --tile {file}") + log_output(f"deduplicate --tile {file}", returncode, stdout, stderr) + if returncode != 0: + raise subprocess.CalledProcessError(returncode, cmd) + +def deduplicate_tiles(input_folder: str, output_folder: str): + os.makedirs(output_folder, exist_ok=True) + files = [f for f in os.listdir(input_folder) if f.endswith('.txt.gz')] + + with ProcessPoolExecutor() as executor: + futures = [executor.submit(process_deduplication_file, file, input_folder, output_folder) for file in files] + for _ in tqdm(futures, total=len(futures), desc="deduplicate"): + _.result() + +def sort_chunk(chunk_files, output_file, temp_dir): + cmd = f"LC_ALL=C sort -m -u -k1,1 -t $'\\t' -T {temp_dir} {' '.join(chunk_files)} > {output_file}" + subprocess.run(["bash", "-c", cmd], check=True) + return output_file + +def merge_tiles(input_folder: str, output_file: str, threads: int, chunk_size: int = 100): + os.makedirs(os.path.dirname(output_file), exist_ok=True) + files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.startswith(".uncompressed.sorted.")] + + # Create a temporary directory for intermediate files + with tempfile.TemporaryDirectory(dir=input_folder) as temp_dir: + # Stage 1: Sort chunks of files in parallel + chunk_outputs = [] + with ProcessPoolExecutor(max_workers=threads) as executor: + futures = [] + for i in range(0, len(files), chunk_size): + chunk = files[i:i+chunk_size] + chunk_output = os.path.join(temp_dir, f"chunk_sorted_{i}.txt") + futures.append(executor.submit(sort_chunk, chunk, chunk_output, temp_dir)) + + for future in tqdm(futures, desc="Sorting chunks"): + chunk_outputs.append(future.result()) + + # Stage 2: Final merge of all chunk outputs + final_merge_cmd = f"LC_ALL=C sort --parallel={threads} -m -u -k1,1 -t $'\\t' -T {temp_dir} {' '.join(chunk_outputs)} > {output_file}" + + try: + subprocess.run(["bash", "-c", final_merge_cmd], check=True) + logging.info(f"Final merge completed successfully. Output: {output_file}") + except subprocess.CalledProcessError as e: + logging.error(f"Error during final merge: {str(e)}") + raise + + +def process_chunk(file_path, start_offset, end_offset, output_folder, chunk_id, num_processes): + file_handles = {} + csvw_handles = {} + temp_folder = os.path.join(output_folder, f"temp_{chunk_id}") + os.makedirs(temp_folder, exist_ok=True) + + flush_interval = 10_000_000 + lines_processed = 0 + + with open(file_path, 'r') as f: + f.seek(start_offset) + current_position = start_offset + + # Skip the first line if not starting from the beginning (it might be partial) + if start_offset > 0: + f.readline() + current_position = f.tell() + + pbar = None + if start_offset == 0: + total_size = end_offset - start_offset + pbar = tqdm(total=total_size*num_processes, unit='B', unit_scale=True, desc=f'Processing in {num_processes} chunks') + + while current_position < end_offset: + line = f.readline() + if not line: # End of file + break + + new_position = f.tell() + if pbar: + pbar.update((new_position - current_position)*num_processes) + current_position = new_position + + row = next(csv.reader([line], delimiter='\t')) + + id = row[-1] + if id not in file_handles: + file_handles[id] = open(os.path.join(temp_folder, f"{id}"), 'w', newline='') + csvw_handles[id] = csv.writer(file_handles[id], delimiter='\t') + csvw_handles[id].writerow(["cell_bc", "xcoord", "ycoord"]) + csvw_handles[id].writerow(row[:-1]) + + lines_processed += 1 + if lines_processed % flush_interval == 0: + for handle in file_handles.values(): + handle.flush() + os.fsync(handle.fileno()) + + if pbar: + pbar.close() + + for handle in file_handles.values(): + handle.flush() + os.fsync(handle.fileno()) + handle.close() + +def merge_file(base_name, temp_folders, output_folder): + final_file = os.path.join(output_folder, f"{base_name}.txt.gz") + temp_files = [os.path.join(folder, f"{base_name}.txt.gz") + for folder in temp_folders + if os.path.exists(os.path.join(folder, f"{base_name}.txt.gz"))] + + with gzip.open(final_file, 'wt', newline='') as f_out: + writer = csv.writer(f_out, delimiter='\t') + writer.writerow(["cell_bc", "xcoord", "ycoord"]) # Write header once + for temp_file in temp_files: + with open(temp_file, 'r') as f_in: + reader = csv.reader(f_in, delimiter='\t') + next(reader) # Skip header + for row in reader: + writer.writerow(row) + os.remove(temp_file) + + return base_name + +def merge_intermediate_files(output_folder, num_processes): + temp_folders = [os.path.join(output_folder, f"temp_{i}") for i in range(num_processes)] + + # Get unique base names across all temp folders + base_names = set() + for folder in temp_folders: + if not os.path.exists(temp_folders): + logging.error(f"The temporary folder 'temp_{i}' does not exist") + break + base_names.update(name.rsplit('.', 2)[0] for name in os.listdir(folder)) + + with ProcessPoolExecutor(max_workers=num_processes) as executor: + futures = [executor.submit(merge_file, base_name, temp_folders, output_folder) + for base_name in base_names] + + for future in tqdm(futures, total=len(futures), desc="Merging files"): + future.result() + + # Clean up temp folders + for folder in temp_folders: + shutil.rmtree(folder) + +def distribute_per_file(input_file: str, output_folder: str, num_processes: int = None): + os.makedirs(output_folder, exist_ok=True) + + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (min(MAX_FILES, hard), hard)) + + if num_processes is None: + num_processes = os.cpu_count() + + try: + offsets = calculate_offsets(input_file, num_processes) + offsets.append(os.path.getsize(input_file)) # Add end of file offset + + with ProcessPoolExecutor(max_workers=num_processes) as executor: + futures = [] + for i in range(num_processes): + start_offset = offsets[i] + end_offset = offsets[i+1] + futures.append(executor.submit(process_chunk, input_file, start_offset, end_offset, output_folder, i, num_processes)) + + for future in tqdm(futures, total=num_processes, desc="Processing chunks"): + future.result() + + logging.info("All chunks processed. Merging intermediate files...") + merge_intermediate_files(output_folder, num_processes) + + except Exception as e: + logging.error(f"Error during file distribution: {str(e)}") + raise + + logging.info(f"File distribution completed. Output in {output_folder}") + + +def process_compression_file(file: str, input_folder: str, output_folder: str): + input_file = os.path.join(input_folder, file) + output_file = os.path.join(output_folder, f"{file}.gz") + cmd = f"gzip -c {input_file} > {output_file}" + returncode, stdout, stderr = run_command(["bash", "-c", cmd], f"gzip {file}") + log_output(f"gzip {file}", returncode, stdout, stderr) + if returncode != 0: + logging.warning(f"return code {returncode}, at command '{cmd}'") + +def compress_files(input_folder: str, output_folder: str): + os.makedirs(output_folder, exist_ok=True) + files = [f for f in os.listdir(input_folder) if not f.startswith(".")] + + with ProcessPoolExecutor() as executor: + futures = [executor.submit(process_compression_file, file, input_folder, output_folder) for file in files] + for _ in tqdm(futures, total=len(files), desc="gzip"): + _.result() # This will raise any exceptions that occurred during execution + + +def check_bcl2fastq(): + """Check if bcl2fastq is installed and accessible.""" + if shutil.which("bcl2fastq") is not None: + return "bcl2fastq" + elif shutil.which("bcl-convert") is not None: + return "bcl-convert" + else: + logging.error( + "bcl2fastq or bcl-convert are not found in the system PATH. Please install bcl2fastq or bcl-convert and make sure it's in your PATH." + ) + + return None + + +def check_os(): + """Check if the operating system is Linux.""" + if sys.platform != "linux": + logging.warning( + "This script is designed to run on Linux. You may encounter issues on other operating systems." + ) + return False + return True + +def create_lanes_tiles_S4(): + lanes_and_tiles = [] + for lane in range(1, 5): + for side in range(1, 3): + for column in range(1, 7): + for row in range(1, 79): + lanes_and_tiles.append(f"{lane}_{side}{column}{row:02d}") + + return lanes_and_tiles + +def _run_flowcell_map(args: argparse.Namespace): + if not check_os(): + return + + demux_tool = check_bcl2fastq() + if demux_tool is None: + return + + args.demux_tool = demux_tool + + max_cores = multiprocessing.cpu_count() + if args.parallel_processes > max_cores: + logging.warning( + f"Requested {args.parallel_processes} processes, but only {max_cores} cores are available. Limiting to {max_cores} processes." + ) + args.parallel_processes = max_cores + + if not os.path.exists(args.tiles_out): + os.makedirs(args.tiles_out) + logging.warning(f"Created output directory: {args.tiles_out}") + + run_info_path = os.path.join(args.bcl_in, "RunInfo.xml") + if not os.path.exists(run_info_path): + lanes_and_tiles = parse_run_info_xml(run_info_path) + else: + logging.warning(f"RunInfo.xml not found at {run_info_path}. Generating lanes and tiles programmatically.") + lanes_and_tiles = create_lanes_tiles_S4() + + # we will not use temporary directories for the time being + with tempfile.TemporaryDirectory(dir=args.tiles_out) as temp_dir: + args.bcl_out = os.path.join(args.tiles_out, "bcl_out") + args.tilecoords_out = os.path.join(args.tiles_out, "tilecoords_out") + args.dedup_out = os.path.join(args.tiles_out, "dedup_out") + args.merge_out = os.path.join(args.tiles_out, "merge_out") + args.distribute_out = os.path.join(args.tiles_out, "distribute_out") + merged_file = os.path.join(args.merge_out, "merged_deduplicated.txt") + + for dir_path in [args.bcl_out, args.tilecoords_out, args.dedup_out, args.merge_out, args.distribute_out]: + os.makedirs(dir_path, exist_ok=True) + + lanes_and_tiles_path = os.path.join(args.bcl_in, "lanes_and_tiles.txt") + with open(lanes_and_tiles_path, "w") as f: + for tile in lanes_and_tiles: + f.write(f"{tile}\n") + + if len(os.listdir(args.dedup_out)) == 0: + # process tiles in parallel + with ProcessPoolExecutor(max_workers=args.parallel_processes) as executor: + futures = [executor.submit(process_tile, tile, args) for tile in lanes_and_tiles] + for _ in tqdm(futures, total=len(futures), desc="Processing tiles"): + _.result() + + if not os.path.exists(args.tilecoords_out) or not os.listdir(args.tilecoords_out): + logging.error(f"Tile coordinates output directory {args.tilecoords_out} does not exist or is empty") + return + + logging.info("Deduplicating individual tiles") + if not os.path.exists(merged_file): + deduplicate_tiles(args.tilecoords_out, args.dedup_out) + + if not os.path.exists(args.dedup_out) or not os.listdir(args.dedup_out): + logging.error(f"Deduplicated tiles directory {args.dedup_out} does not exist or is empty") + return + + logging.info("Merging and deduplicating all tiles") + if not os.path.exists(os.path.join(args.merge_out, "done")): + merge_tiles(args.dedup_out, merged_file, args.parallel_processes) + + open(os.path.join(args.merge_out, "done"), 'a').close() + + if not os.path.exists(merged_file): + logging.error(f"Merged file {merged_file} does not exist") + return + + logging.info("Writing compressed tiles") + if not os.path.exists(os.path.join(args.tiles_out, "done")): + distribute_per_file(merged_file, args.tiles_out, args.parallel_processes) + + open(os.path.join(args.tiles_out, "done"), 'a').close() + + if not os.path.exists(args.tiles_out) or not os.listdir(args.tiles_out): + logging.error(f"Final puck file directory {args.tiles_out} does not exist or is empty") + return + + # TODO: create a file with statistics (how many tiles are created, how many barcodes, how many deduplicated) + logging.info("Flowcell mapping completed successfully") + + +if __name__ == "__main__": + from openst.cli import get_flowcell_map_parser + + args = get_flowcell_map_parser().parse_args() + _run_flowcell_map(args) diff --git a/openst/preprocessing/image_stitch.py b/openst/preprocessing/image_stitch.py index 84492ff..b210587 100644 --- a/openst/preprocessing/image_stitch.py +++ b/openst/preprocessing/image_stitch.py @@ -24,8 +24,6 @@ def generate_zstacks_from_tiles(input_dir, join_zstack_regex): zstack_files = defaultdict(list) - - # Compile regex pattern pattern = re.compile(join_zstack_regex) for filename in os.listdir(input_dir): @@ -82,7 +80,48 @@ def _image_stitch_imagej_keyence( ] -SUPPORTED_MICROSCOPES = {"keyence": _image_stitch_imagej_keyence} +def _image_stitch_imagej_axio( + imagej_bin: str, + input_dir: str, + image_out: str, +): + import xml.etree.ElementTree as ET + + xml_file_path = os.path.join(input_dir, "_meta.xml") + macro_keyence_path = get_absolute_package_path("preprocessing/imagej_macros/axio_stitch.ijm") + + # Checking whether the grid file exists + check_file_exists(xml_file_path) + + # Loading the xml metadata for gridx and gridy + tree = ET.parse(xml_file_path) + root = tree.getroot() + + GRIDX = root.find('./Tags/V148') + GRIDY = root.find('./Tags/V149') + # we assume names like 02_PCW06.tif + filename_base, extension_base = root.find('./Tags/V5').text.split(".") + + if GRIDX is not None and GRIDY is not None: + GRIDX = int(GRIDX.text) + GRIDY = int(GRIDY.text) + else: + raise ValueError("Could not parse the grid size for the current tiles") + + logging.info(f"Grid size: X = {GRIDX}, Y = {GRIDY}") + logging.info(f"Running macro {macro_keyence_path} with ImageJ at {imagej_bin}") + + return [ + f"{imagej_bin}", + "--headless", + "--console", + "-macro", + f"{macro_keyence_path}", + f"{GRIDX};{GRIDY};{input_dir};{image_out};{filename_base}_p{{ii}}.png", + ] + + +SUPPORTED_MICROSCOPES = {"keyence": _image_stitch_imagej_keyence, "axio": _image_stitch_imagej_axio} def image_stitch_imagej(imagej_bin: str, microscope: str, input_dir: str, image_out: str): diff --git a/openst/preprocessing/imagej_macros/axio_stitch.ijm b/openst/preprocessing/imagej_macros/axio_stitch.ijm new file mode 100644 index 0000000..25308c0 --- /dev/null +++ b/openst/preprocessing/imagej_macros/axio_stitch.ijm @@ -0,0 +1,12 @@ +arglist = getArgument(); +args = split(arglist,';'); +var gridx = args[0]; +var gridy = args[1]; +var indir = args[2]; +var outfile = args[3]; +var filename_template = args[4]; + +run("Grid/Collection stitching", "type=[Grid: snake by rows] order=[Right & Down] grid_size_x=" + gridx + " grid_size_y=" + gridy + " tile_overlap=20 first_file_index_i=0 directory=[" + indir + "] file_names=" + filename_template + " fusion_method=[Linear Blending] regression_threshold=0.05 max/avg_displacement_threshold=1.00 absolute_displacement_threshold=1.00 compute_overlap subpixel_accuracy computation_parameters=[Save computation time (but use more RAM)] image_output=[Fuse and display]"); +run("RGB Color"); +saveAs("Tiff", outfile); +close(); \ No newline at end of file diff --git a/openst/preprocessing/imagej_macros/keyence_stitch.ijm b/openst/preprocessing/imagej_macros/keyence_stitch.ijm index 0626042..1e59a77 100644 --- a/openst/preprocessing/imagej_macros/keyence_stitch.ijm +++ b/openst/preprocessing/imagej_macros/keyence_stitch.ijm @@ -6,7 +6,7 @@ var indir = args[2]; var outfile = args[3]; run("Grid/Collection stitching", "type=[Grid: snake by rows] order=[Right & Down] grid_size_x=" + gridx + " grid_size_y=" + gridy + " tile_overlap=20 first_file_index_i=1 directory=[" + indir + "] file_names=Image_{iiiii}_CH4.tif fusion_method=[Linear Blending] regression_threshold=0.05 max/avg_displacement_threshold=1.00 absolute_displacement_threshold=1.00 compute_overlap subpixel_accuracy computation_parameters=[Save computation time (but use more RAM)] image_output=[Fuse and display]"); - +run("RGB Color"); run("Stack to RGB"); saveAs("Tiff", outfile); close(); \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1069366..247d0c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "openst" -version = "0.1.23" +version = "0.2.0" description = "The computational pipeline for the Open-ST method." license = "GPL-2.0" authors = [ @@ -58,6 +58,7 @@ pyqt5-qt5 = "==5.15.2" pyqt5-sip = "==12.12.2" ome-zarr = ">=0.8.2" pyqtgraph = ">=0.13.3" +dnaio = ">=1.2.0" [tool.poetry.urls] "Documentation" = "https://rajewsky-lab.github.io/openst/"