Skip to main content

module aixplain.processes.data_onboarding.onboard_functions

Global Variables

  • FORBIDDEN_COLUMN_NAMES

function get_paths

get_paths(input_paths: List[Union[str, Path]]) → List[Path]

Recursively access all local paths. Check if file extensions are supported.

Args:

  • input_paths (List[Union[str, Path]]): list of input pahts including folders and files

Returns:

  • List[Path]: list of local file paths

function process_data_files

process_data_files(
data_asset_name: str,
metadata: MetaData,
paths: List,
folder: Optional[str, Path] = None
) → Tuple[List[File], int, int, int, int]

Process a list of local files, compress and upload them to pre-signed URLs in S3

Args:

  • data_asset_name (str): name of the data asset
  • metadata (MetaData): meta data of the asset
  • paths (List): list of paths to local files
  • folder (Union[str, Path], optional): local folder to save compressed files before upload them to s3. Defaults to data_asset_name.

Returns:

  • Tuple[List[File], int, int, int]: list of s3 links; data, start and end columns index; and number of rows

function build_payload_data

build_payload_data(data: Data) → Dict

Create data payload to call coreengine on Corpus/Dataset onboard

Args:

  • data (Data): data object

Returns:

  • Dict: payload

function build_payload_corpus

build_payload_corpus(
corpus: Corpus,
ref_data: List[str],
error_handler: ErrorHandler
) → Dict

Create corpus payload to call coreengine on the onboard process

Args:

  • corpus (Corpus): corpus object
  • ref_data (List[Text]): list of referred data
  • error_handler (ErrorHandler): how to handle failed rows

Returns:

  • Dict: payload

function build_payload_dataset

build_payload_dataset(
dataset: Dataset,
input_ref_data: Dict[str, Any],
output_ref_data: Dict[str, List[Any]],
hypotheses_ref_data: Dict[str, Any],
meta_ref_data: Dict[str, Any],
tags: List[str],
error_handler: ErrorHandler
) → Dict

Generate onboard payload to coreengine

Args:

  • dataset (Dataset): dataset to be onboard
  • input_ref_data (Dict[Text, Any]): reference to existent input data
  • output_ref_data (Dict[Text, List[Any]]): reference to existent output data
  • hypotheses_ref_data (Dict[Text, Any]): reference to existent hypotheses to the target data
  • meta_ref_data (Dict[Text, Any]): reference to existent metadata
  • tags (List[Text]): description tags
  • error_handler (ErrorHandler): how to handle failed rows

Returns:

  • Dict: onboard payload

function create_data_asset

create_data_asset(
payload: Dict,
data_asset_type: str = 'corpus',
api_key: Optional[str] = None
) → Dict

Service to call onboard process in coreengine

Args:

  • payload (Dict): onboard payload
  • data_asset_type (Text, optional): corpus or dataset. Defaults to "corpus".
  • api_key (Optional[Text]): team api key. Defaults to None.

Returns:

  • Dict: onboard status

function is_data

is_data(data_id: str)bool

Check whether reference data exists

Args:

  • data_id (Text): ID of the data

Returns:

  • bool: True if it exists, False otherwise

function split_data

split_data(
paths: List,
split_rate: List[float],
split_labels: List[str]
) → MetaData

Split the data according to some split labels and rate

Args:

  • paths (List): path to data files
  • split_rate (List[Text]): split rate
  • split_labels (List[Text]): split labels

Returns:

  • MetaData: metadata of the new split