"""
GPU-accelerated dimensionality reduction and distance computation.
Provides PCA, randomized PCA, PCoA, pairwise genetic distance, and
Li & Ralph (2019) local PCA (lostruct) functions using CuPy.
"""
from dataclasses import dataclass
from typing import Union, Optional, Tuple
import numpy as np
import cupy as cp
import pandas as pd
from .haplotype_matrix import HaplotypeMatrix
from ._utils import get_population_matrix as _get_population_matrix
_GPU_MEM_BUDGET = 0.3
def _prepare_matrix(haplotype_matrix, scaler, population, missing_data='include'):
"""Center and scale haplotype matrix for PCA.
Uses chunked processing for large matrices to avoid OOM.
Returns the prepared matrix X on GPU.
"""
from ._memutil import dac_and_n, estimate_variant_chunk_size
if population is not None:
matrix = _get_population_matrix(haplotype_matrix, population)
else:
matrix = haplotype_matrix
if matrix.device == 'CPU':
matrix.transfer_to_gpu()
hap = matrix.haplotypes
n_samples, n_var = hap.shape
if missing_data == 'exclude':
dac, nv = dac_and_n(hap)
complete = nv == n_samples
if not cp.all(complete):
hap = hap[:, complete]
n_var = hap.shape[1]
# Compute per-site mean and scale from allele counts (memory-safe)
dac, nv = dac_and_n(hap)
site_mean = cp.where(nv > 0, dac.astype(cp.float64) / nv.astype(cp.float64), 0.0)
if scaler == 'patterson':
p = site_mean
scale = cp.sqrt(p * (1 - p))
scale = cp.where(scale > 0, scale, 1.0)
elif scaler == 'standard':
# Need variance -- compute via second pass or approximate
scale = None # handled below
else:
scale = None
# Check if full float64 matrix fits in memory
free = cp.cuda.Device().mem_info[0]
needed = n_samples * n_var * 8 # float64
if needed < free * 0.4:
# Fast path: materialize full matrix
has_missing = bool(cp.any(hap < 0).get())
if has_missing:
valid_mask = hap >= 0
X = cp.where(valid_mask, hap, 0).astype(cp.float64)
X = cp.where(valid_mask, X, site_mean[None, :])
else:
X = hap.astype(cp.float64)
X -= site_mean
if scaler == 'patterson':
X /= scale
elif scaler == 'standard':
std = cp.std(X, axis=0)
valid = std > 0
X[:, valid] /= std[valid]
X[:, ~valid] = 0
return X
# Memory-constrained path: return hap + metadata for chunked PCA
# Store scaling info so pca() can chunk the matmul
return _DeferredPCA(hap, site_mean, scale, scaler)
class _DeferredPCA:
"""Wrapper for memory-constrained PCA: stores hap + scaling metadata
so the caller can compute X @ X.T via chunked matmul without
materializing the full float64 matrix."""
def __init__(self, hap, site_mean, scale, scaler):
self.hap = hap
self.site_mean = site_mean
self.scale = scale
self.scaler = scaler
self.shape = hap.shape
def _scale_chunk(self, start, end):
"""Prepare a scaled float64 chunk of the matrix."""
chunk = self.hap[:, start:end]
# Check this chunk for missing (avoids full-matrix boolean allocation)
has_missing_chunk = bool(cp.any(chunk < 0).get())
if has_missing_chunk:
valid = chunk >= 0
x = cp.where(valid, chunk, 0).astype(cp.float64)
x = cp.where(valid, x, self.site_mean[start:end])
else:
x = chunk.astype(cp.float64)
x -= self.site_mean[start:end]
if self.scaler == 'patterson' and self.scale is not None:
x /= self.scale[start:end]
return x
@property
def _chunk_size(self):
from ._memutil import estimate_variant_chunk_size
return estimate_variant_chunk_size(self.shape[0], bytes_per_element=8,
n_intermediates=2)
def chunked_gram(self):
"""Compute X @ X.T via chunked processing."""
n_samples, n_var = self.shape
C = cp.zeros((n_samples, n_samples), dtype=cp.float64)
for start in range(0, n_var, self._chunk_size):
end = min(start + self._chunk_size, n_var)
x = self._scale_chunk(start, end)
C += x @ x.T
del x
return C
def __matmul__(self, other):
"""Compute X @ other via chunked processing."""
n_samples, n_var = self.shape
result = cp.zeros((n_samples, other.shape[1]), dtype=cp.float64)
for start in range(0, n_var, self._chunk_size):
end = min(start + self._chunk_size, n_var)
x = self._scale_chunk(start, end)
result += x @ other[start:end, :]
del x
return result
@property
def T(self):
"""Return a transposed view for X.T @ Y operations."""
return _DeferredPCATranspose(self)
class _DeferredPCATranspose:
"""Transpose view of _DeferredPCA for X.T @ Y operations."""
def __init__(self, parent):
self.parent = parent
self.shape = (parent.shape[1], parent.shape[0])
def __matmul__(self, other):
"""Compute X.T @ other via chunked processing."""
n_samples, n_var = self.parent.shape
result = cp.zeros((n_var, other.shape[1]), dtype=cp.float64)
for start in range(0, n_var, self.parent._chunk_size):
end = min(start + self.parent._chunk_size, n_var)
x = self.parent._scale_chunk(start, end)
result[start:end, :] = x.T @ other
del x
return result
def _pca_from_gram(C, n_samples, n_components):
"""Extract PCA coordinates from a Gram matrix (X @ X.T).
Eigendecomposes the n x n Gram matrix instead of running SVD on the
full n x m data matrix. Equivalent when n_samples <= n_variants.
"""
eigvals, eigvecs = cp.linalg.eigh(C)
idx = cp.argsort(eigvals)[::-1]
eigvals = eigvals[idx]
eigvecs = eigvecs[:, idx]
coords = eigvecs[:, :n_components] * cp.sqrt(cp.maximum(eigvals[:n_components], 0))
var = eigvals / n_samples
explained_variance_ratio = var[:n_components] / cp.sum(cp.maximum(var, 0))
return coords.get(), explained_variance_ratio.get()
[docs]
def pca(haplotype_matrix: HaplotypeMatrix,
n_components: int = 10,
scaler: Optional[str] = 'patterson',
population: Optional[Union[str, list]] = None,
missing_data: str = 'include'):
"""Principal Component Analysis on haplotype data.
Parameters
----------
haplotype_matrix : HaplotypeMatrix
Haplotype data. Rows are haplotypes, columns are variants.
n_components : int
Number of principal components to compute.
scaler : str or None
Scaling method before PCA:
'patterson' - scale by sqrt(p*(1-p)), Patterson et al. (2006)
'standard' - standardize to unit variance per variant
None - center only (subtract mean)
population : str or list, optional
Population subset to use.
missing_data : str
'include' - impute missing to per-site mean
'exclude' - filter sites with any missing
Returns
-------
coords : ndarray, float64, shape (n_samples, n_components)
Sample coordinates in PC space.
explained_variance_ratio : ndarray, float64, shape (n_components,)
Proportion of variance explained by each component.
"""
prepared = _prepare_matrix(haplotype_matrix, scaler, population, missing_data)
if isinstance(prepared, _DeferredPCA):
n_samples = prepared.shape[0]
n_components = min(n_components, n_samples)
C = prepared.chunked_gram()
return _pca_from_gram(C, n_samples, n_components)
X = prepared
n_samples, n_variants = X.shape
n_components = min(n_components, min(n_samples, n_variants))
# When n_samples <= n_variants (typical for popgen), eigendecompose
# the n x n Gram matrix X @ X.T instead of full SVD on n x m.
if n_samples <= n_variants:
C = X @ X.T
return _pca_from_gram(C, n_samples, n_components)
# Fallback: full SVD when n_samples > n_variants
try:
U, S, Vt = cp.linalg.svd(X, full_matrices=False)
except Exception as e:
if 'CUSOLVER' in str(type(e).__name__) or 'CUSOLVER' in str(e):
raise RuntimeError(
f"Full SVD failed on matrix of shape ({n_samples}, {n_variants}). "
f"This dataset is too large for exact PCA. "
f"Use randomized_pca() instead, which handles large datasets efficiently."
) from e
raise
coords = U[:, :n_components] * S[:n_components]
var = (S ** 2) / n_samples
explained_variance_ratio = var[:n_components] / cp.sum(var)
return coords.get(), explained_variance_ratio.get()
[docs]
def randomized_pca(haplotype_matrix: HaplotypeMatrix,
n_components: int = 10,
scaler: Optional[str] = 'patterson',
population: Optional[Union[str, list]] = None,
n_iter: int = 3,
random_state: Optional[int] = None,
missing_data: str = 'include'):
"""Randomized PCA using truncated SVD approximation.
Faster than full PCA for large datasets where only a few
components are needed.
Parameters
----------
haplotype_matrix : HaplotypeMatrix
Haplotype data.
n_components : int
Number of components.
scaler : str or None
'patterson', 'standard', or None.
population : str or list, optional
Population subset.
n_iter : int
Number of power iterations for accuracy.
random_state : int, optional
Random seed for reproducibility.
Returns
-------
coords : ndarray, float64, shape (n_samples, n_components)
explained_variance_ratio : ndarray, float64, shape (n_components,)
"""
X = _prepare_matrix(haplotype_matrix, scaler, population, missing_data)
n_samples, n_variants = X.shape
n_components = min(n_components, min(n_samples, n_variants))
# randomized SVD on GPU
if random_state is not None:
cp.random.seed(random_state)
# random projection
k = n_components + 10 # oversampling
k = min(k, min(n_samples, n_variants))
Omega = cp.random.randn(n_variants, k, dtype=cp.float64)
Y = X @ Omega
# power iteration for accuracy
for _ in range(n_iter):
Y = X @ (X.T @ Y)
# QR decomposition
Q, _ = cp.linalg.qr(Y)
# project and SVD in reduced space
if isinstance(X, _DeferredPCA):
# B = Q.T @ X has shape (k, n_var) — too wide for cuSOLVER SVD.
# Instead compute B @ B.T = Q.T @ (X @ X.T) @ Q = Q.T @ gram @ Q
# and eigendecompose the small (k, k) matrix.
gram = X.chunked_gram() # (n_samples, n_samples)
M = Q.T @ gram @ Q # (k, k)
eigvals, eigvecs = cp.linalg.eigh(M)
idx = cp.argsort(eigvals)[::-1]
eigvals = eigvals[idx]
eigvecs = eigvecs[:, idx]
S = cp.sqrt(cp.maximum(eigvals, 0))
Uhat = eigvecs
else:
B = Q.T @ X
Uhat, S, Vt = cp.linalg.svd(B, full_matrices=False)
U = Q @ Uhat
coords = U[:, :n_components] * S[:n_components]
# explained variance
if isinstance(X, _DeferredPCA):
# Reuse gram from above (already computed for eigendecomposition)
total_var = cp.trace(gram) / n_samples
else:
total_var = cp.sum(cp.var(X, axis=0))
var = (S[:n_components] ** 2) / n_samples
explained_variance_ratio = var / total_var
return coords.get(), explained_variance_ratio.get()
[docs]
def pairwise_distance(haplotype_matrix: HaplotypeMatrix,
metric: str = 'euclidean',
population: Optional[Union[str, list]] = None,
missing_data: str = 'include'):
"""Compute pairwise distances between samples.
Parameters
----------
haplotype_matrix : HaplotypeMatrix
Haplotype data.
metric : str
Distance metric. Supported on GPU: 'euclidean', 'cityblock',
'sqeuclidean'. Falls back to scipy for other metrics.
population : str or list, optional
Population subset.
missing_data : str
'include' - mask missing, normalize per pair
'exclude' - filter sites with any missing
Returns
-------
dist : ndarray, float64, shape (n_samples * (n_samples - 1) // 2,)
Condensed distance matrix.
"""
if population is not None:
matrix = _get_population_matrix(haplotype_matrix, population)
else:
matrix = haplotype_matrix
if matrix.device == 'CPU':
matrix.transfer_to_gpu()
hap = matrix.haplotypes
if missing_data == 'exclude':
missing_per_var = cp.sum(hap < 0, axis=0)
complete = missing_per_var == 0
hap = hap[:, complete]
X = cp.where(hap >= 0, hap, 0).astype(cp.float64)
valid_mask = (hap >= 0).astype(cp.float64)
has_missing = cp.any(hap < 0)
n = X.shape[0]
if metric in ('euclidean', 'sqeuclidean', 'cityblock'):
idx_i, idx_j = cp.triu_indices(n, k=1)
n_pairs = len(idx_i)
# Estimate batch size from available GPU memory
n_variants = X.shape[1]
free_mem = cp.cuda.Device().mem_info[0]
# Each pair needs ~3 float64 arrays of n_variants (diff, joint, result)
bytes_per_pair = n_variants * 8 * 3
batch_size = max(1, min(n_pairs, int(free_mem * 0.3 / bytes_per_pair)))
dist_parts = []
for start in range(0, n_pairs, batch_size):
end = min(start + batch_size, n_pairs)
bi = idx_i[start:end]
bj = idx_j[start:end]
if has_missing:
# only compare at jointly-valid sites
joint = valid_mask[bi] * valid_mask[bj]
n_joint = cp.sum(joint, axis=1)
else:
n_joint = cp.float64(X.shape[1])
if metric == 'cityblock':
raw = cp.sum(cp.abs(X[bi] - X[bj]) * (joint if has_missing else 1.0), axis=1)
else:
raw = cp.sum(((X[bi] - X[bj]) ** 2) * (joint if has_missing else 1.0), axis=1)
# normalize by jointly-valid sites
if has_missing:
d = cp.where(n_joint > 0, raw * X.shape[1] / n_joint, 0.0)
else:
d = raw
if metric == 'euclidean':
d = cp.sqrt(d)
dist_parts.append(d)
return cp.concatenate(dist_parts).get()
else:
from scipy.spatial.distance import pdist
X_cpu = X.get()
return pdist(X_cpu, metric=metric)
[docs]
def pcoa(dist, n_components: Optional[int] = None):
"""Principal Coordinate Analysis (classical MDS).
Parameters
----------
dist : array_like
Pairwise distances in condensed form (from pairwise_distance)
or as a square distance matrix.
n_components : int, optional
Number of dimensions to return. If None, returns all
dimensions with positive eigenvalues.
Returns
-------
coords : ndarray, float64, shape (n_samples, n_components)
Sample coordinates.
explained_variance_ratio : ndarray, float64, shape (n_components,)
Proportion of variance explained by each axis.
"""
from scipy.spatial.distance import squareform
dist = np.asarray(dist, dtype='f8')
# convert condensed to square if needed
if dist.ndim == 1:
D = squareform(dist)
else:
D = dist
n = D.shape[0]
# double-centering
D_sq = D ** 2
row_mean = D_sq.mean(axis=1, keepdims=True)
col_mean = D_sq.mean(axis=0, keepdims=True)
grand_mean = D_sq.mean()
F = -0.5 * (D_sq - row_mean - col_mean + grand_mean)
# eigendecomposition
eigvals, eigvecs = np.linalg.eigh(F)
# sort descending
idx = np.argsort(eigvals)[::-1]
eigvals = eigvals[idx]
eigvecs = eigvecs[:, idx]
# keep positive eigenvalues
pos = eigvals > 0
eigvals = eigvals[pos]
eigvecs = eigvecs[:, pos]
if n_components is not None:
eigvals = eigvals[:n_components]
eigvecs = eigvecs[:, :n_components]
# project
coords = eigvecs * np.sqrt(eigvals)
# explained variance
explained_variance_ratio = eigvals / np.sum(eigvals)
return coords, explained_variance_ratio
# ---------------------------------------------------------------------------
# Local PCA (lostruct; Li & Ralph 2019)
# ---------------------------------------------------------------------------
[docs]
@dataclass
class LocalPCAResult:
"""Per-window local PCA output.
Attributes
----------
windows : pandas.DataFrame
One row per window (chrom, start, end, center, n_variants, window_id).
eigvals : numpy.ndarray, float64, shape (n_windows, k)
Top-k eigenvalues of each window's sample-sample covariance matrix,
descending. NaN for windows with fewer than k variants.
eigvecs : numpy.ndarray, float64, shape (n_windows, k, n_samples)
Top-k eigenvectors. NaN for invalid windows.
sumsq : numpy.ndarray, float64, shape (n_windows,)
Sum of squared entries of each window's covariance matrix; used by
`pc_dist` for the proportion-of-variance denominator.
k : int
Number of PCs retained.
scaler : str or None
missing_data : str
jackknife_se : numpy.ndarray or None
Delete-1 block jackknife SE of the top-k eigenvectors. Shape
``(n_windows, k)`` with ``aggregate='mean'``, or
``(n_windows, k, n_samples)`` with ``aggregate=None``.
``None`` when jackknife was not computed.
"""
windows: "pd.DataFrame"
eigvals: np.ndarray
eigvecs: np.ndarray
sumsq: np.ndarray
k: int
scaler: Optional[str]
missing_data: str
jackknife_se: Optional[np.ndarray] = None
@property
def n_windows(self) -> int:
return self.eigvals.shape[0]
@property
def n_samples(self) -> int:
return self.eigvecs.shape[2]
[docs]
def to_lostruct_matrix(self) -> np.ndarray:
"""Flat `(n_windows, 1 + k + k*n_samples)` matrix matching
`lostruct::eigen_windows()` output layout.
Eigenvector block is column-major per R: PC_1 samples 1..N, then PC_2
samples 1..N, etc. That coincides with the C-order flatten of our
`(k, n_samples)` per-window array.
"""
nw = self.n_windows
out = np.empty((nw, 1 + self.k + self.k * self.n_samples), dtype=np.float64)
out[:, 0] = self.sumsq
out[:, 1:1 + self.k] = self.eigvals
out[:, 1 + self.k:] = self.eigvecs.reshape(nw, -1)
return out
def _window_gram(X: "cp.ndarray", n_var: int) -> Tuple["cp.ndarray", "cp.ndarray"]:
"""Return lostruct-equivalent sample-sample covariance and its sum-of-squares.
Replicates R's `cov(sweep(x, 1, rowMeans(x), "-"))`: already-variant-centered
`X` (rows=samples, cols=variants) gets an additional sample-centering, then
`C = (X @ X.T) / (n_var - 1)` and `sumsq = sum(C**2)`. Sum-of-squares is
returned as a 0-d GPU scalar so callers can defer the host sync.
"""
X = X - X.mean(axis=1, keepdims=True)
denom = max(n_var - 1, 1)
C = (X @ X.T) / denom
return C, (C ** 2).sum()
def _local_pca_window_dense(X: "cp.ndarray", n_var: int, k: int
) -> Tuple["cp.ndarray", "cp.ndarray", "cp.ndarray"]:
"""Per-window dense top-k eigenpairs and Frobenius-squared sumsq.
Forms the (n_hap x n_hap) Gram transiently, runs `cp.linalg.eigh`, and
returns top-k. Bit-identical numerics to ``_window_gram`` followed by
eigh + ``cp.stack`` extraction; designed so the streaming dispatcher
can drop this in per window without touching the legacy code path.
Parameters
----------
X : cupy.ndarray, shape (n_hap, n_var_w)
Variant-centred haplotype block (the caller has already
applied per-variant centering / scaling via ``_prepare_matrix``).
n_var : int
Span normaliser (matches ``_window_gram`` -- usually
``X.shape[1]`` but exposed so the caller can pass the original
window's variant count for thin-window handling).
k : int
Number of leading eigenpairs to retain.
Returns
-------
eigvals : cupy.ndarray, shape (k,), float64, descending
eigvecs : cupy.ndarray, shape (k, n_hap), float64, rows are eigenvectors
sumsq : cupy.ndarray, 0-d float64, equal to ``(C ** 2).sum()`` where
C is the same Gram used internally.
"""
C, sumsq = _window_gram(X, n_var)
evals, evecs = cp.linalg.eigh(C)
n = C.shape[0]
# eigh returns ascending; the top-k are the last k columns of evecs.
# Reverse to descending and transpose so rows are eigenvectors.
top_idx = cp.arange(n - 1, n - 1 - k, -1)
top_vals = evals[top_idx]
top_vecs = cp.transpose(evecs[:, top_idx], (1, 0))
return top_vals, top_vecs, sumsq
def _local_pca_window_rsvd(X: "cp.ndarray", n_var: int, k: int,
oversample: int = 8, n_iter: int = 1,
rng: Optional["cp.random.RandomState"] = None
) -> Tuple["cp.ndarray", "cp.ndarray", "cp.ndarray"]:
"""Per-window randomized top-k eigenpairs of ``C_w = X X^T / (n_var-1)``.
Halko-Martinsson-Tropp randomized SVD applied directly to the centred
haplotype block; never materialises the (n_hap x n_hap) Gram. Cost
is `O(n_hap * n_var_w * (k + oversample))` per window vs the dense
path's `O(n_hap^2 * n_var_w + n_hap^3)`.
The eigenvalues of ``C_w`` are exactly the squared singular values of
the sample-centred ``X`` divided by ``(n_var - 1)``; this routine
samples a Gaussian projection of the variant axis, performs ``n_iter``
subspace iterations on the resulting range estimate, and runs a small
SVD on the projected block.
The returned ``sumsq`` is ``sum_{i=1}^{k+oversample} sigma_i^4 /
(n_var-1)^2``, an approximation to ``(C ** 2).sum()`` that drops the
contribution of singular values beyond the oversample budget. With
``oversample >= 2k`` this is accurate to better than 5e-3 relative
on the regimes lostruct cares about (low-rank-plus-noise spectra).
Falls back to ``_local_pca_window_dense`` when
``n_var_w < k + oversample`` so very thin windows use the
deterministic path.
"""
n_hap, n_var_w = X.shape
r = int(k + oversample)
if n_var_w < r:
return _local_pca_window_dense(X, n_var, k)
if rng is None:
rng = cp.random.RandomState()
# 1. Sample-centre to match _window_gram semantics.
X_c = X - X.mean(axis=1, keepdims=True)
# 2. Random Gaussian projection of the variant axis.
Omega = rng.standard_normal((n_var_w, r), dtype=cp.float64)
Y = X_c @ Omega # (n_hap, r)
# 3. Subspace iteration: Y <- (X X^T) Y, with reorthonormalisation.
for _ in range(n_iter):
Q, _ = cp.linalg.qr(Y) # (n_hap, r)
Z = X_c.T @ Q # (n_var_w, r)
Y = X_c @ Z # (n_hap, r)
Q, _ = cp.linalg.qr(Y) # (n_hap, r)
# 4. Small SVD on the projected block.
B = Q.T @ X_c # (r, n_var_w)
U_small, sigma, _ = cp.linalg.svd(B, full_matrices=False)
# sigma is (r,) descending.
# 5. Lift the small left singular vectors back to the original space.
U = Q @ U_small # (n_hap, r)
denom = max(n_var - 1, 1)
eigvals_all = (sigma ** 2) / denom # (r,)
eigvals = eigvals_all[:k] # (k,)
eigvecs = cp.transpose(U[:, :k], (1, 0)) # (k, n_hap)
# sumsq approx: sum sigma_i^4 / d^2 over the computed budget.
sumsq = (eigvals_all ** 2).sum() * denom ** 2 / denom ** 2 # = (sigma^4).sum() / d^2
# Note: written so that the algebra is transparent; cupy will fold it.
return eigvals, eigvecs, sumsq
def _local_pca_streaming(matrix, iterator, k, scaler, missing_data,
engine, tile_size, oversample,
n_subspace_iter, random_state, window_params):
"""Streaming local PCA dispatcher (engine='streaming-dense' or
'streaming-rsvd').
Iterates the WindowIterator and processes each window's per-window
kernel through to host arrays before continuing, so peak GPU
memory is bounded by the per-window working set rather than
n_windows. Periodically calls ``cp.get_default_memory_pool().
free_all_blocks()`` to keep the cupy pool from fragmenting across
long whole-genome runs.
The ``tile_size`` argument is the period at which the cupy memory
pool is asked to release unused blocks (auto-sized to ~16 windows
if not given). It does not affect numerics; lowering it reduces
peak fragmentation at a small per-call cost.
"""
n_samples = matrix.num_haplotypes
if tile_size is None:
tile_size = 16
if engine == 'streaming-rsvd':
seed = 12345 if random_state is None else int(random_state)
rng = cp.random.RandomState(seed=seed)
else:
rng = None
window_meta = []
eigvals_gpu_chunks = []
eigvecs_gpu_chunks = []
sumsq_gpu_chunks = []
# Per-window kernel outputs accumulate as GPU tensors. They are tiny
# (k floats + (k, n_hap) floats + 1 float per window) so retaining
# them for the duration of the loop is cheap, and avoiding the
# per-window cp.asnumpy / float(...) sync chain is the whole point
# of issue #91. cp.stack at the end gives a single contiguous
# (n_windows, ...) tensor; the .get() that follows is the only
# D->H transfer the dispatcher does.
n_processed = 0
for window in iterator:
is_valid = window.n_variants >= max(k, 2)
window_meta.append((
window.chrom, window.start, window.end, window.center,
window.n_variants, window.window_id, is_valid,
))
if not is_valid:
eigvals_gpu_chunks.append(cp.full(k, cp.nan, dtype=cp.float64))
eigvecs_gpu_chunks.append(
cp.full((k, n_samples), cp.nan, dtype=cp.float64))
sumsq_gpu_chunks.append(cp.asarray(cp.nan, dtype=cp.float64))
del window
continue
X = _prepare_matrix(window.matrix, scaler, population=None,
missing_data=missing_data)
X = _materialize_prepared(X)
if engine == 'streaming-dense':
evals, evecs, sumsq = _local_pca_window_dense(
X, X.shape[1], k)
else: # streaming-rsvd
evals, evecs, sumsq = _local_pca_window_rsvd(
X, X.shape[1], k,
oversample=oversample,
n_iter=n_subspace_iter,
rng=rng,
)
eigvals_gpu_chunks.append(evals.astype(cp.float64, copy=False))
eigvecs_gpu_chunks.append(evecs.astype(cp.float64, copy=False))
sumsq_gpu_chunks.append(sumsq.astype(cp.float64, copy=False))
del X, evals, evecs, sumsq, window
n_processed += 1
if n_processed % tile_size == 0:
cp.get_default_memory_pool().free_all_blocks()
cp.get_default_memory_pool().free_all_blocks()
if len(window_meta) == 0:
raise ValueError("WindowIterator produced no windows.")
# Single bulk D->H transfer for all per-window outputs (issue #91).
eigvals_host = cp.stack(eigvals_gpu_chunks, axis=0).get()
eigvecs_host = cp.stack(eigvecs_gpu_chunks, axis=0).get()
sumsq_host = cp.stack(sumsq_gpu_chunks, axis=0).get()
del eigvals_gpu_chunks, eigvecs_gpu_chunks, sumsq_gpu_chunks
cp.get_default_memory_pool().free_all_blocks()
valid_mask = np.array(
[meta[6] for meta in window_meta], dtype=bool)
eigvals_host[~valid_mask] = np.nan
eigvecs_host[~valid_mask] = np.nan
sumsq_host[~valid_mask] = np.nan
chrom_col, start_col, end_col, center_col, nvar_col, wid_col, _ = zip(
*window_meta)
windows_df = pd.DataFrame({
'chrom': list(chrom_col),
'start': list(start_col),
'end': list(end_col),
'center': list(center_col),
'n_variants': list(nvar_col),
'window_id': list(wid_col),
})
return LocalPCAResult(
windows=windows_df,
eigvals=eigvals_host,
eigvecs=eigvecs_host,
sumsq=sumsq_host,
k=k,
scaler=scaler,
missing_data=missing_data,
)
def _batched_top_k_eigh(gram_stack: "cp.ndarray", k: int,
batch_size: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""Batched eigendecomposition on a stack of symmetric matrices.
Returns top-k eigvals (descending) and eigvecs as host arrays.
Parameters
----------
gram_stack : cupy.ndarray, float64, shape (n_windows, n, n)
k : int
batch_size : int, optional
Chunk size across the window axis. If None, uses all windows at once
unless memory budget is exceeded.
Returns
-------
eigvals : numpy.ndarray, shape (n_windows, k)
eigvecs : numpy.ndarray, shape (n_windows, k, n)
"""
n_windows, n, _ = gram_stack.shape
if batch_size is None:
free = cp.cuda.Device().mem_info[0]
per_window = n * n * 8 * 4 # gram + workspace + eigvecs + scratch
batch_size = max(1, min(n_windows,
int(free * _GPU_MEM_BUDGET) // max(per_window, 1)))
eigvals_out = np.empty((n_windows, k), dtype=np.float64)
eigvecs_out = np.empty((n_windows, k, n), dtype=np.float64)
for start in range(0, n_windows, batch_size):
end = min(start + batch_size, n_windows)
evals, evecs = cp.linalg.eigh(gram_stack[start:end])
# eigh returns ascending; top-k is the last k reversed.
top = cp.arange(n - 1, n - 1 - k, -1)
top_vals = evals[:, top]
top_vecs = cp.transpose(evecs[:, :, top], (0, 2, 1))
eigvals_out[start:end] = top_vals.get()
eigvecs_out[start:end] = top_vecs.get()
return eigvals_out, eigvecs_out
def _resolve_window_params(window_params, window_size, step_size, window_type,
regions, caller: str):
"""Return a WindowParams, either passed through or built from short-hand."""
from .windowed_analysis import WindowParams
if window_params is not None:
return window_params
if window_size is None:
raise ValueError(f"{caller} requires window_params or window_size.")
return WindowParams(
window_type=window_type,
window_size=window_size,
step_size=step_size if step_size is not None else window_size,
regions=regions,
)
def _materialize_prepared(X):
"""If `_prepare_matrix` returned a deferred chunked view, concatenate it.
Per-window matrices are small enough that a single contiguous copy is fine
and it lets us apply additional in-place centering downstream.
"""
if isinstance(X, _DeferredPCA):
n_var = X.shape[1]
return cp.concatenate(
[X._scale_chunk(s, min(s + X._chunk_size, n_var))
for s in range(0, n_var, X._chunk_size)], axis=1)
return X
def _estimate_n_windows(matrix, window_params) -> int:
"""Upper bound on the number of windows ``WindowIterator`` will yield.
Exact for ``window_type='snp'`` and ``'regions'``. For ``'bp'`` this is
an upper bound (some bp windows may be empty and skipped); overestimating
only biases the auto-selector toward streaming, which is safe.
"""
if window_params.window_type == 'regions':
return 0 if window_params.regions is None else len(window_params.regions)
if window_params.window_type == 'snp':
n_var = matrix.num_variants
if n_var < window_params.window_size:
return 0
return (n_var - window_params.window_size) // window_params.step_size + 1
if window_params.window_type == 'bp':
positions = matrix.positions
if isinstance(positions, cp.ndarray):
chrom_start = int(positions[0].get())
chrom_end = int(positions[-1].get())
else:
chrom_start = int(positions[0])
chrom_end = int(positions[-1])
span = max(0, chrom_end - chrom_start)
return span // window_params.step_size + 1
return 0
def _pick_dense_engine(matrix, window_params,
free_bytes: Optional[int] = None,
budget_fraction: float = 0.7) -> str:
"""Choose between 'dense-eigh' and 'streaming-dense' for engine='auto'.
The dense-eigh path holds a ``(n_windows, n_hap, n_hap)`` Gram stack on
the GPU and runs a single batched ``cp.linalg.eigh`` (~2x faster per
window than streaming when it fits). It falls over when the stack plus
the cuSOLVER eigh workspace exceed free GPU memory. Estimate the peak
as 3x the Gram-stack size (Gram + workspace + eigvecs) and pick
streaming when that exceeds ``budget_fraction`` of free memory.
Parameters
----------
matrix : HaplotypeMatrix
The (possibly population-filtered) matrix that will be windowed.
window_params : WindowParams
free_bytes : int, optional
Free GPU memory in bytes. Defaults to
``cp.cuda.Device().mem_info[0]`` (queried once at decision time).
budget_fraction : float
Fraction of free GPU memory to budget for the Gram-stack peak.
Default 0.7 leaves room for the haplotype matrix, the cupy
memory pool's existing allocations, and other concurrent GPU work.
"""
n_samples = matrix.num_haplotypes
n_windows_est = _estimate_n_windows(matrix, window_params)
if n_windows_est == 0:
return 'dense-eigh'
peak_bytes = 3 * n_windows_est * n_samples * n_samples * 8
if free_bytes is None:
free_bytes = cp.cuda.Device().mem_info[0]
if peak_bytes < budget_fraction * free_bytes:
return 'dense-eigh'
return 'streaming-dense'
[docs]
def local_pca(haplotype_matrix: "HaplotypeMatrix",
window_params=None,
k: int = 2,
scaler: Optional[str] = None,
missing_data: str = 'include',
population: Optional[Union[str, list]] = None,
batch_size: Optional[int] = None,
window_size: Optional[int] = None,
step_size: Optional[int] = None,
window_type: str = 'snp',
regions=None,
engine: str = 'dense-eigh',
tile_size: Optional[int] = None,
oversample: int = 8,
n_subspace_iter: int = 1,
random_state: Optional[int] = None) -> LocalPCAResult:
"""Local PCA along the genome (lostruct; Li & Ralph 2019).
For each genomic window, computes the top-k eigenvalues/eigenvectors of the
sample-sample covariance matrix. The covariance matches R's
`cov(sweep(x, 1, rowMeans(x), "-"))` so outputs are directly comparable to
`lostruct::eigen_windows()`.
Parameters
----------
haplotype_matrix : HaplotypeMatrix
window_params : WindowParams, optional
Pre-built window spec. Required when ``window_size`` is not given.
k : int
Number of PCs to retain per window.
scaler : str or None
None (lostruct default), 'patterson', or 'standard'. See `pca()`.
missing_data : str
'include' (impute to per-site mean) or 'exclude' (drop missing sites).
lostruct uses pairwise-complete; we approximate with mean imputation.
population : str or list, optional
batch_size : int, optional
Windows per batched eigh call. Auto-sized if None.
window_size, step_size, window_type, regions
Short-hand for constructing a `WindowParams` without importing it.
engine : str
``'dense-eigh'`` (default) holds the full ``(n_windows, n_hap, n_hap)``
Gram stack on the GPU and runs a single batched ``cp.linalg.eigh``;
~2x faster per window than streaming when the stack fits.
``'streaming-dense'`` processes each window independently with bounded
peak memory; required when the Gram stack would exceed GPU memory
(e.g. thousands of haplotypes x thousands of windows).
``'streaming-rsvd'`` is the streaming path with a randomized top-k
per window; chosen explicitly for very thin windows.
``'auto'`` estimates the Gram-stack peak and picks ``dense-eigh``
when it fits in 70% of free GPU memory, ``streaming-dense`` otherwise.
Returns
-------
LocalPCAResult
"""
from .windowed_analysis import WindowIterator
valid_engines = ('auto', 'dense-eigh', 'streaming-dense', 'streaming-rsvd')
if engine not in valid_engines:
raise ValueError(
f"engine must be one of {valid_engines}, got {engine!r}")
window_params = _resolve_window_params(
window_params, window_size, step_size, window_type, regions,
caller='local_pca')
if population is not None:
matrix = _get_population_matrix(haplotype_matrix, population)
else:
matrix = haplotype_matrix
if matrix.device == 'CPU':
matrix.transfer_to_gpu()
n_samples = matrix.num_haplotypes
iterator = WindowIterator(matrix, window_params)
if engine == 'auto':
engine = _pick_dense_engine(matrix, window_params)
if engine in ('streaming-dense', 'streaming-rsvd'):
return _local_pca_streaming(
matrix=matrix,
iterator=iterator,
k=k,
scaler=scaler,
missing_data=missing_data,
engine=engine,
tile_size=tile_size,
oversample=oversample,
n_subspace_iter=n_subspace_iter,
random_state=random_state,
window_params=window_params,
)
identity_placeholder = cp.eye(n_samples, dtype=cp.float64)
nan_scalar = cp.asarray(np.nan, dtype=cp.float64)
window_meta = []
gram_list = []
sumsq_list = []
valid_flags = []
for window in iterator:
window_meta.append(window)
if window.n_variants < max(k, 2):
gram_list.append(identity_placeholder)
sumsq_list.append(nan_scalar)
valid_flags.append(False)
continue
X = _prepare_matrix(window.matrix, scaler, population=None,
missing_data=missing_data)
X = _materialize_prepared(X)
C, sumsq = _window_gram(X, X.shape[1])
gram_list.append(C)
sumsq_list.append(sumsq)
valid_flags.append(True)
if len(window_meta) == 0:
raise ValueError("WindowIterator produced no windows.")
gram_stack = cp.stack(gram_list, axis=0)
eigvals_host, eigvecs_host = _batched_top_k_eigh(gram_stack, k, batch_size)
sumsq_host = cp.stack(sumsq_list).get()
valid_mask = np.array(valid_flags, dtype=bool)
eigvals_host[~valid_mask] = np.nan
eigvecs_host[~valid_mask] = np.nan
sumsq_host[~valid_mask] = np.nan
windows_df = pd.DataFrame({
'chrom': [w.chrom for w in window_meta],
'start': [w.start for w in window_meta],
'end': [w.end for w in window_meta],
'center': [w.center for w in window_meta],
'n_variants': [w.n_variants for w in window_meta],
'window_id': [w.window_id for w in window_meta],
})
return LocalPCAResult(
windows=windows_df,
eigvals=eigvals_host,
eigvecs=eigvecs_host,
sumsq=sumsq_host,
k=k,
scaler=scaler,
missing_data=missing_data,
)
def _local_pca_with_jackknife(
haplotype_matrix: "HaplotypeMatrix",
window_params=None,
k: int = 2,
n_blocks: int = 10,
scaler: Optional[str] = None,
missing_data: str = 'include',
population: Optional[Union[str, list]] = None,
aggregate: Optional[str] = 'mean',
batch_size: Optional[int] = None,
window_size: Optional[int] = None,
step_size: Optional[int] = None,
window_type: str = 'snp',
regions=None,
) -> LocalPCAResult:
"""Local PCA with fused jackknife SE, sharing per-window matrix preparation.
Iterates windows once; for each valid window calls ``_prepare_matrix`` once
and from the same materialized ``X`` computes both the full Gram matrix (for
eigvals/eigvecs) and the leave-one-block-out Gram matrices (for jackknife).
Two-tier validity: windows with enough variants for PCA (``>= max(k, 2)``)
but too few for jackknife (``< max(k, 2) * n_blocks``) get valid
eigvals/eigvecs but NaN ``jackknife_se``.
"""
from .windowed_analysis import WindowIterator
window_params = _resolve_window_params(
window_params, window_size, step_size, window_type, regions,
caller='_local_pca_with_jackknife')
if population is not None:
matrix = _get_population_matrix(haplotype_matrix, population)
else:
matrix = haplotype_matrix
if matrix.device == 'CPU':
matrix.transfer_to_gpu()
n_samples = matrix.num_haplotypes
iterator = WindowIterator(matrix, window_params)
identity_placeholder = cp.eye(n_samples, dtype=cp.float64)
nan_scalar = cp.asarray(np.nan, dtype=cp.float64)
identity_blocks = cp.broadcast_to(
identity_placeholder, (n_blocks, n_samples, n_samples))
min_pca = max(k, 2)
min_jk = min_pca * n_blocks
window_meta = []
gram_list = []
sumsq_list = []
jk_gram_list = []
valid_pca = []
valid_jk = []
for window in iterator:
window_meta.append(window)
if window.n_variants < min_pca:
# Too few variants for PCA or jackknife.
gram_list.append(identity_placeholder)
sumsq_list.append(nan_scalar)
jk_gram_list.append(identity_blocks)
valid_pca.append(False)
valid_jk.append(False)
continue
X = _prepare_matrix(window.matrix, scaler, population=None,
missing_data=missing_data)
X = _materialize_prepared(X)
n_var_win = X.shape[1]
# Full Gram for local_pca.
C, sumsq = _window_gram(X, n_var_win)
gram_list.append(C)
sumsq_list.append(sumsq)
valid_pca.append(True)
# Leave-one-block-out Grams for jackknife.
step = n_var_win // n_blocks
if n_var_win < min_jk or step < 1:
jk_gram_list.append(identity_blocks)
valid_jk.append(False)
continue
window_grams = cp.empty((n_blocks, n_samples, n_samples),
dtype=cp.float64)
for b in range(n_blocks):
lo = b * step
hi = (b + 1) * step
keep = cp.concatenate([cp.arange(0, lo), cp.arange(hi, n_var_win)])
Xk = X[:, keep]
Xk = Xk - Xk.mean(axis=1, keepdims=True)
denom = max(Xk.shape[1] - 1, 1)
window_grams[b] = (Xk @ Xk.T) / denom
jk_gram_list.append(window_grams)
valid_jk.append(True)
n_windows = len(window_meta)
if n_windows == 0:
raise ValueError("WindowIterator produced no windows.")
# --- Main eigendecomposition (local_pca) ---
gram_stack = cp.stack(gram_list, axis=0)
eigvals_host, eigvecs_host = _batched_top_k_eigh(gram_stack, k, batch_size)
sumsq_host = cp.stack(sumsq_list).get()
pca_mask = np.array(valid_pca, dtype=bool)
eigvals_host[~pca_mask] = np.nan
eigvecs_host[~pca_mask] = np.nan
sumsq_host[~pca_mask] = np.nan
# --- Jackknife eigendecomposition ---
jk_stack = cp.concatenate(jk_gram_list, axis=0)
_, jk_vecs = _batched_top_k_eigh(jk_stack, k, batch_size)
jk_vecs_4d = cp.asarray(jk_vecs).reshape(n_windows, n_blocks, k, n_samples)
aligned = _sign_align_replicates(jk_vecs_4d)
mean = aligned.mean(axis=1, keepdims=True)
jackknife_scale = (n_blocks - 1) / n_blocks
var = jackknife_scale * cp.sum((aligned - mean) ** 2, axis=1)
se_out = cp.sqrt(var).get() # (n_windows, k, n_samples)
jk_mask = np.array(valid_jk, dtype=bool)
se_out[~jk_mask] = np.nan
if aggregate == 'mean':
jackknife_se = np.nanmean(se_out, axis=2) # (n_windows, k)
elif aggregate is None:
jackknife_se = se_out
else:
raise ValueError(f"Unknown aggregate: {aggregate}")
windows_df = pd.DataFrame({
'chrom': [w.chrom for w in window_meta],
'start': [w.start for w in window_meta],
'end': [w.end for w in window_meta],
'center': [w.center for w in window_meta],
'n_variants': [w.n_variants for w in window_meta],
'window_id': [w.window_id for w in window_meta],
})
return LocalPCAResult(
windows=windows_df,
eigvals=eigvals_host,
eigvecs=eigvecs_host,
sumsq=sumsq_host,
k=k,
scaler=scaler,
missing_data=missing_data,
jackknife_se=jackknife_se,
)
[docs]
def pc_dist(source, npc: Optional[int] = None,
normalize: Optional[str] = 'L1',
w=1) -> np.ndarray:
"""Pairwise Frobenius distance between windows' low-rank covariance reps.
Implements the trace identity from ``lostruct::pc_dist``::
||A - B||^2 = sum(a^2) + sum(b^2) - 2 * tr(diag(a) X diag(b) X^T)
where ``X = U^T V`` for eigenvector blocks U, V.
Parameters
----------
source : LocalPCAResult or numpy.ndarray
Either a LocalPCAResult or a flat lostruct-style matrix of shape
(n_windows, 1 + k + k*n_samples).
npc : int, optional
Number of PCs to use. Defaults to ``source.k`` when ``source`` is a
LocalPCAResult; required when ``source`` is a flat matrix.
normalize : {'L1', 'L2', None}
Per-window eigenvalue normalization before distance computation.
w : array_like or scalar
Sample weights (default 1).
Returns
-------
numpy.ndarray, shape (n_windows, n_windows)
Symmetric, non-negative distance matrix. NaN for invalid windows.
"""
if isinstance(source, LocalPCAResult):
if npc is None:
npc = source.k
eigvals = source.eigvals[:, :npc].copy()
eigvecs = source.eigvecs[:, :npc, :].copy()
else:
arr = np.asarray(source, dtype=np.float64)
if npc is None:
raise ValueError("npc must be provided when source is a flat matrix.")
n_windows = arr.shape[0]
n_samples = (arr.shape[1] - 1 - npc) // npc
eigvals = arr[:, 1:1 + npc].copy()
eigvecs = arr[:, 1 + npc:].reshape(n_windows, npc, n_samples).copy()
n_windows, npc_actual, n_samples = eigvecs.shape
if npc_actual != npc:
raise ValueError(f"npc mismatch: requested {npc}, have {npc_actual}")
w_arr = np.broadcast_to(np.sqrt(np.asarray(w, dtype=np.float64)),
(n_samples,))
eigvecs = eigvecs * w_arr[None, None, :]
if normalize == 'L1':
denom = np.sum(np.abs(eigvals), axis=1, keepdims=True)
eigvals = np.divide(eigvals, denom,
out=np.full_like(eigvals, np.nan),
where=denom != 0)
elif normalize == 'L2':
denom = np.sqrt(np.sum(eigvals ** 2, axis=1, keepdims=True))
eigvals = np.divide(eigvals, denom,
out=np.full_like(eigvals, np.nan),
where=denom != 0)
elif normalize is not None:
raise ValueError(f"Unknown normalize: {normalize}")
vals = cp.asarray(eigvals) # (nw, k)
vecs = cp.asarray(eigvecs) # (nw, k, n_samples)
self_norm = cp.sum(vals ** 2, axis=1) # (nw,)
# Trace-identity cross term (pc_dist.R:57-61):
# X[i,j] = U_i^T V_j (k, k)
# tr(diag(a_i) X diag(b_j) X^T)
# = sum_{m,n} a_i[m] * b_j[n] * X[m,n]^2
# Full-matrix path materializes the (nw, nw, k, k) tensor; chunked path
# splits the outer window axis when that would exceed the memory budget.
nw = vals.shape[0]
free = cp.cuda.Device().mem_info[0]
k_pc = vals.shape[1]
full_bytes = nw * nw * k_pc * k_pc * 8
if full_bytes < free * _GPU_MEM_BUDGET:
X_all = cp.einsum('ims,jns->ijmn', vecs, vecs)
cross = cp.einsum('im,jn,ijmn->ij', vals, vals, X_all ** 2)
else:
cross = cp.zeros((nw, nw), dtype=cp.float64)
per_row_bytes = nw * k_pc * k_pc * 8 * 4
chunk = max(1, int(free * _GPU_MEM_BUDGET) // max(per_row_bytes, 1))
for start in range(0, nw, chunk):
end = min(start + chunk, nw)
X_chunk = cp.einsum('ims,jns->ijmn', vecs[start:end], vecs)
cross[start:end] = cp.einsum('im,jn,ijmn->ij',
vals[start:end], vals, X_chunk ** 2)
sq_dist = self_norm[:, None] + self_norm[None, :] - 2.0 * cross
# Symmetrize and clamp negative numerical error to 0
sq_dist = 0.5 * (sq_dist + sq_dist.T)
sq_dist = cp.maximum(sq_dist, 0.0)
dist = cp.sqrt(sq_dist).get()
# NaN-propagate: invalid windows (any NaN in eigvals) produce NaN rows/cols
if np.any(np.isnan(eigvals)):
bad = np.any(np.isnan(eigvals), axis=1)
dist[bad, :] = np.nan
dist[:, bad] = np.nan
return dist
# ---------------------------------------------------------------------------
# Minimum enclosing circle (Welzl) + corners post-processing
# ---------------------------------------------------------------------------
def _circle_from_2(p1, p2):
c = 0.5 * (p1 + p2)
r = float(np.linalg.norm(p1 - c))
return c, r
def _circle_from_3(p1, p2, p3):
# Numerically-stable circumcircle via perpendicular bisectors.
ax, ay = p1
bx, by = p2
cx, cy = p3
d = 2.0 * (ax * (by - cy) + bx * (cy - ay) + cx * (ay - by))
if abs(d) < 1e-12:
# Collinear: fall back to diameter of the farthest pair
pairs = [(p1, p2), (p1, p3), (p2, p3)]
pairs.sort(key=lambda pr: np.linalg.norm(pr[0] - pr[1]), reverse=True)
return _circle_from_2(*pairs[0])
ux = ((ax * ax + ay * ay) * (by - cy)
+ (bx * bx + by * by) * (cy - ay)
+ (cx * cx + cy * cy) * (ay - by)) / d
uy = ((ax * ax + ay * ay) * (cx - bx)
+ (bx * bx + by * by) * (ax - cx)
+ (cx * cx + cy * cy) * (bx - ax)) / d
c = np.array([ux, uy])
r = float(max(np.linalg.norm(p - c) for p in (p1, p2, p3)))
return c, r
def _in_circle(p, c, r, tol=1e-10):
return float(np.linalg.norm(p - c)) <= r + tol
def _welzl_mec(points: np.ndarray, rng: np.random.Generator):
"""Randomized Welzl minimum enclosing circle, iterative form."""
pts = points.copy()
rng.shuffle(pts)
n = len(pts)
c = np.array([0.0, 0.0])
r = 0.0
boundary = []
for i in range(n):
if r > 0 and _in_circle(pts[i], c, r):
continue
# Rebuild with pts[i] on boundary
c, r = pts[i], 0.0
for j in range(i):
if _in_circle(pts[j], c, r):
continue
c, r = _circle_from_2(pts[i], pts[j])
for k in range(j):
if _in_circle(pts[k], c, r):
continue
c, r = _circle_from_3(pts[i], pts[j], pts[k])
return c, r
def _mec_defining_points(xy: np.ndarray, rng: np.random.Generator,
tol: float = 1e-6):
"""Find up to 3 input points that lie on the MEC boundary.
Returns (center, radius, indices_on_boundary) using original xy indices
(NaN rows already filtered by caller).
"""
c, r = _welzl_mec(xy, rng)
if r <= 0:
return c, r, np.array([0], dtype=np.int64)
dists = np.linalg.norm(xy - c, axis=1)
on_circle = np.where(np.abs(dists - r) <= tol * max(r, 1.0))[0]
return c, r, on_circle
[docs]
def corners(xy: np.ndarray, prop: float, k: int = 3,
random_state: Optional[int] = None) -> np.ndarray:
"""Find `k` "corner" clusters in a 2D embedding (lostruct::corners).
Computes the minimum enclosing circle of `xy`, then for each of the `k`
defining points returns the `int(prop * n)` nearest `xy` points. If fewer
than `k` defining points exist on the MEC boundary, extra corners are
added greedily from points farthest from the MEC center and not already
in an existing corner's neighborhood (mirrors `corners.R:42-53`).
Parameters
----------
xy : numpy.ndarray, shape (n, 2)
Coordinates (e.g. MDS output). NaN rows are dropped before the MEC.
prop : float
Fraction of points per corner.
k : int
Number of corners (effective minimum 3, matching R).
random_state : int, optional
Returns
-------
numpy.ndarray, int64, shape (n_per_corner, k)
Column i = indices (into the original, NaN-inclusive xy) of points
closest to the i-th corner.
"""
xy = np.asarray(xy, dtype=np.float64)
if xy.ndim != 2 or xy.shape[1] != 2:
raise ValueError("xy must have shape (n, 2)")
k_eff = max(k, 3)
valid_mask = ~np.any(np.isnan(xy), axis=1)
valid_idx = np.where(valid_mask)[0]
pts = xy[valid_idx]
if len(pts) < k_eff:
raise ValueError(
f"Need at least {k_eff} non-NaN points; got {len(pts)}")
n_per = max(1, int(prop * len(pts)))
rng = np.random.default_rng(random_state)
center, radius, boundary_local = _mec_defining_points(pts, rng)
# Build corner index list (indices into `pts`)
corner_point_idx = list(boundary_local[:k_eff])
# Fallback: extend with farthest-from-center points not already covered
if len(corner_point_idx) < k_eff:
used = set()
for ci in corner_point_idx:
cdist = np.linalg.norm(pts - pts[ci], axis=1)
nearest = np.argsort(cdist)[:n_per]
used.update(nearest.tolist())
center_dist = np.linalg.norm(pts - center, axis=1)
order = np.argsort(center_dist)[::-1]
for cand in order:
if len(corner_point_idx) >= k_eff:
break
if int(cand) in used:
continue
corner_point_idx.append(int(cand))
cdist = np.linalg.norm(pts - pts[cand], axis=1)
used.update(np.argsort(cdist)[:n_per].tolist())
# For each corner, take n_per nearest (in the valid point set)
out = np.empty((n_per, k_eff), dtype=np.int64)
for i, ci in enumerate(corner_point_idx[:k_eff]):
d = np.linalg.norm(pts - pts[ci], axis=1)
nearest = np.argsort(d)[:n_per]
# Map back to original (NaN-inclusive) indices
out[:, i] = valid_idx[nearest]
return out
# ---------------------------------------------------------------------------
# End-to-end lostruct pipeline
# ---------------------------------------------------------------------------
[docs]
@dataclass
class LostructResult:
"""End-to-end lostruct pipeline output.
Bundles the four pipeline stages into one object so a caller can run
``lostruct(hm, ...)`` and pick off whichever intermediates they need.
Attributes
----------
local_pca : LocalPCAResult
Per-window local PCA output (eigvals, eigvecs, sumsq, windows).
distance : numpy.ndarray, shape (n_windows, n_windows)
Pairwise Frobenius distance between windows from ``pc_dist``.
mds : numpy.ndarray, shape (n_windows, n_components)
MDS coordinates from ``pcoa``.
explained_variance_ratio : numpy.ndarray, shape (n_components,)
Variance fraction explained by each MDS axis.
corner_indices : numpy.ndarray or None, shape (n_per_corner, n_corners)
Window indices of the ``n_corners`` outlier clusters from
``corners``. ``None`` when corner detection was skipped.
"""
local_pca: "LocalPCAResult"
distance: np.ndarray
mds: np.ndarray
explained_variance_ratio: np.ndarray
corner_indices: Optional[np.ndarray]
@property
def windows(self) -> "pd.DataFrame":
"""Convenience pointer at ``local_pca.windows``."""
return self.local_pca.windows
@property
def n_windows(self) -> int:
return self.local_pca.n_windows
@property
def jackknife_se(self) -> Optional[np.ndarray]:
"""Block-within-window jackknife standard error of the top-k
eigenvectors, or ``None`` if ``lostruct`` was called with
``jackknife=False``. Shortcut for ``self.local_pca.jackknife_se``.
"""
return self.local_pca.jackknife_se
[docs]
def lostruct(haplotype_matrix: "HaplotypeMatrix",
*,
# local_pca params
window_params=None,
k: int = 2,
scaler: Optional[str] = None,
missing_data: str = 'include',
population: Optional[Union[str, list]] = None,
batch_size: Optional[int] = None,
window_size: Optional[int] = None,
step_size: Optional[int] = None,
window_type: str = 'snp',
regions=None,
# jackknife
jackknife: bool = False,
n_blocks: int = 10,
jackknife_aggregate: Optional[str] = 'mean',
# pc_dist params
npc: Optional[int] = None,
normalize: Optional[str] = 'L1',
pc_dist_weights=1,
# pcoa params
n_components: int = 2,
# corners params
corner_prop: float = 0.05,
n_corners: int = 3,
random_state: Optional[int] = None,
# streaming engine controls (forwarded to local_pca)
engine: str = 'dense-eigh',
tile_size: Optional[int] = None,
oversample: int = 8,
n_subspace_iter: int = 1,
) -> "LostructResult":
"""End-to-end lostruct pipeline (Li & Ralph 2019).
Runs the four-step recipe in one call: per-window local PCA, pairwise
Frobenius distance between window covariance reps, classical MDS on
that distance, and corner / outlier detection in the MDS embedding.
Optionally also computes the block-within-window jackknife standard
error of the per-window eigenvectors (Li & Ralph 2019, Appendix);
when requested the SE is computed in the same window pass as the
base eigendecomposition (shared matrix prep), and is exposed via
``LostructResult.jackknife_se``.
Parameters
----------
haplotype_matrix : HaplotypeMatrix
window_params : WindowParams, optional
Pre-built window spec. Required when ``window_size`` is not given.
k : int
Number of PCs retained per window in the local PCA step.
scaler, missing_data, population, batch_size : see :func:`local_pca`.
window_size, step_size, window_type, regions
Short-hand for constructing a ``WindowParams``.
jackknife : bool
If True, also compute the delete-1 block jackknife standard
error of the per-window top-k eigenvectors and expose it via
``LostructResult.jackknife_se``. Per-window matrix preparation
is shared with the base eigendecomposition, so enabling this is
much cheaper than calling ``local_pca_jackknife`` separately.
n_blocks : int
Number of jackknife blocks per window when ``jackknife=True``.
Ignored otherwise.
jackknife_aggregate : {'mean', None}
``'mean'`` returns shape ``(n_windows, k)`` -- per-PC standard
error averaged across samples (matches the R reference).
``None`` returns the full ``(n_windows, k, n_samples)`` tensor.
Ignored when ``jackknife=False``.
npc : int, optional
Number of PCs used by ``pc_dist``. Defaults to ``k``.
normalize : {'L1', 'L2', None}
Per-window eigenvalue normalization before distance computation.
pc_dist_weights : array_like or scalar
Sample weights forwarded to ``pc_dist`` as ``w``.
n_components : int
Number of MDS dimensions to compute (default 2).
corner_prop : float
Fraction of points per corner cluster (forwarded to ``corners``).
n_corners : int
Number of corner clusters to detect. Set to 0 (or any non-positive
value) to skip corner detection and leave
``LostructResult.corner_indices`` as None.
random_state : int, optional
RNG seed for the MEC tie-breaks inside ``corners``.
Returns
-------
LostructResult
Bundle of the four pipeline outputs. Each piece is also reachable
via the underlying functions for users who want to recompute one
stage with different parameters.
See Also
--------
local_pca, local_pca_jackknife, pc_dist, pcoa, corners
References
----------
Li & Ralph (2019), Genetics. https://doi.org/10.1534/genetics.118.301747
"""
if jackknife:
pca = _local_pca_with_jackknife(
haplotype_matrix,
window_params=window_params, k=k, n_blocks=n_blocks,
scaler=scaler, missing_data=missing_data,
population=population, aggregate=jackknife_aggregate,
batch_size=batch_size, window_size=window_size,
step_size=step_size, window_type=window_type, regions=regions)
else:
pca = local_pca(haplotype_matrix,
window_params=window_params, k=k,
scaler=scaler, missing_data=missing_data,
population=population, batch_size=batch_size,
window_size=window_size, step_size=step_size,
window_type=window_type, regions=regions,
engine=engine, tile_size=tile_size,
oversample=oversample,
n_subspace_iter=n_subspace_iter,
random_state=random_state)
dist = pc_dist(pca, npc=npc, normalize=normalize, w=pc_dist_weights)
mds, evr = pcoa(dist, n_components=n_components)
if n_corners is None or n_corners <= 0:
corner_idx = None
else:
corner_idx = corners(mds, prop=corner_prop, k=n_corners,
random_state=random_state)
return LostructResult(
local_pca=pca,
distance=dist,
mds=mds,
explained_variance_ratio=evr,
corner_indices=corner_idx,
)
# ---------------------------------------------------------------------------
# Jackknife SE for local PCs
# ---------------------------------------------------------------------------
def _sign_align_replicates(vecs: "cp.ndarray") -> "cp.ndarray":
"""Sign-align replicate eigenvectors to replicate 0.
Accepts either ``(n_reps, k, n_samples)`` (single window) or
``(n_windows, n_reps, k, n_samples)``. For each (rep, pc) within its
window, flip sign if ``||v - v_0||^2 > ||v + v_0||^2``, equivalent to
``sign(<v, v_0>) < 0``.
"""
if vecs.ndim == 3:
ref = vecs[0] # (k, n_samples)
dot = cp.einsum('rks,ks->rk', vecs, ref) # (n_reps, k)
sign = cp.where(dot >= 0, 1.0, -1.0)[:, :, None]
elif vecs.ndim == 4:
ref = vecs[:, 0] # (n_windows, k, n_samples)
dot = cp.einsum('wrks,wks->wrk', vecs, ref) # (n_windows, n_reps, k)
sign = cp.where(dot >= 0, 1.0, -1.0)[:, :, :, None]
else:
raise ValueError(
f"vecs must have 3 or 4 dims, got {vecs.ndim}")
return vecs * sign
[docs]
def local_pca_jackknife(haplotype_matrix: "HaplotypeMatrix",
window_params=None,
k: int = 2,
n_blocks: int = 10,
scaler: Optional[str] = None,
missing_data: str = 'include',
population: Optional[Union[str, list]] = None,
aggregate: Optional[str] = 'mean',
batch_size: Optional[int] = None,
window_size: Optional[int] = None,
step_size: Optional[int] = None,
window_type: str = 'snp',
regions=None) -> np.ndarray:
"""Delete-1 block jackknife SE of local PCs (DPGP_jackknife_var.R port).
For each window, partitions variants into `n_blocks` contiguous blocks;
for each block, recomputes the sample-sample covariance with that block
removed, eigendecomposes it, takes the top-k eigenvectors, sign-aligns
across replicates, and computes the jackknife SE per sample.
Parameters
----------
n_blocks : int
Number of jackknife blocks per window.
aggregate : {'mean', None}
'mean' returns shape (n_windows, k) — per-PC SE averaged across
samples (matches R's `mean(b)`). None returns the full
(n_windows, k, n_samples) tensor.
Returns
-------
numpy.ndarray
NaN rows for windows with fewer than `max(k, n_blocks+1)` variants.
"""
from .windowed_analysis import WindowIterator
window_params = _resolve_window_params(
window_params, window_size, step_size, window_type, regions,
caller='local_pca_jackknife')
if population is not None:
matrix = _get_population_matrix(haplotype_matrix, population)
else:
matrix = haplotype_matrix
if matrix.device == 'CPU':
matrix.transfer_to_gpu()
n_samples = matrix.num_haplotypes
iterator = WindowIterator(matrix, window_params)
identity_blocks = cp.broadcast_to(
cp.eye(n_samples, dtype=cp.float64),
(n_blocks, n_samples, n_samples))
min_variants = max(k, 2) * n_blocks
all_grams = []
valid_flags = []
for window in iterator:
if window.n_variants < min_variants:
all_grams.append(identity_blocks)
valid_flags.append(False)
continue
X = _prepare_matrix(window.matrix, scaler, population=None,
missing_data=missing_data)
X = _materialize_prepared(X)
n_var_win = X.shape[1]
# Block width truncates (matches R's `round(nrow/10)` in DPGP_jackknife_var.R).
step = n_var_win // n_blocks
if step < 1:
all_grams.append(identity_blocks)
valid_flags.append(False)
continue
window_grams = cp.empty((n_blocks, n_samples, n_samples),
dtype=cp.float64)
for b in range(n_blocks):
lo = b * step
hi = (b + 1) * step
keep = cp.concatenate([cp.arange(0, lo), cp.arange(hi, n_var_win)])
Xk = X[:, keep]
Xk = Xk - Xk.mean(axis=1, keepdims=True)
denom = max(Xk.shape[1] - 1, 1)
window_grams[b] = (Xk @ Xk.T) / denom
all_grams.append(window_grams)
valid_flags.append(True)
n_windows = len(all_grams)
if n_windows == 0:
raise ValueError("WindowIterator produced no windows.")
gram_stack = cp.concatenate(all_grams, axis=0)
eigvals_host, eigvecs_host = _batched_top_k_eigh(gram_stack, k, batch_size)
eigvecs_4d_gpu = cp.asarray(eigvecs_host).reshape(
n_windows, n_blocks, k, n_samples)
aligned = _sign_align_replicates(eigvecs_4d_gpu)
mean = aligned.mean(axis=1, keepdims=True)
jackknife_scale = (n_blocks - 1) / n_blocks
var = jackknife_scale * cp.sum((aligned - mean) ** 2, axis=1)
se_out = cp.sqrt(var).get() # (n_windows, k, n_samples)
valid_mask = np.array(valid_flags, dtype=bool)
se_out[~valid_mask] = np.nan
if aggregate == 'mean':
return np.nanmean(se_out, axis=2) # (n_windows, k)
elif aggregate is None:
return se_out
else:
raise ValueError(f"Unknown aggregate: {aggregate}")