aixplain.modules.pipeline.asset
__author__
Copyright 2024 The aiXplain SDK authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Author: Thiago Castro Ferreira, Shreyas Sharma and Lucas Pavanelli Date: November 25th 2024 Description: Pipeline Asset Class
Pipeline Objects
class Pipeline(Asset, DeployableMixin)
Representing a custom pipeline that was created on the aiXplain Platform
Attributes:
idText - ID of the PipelinenameText - Name of the Pipelineapi_keyText - Team API Key to run the Pipeline.urlText, optional - running URL of platform. Defaults to config.BACKEND_URL.supplierText, optional - Pipeline supplier. Defaults to "aiXplain".versionText, optional - version of the pipeline. Defaults to "1.0".statusAssetStatus, optional - Pipeline status. Defaults to AssetStatus.DRAFT.**additional_info- Any additional Pipeline info to be saved
__init__
def __init__(id: Text,
name: Text,
api_key: Text,
url: Text = config.BACKEND_URL,
supplier: Text = "aiXplain",
version: Text = "1.0",
status: AssetStatus = AssetStatus.DRAFT,
**additional_info) -> None
Create a Pipeline with the necessary information
Arguments:
idText - ID of the PipelinenameText - Name of the Pipelineapi_keyText - Team API Key to run the Pipeline.urlText, optional - running URL of platform. Defaults to config.BACKEND_URL.supplierText, optional - Pipeline supplier. Defaults to "aiXplain".versionText, optional - version of the pipeline. Defaults to "1.0".statusAssetStatus, optional - Pipeline status. Defaults to AssetStatus.DRAFT.**additional_info- Any additional Pipeline info to be saved
poll
def poll(poll_url: Text,
name: Text = "pipeline_process",
response_version: Text = "v2") -> Union[Dict, PipelineResponse]
Poll the platform to check whether an asynchronous call is done.
Arguments:
poll_urlText - polling URLnameText, optional - ID given to a call. Defaults to "pipeline_process".
Returns:
Dict- response obtained by polling call
run
def run(data: Union[Text, Dict],
data_asset: Optional[Union[Text, Dict]] = None,
name: Text = "pipeline_process",
timeout: float = 20000.0,
wait_time: float = 1.0,
version: Optional[Text] = None,
response_version: Text = "v2",
**kwargs) -> Union[Dict, PipelineResponse]
Run the pipeline synchronously and wait for results.
This method executes the pipeline with the provided input data and waits for completion. It handles both direct data input and data assets, with support for polling and timeout.
Arguments:
dataUnion[Text, Dict] - The input data for the pipeline. Can be:- A string (file path, URL, or raw data)
- A dictionary mapping node labels to input data
data_assetOptional[Union[Text, Dict]], optional - Data asset(s) to process. Can be a single asset ID or a dict mapping node labels to asset IDs. Defaults to None.nameText, optional - Identifier for this pipeline run. Used for logging. Defaults to "pipeline_process".timeoutfloat, optional - Maximum time in seconds to wait for completion. Defaults to 20000.0.wait_timefloat, optional - Initial time in seconds between polling attempts. May increase over time. Defaults to 1.0.versionOptional[Text], optional - Specific pipeline version to run. Defaults to None.response_versionText, optional - Response format version ("v1" or "v2"). Defaults to "v2".**kwargs- Additional keyword arguments passed to the pipeline.
Returns:
Union[Dict, PipelineResponse]: If response_version is:
- "v1": Dictionary with status, error (if any), and elapsed time
- "v2": PipelineResponse object with structured response data
Raises:
Exception- If the pipeline execution fails, times out, or encounters errors during polling.
Notes:
- The method starts with run_async and then polls for completion
- wait_time may increase up to 60 seconds between polling attempts
- For v2 responses, use PipelineResponse methods to access results
run_async
def run_async(data: Union[Text, Dict],
data_asset: Optional[Union[Text, Dict]] = None,
name: Text = "pipeline_process",
batch_mode: bool = True,
version: Optional[Text] = None,
response_version: Text = "v2",
**kwargs) -> Dict
Runs asynchronously a pipeline call.
Arguments:
dataUnion[Text, Dict] - link to the input datadata_assetOptional[Union[Text, Dict]], optional - Data asset to be processed by the pipeline. Defaults to None.nameText, optional - ID given to a call. Defaults to "pipeline_process".batch_modebool, optional - Whether to run the pipeline in batch mode or online. Defaults to True.versionOptional[Text], optional - Version of the pipeline. Defaults to None.response_versionText, optional - Version of the response. Defaults to "v2".kwargs- A dictionary of keyword arguments. The keys are the argument names
Returns:
Dict- polling URL in response
update
def update(pipeline: Union[Text, Dict],
save_as_asset: bool = False,
api_key: Optional[Text] = None,
name: Optional[Text] = None)
Update Pipeline
Arguments:
pipelineUnion[Text, Dict] - Pipeline as a Python dictionary or in a JSON filesave_as_assetbool, optional - Save as asset (True) or draft (False). Defaults to False.api_keyOptional[Text], optional - Team API Key to create the Pipeline. Defaults to None.
Raises:
Exception- Make sure the pipeline to be save is in a JSON file.
delete
def delete() -> None
Delete this pipeline from the platform.
This method permanently removes the pipeline from the aiXplain platform. The operation cannot be undone.
Raises:
Exception- If deletion fails, which can happen if:- The pipeline doesn't exist
- The user doesn't have permission to delete it
- The API request fails
- The server returns a non-200 status code
Notes:
- This operation is permanent and cannot be undone
- Only the pipeline owner can delete it
- Uses the team API key for authentication
save
def save(pipeline: Optional[Union[Text, Dict]] = None,
save_as_asset: bool = False,
api_key: Optional[Text] = None)
Update and Save Pipeline
Arguments:
pipelineOptional[Union[Text, Dict]] - Pipeline as a Python dictionary or in a JSON filesave_as_assetbool, optional - Save as asset (True) or draft (False). Defaults to False.api_keyOptional[Text], optional - Team API Key to create the Pipeline. Defaults to None.
Raises:
Exception- Make sure the pipeline to be save is in a JSON file.
deploy
def deploy(api_key: Optional[Text] = None) -> None
Deploy the Pipeline.
This method overrides the deploy method in DeployableMixin to handle Pipeline-specific deployment functionality.
Arguments:
api_keyOptional[Text], optional - Team API Key to deploy the Pipeline. Defaults to None.
__repr__
def __repr__()
Return a string representation of the Pipeline instance.
Returns:
str- A string in the format "Pipeline: <name> (id=<id>)".