Source code for zea.scan

"""Structure containing the parameters defining an ultrasound acquisition.

This module provides the :class:`Parameters` class, a flexible structure
for managing all parameters related to an ultrasound acquisition (merged probe
and scan parameters).

Features
^^^^^^^^

- **Flexible initialization:** The :class:`Parameters` class supports lazy initialization,
  allowing you to specify any combination of supported parameters. You can pass only
  the parameters you have, and the rest will be computed or set to defaults as needed.

- **Automatic computation:** Many scan properties (such as
  grid, number of pixels, wavelength, etc.) are computed automatically from the
  provided parameters. This enables you to work with minimal input and still obtain
  all necessary scan configuration details.

- **Dependency tracking and lazy evaluation:** Derived properties are computed only
  when accessed, and are automatically invalidated and recomputed if their dependencies
  change. This ensures efficient memory usage and avoids unnecessary computations.

- **Parameter validation:** All parameters are type-checked and validated against
  a predefined schema, reducing errors and improving robustness.

- **Selection of transmits:** The scan supports flexible selection of transmit events,
  using the :meth:`set_transmits` method. You can select all, a specific number,
  or specific transmit indices. The selection is stored and can be accessed via
  the :attr:`selected_transmits` property.

Comparison to ``zea.Config`` and ``zea.Probe``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- :class:`zea.config.Config`: A general-purpose parameter dictionary for experiment and pipeline
  configuration. It is not specific to ultrasound acquisition and does not compute
  derived parameters.

- :class:`zea.probes.Probe`: Contains only probe-specific parameters (e.g., geometry, frequency).

- :class:`zea.Parameters`: Combines all parameters relevant to an ultrasound acquisition,
  including probe, acquisition, and scan region. It also provides automatic computation
  of derived properties and dependency management.

Example Usage
^^^^^^^^^^^^^

.. doctest::

    >>> from zea import Config, File, Probe, Parameters

    >>> # The usual entry point: load the merged probe + scan parameters from a file
    >>> path = (
    ...     "hf://zeahub/picmus/database/experiments/contrast_speckle/"
    ...     "contrast_speckle_expe_dataset_iq/contrast_speckle_expe_dataset_iq.hdf5"
    ... )
    >>> with File(path) as f:
    ...     parameters = f.load_parameters()
    >>> type(parameters).__name__
    'Parameters'

    >>> # You can also build one from a Probe's parameters ...
    >>> probe = Probe.from_name("verasonics_l11_4v")
    >>> parameters = Parameters(
    ...     probe_geometry=probe.probe_geometry,
    ...     center_frequency=probe.probe_center_frequency,
    ...     element_width=probe.element_width,
    ...     grid_size_z=256,
    ...     n_tx=11,
    ... )

    >>> # ... from a Config's parameters ...
    >>> config = Config.from_path("hf://zeahub/configs/config_picmus_rf.yaml")
    >>> parameters = Parameters(n_tx=11, **config.parameters)

    >>> # ... or fully manually
    >>> parameters = Parameters(
    ...     grid_size_x=128,
    ...     grid_size_z=256,
    ...     xlims=(-0.02, 0.02),
    ...     zlims=(0.0, 0.06),
    ...     ylims=(0.0, 0.0),
    ...     center_frequency=6.25e6,
    ...     sound_speed=1540.0,
    ...     sampling_frequency=25e6,
    ...     n_el=128,
    ...     n_tx=11,
    ...     probe_geometry=probe.probe_geometry,
    ... )

    >>> # Access a derived property (computed lazily)
    >>> grid = parameters.grid  # shape: (grid_size_z, grid_size_x, 3)

    >>> # Select a subset of transmit events
    >>> _ = parameters.set_transmits(3)  # Use 3 evenly spaced transmits
    >>> _ = parameters.set_transmits([0, 2, 4])  # Use specific transmit indices
    >>> _ = parameters.set_transmits("all")  # Use all transmits

"""

from copy import deepcopy

import numpy as np
from keras import ops

from zea import log
from zea.beamform.pfield import compute_pfield
from zea.beamform.pixelgrid import (
    cartesian_pixel_grid,
    check_for_aliasing,
    polar_pixel_grid,
)
from zea.data.spec import ScanSpec, ProbeSpec
from zea.display import compute_scan_convert_2d_coordinates
from zea.internal.parameters import BaseParameters, MissingDependencyError, cache_with_dependencies
from zea.internal.utils import deprecated
from zea.probes import Probe


