Source code for zea.data.legacy_file
import numpy as np
from keras.utils import pad_sequences
from zea import log
[docs]
def dict_to_sorted_list(dictionary: dict):
"""Convert a dictionary with sortable keys to a sorted list of values.
.. note::
This function operates on the top level of the dictionary only.
If the dictionary contains nested dictionaries, those will not be sorted.
Example:
.. doctest::
>>> from zea.data.legacy_file import dict_to_sorted_list
>>> input_dict = {"number_000": 5, "number_001": 1, "number_002": 23}
>>> dict_to_sorted_list(input_dict)
[5, 1, 23]
Args:
dictionary (dict): The dictionary to convert. The keys must be sortable.
Returns:
list: The sorted list of values.
"""
return [value for _, value in sorted(dictionary.items())]
def _waveforms_dict_to_array(waveforms_dict: dict):
"""Convert waveforms stored as a dictionary to a padded numpy array."""
waveforms = dict_to_sorted_list(waveforms_dict)
return pad_sequences(waveforms, dtype=np.float32, padding="post")
def _reformat_waveforms(scan_kwargs: dict) -> dict:
"""Reformat waveforms from dict to array if needed. This is for backwards compatibility and will
be removed in a future version of zea.
Args:
scan_kwargs (dict): The scan parameters.
Returns:
scan_kwargs (dict): The scan parameters with the keys waveforms_one_way and
waveforms_two_way reformatted to arrays if they were stored as dicts.
"""
if "waveforms_one_way" in scan_kwargs and isinstance(scan_kwargs["waveforms_one_way"], dict):
log.warning(
"The waveforms_one_way parameter is stored as a dictionary in the file. "
"Converting to array. This will be deprecated in future versions of zea. "
"Please update your files to store waveforms as arrays of shape `(n_tx, n_samples)`."
)
scan_kwargs["waveforms_one_way"] = _waveforms_dict_to_array(
scan_kwargs["waveforms_one_way"]
)
if "waveforms_two_way" in scan_kwargs and isinstance(scan_kwargs["waveforms_two_way"], dict):
log.warning(
"The waveforms_two_way parameter is stored as a dictionary in the file. "
"Converting to array. This will be deprecated in future versions of zea. "
"Please update your files to store waveforms as arrays of shape `(n_tx, n_samples)`."
)
scan_kwargs["waveforms_two_way"] = _waveforms_dict_to_array(
scan_kwargs["waveforms_two_way"]
)
return scan_kwargs
[docs]
def check_focus_distances(scan_parameters: dict) -> dict:
"""Warn and auto-convert focus distances stored in wavelengths to metres.
Some older files store ``focus_distances`` in wavelengths rather than
metres. This helper detects the pattern (values ≥ 1 and ≠``inf``) and
converts them using ``sound_speed / center_frequency``.
Args:
scan_parameters: Raw scan parameter dict loaded from HDF5.
Returns:
dict: The same dict, with ``focus_distances`` converted when needed.
"""
if "focus_distances" in scan_parameters:
focus_distances = scan_parameters["focus_distances"]
if np.any(np.logical_and(focus_distances >= 1, focus_distances != np.inf)):
log.warning(
"We have detected that focus distances are (probably) stored in "
"wavelengths. Please update your file! "
"Converting to metres automatically for now, but this assumes that "
"`center_frequency` is the probe centre frequency which is not always "
"the case!"
)
assert "sound_speed" in scan_parameters, (
"Cannot convert focus distances from wavelengths to metres "
"because sound_speed is not defined in the scan parameters."
)
assert "center_frequency" in scan_parameters, (
"Cannot convert focus distances from wavelengths to metres "
"because center_frequency is not defined in the scan parameters."
)
wavelength = scan_parameters["sound_speed"] / scan_parameters["center_frequency"]
scan_parameters["focus_distances"] = focus_distances * wavelength
return scan_parameters
def _if_exists_cast_to_float(key, parameters):
"""Cast a value to float if it exists."""
if key in parameters:
parameters[key] = np.float32(parameters[key])
[docs]
def infer_n_tx(scan_parameters: dict):
"""Infer n_tx from n_frames and n_ax."""
if "n_tx" in scan_parameters:
return scan_parameters["n_tx"]
if "t0_delays" in scan_parameters:
return scan_parameters["t0_delays"].shape[0]
if "focus_distances" in scan_parameters:
return scan_parameters["focus_distances"].shape[0]
if "polar_angles" in scan_parameters:
return scan_parameters["polar_angles"].shape[0]
raise ValueError("Cannot infer 'n_tx' from scan parameters. ")
[docs]
def legacy_scan(scan_parameters: dict):
"""Format scan parameters for legacy file."""
if set(scan_parameters.keys()) == {"n_ax", "n_frames", "n_tx"}:
return {}
scan_parameters = check_focus_distances(scan_parameters)
scan_parameters = _reformat_waveforms(scan_parameters)
scan_parameters.pop("probe_geometry", None)
scan_parameters.pop("n_ax", None)
scan_parameters.pop("n_el", None)
n_tx = scan_parameters.pop("n_tx", None)
scan_parameters.pop("n_ch", None)
scan_parameters.pop("n_frames", None)
scan_parameters.pop("bandwidth_percent", None)
if "demodulation_frequency" not in scan_parameters:
if "center_frequency" in scan_parameters:
scan_parameters["demodulation_frequency"] = scan_parameters["center_frequency"]
else:
raise ValueError("No demodulation or center frequency found in scan parameters.")
if "transmit_origins" not in scan_parameters:
n_tx = infer_n_tx(scan_parameters)
scan_parameters["transmit_origins"] = np.zeros((int(n_tx), 3), dtype=np.float32)
for key in ["sampling_frequency", "sound_speed", "center_frequency", "demodulation_frequency"]:
if key in scan_parameters:
scan_parameters[key] = np.squeeze(scan_parameters[key])
for key in [
"sampling_frequency",
"demodulation_frequency",
"center_frequency",
"initial_times",
"transmit_origins",
"sound_speed",
]:
_if_exists_cast_to_float(key, scan_parameters)
return scan_parameters
[docs]
def legacy_probe(scan_parameters: dict):
"""Format probe parameters for legacy file."""
probe_parameters = {}
if "probe_geometry" in scan_parameters:
probe_parameters["probe_geometry"] = scan_parameters["probe_geometry"]
return probe_parameters