zea.Pipeline¶
- class zea.Pipeline(operations, with_batch_dim=True, jit_options='ops', jit_kwargs=None, name='pipeline', validate=True, timed=False, device=None)[source]¶
Bases:
objectPipeline class for processing ultrasound data through a series of operations.
Initialize a pipeline.
- Parameters:
operations (
List[Operation]) – A list of Operation instances representing the operations to be performed.with_batch_dim (
bool) – Whether operations should expect a batch dimension. Defaults to True.jit_options (
Optional[str]) –The JIT options to use. Must be “pipeline”, “ops”, or None.
”pipeline”: compiles the entire pipeline as a single function. This may be faster but does not preserve python control flow, such as caching.
”ops”: compiles each operation separately. This preserves python control flow and caching functionality, but speeds up the operations.
None: disables JIT compilation.
Defaults to “ops”.
jit_kwargs (
dict|None) – Additional keyword arguments for the JIT compiler.name (str, optional) – The name of the pipeline. Defaults to “pipeline”.
validate (bool, optional) – Whether to validate the pipeline. Defaults to True.
timed (
bool) – Whether to time each operation. Defaults to False.device (
Optional[str]) – Default device for all pipeline calls, e.g.'cpu','gpu:0','cuda:1'. Can be overridden per-call by passingdevice=to__call__. Useszea.backend.func_on_device()under the hood, which moves input tensors to the device for thetorchbackend and wraps the call in a device context for JAX / TensorFlow. Defaults toNone(no device placement).
- __call__(return_numpy=False, device=None, **inputs)[source]¶
Process input data through the pipeline.
- Parameters:
return_numpy (bool) – If
True, convert output tensors to NumPy arrays before returning.device (
Optional[str]) – Device to run this call on, e.g.'cpu','gpu:0', or'cuda:1'. Overrides the pipeline-leveldeviceset at construction time for this single invocation. WhenNone(default), the pipeline-leveldeviceattribute is used (which is alsoNoneby default, meaning no explicit device placement).**inputs – Tensor inputs forwarded to the operations.
- Return type:
Dict[str,Any]
- classmethod from_config(config, **kwargs)[source]¶
Create a pipeline from a dictionary or
zea.Configobject.- Parameters:
config (
Dict) – Configuration dictionary orzea.Configobject. Must have apipelinekey with a subkeyoperations.**kwargs – Additional keyword arguments to be passed to the pipeline.
- Return type:
Example
>>> from zea import Config, Pipeline >>> config = Config( ... { ... "pipeline": { ... "operations": [ ... "identity", ... ], ... } ... } ... ) >>> pipeline = Pipeline.from_config(config)
- classmethod from_default(beamformer='delay_and_sum', num_patches=100, baseband=False, enable_pfield=False, timed=False, **kwargs)[source]¶
Create a default pipeline.
- Parameters:
beamformer (str) – Type of beamformer to use. Currently supporting: - “delay_and_sum” - “delay_multiply_and_sum” - “coherence_factor” - “generalized_coherence_factor” Defaults to “delay_and_sum”.
num_patches (int) – Number of patches for the PatchedGrid operation. Defaults to 100. If you get an out of memory error, try to increase this number.
baseband (bool) – If True, assume the input data is baseband (I/Q) data, which has 2 channels (last dim). Defaults to False, which assumes RF data, so input signal has a single channel dim and is still on carrier frequency.
enable_pfield (bool) – If True, apply PfieldWeighting. Defaults to False. This will calculate pressure field and only beamform the data to those locations.
timed (bool, optional) – Whether to time each operation. Defaults to False.
**kwargs – Additional keyword arguments to be passed to the Pipeline constructor.
- Return type:
- classmethod from_json(json_string, **kwargs)[source]¶
Create a pipeline from a JSON string.
- Parameters:
json_string (
str) – JSON string representing the pipeline. Must have apipelinekey with a subkeyoperations.**kwargs – Additional keyword arguments to be passed to the pipeline.
- Return type:
Example:
`python json_string = '{"pipeline": {"operations": ["identity"]}}' pipeline = Pipeline.from_json(json_string) `
- classmethod from_path(file_path, revision=None, **kwargs)[source]¶
Create a pipeline from a YAML/config file path.
- Parameters:
file_path (
str) – Path to the config file (local orhf://URI). Must have apipelinekey with a subkeyoperations.revision (
str) – Revision of the config file (for Hugging Facehf://URIs).**kwargs – Additional keyword arguments to be passed to the pipeline.
- Return type:
Example
>>> from zea import Config, Pipeline >>> config = Config( ... { ... "pipeline": { ... "operations": [ ... "identity", ... ], ... } ... } ... ) >>> config.to_yaml("pipeline.yaml") >>> pipeline = Pipeline.from_path("pipeline.yaml")
- classmethod from_yaml(cls, file_path, **kwargs)[source]¶
Deprecated. Use
from_path()instead.- Return type:
- get_dict(compact=True)[source]¶
Convert the pipeline to a dictionary.
- Parameters:
compact (bool) – If True (default), only include parameters that differ from their defaults. If False, include all parameters for full reproducibility.
- Return type:
dict
- get_params(per_operation=False)[source]¶
Get a snapshot of the current parameters of the operations in the pipeline.
- Parameters:
per_operation (
bool) – If True, return a list of dictionaries for each operation. If False, return a single dictionary with all parameters combined.
- property input_data_type¶
Get the input_data_type property of the pipeline.
- property jit_options¶
Get the jit_options property of the pipeline.
- property jittable¶
Check if all operations in the pipeline are jittable.
- property key: str¶
Input key of the pipeline.
- classmethod load(file_path, **kwargs)[source]¶
Load a pipeline from a JSON or YAML file.
- Return type:
- property needs_keys: set¶
Get a set of all input keys needed by the pipeline.
Will keep track of keys that are already provided by previous operations.
- property operations: List[Operation | Pipeline]¶
Alias for self.layers to match the zea naming convention
- property output_data_type¶
Get the output_data_type property of the pipeline.
- property output_key: str¶
Output key of the pipeline.
- property output_keys: set¶
All output keys the pipeline guarantees to produce.
- prepare_parameters(parameters=None, device=None, **overrides)[source]¶
Prepare a
Parametersobject for the pipeline.Converts the (validated and derived) parameters needed by this pipeline’s operations into a dictionary of tensors, then overlays any manually supplied overrides (e.g.
config.parametersor ad-hoc keyword arguments). Overrides take priority over the values inparameters.- Parameters:
parameters (
Parameters) –Parametersobject. Only the keys this pipelineneeds(and that are not overridden) are converted, so derivation is lazy and minimal.device (
Optional[str]) – Device to place the tensors on. Defaults to the pipeline device.**overrides – Additional parameters to include in the inputs (converted to tensors). These overwrite values taken from
parameters.
- Returns:
Dictionary of inputs with all values as tensors.
- Return type:
dict
Example
inputs = pipeline.prepare_parameters(parameters, **config.parameters) outputs = pipeline(data=raw_data, **inputs)
- set_params(**params)[source]¶
Set parameters for the operations in the pipeline by adding them to the cache.
- property static_params: List[str]¶
Get a list of static parameters for the pipeline.
- property unjitable_ops¶
Get a list of operations that are not jittable.
- property valid_keys: set¶
Get a set of valid keys for the pipeline.
This is all keys that can be passed to the pipeline as input.
- property with_batch_dim¶
Get the with_batch_dim property of the pipeline.