[docs] class Parameters(BaseParameters): """Represents a full ultrasound acquisition configuration with computed properties. A :class:`Parameters` object holds **all** parameters relevant to an acquisition — merged probe and scan parameters — and computes derived quantities (grid, wavelength, pfield, scan-conversion coordinates, ...) lazily with dependency tracking and caching. Obtain one from a file via :meth:`zea.data.file.File.load_parameters`. The set of valid file-backed parameters is derived from :class:`~zea.data.spec.ScanSpec` and :class:`~zea.data.spec.ProbeSpec` (single source of truth), extended with recon/beamforming parameters that are not stored in the file. Arbitrary custom parameters may also be set; they are stored as-is and are not used to compute any derived parameters (e.g. the beamforming grid). They are simply passed through — for example to a pipeline call (see :class:`~zea.internal.parameters.BaseParameters`). Args: grid_size_x (int): Grid width in pixels. For a cartesian grid, this is the lateral (x) pixels in the grid, set to prevent aliasing if not provided. For a polar grid, this can be thought of as the number for rays in the polar direction. grid_size_z (int): Grid height in pixels. This is the number of axial (z) pixels in the grid, set to prevent aliasing if not provided. sound_speed (float, optional): Speed of sound in the medium in m/s. Defaults to 1540.0. sampling_frequency (float): Sampling frequency in Hz. center_frequency (float): Transmit center frequency in Hz. demodulation_frequency (float, optional): Demodulation frequency in Hz. n_el (int): Number of elements in the transducer array. n_tx (int): Number of transmit events in the dataset. n_ax (int): Number of axial samples in the received signal. n_ch (int, optional): Number of channels (1 for RF, 2 for IQ data). xlims (tuple of float): Lateral (x) limits of the imaging region in meters (min, max). ylims (tuple of float, optional): Elevation (y) limits of the imaging region in meters (min, max). zlims (tuple of float): Axial (z) limits of the imaging region in meters (min, max). probe_geometry (np.ndarray): Element positions as array of shape (n_el, 3). polar_angles (np.ndarray): Polar angles for each transmit event in radians of shape (n_tx,). These angles are often used in 2D imaging. azimuth_angles (np.ndarray): Azimuth angles for each transmit event in radians of shape (n_tx,). These angles are often used in 3D imaging. t0_delays (np.ndarray): Transmit delays in seconds of shape (n_tx, n_el), shifted such that the smallest delay is 0. tx_apodizations (np.ndarray): Transmit apodizations of shape (n_tx, n_el). focus_distances (np.ndarray): Distance from the origin point on the transducer to where the beam comes to focus for each transmit in meters of shape (n_tx,). transmit_origins (np.ndarray): Transmit origins of shape (n_tx, 3). initial_times (np.ndarray): Initial times in seconds for each event of shape (n_tx,). probe_bandwidth_percent (float, optional): Bandwidth as percentage of center frequency. Defaults to 200.0. time_to_next_transmit (np.ndarray): The time between subsequent transmit events of shape (n_frames, n_tx). tgc_gain_curve (np.ndarray): Time gain compensation (TGC) curve of shape (n_ax,). waveforms_one_way (np.ndarray): The one-way transmit waveforms of shape (n_waveforms, n_samples). waveforms_two_way (np.ndarray): The two-way transmit waveforms of shape (n_waveforms, n_samples). t_peak (np.ndarray, optional): The time of the peak of the pulse of every transmit waveform of shape (n_tx,). pixels_per_wavelength (int, optional): Number of pixels per wavelength. Defaults to 4. element_width (float, optional): Width of each transducer element in meters. resolution (float, optional): Resolution for scan conversion in mm / pixel. If None, it is calculated based on the input image. pfield_kwargs (dict, optional): Additional parameters for pressure field computation. See `zea.beamform.pfield.compute_pfield` for details. apply_lens_correction (bool, optional): Whether to apply lens correction to delays. Defaults to False. lens_thickness (float, optional): Thickness of the lens in meters. f_number (float, optional): F-number of the transducer. Defaults to 1.0. theta_range (tuple, optional): Range of theta angles for 3D imaging. phi_range (tuple, optional): Range of phi angles for 3D imaging. rho_range (tuple, optional): Range of rho (radial) distances for 3D imaging. fill_value (float, optional): Value to use for out-of-bounds pixels. attenuation_coef (float, optional): Attenuation coefficient in dB/(MHz*cm). Defaults to 0.0. selected_transmits (None, str, int, list, slice, or np.ndarray, optional): Specifies which transmit events to select. - None or "all": Use all transmits. - "center": Use only the center transmit. - int: Select this many evenly spaced transmits. - list/array: Use these specific transmit indices. - slice: Use transmits specified by the slice (e.g., slice(0, 10, 2)). grid_type (str, optional): Type of grid to use for beamforming. Can be "cartesian" or "polar". Defaults to "cartesian". dynamic_range (tuple, optional): Dynamic range for image display. Defined in dB as (min_dB, max_dB). distance_to_apex (float, optional): Distance from the transducer to the apex of the pixel grid. This property is used for polar grids. Will be computed automatically if not provided. """ scan_schema = deepcopy(ScanSpec.SCHEMA) probe_schema = deepcopy(Probe.SCHEMA) for key in Probe._NON_PARAMETERS: probe_schema.pop(key) # Valid parameters are derived from the scan and probe specs + a few # beamforming-only parameters. VALID_PARAMS = { **scan_schema, **probe_schema, "grid_size_x": {"dtype": np.int32}, "grid_size_y": {"dtype": np.int32}, "grid_size_z": {"dtype": np.int32}, "xlims": {"dtype": np.float32, "shape": (2,)}, "ylims": {"dtype": np.float32, "shape": (2,)}, "zlims": {"dtype": np.float32, "shape": (2,)}, "pixels_per_wavelength": {"dtype": np.int32, "default": 4}, "pfield_kwargs": {"dtype": dict, "default": {}}, "apply_lens_correction": {"dtype": bool, "default": False}, # native dtype on purpose "grid_type": {"dtype": str, "default": "cartesian"}, "polar_limits": {"dtype": np.float32, "shape": (2,)}, "dynamic_range": {"dtype": np.float32, "shape": (2,)}, "selected_transmits": { "dtype": (type(None), str, int, list, slice, np.ndarray), "default": None, }, "n_frames": {"dtype": np.int32}, "n_el": {"dtype": np.int32}, "n_tx": {"dtype": np.int32}, "n_ax": {"dtype": int}, # native dtype on purpose "n_ch": {"dtype": np.int32}, "attenuation_coef": {"dtype": np.float32, "default": 0.0}, "f_number": {"dtype": float, "default": 1.0}, # native dtype on purpose "t_peak": {"dtype": np.float32}, "theta_range": {"dtype": np.float32, "shape": (2,)}, "phi_range": {"dtype": np.float32, "shape": (2,)}, "rho_range": {"dtype": np.float32, "shape": (2,)}, "fill_value": {"dtype": float}, "resolution": {"dtype": (np.float32, type(None)), "default": None}, "distance_to_apex": {"dtype": np.float32, "default": 0.0}, } # Add some defaults that are not stored in a file VALID_PARAMS["sound_speed"]["default"] = 1540.0 VALID_PARAMS["probe_bandwidth_percent"]["default"] = 200.0 @cache_with_dependencies("probe_geometry") def aperture_size(self): """Calculate the aperture size (x,y,z) based on the probe geometry.""" if "probe_geometry" in self._params: x_coords = self.probe_geometry[:, 0] y_coords = self.probe_geometry[:, 1] z_coords = self.probe_geometry[:, 2] aperture_width = x_coords.max() - x_coords.min() aperture_height = y_coords.max() - y_coords.min() aperture_depth = z_coords.max() - z_coords.min() return np.array([aperture_width, aperture_height, aperture_depth]) return None @cache_with_dependencies("polar_limits", "aperture_size") def distance_to_apex(self): """Calculate the distance from the transducer to the apex of the pixel grid.""" if "distance_to_apex" in self._params: return self._params["distance_to_apex"] if self.aperture_size is not None: max_angle = np.max(np.abs(self.polar_limits)) t = np.tan(max_angle) if np.isclose(t, 0.0): return 0.0 distance_to_apex = (self.aperture_size[0] / 2) / t return distance_to_apex return 0.0 @cache_with_dependencies( "xlims", "ylims", "zlims", "grid_size_x", "grid_size_z", "grid_size_y", "grid_type", "is_3d", "polar_limits", "distance_to_apex", ) def grid(self): """The beamforming grid of shape (grid_size_z, grid_size_x, [grid_size_y], 3).""" if self.grid_type == "polar": if self.is_3d: raise NotImplementedError("3D polar grids are not yet supported.") return polar_pixel_grid( self.polar_limits, self.zlims, self.grid_size_z, self.grid_size_x, self.distance_to_apex, ) elif self.grid_type == "cartesian": return cartesian_pixel_grid( self.xlims, self.zlims, self.ylims, grid_size_z=self.grid_size_z, grid_size_x=self.grid_size_x, grid_size_y=self.grid_size_y, ) else: raise ValueError( f"Unsupported grid type: {self.grid_type}. Supported types are " "'cartesian' and 'polar'." ) @cache_with_dependencies("xlims", "wavelength", "pixels_per_wavelength") def grid_size_x(self): """Grid width in pixels. For a cartesian grid, this is the lateral (x) pixels in the grid, set to prevent aliasing if not provided. For a polar grid, this can be thought of as the number for rays in the polar direction. """ grid_size_x = self._params.get("grid_size_x") if grid_size_x is not None: return grid_size_x width = self.xlims[1] - self.xlims[0] min_grid_size_x = int(np.ceil(width / (self.wavelength / self.pixels_per_wavelength))) return max(min_grid_size_x, 1) @cache_with_dependencies( "ylims", "wavelength", "pixels_per_wavelength", ) def grid_size_y(self): """Grid height in pixels. For a cartesian grid, this is the vertical (y) pixels in the grid, set to prevent aliasing if not provided. For a polar grid, this can be thought of as the number for rays in the azimuthal direction. """ grid_size_y = self._params.get("grid_size_y") if grid_size_y is not None: return grid_size_y height = self.ylims[1] - self.ylims[0] min_grid_size_y = int(np.ceil(height / (self.wavelength / self.pixels_per_wavelength))) return max(min_grid_size_y, 1) @cache_with_dependencies( "zlims", "wavelength", "pixels_per_wavelength", ) def grid_size_z(self): """Grid depth in pixels. This is the number of axial (z) pixels in the grid, set to prevent aliasing if not provided.""" grid_size_z = self._params.get("grid_size_z") if grid_size_z is not None: return grid_size_z depth = self.zlims[1] - self.zlims[0] min_grid_size_z = int(np.ceil(depth / (self.wavelength / self.pixels_per_wavelength))) return max(min_grid_size_z, 1) @cache_with_dependencies("sound_speed", "center_frequency") def wavelength(self): """Calculate the wavelength based on sound speed and transmit center frequency.""" return self.sound_speed / self.center_frequency @cache_with_dependencies("zlims", "polar_limits", "probe_geometry") def xlims(self): """The x-limits of the beamforming grid [m]. If not explicitly set, it is computed based on the polar limits and probe geometry. """ xlims = self._params.get("xlims") if xlims is None: radius = max(self.zlims) xlims_polar = ( radius * np.cos(-np.pi / 2 + self.polar_limits[0]), radius * np.cos(-np.pi / 2 + self.polar_limits[1]), ) xlims_plane = ( min(self.probe_geometry[:, 0]), max(self.probe_geometry[:, 0]), ) xlims = ( min(xlims_polar[0], xlims_plane[0]), max(xlims_polar[1], xlims_plane[1]), ) return xlims @cache_with_dependencies("zlims", "grid_type", "azimuth_limits", "probe_geometry") def ylims(self): """The y-limits of the beamforming grid [m]. If not explicitly set, it is computed based on the azimuth limits and probe geometry. """ ylims = self._params.get("ylims") if ylims is not None: return ylims # If ylims not set, compute based on azimuth limits and probe geometry radius = max(self.zlims) ylims_azimuth = ( (0.0, 0.0) # avoid numerical imprecision with np.cos(np.pi/2) if self.azimuth_limits[0] == self.azimuth_limits[1] else ( radius * np.cos(-np.pi / 2 + self.azimuth_limits[0]), radius * np.cos(-np.pi / 2 + self.azimuth_limits[1]), ) ) ylims_plane = (min(self.probe_geometry[:, 1]), max(self.probe_geometry[:, 1])) ylims = ( min(ylims_azimuth[0], ylims_plane[0]), max(ylims_azimuth[1], ylims_plane[1]), ) return ylims @cache_with_dependencies("sound_speed", "sampling_frequency", "n_ax") def zlims(self): """The z-limits of the beamforming grid [m].""" zlims = self._params.get("zlims") if zlims is None: return [0, self.sound_speed * self.n_ax / self.sampling_frequency / 2] return zlims @cache_with_dependencies("grid", "grid_type", "distance_to_apex") def extent(self): """ The extent of the beamforming grid in the format: (xmin, xmax, ymin, ymax, zmin, zmax). """ xlims = (self.grid[..., 0].min(), self.grid[..., 0].max()) ylims = (self.grid[..., 1].min(), self.grid[..., 1].max()) zlims = (self.grid[..., 2].min(), self.grid[..., 2].max()) # For polar grids, adjust zlims to account for distance to apex if self.grid_type == "polar": zlims = (zlims[0] + self.distance_to_apex, zlims[1]) return np.array( [ xlims[0], xlims[1], ylims[0], ylims[1], zlims[0], zlims[1], ] ) @cache_with_dependencies("extent") def extent_imshow(self): """The extent of the beamforming grid in the format: (xmin, xmax, ymin, ymax, zmin, zmax). Returns: np.ndarray: The extent of the beamforming grid in the format (xmin, xmax, zmax, zmin). This format can be used directly in matplotlib's ``plt.imshow``. """ xlims_0, xlims_1, ylims_0, ylims_1, zlims_0, zlims_1 = self.extent if ylims_0 != ylims_1: log.warning("Are you sure you want to use 2D imshow extent for a 3D grid?") return np.array([xlims_0, xlims_1, zlims_1, zlims_0]) @cache_with_dependencies("grid") def flatgrid(self): """The beamforming grid of shape (grid_size_z*grid_size_x*grid_size_y, 3).""" return self.grid.reshape(-1, 3) @cache_with_dependencies("grid_size_x", "grid_size_y", "grid_size_z") def is_3d(self): """Whether the scan grid is 3D (True) or 2D (False).""" return self.grid_size_y > 1 and self.grid_size_x > 1 and self.grid_size_z > 1 @property def n_tx_total(self): """The total number of transmits in the full dataset.""" return self._params["n_tx"] @cache_with_dependencies("selected_transmits") def n_tx(self): """The number of currently selected transmits.""" return len(self.selected_transmits)
[docs] def set_transmits(self, selection): """Select which transmit events to use. This method provides flexible ways to select transmit events: Args: selection: Specifies which transmits to select: - None: Use all transmits - "all": Use all transmits - "center": Use only the center transmit - "focused": Use only focused transmits - "diverging": Use only diverging transmits - "plane": Use only plane wave transmits - int: Select this many evenly spaced transmits - list/array: Use these specific transmit indices - slice: Use transmits specified by the slice (e.g., slice(0, 10, 2)) Returns: The current instance for method chaining. Raises: ValueError: If the selection is invalid or incompatible with the scan. """ n_tx_total = self._params.get("n_tx") if n_tx_total is None: if selection is None: # n_tx not yet known (e.g. file with image-only data); store as-is. self._params["selected_transmits"] = None self._invalidate("selected_transmits") return self raise ValueError("n_tx must be set before calling set_transmits") # Handle array-like - convert to list of indices if isinstance(selection, np.ndarray): if len(selection.shape) == 0: # Handle scalar numpy array return self.set_transmits(int(selection)) elif len(selection.shape) == 1: selection = selection.tolist() else: raise ValueError(f"Invalid array shape: {selection.shape}") # Handle None and "all" - use all transmits if selection is None or selection == "all": self._params["selected_transmits"] = list(range(n_tx_total)) self._invalidate("selected_transmits") return self # Handle "center" - use center transmit if selection == "center": self._params["selected_transmits"] = [n_tx_total // 2] self._invalidate("selected_transmits") return self if selection == "focused": value = self._params.get("focus_distances") if value is None: raise ValueError("No focus distances provided, cannot select focused transmits") idx = np.where(value > 0)[0].tolist() if len(idx) == 0: raise ValueError("No focused transmits found.") self._params["selected_transmits"] = idx self._invalidate("selected_transmits") return self if selection == "diverging": value = self._params.get("focus_distances") if value is None: raise ValueError("No focus distances provided, cannot select diverging transmits") idx = np.where(value < 0)[0].tolist() if len(idx) == 0: raise ValueError("No diverging transmits found.") self._params["selected_transmits"] = idx self._invalidate("selected_transmits") return self if selection == "plane": value = self._params.get("focus_distances") if value is None: raise ValueError("No focus distances provided, cannot select plane wave transmits") idx = np.concatenate([np.where(value == 0)[0], np.where(np.isinf(value))[0]]).tolist() if len(idx) == 0: raise ValueError("No plane wave transmits found.") self._params["selected_transmits"] = idx self._invalidate("selected_transmits") return self # Handle integer - select evenly spaced transmits if isinstance(selection, (int, np.integer)): selection = int(selection) # Convert numpy integer to Python int if selection <= 0: raise ValueError(f"Number of transmits must be positive, got {selection}") if selection > n_tx_total: raise ValueError( f"Requested {selection} transmits exceeds available transmits ({n_tx_total})" ) if selection == 1: self._params["selected_transmits"] = [n_tx_total // 2] else: # Compute evenly spaced indices tx_indices = np.linspace(0, n_tx_total - 1, selection) self._params["selected_transmits"] = list(np.rint(tx_indices).astype(int)) self._invalidate("selected_transmits") return self # Handle slice - convert to list of indices if isinstance(selection, slice): selection = list(range(n_tx_total))[selection] # Handle list of indices if isinstance(selection, list): # Validate indices if not all(isinstance(i, (int, np.integer)) for i in selection): raise ValueError("All transmit indices must be integers") if any(i < 0 or i >= n_tx_total for i in selection): raise ValueError(f"Transmit indices must be between 0 and {n_tx_total - 1}") self._params["selected_transmits"] = [ int(i) for i in selection ] # Convert numpy integers to Python ints self._invalidate("selected_transmits") return self # Aliasing check check_for_aliasing(self) raise ValueError(f"Unsupported selection type: {type(selection)}")
@cache_with_dependencies("center_frequency") def demodulation_frequency(self): """The demodulation frequency in Hz.""" if self._params.get("demodulation_frequency") is not None: return self._params["demodulation_frequency"] return self.center_frequency @cache_with_dependencies("selected_transmits") def polar_angles(self): """Polar angles for each transmit event in radians of shape (n_tx,). These angles are often used in 2D imaging.""" value = self._params.get("polar_angles") if value is None: return None return value[self.selected_transmits] @cache_with_dependencies("polar_angles") def polar_limits(self): """The limits of the polar angles, used for polar grids.""" value = self._params.get("polar_limits") if value is None and self.polar_angles is not None: value = self.polar_angles.min(), self.polar_angles.max() diff = value[1] - value[0] # add 15% margin to the limits value = (value[0] - 0.15 * diff, value[1] + 0.15 * diff) return value @cache_with_dependencies("selected_transmits", "n_tx") def azimuth_angles(self): """Azimuth angles for each transmit event in radians of shape (n_tx,). These angles are often used in 3D imaging.""" value = self._params.get("azimuth_angles") if value is None: log.warning_once( "No ``azimuth_angles`` provided, using zeros", key=(id(self), "azimuth_angles"), ) return np.zeros(self.n_tx) return value[self.selected_transmits] @cache_with_dependencies("azimuth_angles") def azimuth_limits(self): """The limits of the azimuth angles.""" value = self._params.get("azimuth_limits") if value is None and self.azimuth_angles is not None: value = self.azimuth_angles.min(), self.azimuth_angles.max() diff = value[1] - value[0] # add 15% margin to the limits value = (value[0] - 0.15 * diff, value[1] + 0.15 * diff) return value @cache_with_dependencies("selected_transmits", "n_el", "n_tx") def t0_delays(self): """Transmit delays in seconds of shape (n_tx, n_el), shifted such that the smallest delay is 0.""" value = self._params.get("t0_delays") if value is None: log.warning_once( "No ``t0_delays`` provided, using zeros", key=(id(self), "t0_delays"), ) return np.zeros((self.n_tx, self.n_el)) return value[self.selected_transmits] @cache_with_dependencies("selected_transmits", "n_el", "n_tx") def tx_apodizations(self): """Transmit apodizations of shape (n_tx, n_el).""" value = self._params.get("tx_apodizations") if value is None: log.warning_once( "No ``tx_apodizations`` provided, using ones", key=(id(self), "tx_apodizations"), ) return np.ones((self.n_tx, self.n_el)) return value[self.selected_transmits] @cache_with_dependencies("selected_transmits", "n_tx") def focus_distances(self): """Focus distances in meters for each event of shape (n_tx,).""" value = self._params.get("focus_distances") if value is None: log.warning_once( "No ``focus_distances`` provided, using zeros", key=(id(self), "focus_distances"), ) return np.zeros(self.n_tx) return value[self.selected_transmits] @cache_with_dependencies("selected_transmits", "n_tx") def transmit_origins(self): """Transmit origins of shape (n_tx, 3).""" value = self._params.get("transmit_origins") if value is None: log.warning_once( "No ``transmit_origins`` provided, using zeros", key=(id(self), "transmit_origins"), ) return np.zeros((self.n_tx, 3)) return value[self.selected_transmits] @cache_with_dependencies("selected_transmits", "n_tx") def initial_times(self): """Initial times in seconds for each event of shape (n_tx,).""" value = self._params.get("initial_times") if value is None: log.warning_once( "No ``initial_times`` provided, using zeros", key=(id(self), "initial_times"), ) return np.zeros(self.n_tx) return value[self.selected_transmits] @cache_with_dependencies("waveforms_one_way", "waveforms_two_way") def n_waveforms(self): """The number of unique transmit waveforms.""" if self.waveforms_one_way is not None: return self.waveforms_one_way.shape[0] if self.waveforms_two_way is not None: return self.waveforms_two_way.shape[0] return 1 @cache_with_dependencies("center_frequency", "selected_transmits") def t_peak(self): """The time of the peak of the pulse in seconds of shape (n_tx,).""" t_peak = self._params.get("t_peak") if t_peak is None: t_peak = np.full(self.n_tx_total, 1 / self.center_frequency) return t_peak[self.selected_transmits] @cache_with_dependencies("selected_transmits") def time_to_next_transmit(self): """The time between subsequent transmit events of shape (n_frames, n_tx).""" value = self._params.get("time_to_next_transmit") if value is None: return None return value[:, self.selected_transmits] @cache_with_dependencies("n_ax") def tgc_gain_curve(self): """Time gain compensation (TGC) curve of shape (n_ax,).""" value = self._params.get("tgc_gain_curve") if value is None: log.warning_once( "No ``tgc_gain_curve`` provided, using ones", key=(id(self), "tgc_gain_curve"), ) return np.ones(self.n_ax) return value[: self.n_ax] @cache_with_dependencies( "sound_speed", "center_frequency", "probe_bandwidth_percent", "n_el", "probe_geometry", "tx_apodizations", "grid", "t0_delays", "pfield_kwargs", ) def pfield(self) -> np.ndarray: """Compute or return the pressure field (pfield) for weighting of shape (n_tx, grid_size_z, grid_size_x).""" if self.is_3d: raise NotImplementedError("Pfield computation is not yet implemented for 3D scans.") pfield = compute_pfield( sound_speed=self.sound_speed, center_frequency=self.center_frequency, n_el=self.n_el, probe_geometry=self.probe_geometry, tx_apodizations=self.tx_apodizations, grid=self.grid, t0_delays=self.t0_delays, probe_bandwidth_percent=self.probe_bandwidth_percent, **self.pfield_kwargs, ) return ops.convert_to_numpy(pfield) @cache_with_dependencies("pfield") def flat_pfield(self): """Flattened pfield for weighting of shape (n_pix, n_tx).""" return self.pfield.reshape(self.n_tx, -1).swapaxes(0, 1) @cache_with_dependencies("zlims", "distance_to_apex") def rho_range(self): """A tuple specifying the range of rho values (min_rho, max_rho). Defined in mm. Used for scan conversion.""" value = self._params.get("rho_range") if value is None: return (self.zlims[0], self.zlims[1] + self.distance_to_apex) return value @cache_with_dependencies("polar_limits") def theta_range(self): """A tuple specifying the range of theta values (min_theta, max_theta). Defined in radians. Used for scan conversion.""" value = self._params.get("theta_range") if value is None and self.polar_limits is not None: return self.polar_limits return value @cache_with_dependencies( "rho_range", "theta_range", "resolution", "grid_size_z", "grid_size_x", "distance_to_apex", ) def coordinates_2d(self): """The coordinates for scan conversion.""" coords, _ = compute_scan_convert_2d_coordinates( (self.grid_size_z, self.grid_size_x), self.rho_range, self.theta_range, self.resolution, distance_to_apex=self.distance_to_apex, ) return coords @cache_with_dependencies("is_3d", "coordinates_2d") def coordinates(self): """Get the coordinates for scan conversion.""" if self.is_3d: raise NotImplementedError return self.coordinates_2d @cache_with_dependencies("time_to_next_transmit") def pulse_repetition_frequency(self): """The pulse repetition frequency (PRF) [Hz]. Assumes a constant PRF.""" if self.time_to_next_transmit is None: raise ValueError( "Time to next transmit must be set to compute pulse repetition frequency" ) pulse_repetition_interval = np.mean(self.time_to_next_transmit) return 1 / pulse_repetition_interval @cache_with_dependencies("time_to_next_transmit") def frames_per_second(self): """The number of frames per second [Hz]. Assumes a constant frame rate. Frames per second computed based on time between transmits within a frame. Ignores time between frames (e.g. due to processing). Uses the time it took to do all transmits (per frame). So if you only use some portion of the transmits, the fps will still be calculated based on all. """ time_to_next_transmit = self._params.get("time_to_next_transmit") if time_to_next_transmit is None: raise ValueError("Time to next transmit must be set to compute frames per second") # Check if fps is constant uniq = np.unique(time_to_next_transmit, axis=0) # frame axis if uniq.shape[0] != 1: log.warning("Time to next transmit is not constant") # Compute fps time = np.mean(np.sum(time_to_next_transmit, axis=1)) fps = 1 / time return fps
[docs] def to_scan_dict(self) -> dict: """Return scan parameters as a plain dict. Suitable for passing directly to :meth:`~zea.File.create` as the ``scan`` argument, or to :func:`~zea.data.file_operations.save_file` alongside :meth:`to_probe_dict`. Only fields defined in :class:`~zea.data.spec.ScanSpec` that are currently stored on this object are included (``None`` values are omitted). Values are read through property access so that any active :attr:`selected_transmits` filtering is applied (e.g. after calling :meth:`set_transmits`). Returns: dict: Scan parameter dict keyed by :class:`~zea.data.spec.ScanSpec` field names. """ result = {} for field in ScanSpec.fields(): # Only include fields that were explicitly stored as a non-None value. # Skipping None here is critical: fields like ``tgc_gain_curve`` that # are absent in the source file are stored as None in ``_params``. # Calling ``getattr`` on them would trigger computed defaults (e.g. # ones), which would be written to the new file and break round-trips. if field not in self._params or self._params[field] is None: continue try: value = getattr(self, field) # applies selected_transmits filtering except MissingDependencyError: continue if value is not None: result[field] = value return result
[docs] def to_probe_dict(self) -> dict: """Return file-backed probe parameters as a plain dict. Suitable for passing directly to :meth:`~zea.File.create` as the ``probe`` argument, or to :func:`~zea.data.file_operations.save_file` alongside :meth:`to_scan_dict`. Only fields defined in :class:`~zea.data.spec.ProbeSpec` that are currently stored on this object are included (``None`` values are omitted). Returns: dict: Probe parameter dict keyed by :class:`~zea.data.spec.ProbeSpec` field names. """ return { field: self._params[field] for field in ProbeSpec.fields() if field in self._params and self._params[field] is not None }
def __setattr__(self, key, value): if key == "selected_transmits": # If setting selected_transmits, call set_transmits to handle logic self.set_transmits(value) else: return super().__setattr__(key, value)
[docs] class Scan(Parameters): """Deprecated alias for :class:`Parameters`. ``Scan`` was renamed to :class:`zea.Parameters` (which now holds the merged probe and scan parameters). This subclass is kept temporarily to ease the transition: instantiating it emits a :class:`DeprecationWarning` pointing to :class:`zea.Parameters`. It will be removed in a future release. """ @deprecated(replacement="zea.Parameters") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)