aixplain.modules.pipeline.asset
__author__
Copyright 2024 The aiXplain SDK authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Author: Thiago Castro Ferreira, Shreyas Sharma and Lucas Pavanelli Date: November 25th 2024 Description: Pipeline Asset Class
Pipeline Objects
class Pipeline(Asset, DeployableMixin)
Representing a custom pipeline that was created on the aiXplain Platform
Attributes:
id
Text - ID of the Pipelinename
Text - Name of the Pipelineapi_key
Text - Team API Key to run the Pipeline.url
Text, optional - running URL of platform. Defaults to config.BACKEND_URL.supplier
Text, optional - Pipeline supplier. Defaults to "aiXplain".version
Text, optional - version of the pipeline. Defaults to "1.0".status
AssetStatus, optional - Pipeline status. Defaults to AssetStatus.DRAFT.**additional_info
- Any additional Pipeline info to be saved
__init__
def __init__(id: Text,
name: Text,
api_key: Text,
url: Text = config.BACKEND_URL,
supplier: Text = "aiXplain",
version: Text = "1.0",
status: AssetStatus = AssetStatus.DRAFT,
**additional_info) -> None
Create a Pipeline with the necessary information
Arguments:
id
Text - ID of the Pipelinename
Text - Name of the Pipelineapi_key
Text - Team API Key to run the Pipeline.url
Text, optional - running URL of platform. Defaults to config.BACKEND_URL.supplier
Text, optional - Pipeline supplier. Defaults to "aiXplain".version
Text, optional - version of the pipeline. Defaults to "1.0".status
AssetStatus, optional - Pipeline status. Defaults to AssetStatus.DRAFT.**additional_info
- Any additional Pipeline info to be saved
poll
def poll(poll_url: Text,
name: Text = "pipeline_process",
response_version: Text = "v2") -> Union[Dict, PipelineResponse]
Poll the platform to check whether an asynchronous call is done.
Arguments:
poll_url
Text - polling URLname
Text, optional - ID given to a call. Defaults to "pipeline_process".
Returns:
Dict
- response obtained by polling call
run
def run(data: Union[Text, Dict],
data_asset: Optional[Union[Text, Dict]] = None,
name: Text = "pipeline_process",
timeout: float = 20000.0,
wait_time: float = 1.0,
version: Optional[Text] = None,
response_version: Text = "v2",
**kwargs) -> Union[Dict, PipelineResponse]
Run the pipeline synchronously and wait for results.
This method executes the pipeline with the provided input data and waits for completion. It handles both direct data input and data assets, with support for polling and timeout.
Arguments:
data
Union[Text, Dict] - The input data for the pipeline. Can be:- A string (file path, URL, or raw data)
- A dictionary mapping node labels to input data
data_asset
Optional[Union[Text, Dict]], optional - Data asset(s) to process. Can be a single asset ID or a dict mapping node labels to asset IDs. Defaults to None.name
Text, optional - Identifier for this pipeline run. Used for logging. Defaults to "pipeline_process".timeout
float, optional - Maximum time in seconds to wait for completion. Defaults to 20000.0.wait_time
float, optional - Initial time in seconds between polling attempts. May increase over time. Defaults to 1.0.version
Optional[Text], optional - Specific pipeline version to run. Defaults to None.response_version
Text, optional - Response format version ("v1" or "v2"). Defaults to "v2".**kwargs
- Additional keyword arguments passed to the pipeline.
Returns:
Union[Dict, PipelineResponse]: If response_version is:
- "v1": Dictionary with status, error (if any), and elapsed time
- "v2": PipelineResponse object with structured response data
Raises:
Exception
- If the pipeline execution fails, times out, or encounters errors during polling.
Notes:
- The method starts with run_async and then polls for completion
- wait_time may increase up to 60 seconds between polling attempts
- For v2 responses, use PipelineResponse methods to access results
run_async
def run_async(data: Union[Text, Dict],
data_asset: Optional[Union[Text, Dict]] = None,
name: Text = "pipeline_process",
batch_mode: bool = True,
version: Optional[Text] = None,
response_version: Text = "v2",
**kwargs) -> Dict
Runs asynchronously a pipeline call.
Arguments:
data
Union[Text, Dict] - link to the input datadata_asset
Optional[Union[Text, Dict]], optional - Data asset to be processed by the pipeline. Defaults to None.name
Text, optional - ID given to a call. Defaults to "pipeline_process".batch_mode
bool, optional - Whether to run the pipeline in batch mode or online. Defaults to True.version
Optional[Text], optional - Version of the pipeline. Defaults to None.response_version
Text, optional - Version of the response. Defaults to "v2".kwargs
- A dictionary of keyword arguments. The keys are the argument names
Returns:
Dict
- polling URL in response
update
def update(pipeline: Union[Text, Dict],
save_as_asset: bool = False,
api_key: Optional[Text] = None,
name: Optional[Text] = None)
Update Pipeline
Arguments:
pipeline
Union[Text, Dict] - Pipeline as a Python dictionary or in a JSON filesave_as_asset
bool, optional - Save as asset (True) or draft (False). Defaults to False.api_key
Optional[Text], optional - Team API Key to create the Pipeline. Defaults to None.
Raises:
Exception
- Make sure the pipeline to be save is in a JSON file.
delete
def delete() -> None
Delete this pipeline from the platform.
This method permanently removes the pipeline from the aiXplain platform. The operation cannot be undone.
Raises:
Exception
- If deletion fails, which can happen if:- The pipeline doesn't exist
- The user doesn't have permission to delete it
- The API request fails
- The server returns a non-200 status code
Notes:
- This operation is permanent and cannot be undone
- Only the pipeline owner can delete it
- Uses the team API key for authentication
save
def save(pipeline: Optional[Union[Text, Dict]] = None,
save_as_asset: bool = False,
api_key: Optional[Text] = None)
Update and Save Pipeline
Arguments:
pipeline
Optional[Union[Text, Dict]] - Pipeline as a Python dictionary or in a JSON filesave_as_asset
bool, optional - Save as asset (True) or draft (False). Defaults to False.api_key
Optional[Text], optional - Team API Key to create the Pipeline. Defaults to None.
Raises:
Exception
- Make sure the pipeline to be save is in a JSON file.
deploy
def deploy(api_key: Optional[Text] = None) -> None
Deploy the Pipeline.
This method overrides the deploy method in DeployableMixin to handle Pipeline-specific deployment functionality.
Arguments:
api_key
Optional[Text], optional - Team API Key to deploy the Pipeline. Defaults to None.
__repr__
def __repr__()
Return a string representation of the Pipeline instance.
Returns:
str
- A string in the format "Pipeline: <name> (id=<id>)".