zea.Pipeline¶

class zea.Pipeline(operations, with_batch_dim=True, jit_options='ops', jit_kwargs=None, name='pipeline', validate=True, timed=False, device=None)[source]¶

Bases: object

Pipeline class for processing ultrasound data through a series of operations.

Initialize a pipeline.

Parameters:
  • operations (List[Operation]) – A list of Operation instances representing the operations to be performed.

  • with_batch_dim (bool) – Whether operations should expect a batch dimension. Defaults to True.

  • jit_options (Optional[str]) –

    The JIT options to use. Must be “pipeline”, “ops”, or None.

    • ”pipeline”: compiles the entire pipeline as a single function. This may be faster but does not preserve python control flow, such as caching.

    • ”ops”: compiles each operation separately. This preserves python control flow and caching functionality, but speeds up the operations.

    • None: disables JIT compilation.

    Defaults to “ops”.

  • jit_kwargs (dict | None) – Additional keyword arguments for the JIT compiler.

  • name (str, optional) – The name of the pipeline. Defaults to “pipeline”.

  • validate (bool, optional) – Whether to validate the pipeline. Defaults to True.

  • timed (bool) – Whether to time each operation. Defaults to False.

  • device (Optional[str]) – Default device for all pipeline calls, e.g. 'cpu', 'gpu:0', 'cuda:1'. Can be overridden per-call by passing device= to __call__. Uses zea.backend.func_on_device() under the hood, which moves input tensors to the device for the torch backend and wraps the call in a device context for JAX / TensorFlow. Defaults to None (no device placement).

__call__(return_numpy=False, device=None, **inputs)[source]¶

Process input data through the pipeline.

Parameters:
  • return_numpy (bool) – If True, convert output tensors to NumPy arrays before returning.

  • device (Optional[str]) – Device to run this call on, e.g. 'cpu', 'gpu:0', or 'cuda:1'. Overrides the pipeline-level device set at construction time for this single invocation. When None (default), the pipeline-level device attribute is used (which is also None by default, meaning no explicit device placement).

  • **inputs – Tensor inputs forwarded to the operations.

Return type:

Dict[str, Any]

append(operation)[source]¶

Append an operation to the pipeline.

call(**inputs)[source]¶

Process input data through the pipeline.

Return type:

Dict[str, Any]

copy()[source]¶

Create a copy of the pipeline.

Return type:

Pipeline

classmethod from_config(config, **kwargs)[source]¶

Create a pipeline from a dictionary or zea.Config object.

Parameters:
  • config (Dict) – Configuration dictionary or zea.Config object. Must have a pipeline key with a subkey operations.

  • **kwargs – Additional keyword arguments to be passed to the pipeline.

Return type:

Pipeline

Example

>>> from zea import Config, Pipeline
>>> config = Config(
...     {
...         "pipeline": {
...             "operations": [
...                 "identity",
...             ],
...         }
...     }
... )
>>> pipeline = Pipeline.from_config(config)
classmethod from_default(beamformer='delay_and_sum', num_patches=100, baseband=False, enable_pfield=False, timed=False, **kwargs)[source]¶

Create a default pipeline.

Parameters:
  • beamformer (str) – Type of beamformer to use. Currently supporting: - “delay_and_sum” - “delay_multiply_and_sum” - “coherence_factor” - “generalized_coherence_factor” Defaults to “delay_and_sum”.

  • num_patches (int) – Number of patches for the PatchedGrid operation. Defaults to 100. If you get an out of memory error, try to increase this number.

  • baseband (bool) – If True, assume the input data is baseband (I/Q) data, which has 2 channels (last dim). Defaults to False, which assumes RF data, so input signal has a single channel dim and is still on carrier frequency.

  • enable_pfield (bool) – If True, apply PfieldWeighting. Defaults to False. This will calculate pressure field and only beamform the data to those locations.

  • timed (bool, optional) – Whether to time each operation. Defaults to False.

  • **kwargs – Additional keyword arguments to be passed to the Pipeline constructor.

Return type:

Pipeline

classmethod from_json(json_string, **kwargs)[source]¶

Create a pipeline from a JSON string.

Parameters:
  • json_string (str) – JSON string representing the pipeline. Must have a pipeline key with a subkey operations.

  • **kwargs – Additional keyword arguments to be passed to the pipeline.

Return type:

Pipeline

