import json
import os
import urllib.request
import zipfile
from pathlib import Path
import numpy as np
from tqdm import tqdm
from zea import log
# Girder API base URL shared by CAMUS and CETUS collections
GIRDER_API = "https://humanheart-project.creatis.insa-lyon.fr/database/api/v1"
[docs]
def sitk_load(filepath: str | Path, squeeze: bool = False):
"""Load a NIfTI/medical image using SimpleITK and return the array and metadata.
Args:
filepath: Path to the image file.
squeeze: If True, squeeze singleton dimensions from the array.
Defaults to False.
Returns:
Tuple of:
- Image array. Shape depends on the input and the ``squeeze`` parameter.
- Dictionary of metadata: ``origin``, ``spacing``, ``direction``, ``size``,
``dimension``, and a ``metadata`` sub-dict with all image metadata keys.
"""
try:
import SimpleITK as sitk
except ImportError as exc:
raise ImportError(
"SimpleITK is not installed. "
"Please install it with `pip install SimpleITK` to use this function."
) from exc
image = sitk.ReadImage(str(filepath))
all_metadata = {}
for k in image.GetMetaDataKeys():
all_metadata[k] = image.GetMetaData(k)
metadata = {
"origin": image.GetOrigin(),
"spacing": image.GetSpacing(),
"direction": image.GetDirection(),
"size": image.GetSize(),
"dimension": image.GetDimension(),
"metadata": all_metadata,
}
im_array = sitk.GetArrayFromImage(image)
if squeeze:
im_array = np.squeeze(im_array)
return im_array, metadata
[docs]
def load_avi(file_path, mode="L"):
"""Load a .avi file and return a numpy array of frames.
Decoding and colour conversion are done with OpenCV, which releases the GIL,
so calling this from a thread pool actually parallelises across files (unlike a
per-frame PIL loop, which is GIL-bound). The "L"/"RGB" conversions use the same
ITU-R 601 luma coefficients as PIL, so results match to within rounding.
Args:
file_path (str | Path): The path to the video file.
mode (str, optional): Color mode: "L" (grayscale) or "RGB".
Defaults to "L".
Returns:
numpy.ndarray: Array of frames (num_frames, H, W) or (num_frames, H, W, C)
"""
try:
import cv2
except ImportError as exc:
raise ImportError(
"OpenCV is required for loading video files. "
"Please install it with 'pip install opencv-python' or "
"'pip install opencv-python-headless'."
) from exc
if mode not in ("L", "RGB"):
raise ValueError(f"Unsupported mode {mode!r}, expected 'L' or 'RGB'.")
cap = cv2.VideoCapture(str(file_path))
if not cap.isOpened():
raise OSError(f"Could not open video file {file_path}")
frames = []
try:
while True:
ok, frame = cap.read() # OpenCV decodes to BGR
if not ok:
break
if mode == "L":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame)
finally:
cap.release()
if not frames:
raise OSError(f"No frames decoded from video file {file_path}")
return np.stack(frames)
[docs]
def unzip(src: Path, dst: Path) -> Path:
"""Unzips a .zip file to a directory.
Will check if the unzip has already been fully completed by checking for a file named
".fully_unzipped" in the destination directory.
Args:
src (Path): Path to the .zip file to unzip.
dst (Path): Path to the directory where the files will be unzipped.
Returns:
Path: Path to the unzipped directory.
"""
assert src.suffix == ".zip", f"Source path {src} is not a .zip file."
already_unzipped_filepath = dst / ".fully_unzipped"
if already_unzipped_filepath.exists():
log.info("Files already fully unzipped. Skipping unzipping.")
return dst
if dst.exists() and dst.is_dir() and len(list(dst.iterdir())) > 0:
raise ValueError(
f"Destination directory {dst} is not empty, but the file {already_unzipped_filepath} "
"does not exist. Maybe the previous unzip attempt failed. Please remove the directory "
"and try again."
)
if not src.exists():
raise FileNotFoundError(f"Zip file {src} does not exist.")
log.info(f"Unzipping {src} to {dst}...")
dst_root = os.path.realpath(dst)
with zipfile.ZipFile(src, "r") as zip_ref:
for member in tqdm(zip_ref.namelist(), desc="Extracting files"):
target = os.path.realpath(os.path.join(dst_root, member))
if os.path.commonpath([dst_root, target]) != dst_root:
raise ValueError(f"Unsafe path in zip archive: {member}")
zip_ref.extract(member, dst)
log.info("Unzipping completed.")
# Create file to indicate all files have been unzipped
already_unzipped_filepath.touch()
return dst
[docs]
def download_file(url: str, destination: str | Path) -> Path: # pragma: no cover
"""Download a file from a URL to a local path.
Skips the download if the file already exists at *destination*.
Shows a :mod:`tqdm` progress bar based on the ``content-length``
header when available.
Uses the ``ZEA_DOWNLOAD_TIMEOUT`` environment variable (default 600 s)
as the socket timeout.
Args:
url: URL to download from.
destination: Full file path where the downloaded content will be saved.
The parent directory is created if it does not exist.
Returns:
Path to the (possibly pre-existing) downloaded file.
"""
destination = Path(destination)
if destination.exists():
log.info(f"File already exists: {destination.name}. Skipping download.")
return destination
destination.parent.mkdir(parents=True, exist_ok=True)
timeout = int(os.getenv("ZEA_DOWNLOAD_TIMEOUT", "600"))
filename = destination.name
temp_path = destination.with_name(f"{destination.name}.part")
if temp_path.exists():
temp_path.unlink()
log.info(f"Downloading {filename} ...")
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
total_header = response.headers.get("content-length")
total = int(total_header) if total_header is not None else None
bytes_written = 0
with (
open(temp_path, "wb") as f,
tqdm(total=total or None, unit="B", unit_scale=True, desc=filename) as progress,
):
while chunk := response.read(8192):
f.write(chunk)
bytes_written += len(chunk)
progress.update(len(chunk))
f.flush()
os.fsync(f.fileno())
if total is not None and bytes_written != total:
raise IOError(
f"Downloaded size mismatch for {filename}: "
f"expected {total} bytes, got {bytes_written}."
)
temp_path.replace(destination)
finally:
if temp_path.exists() and not destination.exists():
temp_path.unlink(missing_ok=True)
log.info(f"Downloaded {filename} to {destination.parent}")
return destination
[docs]
def download_from_girder( # pragma: no cover
collection_id: str,
destination: str | Path,
dataset_name: str,
patients: list[int] | None = None,
top_folder_name: str = "dataset",
) -> Path:
"""Download a dataset from the Girder server.
Navigates the Girder collection to find patient folders and downloads
all files for each patient. Existing files are skipped.
Args:
collection_id: Girder collection ID for the dataset.
destination: Directory where the dataset will be downloaded.
dataset_name: Human-readable name used in log messages
(e.g. ``"CAMUS"`` or ``"CETUS"``).
patients: Optional list of patient IDs to download.
If None, all patients in the collection are downloaded.
top_folder_name: Name of the top-level folder inside the collection
that contains patient subfolders. Defaults to ``"dataset"``.
Returns:
Path to the downloaded dataset directory.
"""
destination = Path(destination)
destination.mkdir(parents=True, exist_ok=True)
timeout = int(os.getenv("ZEA_DOWNLOAD_TIMEOUT", "60"))
# Get top-level folders in the collection
url = f"{GIRDER_API}/folder?parentType=collection&parentId={collection_id}&limit=50"
with urllib.request.urlopen(url, timeout=timeout) as resp:
folders = json.loads(resp.read())
dataset_folder_id = None
for folder in folders:
if folder["name"] == top_folder_name:
dataset_folder_id = folder["_id"]
break
if dataset_folder_id is None:
raise RuntimeError(
f"Could not find '{top_folder_name}' folder in {dataset_name} collection."
)
# Get patient folders (paginated — some datasets have >50 patients)
patient_folders = []
offset = 0
page_size = 50
while True:
url = (
f"{GIRDER_API}/folder?parentType=folder&parentId={dataset_folder_id}"
f"&limit={page_size}&offset={offset}"
)
with urllib.request.urlopen(url, timeout=timeout) as resp:
page = json.loads(resp.read())
if not page:
break
patient_folders.extend(page)
if len(page) < page_size:
break
offset += page_size
if patients is not None:
patient_set = set(patients)
patient_folders = [
pf for pf in patient_folders if int(pf["name"].removeprefix("patient")) in patient_set
]
log.info(f"Downloading {len(patient_folders)} patients from {dataset_name} dataset...")
for pf in tqdm(patient_folders, desc="Downloading patients"):
patient_name = pf["name"]
patient_dir = destination / patient_name
patient_dir.mkdir(parents=True, exist_ok=True)
# Get items (files) in the patient folder
url = f"{GIRDER_API}/item?folderId={pf['_id']}&limit=50"
with urllib.request.urlopen(url, timeout=timeout) as resp:
items = json.loads(resp.read())
for item in items:
file_path = patient_dir / item["name"]
if file_path.exists():
log.debug(f"File {file_path} already exists when downloading. Skipping.")
continue
download_url = f"{GIRDER_API}/item/{item['_id']}/download"
log.debug(f"Downloading {item['name']}...")
with urllib.request.urlopen(download_url, timeout=timeout) as resp:
file_path.write_bytes(resp.read())
log.info(f"{dataset_name} dataset downloaded to {destination}")
return destination
# ---------------------------------------------------------------------------
# HuggingFace Hub helpers
# ---------------------------------------------------------------------------
[docs]
def check_output_dir_ownership(folder: "str | Path", repo_id: str) -> None:
"""Raise if *folder* already contains data from a different dataset.
The check is based on the ``zea_repo_id`` field written into the dataset
card (``README.md``) by each converter. A directory is considered *owned*
by a specific dataset when its README.md contains ``zea_repo_id: <repo_id>``.
* **Empty or non-existent directory** → passes (first-time run).
* **Directory with matching README.md** → passes (re-run of same dataset).
* **Directory with mismatched README.md** → raises :class:`FileExistsError`.
* **Directory with HDF5 files but no README.md** → raises :class:`FileExistsError`.
Args:
folder: Output directory to inspect.
repo_id: Expected dataset repository ID, e.g. ``"zeahub/picmus"``.
Raises:
FileExistsError: If the directory belongs to a different dataset.
"""
folder = Path(folder)
readme = folder / "README.md"
if not folder.exists():
return # fresh directory — OK
if readme.exists():
if f"zea_repo_id: {repo_id}" not in readme.read_text():
raise FileExistsError(
f"Output directory '{folder}' already contains data from a different dataset "
f"(README.md does not declare 'zea_repo_id: {repo_id}'). "
"Use a separate output directory for each dataset."
)
return # correct dataset — OK (re-run)
# No README.md yet — fail only if HDF5 files are present (stale/foreign data)
if any(folder.rglob("*.hdf5")):
raise FileExistsError(
f"Output directory '{folder}' already contains HDF5 files but no dataset "
"README.md. Use a separate, empty output directory for each dataset, "
"or delete this directory to start fresh."
)
[docs]
def require_output_dir_ownership(folder: "str | Path", repo_id: str) -> None:
"""Raise if *folder* does not contain a verified dataset card for *repo_id*.
Used as a pre-flight check before uploading to HuggingFace Hub to prevent
accidentally uploading files from a different dataset.
Args:
folder: Directory to check.
repo_id: Expected dataset repository ID, e.g. ``"zeahub/picmus"``.
Raises:
FileNotFoundError: If no README.md is found.
ValueError: If the README.md does not match *repo_id*.
"""
folder = Path(folder)
readme = folder / "README.md"
if not readme.exists():
raise FileNotFoundError(
f"No README.md found in '{folder}'. Run the conversion step before uploading."
)
if f"zea_repo_id: {repo_id}" not in readme.read_text():
raise ValueError(
f"'{folder}/README.md' does not declare 'zea_repo_id: {repo_id}'. "
f"This directory does not appear to contain the '{repo_id}' dataset. "
"Make sure you are uploading the correct directory."
)
[docs]
def write_dataset_card(folder: str | Path, card_content: str) -> Path: # pragma: no cover
"""Write a HuggingFace dataset card (``README.md``) into *folder*.
Args:
folder: Directory where ``README.md`` will be written.
card_content: Markdown content for the dataset card.
Returns:
Path to the written ``README.md`` file.
"""
folder = Path(folder)
card_path = folder / "README.md"
card_path.write_text(card_content)
return card_path
[docs]
def upload_dataset_to_hf( # pragma: no cover
folder: str | Path,
repo_id: str,
revision: str,
file_glob: str = "*.hdf5",
commit_message: str | None = None,
allow_patterns: "list[str] | None" = None,
) -> None:
"""Upload a converted dataset to a HuggingFace Hub revision branch.
Uses :meth:`huggingface_hub.HfApi.upload_large_folder`, the resumable,
chunked, multi-commit uploader meant for large datasets (many or large
files). Upload to the ``main`` branch is intentionally blocked. After
uploading to a named revision branch, verify the data manually and then
merge the branch into ``main`` on the Hugging Face Hub.
Args:
folder: Root folder containing the files to upload.
repo_id: Hugging Face Hub repository ID (e.g. ``"zeahub/picmus"``).
revision: Target branch name. Must not be ``"main"``.
file_glob: Glob pattern for files to include in the size summary.
Defaults to ``"*.hdf5"``.
commit_message: Message used only when creating the *revision* branch
(``upload_large_folder`` generates its own per-commit messages).
Defaults to ``"Upload <repo_id> (zea format) to <revision>"``.
allow_patterns: Optional list of glob patterns limiting which files in
*folder* are uploaded. When ``None`` (default) the whole folder is
uploaded. Use this to scope an upload to specific files.
Raises:
ValueError: If *revision* is ``"main"``.
FileNotFoundError: If no files matching *file_glob* are found
under *folder*.
"""
from huggingface_hub import HfApi, login
if revision == "main":
raise ValueError(
"Upload to 'main' is intentionally blocked. "
"Upload to a named revision branch instead, then merge into main "
"manually after verifying the upload on the Hub."
)
folder = Path(folder)
files = sorted(folder.rglob(file_glob))
if not files:
raise FileNotFoundError(f"No files matching '{file_glob}' found in {folder}")
total_size_mb = sum(f.stat().st_size for f in files) / 1e6
if commit_message is None:
commit_message = f"Upload {repo_id} (zea format) to {revision}"
log.info("")
log.info("=" * 60)
log.info(" HuggingFace upload summary")
log.info("=" * 60)
log.info(f" Repository : {repo_id}")
log.info(f" Branch : {revision}")
log.info(f" Source : {folder}")
log.info(f" Files : {len(files)}")
log.info(f" Total size : {total_size_mb:.1f} MB")
log.info("=" * 60)
log.info("")
answer = input("Proceed with upload? [y/N] ").strip().lower()
if answer != "y":
log.info("Upload cancelled.")
return
login()
api = HfApi()
# Check if the revision (branch) exists; if not, prompt to create it.
try:
refs = api.list_repo_refs(repo_id=repo_id, repo_type="dataset")
branch_names = {b.name for b in refs.branches}
if revision not in branch_names:
create = (
input(
f"Revision (branch) '{revision}' does not exist on {repo_id}. Create it? [y/N] "
)
.strip()
.lower()
)
if create != "y":
log.info("Upload cancelled — revision not created.")
return
api.create_branch(repo_id=repo_id, branch=revision, repo_type="dataset")
log.info("Created branch '%s' on %s.", revision, repo_id)
except Exception as exc:
log.warning("Could not verify revision existence: %s", exc)
# upload_large_folder is the resumable, chunked, multi-commit uploader meant
# for big datasets (many/large files). It manages its own commit messages, so
# commit_message only affects the branch-creation path above, not the upload.
api.upload_large_folder(
folder_path=str(folder),
repo_id=repo_id,
repo_type="dataset",
revision=revision,
allow_patterns=allow_patterns,
)
log.info(f"Uploaded to https://huggingface.co/datasets/{repo_id}/tree/{revision}")