Pipelines
A pipeline is a workflow designed to execute multiple tasks in a predefined sequence, providing full control and flexibility. They are powerful solutions for deterministic automation flows.
Key Features
- Full control and flexibility: Manage every aspect of your pipeline, from the sequence of tasks and agents to the specifics of input processing. Implement conditional logic and branching to enhance reliability and adaptability.
- Parallel execution: Run multiple processes concurrently to maximize efficiency and reduce execution time.
- Modularity and extensibility: Independently develop and test each node within the pipeline, ensuring flexibility and ease of updates. Extend functionality seamlessly by adding tasks without altering the deployed endpoint.
- Asset swapping: Effortlessly replace assets within nodes to trial different models or utilities, maintaining the pipeline structure intact.
- Batching: Process inputs in batches to boost efficiency and throughput. Explore batching strategies here.
- Reusability: Built pipelines can be reused within different agents, serving as a versatile tool for various applications.
Components
- Input Node: Accepts input data to the pipeline. The accepted data types and formats are determined by the node connected to the input node.
- Output Node: Provides pipeline output.
- Model Node: Processes data based on the populated asset e.g., a translation asset would translate input data, a scarper asset will scrape the input url.
- Script Node: Executes custom python code.
- Decision Node: Applies logical conditions to direct the flow within the pipeline.
- Router Node: Distributes incoming data to different paths based on predefined rules.
- Metric Node: Uses metric assets to evaluate input data based on reference data.
Create a pipeline
This example pipeline takes text input, translates it into another language, and performs sentiment analysis on the translated text. The pipeline has:
- 1 input node: Accepts text input.
- 2 processing nodes: Performs translation and sentiment analysis.
- 2 output nodes: Outputs the translated text and its sentiment analysis results.
from aixplain.factories.pipeline_factory import PipelineFactory
from aixplain.modules.pipeline.designer import Input
pipeline = PipelineFactory.init("Multi Input-Output Pipeline")
text_input_node = Input(data="text_input", data_types=["TEXT"], pipeline=pipeline)
TRANSLATION_ASSET_ID = '60ddefbe8d38c51c5885f98a'
translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)
SENTIMENT_ASSET_ID = '61728750720b09325cbcdc36'
sentiment_node = pipeline.sentiment_analysis(asset_id=SENTIMENT_ASSET_ID)
text_input_node.link(translation_node, 'input', 'text')
translation_node.link(sentiment_node, 'data', 'text')
translated_output_node = translation_node.use_output('data')
sentiment_output_node = sentiment_node.use_output('data')
pipeline.save()
outputs = pipeline.run({
'text_input': 'This is example text to translate.'
})
print(outputs)
Guide to Building the Example Pipeline
Initialize Pipeline
from aixplain.factories.pipeline_factory import PipelineFactory
pipeline = PipelineFactory.init("Multi Input-Output Pipeline")
Add input and output nodes
Input Nodes
Define the input node for text input.
from aixplain.modules.pipeline.designer import Input
text_input_node = Input(data="text_input", data_types=["TEXT"], pipeline=pipeline)
Output Nodes
Add nodes to output the translated text and sentiment analysis results.
translated_output_node = translation_node.use_output('data')
sentiment_output_node = sentiment_node.use_output('data')
Add Processing Nodes
Add translation and sentiment analysis nodes to perform the required tasks.
TRANSLATION_ASSET_ID = '60ddefbe8d38c51c5885f98a'
translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)
SENTIMENT_ASSET_ID = '61728750720b09325cbcdc36'
sentiment_node = pipeline.sentiment_analysis(asset_id=SENTIMENT_ASSET_ID)
Link Nodes
Link all nodes ensuring that the data types of connected nodes are compatible. The specific input and output parameters of each node are determined by the asset ID linked to the node.
View Node Parameters
Inspect the parameters of the nodes for debugging or configuration.
print(translation_node.inputs)
print(translation_node.outputs)
Connect the nodes to pass data between them.
text_input_node.link(translation_node, 'input', 'text')
translation_node.link(sentiment_node, 'data', 'text')
Validate Pipeline
Ensure the pipeline is valid:
- Starts with at least one input node.
- Ends with at least one output node.
- All nodes are fully connected in sequence.
pipeline.validate()
Save and Run
Save the pipeline configuration and execute it with input data.
pipeline.save()
outputs = pipeline.run({
'text_input': 'This is example text to translate.'
})
print(outputs)
Debug Pipeline
To gain full visibility into the intermediate results within your pipeline, attach output nodes to key intermediate nodes. This allows you to inspect the data processed at various stages, making it easier to debug or analyze the pipeline's behavior.
Update Pipeline
pipeline.update(
pipeline=updated_pipeline_dict, # Pass the updated pipeline as a Python dictionary
save_as_asset=True, # Save as asset
name="Draft Pipeline Name" # Optional: New name for the draft pipeline
)
Pipeline Parameters
Covers the parameters used in the create
method to define a pipeline's identity, functionality, and configuration.
Parameter | Type | Default | Description |
---|---|---|---|
id | str | Auto-generated | The unique identifier for the pipeline. |
name | str | [required] | The name of the pipeline, serving as its primary identifier. |
description | str | None | A brief description of the pipeline's role and purpose. |
vendor | str | config.TEAM_API_KEY | Name of the aiXplain team owning the pipeline, derived from the api_key . |
version | str | 1.0 | Specifies the version of the pipeline. |
license | str | None | License for using the pipeline, defining its usage rights and restrictions. |
privacy | enum | Private | Determines the visibility and accessibility of the pipeline. Learn more about asset privacy. |
cost | float | Calculated at runtime | The cost of executing the pipeline, calculated in credits based on usage (e.g., data volume). |
api_key | str | Derived from team configuration | The API key of the team owning the pipeline, required for authentication. |
url | str | Generated from backend configuration | The endpoint URL of the pipeline. |
additional_info | dict | {'nodes': [], 'links': [], 'instance': None} | Additional details about the pipeline, including nodes and their connections. |
nodes | list | [] | List of nodes representing individual steps or operations within the pipeline. |
links | list | [] | Connections between nodes, defining the data flow within the pipeline. |
Pipeline Methods
Covers the core methods for creating, configuring, and managing pipelines.
Function | Description |
---|---|
init | Creates a new pipeline object with a specified name. |
input | Adds an input node to the pipeline. |
output | Adds an output node to the pipeline. |
adding an Asset Node | Adds an asset node for a specific functionality, such as speech recognition. |
script | Incorporates a custom script node into the pipeline using a local Python file. |
link | Links the outputs and inputs of two nodes within the pipeline. |
save | Saves the configured pipeline as an asset for future use. |
run | Executes the pipeline with specified input data. |
delete | Deletes the pipeline configuration from the platform. |