Skip to main content

How to build a pipeline

Pipeline Object

Pipeline objects in our system represent ready-to-use AI pipelines that can be invoked for various tasks. Pipeline objects can be created using the aixplain SDK which provides powerful tools to construct pipelines or through Studio for easy editing and quick prototyping.

Examples

Here are quick examples to help you get started.

from aixplain.factories.pipeline_factory import PipelineFactory

TRANSLATION_ASSET_ID = '60ddefbe8d38c51c5885f98a'

pipeline = PipelineFactory.init('Translation Pipeline')
input_node = pipeline.input()
translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)

input_node.link(translation_node, 'input', 'text')

output_node = translation_node.use_output('data')

pipeline.save()
outputs = pipeline.run('This is example text to translate')

print(outputs)
Show output

Instantiating Nodes

You can create a pipeline and instantiate nodes using the code below:

from aixplain.factories.pipeline_factory import PipelineFactory
from aixplain.modules.pipeline.designer import Input

pipeline = PipelineFactory.init("My Pipeline")
input_node = Input(*args, **kwargs)
input_node.attach_to(pipeline)

Alternatively, add nodes to the pipeline using add_node:

input_node = pipeline.add_node(Input(*args, **kwargs))

You can also pass the pipeline to the node constructor:

input_node = Input(*args, pipeline=pipeline, **kwargs)

Or directly instantiate the node within the pipeline:

input_node = pipeline.input(*args, **kwargs)

Adding Output Nodes

Each pipeline should have at least one input, asset, and output node. You can add output nodes like any other node:

translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)
output_node = pipeline.output(*args, **kwargs)
translation_node.link(output_node, 'data', 'output')

For nodes implementing the Outputable mixin, use the shortcut syntax:

output_node = translation_node.use_output('parameter_name_we_are_interested_in')

Asset Nodes and Automatic Population

Asset nodes are used to run models and should have an asset_id. Once instantiated, an asset node contains all model information and parameters which is populated automatically by interacting with the platform.

translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)
print(translation_node.inputs)
print(translation_node.outputs)

Handling Parameters

Parameters are accessed via the inputs and outputs attributes of the node, behaving as proxy objects to the parameters.

print(translation_node.inputs.text)
print(translation_node.outputs.data)

Add parameters to a node using create_param on corresponding inputs or outputs attributes:

translation_node.inputs.create_param('source_language', DataType.TEXT)
translation_node.outputs.create_param('source_audio', DataType.AUDIO)

Alternatively, instantiate parameters directly using InputParam or OutputParam classes:

from aixplain.modules.pipeline.designer import InputParam, OutputParam

source_language = InputParam(
data='source_language',
data_types=DataType.TEXT,
is_required=True,
pipeline=translation_node
)

Or add parameters explicitly:

source_audio = OutputParam(data_types=DataType.AUDIO, data='source_audio')
translation_node.outputs.add_param(source_audio)

In case of need, any parameter value can be set directly without requiring node linking:

translation_node.inputs.text = 'This is example text to translate'
translation_node.inputs.source_language = 'en'

This will implicity set the value attribute of the parameter object.

Linking Nodes

Link nodes to pass data between them using the link method. This method links the output of one node to the input of another on specified parameters.

input_node = pipeline.input()
translation_node = pipeline.translation(asset_id=TRANSLATION_ASSET_ID)
input_node.link(translation_node, 'input', 'text')

Specify parameters explicitly:

input_node.link(translation_node, from_param='input', to_param='text')

Or use parameter instances:

input_node.link(translation_node, from_param=input_node.outputs.input, to_param=translation_node.inputs.text)

You can also link parameters directly if you find it more convenient:

input_node.outputs.input.link(translation_node.inputs.text)

Validating the Pipeline

Use the validate method to ensure the pipeline is valid and ready to run. This method raises an exception if the pipeline has issues.

pipeline.validate()

This method will check the following:

  • Contains at least one input, asset, and output node
  • Input nodes link in, output nodes link out, and others link both ways.
  • All links pointing to the correct nodes and corresponding params.
  • All required params are either set or linked and have the correct data types.

  • Otherwise raises ValueError with a cause if the pipeline is not valid.

    Save and Run the Pipeline

    Save the pipeline before running it. The save method implicitly calls the validate method. Use the run method to execute the pipeline with input data. For more informtation on how to run pipelines, please refer to this guide.

    pipeline.save() # Raises an exception if there are semantic issues
    outputs = pipeline.run('This is example text to translate')
    print(outputs)

    Logic Nodes

    Router Nodes

    A Router node allows you to dynamically redirect the data flow within a pipeline based on conditional logic, allowing different branches of the pipeline to be executed. You can add a condition based on a logical expression that determines how the data is routed. You can have multiple routing paths based on the input data or results of previous nodes.

    router_node = pipeline.router(condition='data == "translate"')
    router_node.link(translation_node, 'true_branch', 'input')
    router_node.link(sentiment_node, 'false_branch', 'input')

    Decision Nodes

    Decision Nodes let you make decisons about the next steps in the pipeline based on input data.

    decision_node = pipeline.decision(condition='input_value > 0.5')
    decision_node.link(node_if_true, 'output', 'input_if_true')
    decision_node.link(node_if_false, 'output', 'input_if_false')

    Using Metric Models

    Metric models can be used to calculate performance metrics to evaluate node outputs.

    metric_node = pipeline.metric(metric_id='metric_asset_id')
    metric_node.link(model_node, 'output', 'input_metric')
    print(metric_node.outputs.score) # Access the computed metric score

    Script Nodes

    You can add a script node by adding a Python script to modify the output of the incoming nodes. Follow this guide on how to create and use script nodes.