Skip to content

API

Import as:

import spstencil as sps

Submodules

You can import everything via the top-level too, e.g., sps.load_anndata, sps.subset_by_values, sps.aggregate_st_by_cell_class, etc.

IO

load_anndata

load_anndata(path: str | Path, backed: Optional[str] = None) -> ad.AnnData

Load an AnnData object from an .h5ad file.

  • path: path to .h5ad
  • backed: pass 'r' for backed mode, or None for in-memory
Source code in spstencil/io.py
def load_anndata(path: str | Path, backed: Optional[str] = None) -> ad.AnnData:
    """Load an AnnData object from an .h5ad file.

    - path: path to .h5ad
    - backed: pass 'r' for backed mode, or None for in-memory
    """
    file_path = Path(path)
    if not file_path.exists():
        raise FileNotFoundError(f"No such file: {file_path}")
    return ad.read_h5ad(str(file_path), backed=backed)

save_anndata

save_anndata(adata: AnnData, path: str | Path, compression: str | None = 'gzip') -> None

Save an AnnData object to .h5ad.

  • compression: None|'gzip'|'lzf' etc.
Source code in spstencil/io.py
def save_anndata(adata: ad.AnnData, path: str | Path, compression: str | None = "gzip") -> None:
    """Save an AnnData object to .h5ad.

    - compression: None|'gzip'|'lzf' etc.
    """
    out = Path(path)
    out.parent.mkdir(parents=True, exist_ok=True)
    adata.write_h5ad(str(out), compression=compression)
  • What: Read/write AnnData .h5ad.
  • Key functions:
  • load_anndata(path, backed=None)
  • save_anndata(adata, path, compression="gzip")
  • Example:
    import spstencil as sps
    adata = sps.load_anndata("data.h5ad")              # top-level alias
    sps.io.save_anndata(adata, "out.h5ad")             # via submodule
    

Generic ops

aggregate_cell_counts

aggregate_cell_counts(adata: AnnData, cell_type_key: str, group_key: str | None = None, normalize: bool = True) -> pd.DataFrame

Aggregate counts of cell types, optionally per group.

Returns a DataFrame of percentages if normalize else counts. - If group_key is None, returns one-row DataFrame across all cells. - Expects categorical/string columns in obs.

Source code in spstencil/ops.py
def aggregate_cell_counts(
    adata: ad.AnnData,
    cell_type_key: str,
    group_key: str | None = None,
    normalize: bool = True,
) -> pd.DataFrame:
    """Aggregate counts of cell types, optionally per group.

    Returns a DataFrame of percentages if normalize else counts.
    - If group_key is None, returns one-row DataFrame across all cells.
    - Expects categorical/string columns in obs.
    """
    if cell_type_key not in adata.obs:
        raise KeyError(f"obs lacks column '{cell_type_key}'")

    obs = adata.obs.copy()
    obs[cell_type_key] = obs[cell_type_key].astype("string")
    if group_key is not None:
        if group_key not in obs:
            raise KeyError(f"obs lacks column '{group_key}'")
        obs[group_key] = obs[group_key].astype("string")

    if group_key is None:
        counts = obs[cell_type_key].value_counts().rename_axis("cell_type").to_frame("count").T
        counts.index = ["all"]
    else:
        counts = (
            obs.groupby(group_key)[cell_type_key]
            .value_counts()
            .unstack(fill_value=0)
        )

    if normalize:
        counts = counts.div(counts.sum(axis=1), axis=0) * 100.0
    return counts

save_split_by_key

save_split_by_key(adata: AnnData, key: str, out_dir: str | Path, compression: str | None = 'gzip') -> Dict[str, Path]

Split by obs[key] and save each part to out_dir/.h5ad. Returns paths.

Source code in spstencil/ops.py
def save_split_by_key(
    adata: ad.AnnData,
    key: str,
    out_dir: str | Path,
    compression: str | None = "gzip",
) -> Dict[str, Path]:
    """Split by obs[key] and save each part to out_dir/<value>.h5ad. Returns paths."""
    from .io import save_anndata

    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    parts = split_by_key(adata, key)
    result: Dict[str, Path] = {}
    for value, sub in parts.items():
        safe_value = str(value).replace("/", "_")
        out_path = out_dir / f"{safe_value}.h5ad"
        save_anndata(sub, out_path, compression=compression)
        result[value] = out_path
    return result

split_by_key

split_by_key(adata: AnnData, key: str) -> Dict[str, ad.AnnData]

Split AnnData into a dict keyed by unique values in obs[key].

Source code in spstencil/ops.py
def split_by_key(adata: ad.AnnData, key: str) -> Dict[str, ad.AnnData]:
    """Split AnnData into a dict keyed by unique values in obs[key]."""
    if key not in adata.obs:
        raise KeyError(f"obs lacks column '{key}'")
    groups = adata.obs[key].astype("string").fillna("NA").unique().tolist()
    return {val: adata[adata.obs[key].astype("string") == val].copy() for val in groups}

subset_by_values

subset_by_values(adata: AnnData, key: str, include_values: Sequence[str] | None = None, exclude_values: Sequence[str] | None = None) -> ad.AnnData

Return a new AnnData subset by obs[key] membership.

At least one of include_values or exclude_values must be provided.

Source code in spstencil/ops.py
def subset_by_values(
    adata: ad.AnnData,
    key: str,
    include_values: Sequence[str] | None = None,
    exclude_values: Sequence[str] | None = None,
) -> ad.AnnData:
    """Return a new AnnData subset by obs[key] membership.

    At least one of include_values or exclude_values must be provided.
    """
    if include_values is None and exclude_values is None:
        raise ValueError("Provide include_values and/or exclude_values")

    if key not in adata.obs:
        raise KeyError(f"obs lacks column '{key}'")

    obs_series = adata.obs[key].astype("string")
    mask = pd.Series(True, index=obs_series.index)

    if include_values is not None:
        include_set = pd.Index(include_values).astype("string")
        mask &= obs_series.isin(include_set)

    if exclude_values is not None:
        exclude_set = pd.Index(exclude_values).astype("string")
        mask &= ~obs_series.isin(exclude_set)

    return adata[mask.values].copy()
  • What: Generic AnnData operations on obs.
  • Key functions:
  • subset_by_values(adata, key, include_values=None, exclude_values=None)
  • split_by_key(adata, key) -> dict[str, AnnData]
  • aggregate_cell_counts(adata, cell_type_key, group_key=None, normalize=True)
  • save_split_by_key(adata, key, out_dir, compression='gzip') -> dict[str, Path]
  • Example:
    subset = sps.ops.subset_by_values(adata, "tissue_type", include_values=["Epithelium"])
    parts  = sps.ops.split_by_key(adata, "patient_id")
    counts = sps.ops.aggregate_cell_counts(adata, "cell_type", group_key="patient_id")
    written = sps.ops.save_split_by_key(adata, "patient_id", "split/")
    

Spatial tools

aggregate_hdf5_by_cell_class

aggregate_hdf5_by_cell_class(hdf5_path: str | Path, *, tissue_tsv: Optional[str | Path] = None, group_by_tissue: bool = False, normalize: bool = True, swap_axes: bool = False) -> pd.DataFrame

Aggregate counts/percentages of HDF5 cell classes overall or by tissue type.

If group_by_tissue is True, requires tissue_tsv to map cells to tissue classes.

