import json
import os
import urllib.request
import zipfile
from pathlib import Path
import imageio
import numpy as np
from PIL import Image
from tqdm import tqdm
from zea import log
# Girder API base URL shared by CAMUS and CETUS collections
GIRDER_API = "https://humanheart-project.creatis.insa-lyon.fr/database/api/v1"
[docs]
def sitk_load(filepath: str | Path, squeeze: bool = False):
"""Load a NIfTI/medical image using SimpleITK and return the array and metadata.
Args:
filepath: Path to the image file.
squeeze: If True, squeeze singleton dimensions from the array.
Defaults to False.
Returns:
Tuple of:
- Image array. Shape depends on the input and the ``squeeze`` parameter.
- Dictionary of metadata: ``origin``, ``spacing``, ``direction``, ``size``,
``dimension``, and a ``metadata`` sub-dict with all image metadata keys.
"""
try:
import SimpleITK as sitk
except ImportError as exc:
raise ImportError(
"SimpleITK is not installed. "
"Please install it with `pip install SimpleITK` to use this function."
) from exc
image = sitk.ReadImage(str(filepath))
all_metadata = {}
for k in image.GetMetaDataKeys():
all_metadata[k] = image.GetMetaData(k)
metadata = {
"origin": image.GetOrigin(),
"spacing": image.GetSpacing(),
"direction": image.GetDirection(),
"size": image.GetSize(),
"dimension": image.GetDimension(),
"metadata": all_metadata,
}
im_array = sitk.GetArrayFromImage(image)
if squeeze:
im_array = np.squeeze(im_array)
return im_array, metadata
[docs]
def load_avi(file_path, mode="L"):
"""Load a .avi file and return a numpy array of frames.
Args:
filename (str): The path to the video file.
mode (str, optional): Color mode: "L" (grayscale) or "RGB".
Defaults to "L".
Returns:
numpy.ndarray: Array of frames (num_frames, H, W) or (num_frames, H, W, C)
"""
frames = []
with imageio.get_reader(file_path) as reader:
for frame in reader:
img = Image.fromarray(frame)
img = img.convert(mode)
img = np.array(img)
frames.append(img)
return np.stack(frames)
[docs]
def unzip(src: str | Path, dataset: str) -> Path:
"""
Checks if data folder exist in src.
Otherwise, unzip dataset.zip in src.
Args:
src (str | Path): The source directory containing the zip file or unzipped folder.
dataset (str): The name of the dataset to unzip.
Options are "picmus", "camus", "echonet", "echonetlvh".
Returns:
Path: The path to the unzipped dataset directory.
"""
src = Path(src)
if dataset == "picmus":
zip_name = "picmus.zip"
folder_name = "archive_to_download"
unzip_dir = src / folder_name
elif dataset == "camus":
zip_name = "CAMUS_public.zip"
folder_name = "CAMUS_public"
unzip_dir = src / folder_name
elif dataset == "echonet":
zip_name = "EchoNet-Dynamic.zip"
folder_name = "EchoNet-Dynamic"
unzip_dir = src / folder_name / "Videos"
elif dataset == "echonetlvh":
zip_name = "EchoNet-LVH.zip"
folder_name = "Batch1"
unzip_dir = src
else:
raise ValueError(f"Dataset {dataset} not recognized for unzip.")
if (src / folder_name).exists():
if dataset == "echonetlvh":
# EchoNetLVH dataset unzips into four folders. Check they all exist.
assert (src / "Batch2").exists(), f"Missing Batch2 folder in {src}."
assert (src / "Batch3").exists(), f"Missing Batch3 folder in {src}."
assert (src / "Batch4").exists(), f"Missing Batch4 folder in {src}."
assert (src / "MeasurementsList.csv").exists(), (
f"Missing MeasurementsList.csv in {src}."
)
log.info(f"Found Batch1, Batch2, Batch3, Batch4 and MeasurementsList.csv in {src}.")
return unzip_dir
# CAMUS special cases: Girder download produces a database_nifti sub-folder,
# or the user may have extracted patient* folders directly into src.
if dataset == "camus":
if (src / "database_nifti").exists():
log.info(f"Found database_nifti folder in {src}.")
return src / "database_nifti"
if any(src.glob("patient*")):
log.info(f"Found patient folders directly in {src}.")
return src
zip_path = src / zip_name
if not zip_path.exists():
raise FileNotFoundError(f"Could not find {zip_name} or {folder_name} folder in {src}.")
log.info(f"Unzipping {zip_path} to {src}...")
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(src)
log.info("Unzipping completed.")
log.info(f"Starting conversion from {src / folder_name}.")
return unzip_dir
[docs]
def download_file(url: str, destination: str | Path) -> Path: # pragma: no cover
"""Download a file from a URL to a local path.
Skips the download if the file already exists at *destination*.
Shows a :mod:`tqdm` progress bar based on the ``content-length``
header when available.
Uses the ``ZEA_DOWNLOAD_TIMEOUT`` environment variable (default 600 s)
as the socket timeout.
Args:
url: URL to download from.
destination: Full file path where the downloaded content will be saved.
The parent directory is created if it does not exist.
Returns:
Path to the (possibly pre-existing) downloaded file.
"""
destination = Path(destination)
if destination.exists():
log.info(f"File already exists: {destination.name}. Skipping download.")
return destination
destination.parent.mkdir(parents=True, exist_ok=True)
timeout = int(os.getenv("ZEA_DOWNLOAD_TIMEOUT", "600"))
filename = destination.name
temp_path = destination.with_name(f"{destination.name}.part")
if temp_path.exists():
temp_path.unlink()
log.info(f"Downloading {filename} ...")
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
total_header = response.headers.get("content-length")
total = int(total_header) if total_header is not None else None
bytes_written = 0
with (
open(temp_path, "wb") as f,
tqdm(total=total or None, unit="B", unit_scale=True, desc=filename) as progress,
):
while chunk := response.read(8192):
f.write(chunk)
bytes_written += len(chunk)
progress.update(len(chunk))
f.flush()
os.fsync(f.fileno())
if total is not None and bytes_written != total:
raise IOError(
f"Downloaded size mismatch for {filename}: "
f"expected {total} bytes, got {bytes_written}."
)
temp_path.replace(destination)
finally:
if temp_path.exists() and not destination.exists():
temp_path.unlink(missing_ok=True)
log.info(f"Downloaded {filename} to {destination.parent}")
return destination
[docs]
def download_from_girder( # pragma: no cover
collection_id: str,
destination: str | Path,
dataset_name: str,
patients: list[int] | None = None,
top_folder_name: str = "dataset",
) -> Path:
"""Download a dataset from the Girder server.
Navigates the Girder collection to find patient folders and downloads
all files for each patient. Existing files are skipped.
Args:
collection_id: Girder collection ID for the dataset.
destination: Directory where the dataset will be downloaded.
dataset_name: Human-readable name used in log messages
(e.g. ``"CAMUS"`` or ``"CETUS"``).
patients: Optional list of patient IDs to download.
If None, all patients in the collection are downloaded.
top_folder_name: Name of the top-level folder inside the collection
that contains patient subfolders. Defaults to ``"dataset"``.
Returns:
Path to the downloaded dataset directory.
"""
destination = Path(destination)
destination.mkdir(parents=True, exist_ok=True)
timeout = int(os.getenv("ZEA_DOWNLOAD_TIMEOUT", "60"))
# Get top-level folders in the collection
url = f"{GIRDER_API}/folder?parentType=collection&parentId={collection_id}&limit=50"
with urllib.request.urlopen(url, timeout=timeout) as resp:
folders = json.loads(resp.read())
dataset_folder_id = None
for folder in folders:
if folder["name"] == top_folder_name:
dataset_folder_id = folder["_id"]
break
if dataset_folder_id is None:
raise RuntimeError(
f"Could not find '{top_folder_name}' folder in {dataset_name} collection."
)
# Get patient folders (paginated — some datasets have >50 patients)
patient_folders = []
offset = 0
page_size = 50
while True:
url = (
f"{GIRDER_API}/folder?parentType=folder&parentId={dataset_folder_id}"
f"&limit={page_size}&offset={offset}"
)
with urllib.request.urlopen(url, timeout=timeout) as resp:
page = json.loads(resp.read())
if not page:
break
patient_folders.extend(page)
if len(page) < page_size:
break
offset += page_size
if patients is not None:
patient_set = set(patients)
patient_folders = [
pf for pf in patient_folders if int(pf["name"].removeprefix("patient")) in patient_set
]
log.info(f"Downloading {len(patient_folders)} patients from {dataset_name} dataset...")
for pf in tqdm(patient_folders, desc="Downloading patients"):
patient_name = pf["name"]
patient_dir = destination / patient_name
patient_dir.mkdir(parents=True, exist_ok=True)
# Get items (files) in the patient folder
url = f"{GIRDER_API}/item?folderId={pf['_id']}&limit=50"
with urllib.request.urlopen(url, timeout=timeout) as resp:
items = json.loads(resp.read())
for item in items:
file_path = patient_dir / item["name"]
if file_path.exists():
log.debug(f"File {file_path} already exists when downloading. Skipping.")
continue
download_url = f"{GIRDER_API}/item/{item['_id']}/download"
log.debug(f"Downloading {item['name']}...")
with urllib.request.urlopen(download_url, timeout=timeout) as resp:
file_path.write_bytes(resp.read())
log.info(f"{dataset_name} dataset downloaded to {destination}")
return destination
# ---------------------------------------------------------------------------
# HuggingFace Hub helpers
# ---------------------------------------------------------------------------
[docs]
def check_output_dir_ownership(folder: "str | Path", repo_id: str) -> None:
"""Raise if *folder* already contains data from a different dataset.
The check is based on the ``zea_repo_id`` field written into the dataset
card (``README.md``) by each converter. A directory is considered *owned*
by a specific dataset when its README.md contains ``zea_repo_id: <repo_id>``.
* **Empty or non-existent directory** → passes (first-time run).
* **Directory with matching README.md** → passes (re-run of same dataset).
* **Directory with mismatched README.md** → raises :class:`FileExistsError`.
* **Directory with HDF5 files but no README.md** → raises :class:`FileExistsError`.
Args:
folder: Output directory to inspect.
repo_id: Expected dataset repository ID, e.g. ``"zeahub/picmus"``.
Raises:
FileExistsError: If the directory belongs to a different dataset.
"""
folder = Path(folder)
readme = folder / "README.md"
if not folder.exists():
return # fresh directory — OK
if readme.exists():
if f"zea_repo_id: {repo_id}" not in readme.read_text():
raise FileExistsError(
f"Output directory '{folder}' already contains data from a different dataset "
f"(README.md does not declare 'zea_repo_id: {repo_id}'). "
"Use a separate output directory for each dataset."
)
return # correct dataset — OK (re-run)
# No README.md yet — fail only if HDF5 files are present (stale/foreign data)
if any(folder.rglob("*.hdf5")):
raise FileExistsError(
f"Output directory '{folder}' already contains HDF5 files but no dataset "
"README.md. Use a separate, empty output directory for each dataset, "
"or delete this directory to start fresh."
)
[docs]
def require_output_dir_ownership(folder: "str | Path", repo_id: str) -> None:
"""Raise if *folder* does not contain a verified dataset card for *repo_id*.
Used as a pre-flight check before uploading to HuggingFace Hub to prevent
accidentally uploading files from a different dataset.
Args:
folder: Directory to check.
repo_id: Expected dataset repository ID, e.g. ``"zeahub/picmus"``.
Raises:
FileNotFoundError: If no README.md is found.
ValueError: If the README.md does not match *repo_id*.
"""
folder = Path(folder)
readme = folder / "README.md"
if not readme.exists():
raise FileNotFoundError(
f"No README.md found in '{folder}'. Run the conversion step before uploading."
)
if f"zea_repo_id: {repo_id}" not in readme.read_text():
raise ValueError(
f"'{folder}/README.md' does not declare 'zea_repo_id: {repo_id}'. "
f"This directory does not appear to contain the '{repo_id}' dataset. "
"Make sure you are uploading the correct directory."
)
[docs]
def write_dataset_card(folder: str | Path, card_content: str) -> Path: # pragma: no cover
"""Write a HuggingFace dataset card (``README.md``) into *folder*.
Args:
folder: Directory where ``README.md`` will be written.
card_content: Markdown content for the dataset card.
Returns:
Path to the written ``README.md`` file.
"""
folder = Path(folder)
card_path = folder / "README.md"
card_path.write_text(card_content)
return card_path
[docs]
def upload_dataset_to_hf( # pragma: no cover
folder: str | Path,
repo_id: str,
revision: str,
file_glob: str = "*.hdf5",
commit_message: str | None = None,
) -> None:
"""Upload a converted dataset to a HuggingFace Hub revision branch.
Upload to the ``main`` branch is intentionally blocked. After uploading
to a named revision branch, verify the data manually and then merge the
branch into ``main`` on the Hugging Face Hub.
Args:
folder: Root folder containing the files to upload.
repo_id: Hugging Face Hub repository ID (e.g. ``"zeahub/picmus"``).
revision: Target branch name. Must not be ``"main"``.
file_glob: Glob pattern for files to include in the size summary.
Defaults to ``"*.hdf5"``.
commit_message: Commit message. Defaults to
``"Upload <repo_id> (zea format) to <revision>"``.
Raises:
ValueError: If *revision* is ``"main"``.
FileNotFoundError: If no files matching *file_glob* are found
under *folder*.
"""
from huggingface_hub import HfApi, login
if revision == "main":
raise ValueError(
"Upload to 'main' is intentionally blocked. "
"Upload to a named revision branch instead, then merge into main "
"manually after verifying the upload on the Hub."
)
folder = Path(folder)
files = sorted(folder.rglob(file_glob))
if not files:
raise FileNotFoundError(f"No files matching '{file_glob}' found in {folder}")
total_size_mb = sum(f.stat().st_size for f in files) / 1e6
if commit_message is None:
commit_message = f"Upload {repo_id} (zea format) to {revision}"
log.info("")
log.info("=" * 60)
log.info(" HuggingFace upload summary")
log.info("=" * 60)
log.info(f" Repository : {repo_id}")
log.info(f" Branch : {revision}")
log.info(f" Source : {folder}")
log.info(f" Files : {len(files)}")
log.info(f" Total size : {total_size_mb:.1f} MB")
log.info("=" * 60)
log.info("")
answer = input("Proceed with upload? [y/N] ").strip().lower()
if answer != "y":
log.info("Upload cancelled.")
return
login(new_session=False)
api = HfApi()
# Check if the revision (branch) exists; if not, prompt to create it.
try:
refs = api.list_repo_refs(repo_id=repo_id, repo_type="dataset")
branch_names = {b.name for b in refs.branches}
if revision not in branch_names:
create = (
input(
f"Revision (branch) '{revision}' does not exist on {repo_id}. Create it? [y/N] "
)
.strip()
.lower()
)
if create != "y":
log.info("Upload cancelled — revision not created.")
return
api.create_branch(repo_id=repo_id, branch=revision, repo_type="dataset")
log.info("Created branch '%s' on %s.", revision, repo_id)
except Exception as exc:
log.warning("Could not verify revision existence: %s", exc)
api.upload_folder(
folder_path=str(folder),
repo_id=repo_id,
repo_type="dataset",
revision=revision,
commit_message=commit_message,
)
log.info(f"Uploaded to https://huggingface.co/datasets/{repo_id}/tree/{revision}")