Source code for pg_gpu._memory_warning

"""``MemoryLimitedWarning`` and the size check ``from_vcf`` uses to fire it.

VCF text parsing is single-threaded in htslib and the whole genotype
matrix gets pulled into host memory at once, so a very large VCF
either takes hours to load or exhausts RAM outright.
``HaplotypeMatrix.from_vcf`` and ``GenotypeMatrix.from_vcf`` call
``_maybe_memory_warn`` before parsing so users with too-large VCFs
get pointed at the one-time-conversion path --
``HaplotypeMatrix.vcf_to_zarr`` builds a VCZ store, and subsequent
reads via ``from_zarr`` are seconds to minutes instead of hours and
can stream a chromosome that doesn't fit in memory.
"""

import collections
import gzip
import os
import warnings


[docs] class MemoryLimitedWarning(UserWarning): """A load that is at risk of exhausting memory or taking far longer than the VCZ path would. Emitted before parsing by ``HaplotypeMatrix.from_vcf`` and ``GenotypeMatrix.from_vcf`` when the VCF on disk is large enough, or has enough samples, that VCF text parsing will be slow and the full-matrix host load may not even fit. The warning text includes a copy-pastable recipe for converting the VCF to VCZ via ``HaplotypeMatrix.vcf_to_zarr``. Silence with:: import warnings from pg_gpu import MemoryLimitedWarning warnings.filterwarnings("ignore", category=MemoryLimitedWarning) """
#: Trigger thresholds. Above these sizes / sample counts a VCF load is #: slow enough that converting to VCZ once pays for itself within a #: single re-read. The constants are kwargs on ``_maybe_memory_warn`` #: so tests can lower them without a global monkeypatch. VCF_WARN_BYTES = 10 * 1024 ** 3 # 10 GiB on disk VCF_WARN_SAMPLES = 5_000 VCF_WARN_REGION_BP = 5_000_000 # 5 Mb #: Maximum number of distinct VCF paths the in-process warn cache will #: track before it starts evicting the oldest entries. Caps memory in #: long-running processes (Jupyter, servers) that hit many unique VCFs. _WARN_CACHE_MAX = 1000 #: Cache of already-warned paths -- keys are canonicalized via #: ``os.path.realpath`` so ``./foo.vcf`` and ``/abs/foo.vcf`` collapse. #: Insertion-ordered: when full, the oldest entry is popped first. _warned_paths = collections.OrderedDict() def _vcf_header_sample_count(path): """Return the number of samples in a VCF's ``#CHROM`` header line, or ``None`` if the file cannot be opened or has no header. Reads the header only -- stops at the first non-``##`` line. On a very large VCF this finishes in milliseconds even when the file is hundreds of GB.""" opener = (gzip.open if path.endswith(".gz") or path.endswith(".bgz") else open) try: with opener(path, "rt") as f: for line in f: if line.startswith("#CHROM"): parts = line.rstrip("\n").rstrip("\r").split("\t") # columns 0-8 are CHROM, POS, ..., FORMAT; samples # start at column 9. return max(0, len(parts) - 9) if not line.startswith("##"): return None except OSError: return None return None def _region_span_bp(region): """Width of a ``'chrom:start-end'`` string in bp, or 0 on a parse failure. Used to decide whether a region argument is small enough that VCF loading is the right tool even on a large file.""" if region is None: return 0 from .zarr_io import _parse_region try: _, start, end = _parse_region(region) except (ValueError, AttributeError, TypeError): return 0 return max(0, end - start) def _format_warning_text(path, size_bytes, n_samples): return ( f"{path} is {size_bytes / 1e9:.1f} GB" f"{f' with {n_samples:,} samples' if n_samples is not None else ''}. " "Loading this VCF will be slow because VCF text parsing is " "single-threaded in htslib, and the full genotype matrix " "must fit in host memory. Convert once to VCZ and use " "HaplotypeMatrix.from_zarr() instead -- subsequent reads finish " "in seconds rather than hours, and the streaming path can " "walk a chromosome larger than memory chunk by chunk.\n\n" "One-time conversion:\n" " from pg_gpu import HaplotypeMatrix\n" f" HaplotypeMatrix.vcf_to_zarr({path!r},\n" f" {(path.rsplit('.', 1)[0] + '.vcz')!r})\n\n" "Then in pg_gpu:\n" f" hm = HaplotypeMatrix.from_zarr({(path.rsplit('.', 1)[0] + '.vcz')!r})\n\n" "To silence:\n" " import warnings\n" " from pg_gpu import MemoryLimitedWarning\n" " warnings.filterwarnings('ignore', category=MemoryLimitedWarning)\n" ) def _maybe_memory_warn(path, *, region=None, warn_bytes=VCF_WARN_BYTES, warn_samples=VCF_WARN_SAMPLES, warn_region_bp=VCF_WARN_REGION_BP, stacklevel=3): """Emit ``MemoryLimitedWarning`` if loading ``path`` will be slow. Triggers when either: * The on-disk file size exceeds ``warn_bytes`` AND either no ``region`` was passed or the region's span exceeds ``warn_region_bp``. A tabix-region read of a few Mb from a 50 GB VCF is fine and does not warn. * The ``#CHROM`` header lists more than ``warn_samples`` samples. Cached per path within a process so repeat loads of the same file only warn once. Used by both ``HaplotypeMatrix.from_vcf`` and ``GenotypeMatrix.from_vcf``. """ # Symlinks and relative paths to the same inode should only warn # once; canonicalize before consulting the cache. Stat is cheap and # only runs on a path we have not warned on yet. try: canonical = os.path.realpath(path) except OSError: canonical = path if canonical in _warned_paths: return try: size = os.path.getsize(canonical) except OSError: return big_file = size > warn_bytes region_big = (region is None or _region_span_bp(region) > warn_region_bp) # The sample-count check requires opening the VCF header, which on a # gzipped multi-GB file decompresses several KB of metadata # lines. Skip it when the size check already cannot trigger -- a # below-threshold file with a too-small region cannot warn on # sample count alone either, so the read is pure waste. if not big_file and (region is not None and not region_big): return n_samples = _vcf_header_sample_count(canonical) too_many_samples = (n_samples is not None and n_samples > warn_samples) if (big_file and region_big) or too_many_samples: warnings.warn( _format_warning_text(canonical, size, n_samples), MemoryLimitedWarning, stacklevel=stacklevel, ) _warned_paths[canonical] = None while len(_warned_paths) > _WARN_CACHE_MAX: _warned_paths.popitem(last=False)