Source code in spstencil/st.py
def aggregate_hdf5_by_cell_class(
    hdf5_path: str | Path,
    *,
    tissue_tsv: Optional[str | Path] = None,
    group_by_tissue: bool = False,
    normalize: bool = True,
    swap_axes: bool = False,
) -> pd.DataFrame:
    """Aggregate counts/percentages of HDF5 cell classes overall or by tissue type.

    If group_by_tissue is True, requires tissue_tsv to map cells to tissue classes.
    """
    preds, coords = load_cell_predictions(hdf5_path)
    if group_by_tissue:
        if tissue_tsv is None:
            raise ValueError("tissue_tsv is required when group_by_tissue=True")
        tissue_df = load_tissue_predictions(tissue_tsv)
        classes = map_cells_to_tissue_classes(coords, tissue_df, swap_axes=swap_axes)
        df = pd.DataFrame({"group": classes, "cell_id": preds.astype(int)})
        counts = df.groupby("group")["cell_id"].value_counts().unstack(fill_value=0)
    else:
        counts = pd.Series(preds.astype(int)).value_counts().rename_axis("cell_id").to_frame("count").T
        counts.index = ["all"]
    if normalize:
        counts = counts.div(counts.sum(axis=1), axis=0) * 100.0
    return counts

aggregate_st_by_cell_class

aggregate_st_by_cell_class(adata: AnnData, cell_hdf5_path: str | Path, *, normalize: bool = True, group_key: Optional[str] = None, tissue_df: Optional[DataFrame] = None, swap_axes: bool = False) -> ad.AnnData

Aggregate ST expression matrix by cell-class predictions from cell classifier HDF5.

Strategy: - Map ST spots to tile indices (t_x, t_y) using their pixel coords. - Map cell predictions to tile indices using their pixel coords from HDF5. - Join on tile indices to assign cell classes to spots. - Aggregate gene expression by summing across spots with same cell class. - Optionally normalize by total expression per cell class.

Returns AnnData with cell classes as obs and aggregated expression matrix.

Source code in spstencil/st.py
def aggregate_st_by_cell_class(
    adata: ad.AnnData,
    cell_hdf5_path: str | Path,
    *,
    normalize: bool = True,
    group_key: Optional[str] = None,
    tissue_df: Optional[pd.DataFrame] = None,
    swap_axes: bool = False,
) -> ad.AnnData:
    """Aggregate ST expression matrix by cell-class predictions from cell classifier HDF5.

    Strategy:
    - Map ST spots to tile indices (t_x, t_y) using their pixel coords.
    - Map cell predictions to tile indices using their pixel coords from HDF5.
    - Join on tile indices to assign cell classes to spots.
    - Aggregate gene expression by summing across spots with same cell class.
    - Optionally normalize by total expression per cell class.

    Returns AnnData with cell classes as obs and aggregated expression matrix.
    """
    preds, cell_coords = load_cell_predictions(cell_hdf5_path)
    # If a tissue grid is provided, use it for tile sizing; else infer from max cell tile grid via spot coords
    if tissue_df is None:
        # fabricate a grid using max tiles derived from spot coords to ensure consistent tiling
        # We set max_x,max_y by treating each spot tile as a grid from its coords
        # Using spot coords to compute tile indices relative grid of size based on max coords
        spot_coords = get_spatial_coords(adata)
        # Build a synthetic grid with max indices inferred from spot coords quantiles
        # Default to 100x100 grid if no tissue_df provided
        max_x = 100
        max_y = 100
        tissue_df = pd.DataFrame({
            "x": np.tile(np.arange(max_x + 1), max_y + 1), 
            "y": np.repeat(np.arange(max_y + 1), max_x + 1), 
            "class": "_"
        })

    # Assign tiles
    spot_coords = get_spatial_coords(adata)
    spot_tiles = assign_tiles_from_coords(spot_coords, tissue_df, swap_axes=swap_axes)
    cell_tiles = assign_tiles_from_coords(cell_coords, tissue_df, swap_axes=swap_axes)

    # Build mapping from tile -> cell classes in that tile
    tile_to_cell_classes: Dict[Tuple[int, int], List[int]] = {}
    for i, (tx, ty) in enumerate(cell_tiles):
        tile_to_cell_classes.setdefault((int(tx), int(ty)), []).append(int(preds[i]))

    # Assign cell classes to spots based on majority vote in each tile
    spot_cell_classes = []
    for tx, ty in spot_tiles:
        key = (int(tx), int(ty))
        cell_classes_in_tile = tile_to_cell_classes.get(key, [])
        if cell_classes_in_tile:
            # Use majority vote for cell class assignment
            from collections import Counter
            majority_class = Counter(cell_classes_in_tile).most_common(1)[0][0]
            spot_cell_classes.append(majority_class)
        else:
            # No cells mapped to this tile
            spot_cell_classes.append(-1)  # Use -1 for unmapped spots

    spot_cell_classes = np.array(spot_cell_classes)

    # Filter out unmapped spots
    mapped_mask = spot_cell_classes != -1
    mapped_spots = np.where(mapped_mask)[0]
    mapped_classes = spot_cell_classes[mapped_mask]

    if len(mapped_spots) == 0:
        # Return empty AnnData if no spots are mapped
        return ad.AnnData(np.zeros((0, adata.n_vars)), var=adata.var.copy())

    # Get unique cell classes and aggregate expression
    unique_classes = np.unique(mapped_classes)
    aggregated_X = []
    cell_class_names = []

    for cell_class in unique_classes:
        class_mask = mapped_classes == cell_class
        class_spots = mapped_spots[class_mask]

        # Sum expression across spots of this cell class
        if hasattr(adata.X, 'toarray'):  # Handle sparse matrices
            class_expression = adata.X[class_spots].sum(axis=0).A1
        else:
            class_expression = adata.X[class_spots].sum(axis=0)

        aggregated_X.append(class_expression)
        cell_class_names.append(f"cell_class_{cell_class}")

    aggregated_X = np.vstack(aggregated_X)

    # Create new AnnData with aggregated data
    adata_agg = ad.AnnData(
        X=aggregated_X,
        var=adata.var.copy(),
        obs=pd.DataFrame(
            index=cell_class_names,
            data={"cell_class": unique_classes, "n_spots_aggregated": [np.sum(mapped_classes == cc) for cc in unique_classes]}
        )
    )

    # Add grouping information if requested
    if group_key is not None:
        if group_key not in adata.obs:
            raise KeyError(f"obs lacks column '{group_key}'")
        # For aggregated data, we can't meaningfully preserve group info since we're aggregating across spots
        # Instead, we could create separate aggregations per group, but that's a different operation
        pass

    # Normalize if requested
    if normalize:
        # Normalize each cell class by total expression (TPM-like)
        totals = adata_agg.X.sum(axis=1, keepdims=True)
        totals[totals == 0] = 1  # Avoid division by zero
        adata_agg.X = (adata_agg.X / totals) * 1e6  # TPM normalization

    return adata_agg

annotate_tissue_to_spots

annotate_tissue_to_spots(adata: AnnData, tissue_df: DataFrame, *, swap_axes: bool = False, column_name: str = 'tissue_type', fill_value: str = 'Unlabelled') -> ad.AnnData

Add a column in obs with tissue class by mapping each spot/cell to tile (x,y).

The mapping uses obsm['spatial'] pixel coords when available.