Example: `python json_string = '{"pipeline": {"operations": ["identity"]}}' pipeline = Pipeline.from_json(json_string) `

classmethod from_path(file_path, revision=None, **kwargs)[source]¶

Create a pipeline from a YAML/config file path.

Parameters:
  • file_path (str) – Path to the config file (local or hf:// URI). Must have a pipeline key with a subkey operations.

  • revision (str) – Revision of the config file (for Hugging Face hf:// URIs).

  • **kwargs – Additional keyword arguments to be passed to the pipeline.

Return type:

Pipeline

Example

>>> from zea import Config, Pipeline
>>> config = Config(
...     {
...         "pipeline": {
...             "operations": [
...                 "identity",
...             ],
...         }
...     }
... )
>>> config.to_yaml("pipeline.yaml")
>>> pipeline = Pipeline.from_path("pipeline.yaml")
classmethod from_yaml(cls, file_path, **kwargs)[source]¶

Deprecated. Use from_path() instead.

Return type:

Pipeline

get_dict(compact=True)[source]¶

Convert the pipeline to a dictionary.

Parameters:

compact (bool) – If True (default), only include parameters that differ from their defaults. If False, include all parameters for full reproducibility.

Return type:

dict

get_params(per_operation=False)[source]¶

Get a snapshot of the current parameters of the operations in the pipeline.

Parameters:

per_operation (bool) – If True, return a list of dictionaries for each operation. If False, return a single dictionary with all parameters combined.

property input_data_type¶

Get the input_data_type property of the pipeline.

insert(index, operation)[source]¶

Insert an operation at a specific index in the pipeline.

property jit_options¶

Get the jit_options property of the pipeline.

property jittable¶

Check if all operations in the pipeline are jittable.

property key: str¶

Input key of the pipeline.

classmethod load(file_path, **kwargs)[source]¶

Load a pipeline from a JSON or YAML file.

Return type:

Pipeline

needs(key)[source]¶

Check if the pipeline needs a specific key at the input.

Return type:

bool

property needs_keys: set¶

Get a set of all input keys needed by the pipeline.

Will keep track of keys that are already provided by previous operations.

property operations: List[Operation | Pipeline]¶

Alias for self.layers to match the zea naming convention

property output_data_type¶

Get the output_data_type property of the pipeline.

property output_key: str¶

Output key of the pipeline.

property output_keys: set¶

All output keys the pipeline guarantees to produce.

prepare_parameters(parameters=None, device=None, **overrides)[source]¶

Prepare a Parameters object for the pipeline.

Converts the (validated and derived) parameters needed by this pipeline’s operations into a dictionary of tensors, then overlays any manually supplied overrides (e.g. config.parameters or ad-hoc keyword arguments). Overrides take priority over the values in parameters.

Parameters:
  • parameters (Parameters) – Parameters object. Only the keys this pipeline needs (and that are not overridden) are converted, so derivation is lazy and minimal.

  • device (Optional[str]) – Device to place the tensors on. Defaults to the pipeline device.

  • **overrides – Additional parameters to include in the inputs (converted to tensors). These overwrite values taken from parameters.

Returns:

Dictionary of inputs with all values as tensors.

Return type:

dict

Example

inputs = pipeline.prepare_parameters(parameters, **config.parameters)
outputs = pipeline(data=raw_data, **inputs)
prepend(operation)[source]¶

Prepend an operation to the pipeline.

reinitialize()[source]¶

Reinitialize the pipeline in place.

reset_timer()[source]¶

Reset the timer for timed operations.

set_jit(value)[source]¶

Set the JIT compilation for the pipeline.

set_params(**params)[source]¶

Set parameters for the operations in the pipeline by adding them to the cache.

property static_params: List[str]¶

Get a list of static parameters for the pipeline.

to_config(compact=True)[source]¶

Convert the pipeline to a zea.Config object.

Return type:

Config

to_json(compact=True)[source]¶

Convert the pipeline to a JSON string.

Return type:

str

to_yaml(file_path, compact=True)[source]¶

Convert the pipeline to a YAML file.

Return type:

None

property unjitable_ops¶

Get a list of operations that are not jittable.

property valid_keys: set¶

Get a set of valid keys for the pipeline.

This is all keys that can be passed to the pipeline as input.

validate()[source]¶

Validate the pipeline by checking the compatibility of the operations.

property with_batch_dim¶

Get the with_batch_dim property of the pipeline.