Source code in spstencil/st.py
def annotate_tissue_to_spots(
    adata: ad.AnnData,
    tissue_df: pd.DataFrame,
    *,
    swap_axes: bool = False,
    column_name: str = "tissue_type",
    fill_value: str = "Unlabelled",
) -> ad.AnnData:
    """Add a column in obs with tissue class by mapping each spot/cell to tile (x,y).

    The mapping uses obsm['spatial'] pixel coords when available.
    """
    coords = get_spatial_coords(adata)
    tiles = assign_tiles_from_coords(coords, tissue_df, swap_axes=swap_axes)
    # Build lookup from (x,y) -> class
    lut: Dict[Tuple[int, int], str] = {
        (int(row.x), int(row.y)): str(row["class"]) for _, row in tissue_df.iterrows()
    }
    classes = []
    for tx, ty in tiles:
        classes.append(lut.get((int(tx), int(ty)), fill_value))
    adata = adata.copy()
    adata.obs[column_name] = pd.Categorical(classes)
    return adata

assign_tiles_from_coords

assign_tiles_from_coords(coords: ndarray, tissue_df: DataFrame, swap_axes: bool = False) -> np.ndarray

Map pixel coords to tile (x, y) indices defined by tissue_preds.tsv grid.

coords should be in [x, y] format to match tissue_df columns. Returns an array of shape (N, 2) of int tile indices [tile_x, tile_y].

Source code in spstencil/st.py
def assign_tiles_from_coords(
    coords: np.ndarray,
    tissue_df: pd.DataFrame,
    swap_axes: bool = False,
) -> np.ndarray:
    """Map pixel coords to tile (x, y) indices defined by tissue_preds.tsv grid.

    coords should be in [x, y] format to match tissue_df columns.
    Returns an array of shape (N, 2) of int tile indices [tile_x, tile_y].
    """
    if len(coords) == 0:
        return np.zeros((0, 2), dtype=int)
    max_x = int(tissue_df["x"].max())
    max_y = int(tissue_df["y"].max())
    if swap_axes:
        max_x, max_y = max_y, max_x

    # coords are [x, y], so coords[:, 0] is x, coords[:, 1] is y
    W = int(coords[:, 0].max()) + 1  # x dimension
    H = int(coords[:, 1].max()) + 1  # y dimension
    tile_size_w = W / (max_x + 1) if max_x >= 0 else 1.0
    tile_size_h = H / (max_y + 1) if max_y >= 0 else 1.0
    tile_x = np.floor(coords[:, 0] / tile_size_w).astype(int)
    tile_y = np.floor(coords[:, 1] / tile_size_h).astype(int)
    return np.vstack([tile_x, tile_y]).T

compute_keep_mask

compute_keep_mask(cell_coords: ndarray, tissue_df: DataFrame, exclude_classes: Sequence[str], swap_axes: bool = False) -> np.ndarray

Compute a boolean mask of cells to keep by excluding tiles of given classes.

  • cell_coords: array shape (N, 2) with [x, y] pixel coordinates
  • tissue_df: DataFrame with columns [x, y, class]
  • exclude_classes: classes to exclude (e.g., ['Chorion'])
  • swap_axes: if True, swap (max_x, max_y) estimation to handle axis-swapped cases
Source code in spstencil/st.py
def compute_keep_mask(
    cell_coords: np.ndarray,
    tissue_df: pd.DataFrame,
    exclude_classes: Sequence[str],
    swap_axes: bool = False,
) -> np.ndarray:
    """Compute a boolean mask of cells to keep by excluding tiles of given classes.

    - cell_coords: array shape (N, 2) with [x, y] pixel coordinates
    - tissue_df: DataFrame with columns [x, y, class]
    - exclude_classes: classes to exclude (e.g., ['Chorion'])
    - swap_axes: if True, swap (max_x, max_y) estimation to handle axis-swapped cases
    """
    if len(cell_coords) == 0:
        return np.array([], dtype=bool)

    max_x = int(tissue_df["x"].max())
    max_y = int(tissue_df["y"].max())
    if swap_axes:
        max_x, max_y = max_y, max_x

    # Image dimensions from cell coords (coords are [x, y])
    W = int(cell_coords[:, 0].max()) + 1  # x dimension
    H = int(cell_coords[:, 1].max()) + 1  # y dimension
    if max_y <= 0 or max_x <= 0 or H <= 0 or W <= 0:
        return np.ones(len(cell_coords), dtype=bool)

    tile_size_w = W / (max_x + 1)
    tile_size_h = H / (max_y + 1)

    # Exclusion set of tile (x, y) pairs
    excl = set(map(tuple, tissue_df[tissue_df["class"].isin(exclude_classes)][["x", "y"]].values))
    if not excl:
        return np.ones(len(cell_coords), dtype=bool)

    # Map cell pixel coords to tile indices (coords are [x, y])
    tile_coords_x = np.floor(cell_coords[:, 0] / tile_size_w).astype(int)
    tile_coords_y = np.floor(cell_coords[:, 1] / tile_size_h).astype(int)
    tile_pairs = np.vstack((tile_coords_x, tile_coords_y)).T

    keep = np.ones(len(cell_coords), dtype=bool)
    for i, pair in enumerate(tile_pairs):
        if (int(pair[0]), int(pair[1])) in excl:
            keep[i] = False
    return keep

crop_cell_data

crop_cell_data(cell_hdf5_path: str | Path, bounds: Dict[str, Tuple[int, int]], output_path: str | Path) -> int

Crop cell HDF5 data based on coordinate bounds.

Parameters:

Name Type Description Default
cell_hdf5_path str | Path

Path to cell HDF5 file

required
bounds Dict[str, Tuple[int, int]]

Dict with 'x' and 'y' bounds as (min, max) tuples

required
output_path str | Path

Path for output cropped HDF5 file

required

Returns:

Type Description
int

Number of cells kept after cropping

Source code in spstencil/st.py
def crop_cell_data(
    cell_hdf5_path: str | Path,
    bounds: Dict[str, Tuple[int, int]],
    output_path: str | Path
) -> int:
    """Crop cell HDF5 data based on coordinate bounds.

    Args:
        cell_hdf5_path: Path to cell HDF5 file
        bounds: Dict with 'x' and 'y' bounds as (min, max) tuples  
        output_path: Path for output cropped HDF5 file

    Returns:
        Number of cells kept after cropping
    """
    preds, coords = load_cell_predictions(cell_hdf5_path)

    # Create mask for cells within bounds
    mask = np.ones(len(coords), dtype=bool)

    if 'x' in bounds:
        x_min, x_max = bounds['x']
        mask &= (coords[:, 0] >= x_min) & (coords[:, 0] <= x_max)

    if 'y' in bounds:
        y_min, y_max = bounds['y']
        mask &= (coords[:, 1] >= y_min) & (coords[:, 1] <= y_max)

    # Save cropped data
    with h5py.File(str(output_path), 'w') as f:
        f.create_dataset('predictions', data=preds[mask])
        f.create_dataset('coords', data=coords[mask])

    return int(mask.sum())

crop_spatial_data

crop_spatial_data(adata: AnnData, bounds: Dict[str, Tuple[int, int]]) -> ad.AnnData

Crop spatial data based on coordinate bounds.

Parameters:

Name Type Description Default
adata AnnData

AnnData object with spatial coordinates

required
bounds Dict[str, Tuple[int, int]]

Dict with 'x' and 'y' bounds as (min, max) tuples

required

Returns:

Type Description
AnnData

Cropped AnnData object

Source code in spstencil/st.py
def crop_spatial_data(
    adata: ad.AnnData,
    bounds: Dict[str, Tuple[int, int]]
) -> ad.AnnData:
    """Crop spatial data based on coordinate bounds.

    Args:
        adata: AnnData object with spatial coordinates
        bounds: Dict with 'x' and 'y' bounds as (min, max) tuples

    Returns:
        Cropped AnnData object
    """
    coords = get_spatial_coords(adata)

    # Create mask for spots within bounds
    mask = np.ones(len(coords), dtype=bool)

    if 'x' in bounds:
        x_min, x_max = bounds['x']
        mask &= (coords[:, 0] >= x_min) & (coords[:, 0] <= x_max)

    if 'y' in bounds:
        y_min, y_max = bounds['y']
        mask &= (coords[:, 1] >= y_min) & (coords[:, 1] <= y_max)

    return adata[mask].copy()

crop_tissue_data

crop_tissue_data(tissue_tsv_path: str | Path, bounds: Dict[str, Tuple[int, int]], output_path: str | Path, bin_size: int = 100) -> int

Crop tissue TSV data based on coordinate bounds.

Parameters:

Name Type Description Default
tissue_tsv_path str | Path

Path to tissue TSV file

required
bounds Dict[str, Tuple[int, int]]

Dict with 'x' and 'y' bounds as (min, max) tuples

required
output_path str | Path

Path for output cropped TSV file

required
bin_size int

Bin size used for coordinate conversion

100

Returns:

Type Description
int

Number of tissue tiles kept after cropping

Source code in spstencil/st.py
def crop_tissue_data(
    tissue_tsv_path: str | Path,
    bounds: Dict[str, Tuple[int, int]],
    output_path: str | Path,
    bin_size: int = 100
) -> int:
    """Crop tissue TSV data based on coordinate bounds.

    Args:
        tissue_tsv_path: Path to tissue TSV file
        bounds: Dict with 'x' and 'y' bounds as (min, max) tuples
        output_path: Path for output cropped TSV file
        bin_size: Bin size used for coordinate conversion

    Returns:
        Number of tissue tiles kept after cropping
    """
    tissue_df = load_tissue_predictions(tissue_tsv_path)

    # Convert tile indices to pixel coordinates for comparison
    tissue_df = tissue_df.copy()
    tissue_df['x_pixel'] = tissue_df['x'] * bin_size
    tissue_df['y_pixel'] = tissue_df['y'] * bin_size

    # Create mask for tissue tiles within bounds
    mask = np.ones(len(tissue_df), dtype=bool)

    if 'x' in bounds:
        x_min, x_max = bounds['x']
        mask &= (tissue_df['x_pixel'] >= x_min) & (tissue_df['x_pixel'] <= x_max)

    if 'y' in bounds:
        y_min, y_max = bounds['y']
        mask &= (tissue_df['y_pixel'] >= y_min) & (tissue_df['y_pixel'] <= y_max)

    # Save cropped tissue data (original x,y tile indices)
    tissue_cropped = tissue_df[mask][['x', 'y', 'class']].copy()
    tissue_cropped.to_csv(output_path, sep='\t', index=False)

    return int(mask.sum())

determine_crop_bounds

determine_crop_bounds(spatial: Dict[str, ndarray], bin_size: int = 100, default_bounds: Optional[Dict[str, Tuple[int, int]]] = None) -> Dict[str, Tuple[int, int]]

Determine cropping bounds based on gaps in spatial data.

Parameters:

Name Type Description Default
spatial Dict[str, ndarray]

Dict with 'x' and 'y' coordinate arrays

required
bin_size int

Binning size for gap detection

100
default_bounds Optional[Dict[str, Tuple[int, int]]]

Default bounds to use if no gaps near edges

None

Returns:

Type Description
Dict[str, Tuple[int, int]]

Dict with 'x' and 'y' bounds as (min, max) tuples

Source code in spstencil/st.py
def determine_crop_bounds(
    spatial: Dict[str, np.ndarray], 
    bin_size: int = 100,
    default_bounds: Optional[Dict[str, Tuple[int, int]]] = None
) -> Dict[str, Tuple[int, int]]:
    """Determine cropping bounds based on gaps in spatial data.

    Args:
        spatial: Dict with 'x' and 'y' coordinate arrays
        bin_size: Binning size for gap detection
        default_bounds: Default bounds to use if no gaps near edges

    Returns:
        Dict with 'x' and 'y' bounds as (min, max) tuples
    """
    bounds = {}

    for axis in ['x', 'y']:
        coords = spatial[axis]
        binned = coords // bin_size

        if is_continuous(binned):
            # No gaps, use original bounds or defaults
            if default_bounds and axis in default_bounds:
                bounds[axis] = default_bounds[axis]
            else:
                bounds[axis] = (int(coords.min()), int(coords.max()))
        else:
            # Find gaps and determine crop bounds
            gaps = missing_ranges(binned)
            coord_min, coord_max = int(coords.min()), int(coords.max())
            max_bin = coord_max // bin_size

            # Check if gaps are near the maximum coordinate
            crop_min = coord_min
            crop_max = coord_max

            for gap_start, gap_end in gaps:
                # If gap is close to max coordinate, crop at gap start
                if gap_end >= max_bin * 0.8:  # Gap near end (80% threshold)
                    crop_max = gap_start * bin_size
                    break

            bounds[axis] = (crop_min, crop_max)

    return bounds

filter_cell_predictions_by_tissue

filter_cell_predictions_by_tissue(hdf5_path: str | Path, tissue_tsv: str | Path, exclude_classes: Sequence[str], swap_axes: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]

Load HDF5 and tissue TSV, compute keep mask, and return (preds_kept, coords_kept, keep_mask).

Source code in spstencil/st.py
def filter_cell_predictions_by_tissue(
    hdf5_path: str | Path,
    tissue_tsv: str | Path,
    exclude_classes: Sequence[str],
    swap_axes: bool = False,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Load HDF5 and tissue TSV, compute keep mask, and return (preds_kept, coords_kept, keep_mask)."""
    preds, coords = load_cell_predictions(hdf5_path)
    tissue_df = load_tissue_predictions(tissue_tsv)
    keep = compute_keep_mask(coords, tissue_df, exclude_classes=exclude_classes, swap_axes=swap_axes)
    return preds[keep], coords[keep], keep

get_spatial_coords

get_spatial_coords(adata: AnnData) -> np.ndarray

Return Nx2 pixel coordinates for ST spots/cells in [x, y] format.

Prefers obsm['spatial'] (Scanpy/Visium convention). If absent, tries obs columns 'x', 'y' or 'px', 'py' or 'array_row', 'array_col' (converted to pixels via max range).

Source code in spstencil/st.py
def get_spatial_coords(adata: ad.AnnData) -> np.ndarray:
    """Return Nx2 pixel coordinates for ST spots/cells in [x, y] format.

    Prefers obsm['spatial'] (Scanpy/Visium convention). If absent, tries obs columns
    'x', 'y' or 'px', 'py' or 'array_row', 'array_col' (converted to pixels via max range).
    """
    if "spatial" in adata.obsm:
        coords = np.asarray(adata.obsm["spatial"])  # shape (N, 2), [x, y] format
        if coords.shape[1] != 2:
            raise ValueError("obsm['spatial'] must be of shape (N, 2)")
        return coords
    obs = adata.obs
    for xcol, ycol in (("x", "y"), ("px", "py")):
        if xcol in obs and ycol in obs:
            return np.vstack([obs[xcol].to_numpy(), obs[ycol].to_numpy()]).T
    # Fallback: Visium grid positions (array row/col), scaled to pixel-like space
    if "array_row" in obs and "array_col" in obs:
        rows = obs["array_row"].to_numpy()
        cols = obs["array_col"].to_numpy()
        # Scale grid to pseudo-pixels: treat max row/col as bounds
        x = cols.astype(float)
        y = rows.astype(float)
        return np.vstack([x, y]).T
    raise KeyError("Cannot find spatial coordinates. Provide obsm['spatial'] or obs['x','y'].")

is_continuous

is_continuous(seq: Iterable[int]) -> bool

Check if sequence has no gaps.

Source code in spstencil/st.py
def is_continuous(seq: Iterable[int]) -> bool:
    """Check if sequence has no gaps."""
    nums = _prep(seq)
    return all(b == a + 1 for a, b in zip(nums, nums[1:]))

load_cell_predictions

load_cell_predictions(hdf5_path: str | Path) -> Tuple[np.ndarray, np.ndarray]

Read predictions and coords arrays from an HDF5 produced by cell classifier.

Expects datasets named 'predictions' and 'coords'. Returns (predictions, coords).

Source code in spstencil/st.py
def load_cell_predictions(hdf5_path: str | Path) -> Tuple[np.ndarray, np.ndarray]:
    """Read predictions and coords arrays from an HDF5 produced by cell classifier.

    Expects datasets named 'predictions' and 'coords'. Returns (predictions, coords).
    """
    with h5py.File(str(hdf5_path), "r") as f:
        if "predictions" not in f or "coords" not in f:
            raise KeyError("HDF5 must contain 'predictions' and 'coords' datasets")
        preds = f["predictions"][()]
        coords = f["coords"][()]
    return preds, coords

load_tissue_predictions

load_tissue_predictions(tsv_path: str | Path) -> pd.DataFrame

Load tissue tile predictions with columns [x, y, class].

Source code in spstencil/st.py
def load_tissue_predictions(tsv_path: str | Path) -> pd.DataFrame:
    """Load tissue tile predictions with columns [x, y, class]."""
    df = pd.read_csv(tsv_path, sep="\t")
    required = {"x", "y", "class"}
    if not required.issubset(df.columns):
        raise KeyError(f"tissue_preds.tsv missing required columns {required}")
    return df

map_cells_to_tissue_classes

map_cells_to_tissue_classes(cell_coords: ndarray, tissue_df: DataFrame, *, swap_axes: bool = False, fill_value: str = 'Unlabelled') -> np.ndarray

Return an array of tissue class labels per cell by tiling coords with tissue_preds grid.

Source code in spstencil/st.py
def map_cells_to_tissue_classes(
    cell_coords: np.ndarray,
    tissue_df: pd.DataFrame,
    *,
    swap_axes: bool = False,
    fill_value: str = "Unlabelled",
) -> np.ndarray:
    """Return an array of tissue class labels per cell by tiling coords with tissue_preds grid."""
    cell_tiles = assign_tiles_from_coords(cell_coords, tissue_df, swap_axes=swap_axes)
    lut: Dict[Tuple[int, int], str] = {
        (int(row.x), int(row.y)): str(row["class"]) for _, row in tissue_df.iterrows()
    }
    classes = []
    for tx, ty in cell_tiles:
        classes.append(lut.get((int(tx), int(ty)), fill_value))
    return np.asarray(classes, dtype=object)

missing_ranges

missing_ranges(seq: Iterable[int]) -> List[Tuple[int, int]]

Find gaps in sequence as (start, end) tuples (exclusive end).

Source code in spstencil/st.py
def missing_ranges(seq: Iterable[int]) -> List[Tuple[int, int]]:
    """Find gaps in sequence as (start, end) tuples (exclusive end)."""
    nums = _prep(seq)
    gaps: List[Tuple[int, int]] = []
    for a, b in zip(nums, nums[1:]):
        if b > a + 1:
            gaps.append((a + 1, b))
    return gaps

split_hdf5_by_tissue

split_hdf5_by_tissue(hdf5_path: str | Path, tissue_tsv: str | Path, *, swap_axes: bool = False) -> Dict[str, np.ndarray]

Return dict mapping tissue class -> indices of cells in that tissue tile.

Source code in spstencil/st.py
def split_hdf5_by_tissue(
    hdf5_path: str | Path,
    tissue_tsv: str | Path,
    *,
    swap_axes: bool = False,
) -> Dict[str, np.ndarray]:
    """Return dict mapping tissue class -> indices of cells in that tissue tile."""
    _, coords = load_cell_predictions(hdf5_path)
    tissue_df = load_tissue_predictions(tissue_tsv)
    classes = map_cells_to_tissue_classes(coords, tissue_df, swap_axes=swap_axes)
    result: Dict[str, list] = {}
    for i, c in enumerate(classes):
        result.setdefault(str(c), []).append(i)
    return {k: np.asarray(v, dtype=int) for k, v in result.items()}

split_st_by_tissue

split_st_by_tissue(adata: AnnData, tissue_df: DataFrame, swap_axes: bool = False, column_name: str = 'tissue_type') -> Dict[str, ad.AnnData]

Split AnnData into dict keyed by tissue class.

Source code in spstencil/st.py
def split_st_by_tissue(
    adata: ad.AnnData,
    tissue_df: pd.DataFrame,
    swap_axes: bool = False,
    column_name: str = "tissue_type",
) -> Dict[str, ad.AnnData]:
    """Split AnnData into dict keyed by tissue class."""
    adata_annot = annotate_tissue_to_spots(adata, tissue_df, swap_axes=swap_axes, column_name=column_name)
    classes = adata_annot.obs[column_name].astype("string").fillna("NA").unique().tolist()
    return {c: adata_annot[adata_annot.obs[column_name].astype("string") == c].copy() for c in classes}

subset_st_by_tissue

subset_st_by_tissue(adata: AnnData, tissue_df: DataFrame, include: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, swap_axes: bool = False, column_name: str = 'tissue_type') -> ad.AnnData

Return AnnData subset where mapped tissue_type is included/excluded.

If include is provided, only those classes are kept. If exclude is provided, those are dropped.

Source code in spstencil/st.py
def subset_st_by_tissue(
    adata: ad.AnnData,
    tissue_df: pd.DataFrame,
    include: Optional[Sequence[str]] = None,
    exclude: Optional[Sequence[str]] = None,
    swap_axes: bool = False,
    column_name: str = "tissue_type",
) -> ad.AnnData:
    """Return AnnData subset where mapped tissue_type is included/excluded.

    If include is provided, only those classes are kept. If exclude is provided, those are dropped.
    """
    adata_annot = annotate_tissue_to_spots(adata, tissue_df, swap_axes=swap_axes, column_name=column_name)
    series = adata_annot.obs[column_name].astype("string")
    mask = pd.Series(True, index=series.index)
    if include:
        mask &= series.isin(pd.Index(include).astype("string"))
    if exclude:
        mask &= ~series.isin(pd.Index(exclude).astype("string"))
    return adata_annot[mask.values].copy()

validate_coordinate_compatibility

validate_coordinate_compatibility(h5ad_coords: ndarray, hdf5_coords: Optional[ndarray] = None, tissue_coords: Optional[ndarray] = None, adata: Optional[AnnData] = None, tolerance_factor: float = 10.0) -> Dict[str, any]

Validate coordinate system compatibility between files.

Parameters:

Name Type Description Default
h5ad_coords ndarray

Coordinates from H5AD file [N, 2] in [x, y] format

required
hdf5_coords Optional[ndarray]

Optional coordinates from HDF5 file [M, 2] in [x, y] format

None
tissue_coords Optional[ndarray]

Optional coordinates from tissue TSV [P, 2] in [x, y] format

None
adata Optional[AnnData]

Optional AnnData object for metadata extraction

None
tolerance_factor float

Maximum acceptable ratio difference between coordinate ranges

10.0

Returns:

Type Description
Dict[str, any]

Dict with validation results and warnings

Source code in spstencil/st.py
def validate_coordinate_compatibility(
    h5ad_coords: np.ndarray, 
    hdf5_coords: Optional[np.ndarray] = None,
    tissue_coords: Optional[np.ndarray] = None,
    adata: Optional[ad.AnnData] = None,
    tolerance_factor: float = 10.0
) -> Dict[str, any]:
    """Validate coordinate system compatibility between files.

    Args:
        h5ad_coords: Coordinates from H5AD file [N, 2] in [x, y] format
        hdf5_coords: Optional coordinates from HDF5 file [M, 2] in [x, y] format  
        tissue_coords: Optional coordinates from tissue TSV [P, 2] in [x, y] format
        adata: Optional AnnData object for metadata extraction
        tolerance_factor: Maximum acceptable ratio difference between coordinate ranges

    Returns:
        Dict with validation results and warnings
    """
    logger = logging.getLogger(__name__)
    results = {"compatible": True, "warnings": [], "info": {}}

    # Extract coordinate ranges
    h5ad_x_range = (h5ad_coords[:, 0].min(), h5ad_coords[:, 0].max())
    h5ad_y_range = (h5ad_coords[:, 1].min(), h5ad_coords[:, 1].max())
    h5ad_x_span = h5ad_x_range[1] - h5ad_x_range[0]
    h5ad_y_span = h5ad_y_range[1] - h5ad_y_range[0]

    results["info"]["h5ad_ranges"] = {"x": h5ad_x_range, "y": h5ad_y_range, "x_span": h5ad_x_span, "y_span": h5ad_y_span}

    # Check H5AD metadata for scale factors
    if adata is not None:
        if "spatial" in adata.uns:
            spatial_info = adata.uns["spatial"]
            if isinstance(spatial_info, dict) and len(spatial_info) > 0:
                sample_key = list(spatial_info.keys())[0]
                sample_data = spatial_info[sample_key]
                if "scalefactors" in sample_data:
                    scale_factors = sample_data["scalefactors"]
                    results["info"]["scale_factors"] = scale_factors
                    logger.info(f"H5AD scale factors: {scale_factors}")
                if "metadata" in sample_data:
                    metadata = sample_data["metadata"]
                    if "source_image_height" in metadata and "source_image_width" in metadata:
                        source_dims = (metadata["source_image_width"], metadata["source_image_height"])
                        results["info"]["source_image_dims"] = source_dims
                        logger.info(f"Source image dimensions: {source_dims[0]}x{source_dims[1]}")

        if "resolution" in adata.uns:
            results["info"]["resolution"] = adata.uns["resolution"]
            logger.info(f"H5AD resolution: {adata.uns['resolution']}")

    # Validate against HDF5 coordinates
    if hdf5_coords is not None:
        hdf5_x_range = (hdf5_coords[:, 0].min(), hdf5_coords[:, 0].max())
        hdf5_y_range = (hdf5_coords[:, 1].min(), hdf5_coords[:, 1].max())
        hdf5_x_span = hdf5_x_range[1] - hdf5_x_range[0]
        hdf5_y_span = hdf5_y_range[1] - hdf5_y_range[0]

        results["info"]["hdf5_ranges"] = {"x": hdf5_x_range, "y": hdf5_y_range, "x_span": hdf5_x_span, "y_span": hdf5_y_span}

        # Check span ratios
        x_ratio = hdf5_x_span / h5ad_x_span if h5ad_x_span > 0 else float('inf')
        y_ratio = hdf5_y_span / h5ad_y_span if h5ad_y_span > 0 else float('inf')

        results["info"]["hdf5_h5ad_ratios"] = {"x": x_ratio, "y": y_ratio}

        if x_ratio > tolerance_factor or x_ratio < 1/tolerance_factor:
            warning = f"Large X coordinate scale difference: HDF5/H5AD ratio = {x_ratio:.2f}"
            results["warnings"].append(warning)
            logger.warning(warning)

        if y_ratio > tolerance_factor or y_ratio < 1/tolerance_factor:
            warning = f"Large Y coordinate scale difference: HDF5/H5AD ratio = {y_ratio:.2f}"
            results["warnings"].append(warning)  
            logger.warning(warning)

        if abs(x_ratio - y_ratio) > 0.5:  # Different aspect ratio scaling
            warning = f"Inconsistent aspect ratio scaling: X ratio={x_ratio:.2f}, Y ratio={y_ratio:.2f}"
            results["warnings"].append(warning)
            logger.warning(warning)

        logger.info(f"HDF5 vs H5AD coordinate spans - X: {hdf5_x_span} vs {h5ad_x_span} (ratio: {x_ratio:.2f}), Y: {hdf5_y_span} vs {h5ad_y_span} (ratio: {y_ratio:.2f})")

    # Validate against tissue coordinates
    if tissue_coords is not None:
        tissue_x_range = (tissue_coords[:, 0].min(), tissue_coords[:, 0].max())
        tissue_y_range = (tissue_coords[:, 1].min(), tissue_coords[:, 1].max())
        tissue_x_span = tissue_x_range[1] - tissue_x_range[0]
        tissue_y_span = tissue_y_range[1] - tissue_y_range[0]

        results["info"]["tissue_ranges"] = {"x": tissue_x_range, "y": tissue_y_range, "x_span": tissue_x_span, "y_span": tissue_y_span}

        # Check span ratios
        x_ratio = tissue_x_span / h5ad_x_span if h5ad_x_span > 0 else float('inf')
        y_ratio = tissue_y_span / h5ad_y_span if h5ad_y_span > 0 else float('inf')

        results["info"]["tissue_h5ad_ratios"] = {"x": x_ratio, "y": y_ratio}

        if x_ratio > tolerance_factor or x_ratio < 1/tolerance_factor:
            warning = f"Large X coordinate scale difference: Tissue/H5AD ratio = {x_ratio:.2f}"
            results["warnings"].append(warning)
            logger.warning(warning)

        if y_ratio > tolerance_factor or y_ratio < 1/tolerance_factor:
            warning = f"Large Y coordinate scale difference: Tissue/H5AD ratio = {y_ratio:.2f}"
            results["warnings"].append(warning)
            logger.warning(warning)

        logger.info(f"Tissue vs H5AD coordinate spans - X: {tissue_x_span} vs {h5ad_x_span} (ratio: {x_ratio:.2f}), Y: {tissue_y_span} vs {h5ad_y_span} (ratio: {y_ratio:.2f})")

        # Check if HDF5 and tissue coordinates match (they should for the same sample)
        if hdf5_coords is not None:
            hdf5_x_span = results["info"]["hdf5_ranges"]["x_span"]
            hdf5_y_span = results["info"]["hdf5_ranges"]["y_span"]
            hdf5_tissue_x_ratio = abs(hdf5_x_span - tissue_x_span) / max(hdf5_x_span, tissue_x_span)
            hdf5_tissue_y_ratio = abs(hdf5_y_span - tissue_y_span) / max(hdf5_y_span, tissue_y_span)

            if hdf5_tissue_x_ratio > 0.01 or hdf5_tissue_y_ratio > 0.01:  # >1% difference
                warning = f"HDF5 and tissue coordinate ranges don't match well - may be different samples"
                results["warnings"].append(warning)
                logger.warning(warning)
            else:
                logger.info("HDF5 and tissue coordinates match well - likely same sample")

    if len(results["warnings"]) > 0:
        results["compatible"] = False
        logger.warning(f"Coordinate compatibility check found {len(results['warnings'])} issues")
    else:
        logger.info("Coordinate systems appear compatible")

    return results
  • What: Spatial tools: coordinate handling, tissue grid mapping, aggregation, cropping, HDF5 helpers.
  • Common workflow:
  • Load inputs:
    • load_tissue_predictions(tissue_tsv) # expects columns x,y,class
    • load_cell_predictions(cell_hdf5) -> (preds, coords)
    • get_spatial_coords(adata) -> Nx2 [x,y]
  • Map and subset/split:
    • annotate_tissue_to_spots(adata, tissue_df, swap_axes=False, column_name="tissue_type")`
    • subset_st_by_tissue(adata, tissue_df, include=..., exclude=...)
    • split_st_by_tissue(adata, tissue_df)
  • Aggregate by cell class:
    • aggregate_st_by_cell_class(adata, cell_hdf5, normalize=True, group_key=None, tissue_df=None, swap_axes=False) -> AnnData
  • Validate coordinates:
    • validate_coordinate_compatibility(h5ad_coords, hdf5_coords=None, tissue_coords=None, adata=None, tolerance_factor=10.0)
  • Cropping:
    • determine_crop_bounds({"x": xs, "y": ys}, bin_size=100) -> bounds
    • crop_spatial_data(adata, bounds) -> AnnData
    • crop_cell_data(cell_hdf5, bounds, out_hdf5) -> int
    • crop_tissue_data(tissue_tsv, bounds, out_tsv, bin_size=100) -> int
  • Utilities:
    • map_cells_to_tissue_classes(cell_coords, tissue_df)
    • split_hdf5_by_tissue(cell_hdf5, tissue_tsv)
    • aggregate_hdf5_by_cell_class(cell_hdf5, tissue_tsv=None, group_by_tissue=False, normalize=True)
    • is_continuous(seq), missing_ranges(seq)
  • Example:
    tissue = sps.st.load_tissue_predictions("tissue_preds.tsv")
    adata_t = sps.st.subset_st_by_tissue(adata, tissue, include=["Epithelium"])
    
    preds, coords = sps.st.load_cell_predictions("cells.hdf5")
    adata_agg = sps.st.aggregate_st_by_cell_class(adata, "cells.hdf5", normalize=True)
    
    xy = sps.st.get_spatial_coords(adata)
    spatial = {"x": xy[:,0], "y": xy[:,1]}
    bounds = sps.st.determine_crop_bounds(spatial, bin_size=100)
    adata_crop = sps.st.crop_spatial_data(adata, bounds)
    

Projection and Unroll

  • What: Project 2D points onto an arbitrary polyline/curve and aggregate expression along a normalized 0–1 axis.
  • Key functions:
  • project_points_onto_polyline(points, polyline, *, use_kdtree=True, kdtree_k=64, use_grid=False, grid_cell_size=None)
  • extract_polyline_from_image(path, threshold=64, invert=False, step=1)
  • unroll_adata_along_polyline(adata, polyline, *, coord_keys=("imagecol","imagerow"), features=None, layer=None, n_bins=50, agg="mean", distance_max=None)
  • unroll_sdata(sdata, points_key, line_key, *, table_key=None, features=None, layer=None, n_bins=50, agg="mean", distance_max=None)
    (optional integration with SpatialData; see SpatialData)

  • Examples:

    import numpy as np
    import spstencil as sps
    
    # 1) Project arbitrary points onto a sine-wave polyline
    x = np.linspace(-500, 500, 1000)
    polyline = np.stack([x, np.sin(x/20.0)], axis=1)
    pts = np.random.uniform([-500, polyline[:,1].min()], [500, polyline[:,1].max()], size=(2000, 2))
    res = sps.project_points_onto_polyline(pts, polyline, use_kdtree=True, kdtree_k=64)
    # res.normalized_positions in [0,1], res.projected_points are closest points on the polyline
    
    # 2) Extract a polyline from a thin-curve ROI image
    poly_from_img = sps.extract_polyline_from_image("testdata/roi.jpg", threshold=64, step=5)
    
    # 3) Unroll AnnData expression along a curve
    agg_df, mapping_df = sps.unroll_adata_along_polyline(
        adata,
        poly_from_img,
        coord_keys=("imagecol", "imagerow"),
        features=["Car1", "Retnlb", "Prdx6"],
        n_bins=50,
        agg="mean",
        distance_max=250.0,
    )
    
    # 4) SpatialData adapter (if using SpatialData objects)
    # agg_df, mapping_df = sps.unroll_sdata(
    #     sdata, points_key="spots", line_key="roi_line", table_key="table",
    #     features=["Car1", "Retnlb", "Prdx6"], n_bins=50
    # )
    

Utilities

convert_cyx_to_yxc_array

convert_cyx_to_yxc_array(img: ndarray) -> np.ndarray

Convert a 3D CYX numpy array to YXC by transposing (1, 2, 0).

Raises ValueError if the input is not 3D.

Source code in spstencil/_utils/axes.py
def convert_cyx_to_yxc_array(img: np.ndarray) -> np.ndarray:
    """Convert a 3D CYX numpy array to YXC by transposing (1, 2, 0).

    Raises ValueError if the input is not 3D.
    """
    if img.ndim != 3:
        raise ValueError(f"Expected 3D array (CYX), got shape {img.shape}")
    return img.transpose(1, 2, 0)

get_tiff_axes

get_tiff_axes(path: str | Path) -> Optional[str]

Return axes string from first series of a TIFF, if available.

Examples: "YXC", "CYX", "TCZYX". Returns None if not present.

Source code in spstencil/_utils/axes.py
def get_tiff_axes(path: str | Path) -> Optional[str]:
    """Return axes string from first series of a TIFF, if available.

    Examples: "YXC", "CYX", "TCZYX". Returns None if not present.
    """
    p = Path(path)
    try:
        with tfi.TiffFile(str(p)) as tf:
            series = tf.series[0] if tf.series else None
            axes = getattr(series, "axes", None)
            return str(axes) if axes is not None else None
    except Exception:
        return None

get_yxc_dims

get_yxc_dims(path: str | Path, *, on_missing: str = 'infer', channel_upper: int = 256, min_spatial: int = 64) -> tuple[int, int, int]

Return (Y, X, C) dimension sizes for a 3D TIFF.

  • Uses TIFF axes metadata when available (supports 'YXC' and 'CYX').
  • If metadata is missing or unsupported:
  • on_missing='error': raise ValueError
  • on_missing='force_cyx': assume CYX ordering
  • on_missing='infer': 1) if TIFF SamplesPerPixel matches a dimension, use that as C 2) else prefer a dimension <= channel_upper as C; others become Y/X (ideally both >= min_spatial)
Source code in spstencil/_utils/axes.py
def get_yxc_dims(
    path: str | Path,
    *,
    on_missing: str = "infer",
    channel_upper: int = 256,
    min_spatial: int = 64,
) -> tuple[int, int, int]:
    """Return (Y, X, C) dimension sizes for a 3D TIFF.

    - Uses TIFF axes metadata when available (supports 'YXC' and 'CYX').
    - If metadata is missing or unsupported:
      - on_missing='error': raise ValueError
      - on_missing='force_cyx': assume CYX ordering
      - on_missing='infer':
        1) if TIFF SamplesPerPixel matches a dimension, use that as C
        2) else prefer a dimension <= channel_upper as C; others become Y/X
           (ideally both >= min_spatial)
    """
    p = Path(path)
    with tfi.TiffFile(str(p)) as tf:
        series = tf.series[0] if tf.series else None
        if series is None or series.shape is None:
            raise ValueError(f"Cannot determine shape for {p}")
        shape = tuple(int(x) for x in series.shape)
        axes = getattr(series, "axes", None)

    if len(shape) != 3:
        raise ValueError(f"Expected 3D TIFF, got shape {shape} for {p}")

    if axes:
        up = str(axes).upper()
        if up == "YXC":
            return shape[0], shape[1], shape[2]
        if up == "CYX":
            return shape[1], shape[2], shape[0]
        # fallthrough to missing policy for unsupported axes

    if on_missing == "error":
        raise ValueError(
            f"No usable axes metadata for {p} (axes={axes}); specify on_missing or convert first"
        )
    if on_missing == "force_cyx":
        return shape[1], shape[2], shape[0]
    if on_missing != "infer":
        raise ValueError("on_missing must be one of {'error','infer','force_cyx'}")

    # Heuristic inference
    d0, d1, d2 = shape
    warnings.warn(
        f"Inferring axes for {p}: shape={shape}, on_missing={on_missing}, "
        f"channel_upper={channel_upper}, min_spatial={min_spatial}",
        RuntimeWarning,
    )
    # 1) Prefer TIFF SamplesPerPixel if it matches a dimension
    spp = _get_samples_per_pixel(p)
    if spp is not None and spp in (d0, d1, d2):
        if spp == d0:
            return d1, d2, d0  # assume CYX
        if spp == d2:
            return d0, d1, d2  # assume YXC
        if spp == d1:
            # middle-as-channels; pick larger as X
            y, x = (d0, d2) if d2 >= d0 else (d2, d0)
            return y, x, d1

    # 2) If a boundary-small dim looks like channels
    if d0 <= channel_upper and d1 >= min_spatial and d2 >= min_spatial:
        return d1, d2, d0  # assume CYX
    if d2 <= channel_upper and d0 >= min_spatial and d1 >= min_spatial:
        return d0, d1, d2  # assume YXC
    # Generic: smallest is channels
    dims = [d0, d1, d2]
    c_idx = int(np.argmin(dims))
    c = dims[c_idx]
    spatial = [dims[i] for i in range(3) if i != c_idx]
    if not (spatial[0] >= min_spatial and spatial[1] >= min_spatial):
        # Prefer YXC if last dim is the smallest and reasonably small
        if c_idx == 2 and c <= channel_upper:
            return d0, d1, d2
        # otherwise assume CYX
        return d1, d2, d0
    # Map back to Y,X depending on which index is channels
    if c_idx == 0:  # CYX
        return d1, d2, d0
    if c_idx == 2:  # YXC
        return d0, d1, d2
    # Middle-as-channels is unusual; pick larger as X
    y, x = sorted(spatial)
    return y, x, c

imread_yxc

imread_yxc(path: str | Path, *, force: bool = False) -> np.ndarray

Read a TIFF from disk and return a 3D YXC numpy array.

Uses TIFF metadata to determine axes: - If axes == 'YXC': returns as-is - If axes == 'CYX': converts to YXC - If axes missing or different: raises unless force=True (assumes CYX)

Source code in spstencil/_utils/axes.py
def imread_yxc(path: str | Path, *, force: bool = False) -> np.ndarray:
    """Read a TIFF from disk and return a 3D YXC numpy array.

    Uses TIFF metadata to determine axes:
    - If axes == 'YXC': returns as-is
    - If axes == 'CYX': converts to YXC
    - If axes missing or different: raises unless force=True (assumes CYX)
    """
    p = Path(path)
    img = tfi.imread(str(p))
    if img.ndim != 3:
        raise ValueError(f"Expected 3D TIFF, got shape {img.shape} for {p}")
    axes = get_tiff_axes(p)
    if axes is None:
        if not force:
            raise ValueError(f"No TIFF axes metadata for {p}; pass force=True to assume CYX")
        return convert_cyx_to_yxc_array(img)
    axes_up = axes.upper()
    if axes_up == "YXC":
        return img
    if axes_up == "CYX":
        return convert_cyx_to_yxc_array(img)
    if not force:
        raise ValueError(f"Unsupported axes '{axes}' for {p}; expected CYX or YXC")
    return convert_cyx_to_yxc_array(img)
  • What: TIFF axis handling and conversions for arrays/images.
  • Key functions:
  • get_tiff_axes(path) -> Optional[str]
  • imread_yxc(path, force=False) -> np.ndarray # returns YXC
  • convert_cyx_to_yxc_array(img) -> np.ndarray
  • get_yxc_dims(path, on_missing='infer', channel_upper=256, min_spatial=64) -> (Y, X, C)
  • Example:
    img_yxc = sps.axes.imread_yxc("he.tif", force=False)
    y, x, c = sps.axes.get_yxc_dims("he.tif", on_missing="infer")
    

convert_cyx_to_yxc.py ───────────────────────────────────────────────

Walk through a directory, find every *.tif(f) image assumed to be in CYX (channels–first) order, transpose it to YXC (channels–last) order, and save the result as a new TIFF.

The output keeps only the first three dimensions and adds an axes entry to the TIFF metadata so downstream tools (e.g. happy) recognise the layout.

Usage

$ python -m spstencil._utils.cyx2yxc /path/to/input_dir [/path/to/output_dir]

• If output_dir is omitted, files are written next to the originals with the suffix _yxc.tif. • Recurses into sub‑directories.

convert_cyx_dir_to_yxc

convert_cyx_dir_to_yxc(input_dir: Path, output_dir: Path | None = None, *, workers: int | None = None, force: bool = False) -> int

Convert all TIFFs under input_dir from CYX to YXC, respecting metadata.

Returns the number of successfully converted files. Skips files that are already YXC or have incompatible/missing axes unless force=True.

Source code in spstencil/_utils/cyx2yxc.py
def convert_cyx_dir_to_yxc(
    input_dir: pathlib.Path,
    output_dir: pathlib.Path | None = None,
    *,
    workers: int | None = None,
    force: bool = False,
) -> int:
    """Convert all TIFFs under input_dir from CYX to YXC, respecting metadata.

    Returns the number of successfully converted files. Skips files that are already
    YXC or have incompatible/missing axes unless force=True.
    """
    if not input_dir.is_dir():
        raise NotADirectoryError(f"Input path '{input_dir}' is not a directory")

    tiffs = _find_tiffs(input_dir)
    if not tiffs:
        return 0

    max_workers = workers if workers is not None else max(_mp.cpu_count() // 2, 1)
    print(f"Found {len(tiffs)} TIFF(s), starting conversion…\n")
    converted = 0
    with _fut.ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = [ex.submit(_convert_one, p, output_dir, force=force) for p in tiffs]
        for f in futures:
            try:
                if f.result():
                    converted += 1
            except Exception as e:
                print(f"[ERROR] Worker failure: {e}", file=sys.stderr)
    print("\nDone.")
    return converted
  • What: Batch convert directories of TIFFs from CYX to YXC.
  • Key function:
  • convert_cyx_dir_to_yxc(input_dir, output_dir=None, workers=None, force=False) -> bool|int
  • Example:
    from pathlib import Path
    num = sps.cyx2yxc.convert_cyx_dir_to_yxc(Path("raw/"), Path("yxc/"), workers=8, force=False)