relevanceai.operations_new.ops_run#

Base class for base.py to inherit. All functions related to running operations on datasets.

The Pull Transform Push library is designed to be able to consistently data from Relevance AI Database, transform and then constantly push data to the Relevance AI Database. This ensures that resources are utilised to their limits.

Module Contents#

relevanceai.operations_new.ops_run.logger#
class relevanceai.operations_new.ops_run.PullTransformPush(dataset: Optional[relevanceai.dataset.Dataset] = None, pull_dataset: Optional[relevanceai.dataset.Dataset] = None, push_dataset: Optional[relevanceai.dataset.Dataset] = None, func: Optional[Callable] = None, func_args: Optional[Tuple[Any, Ellipsis]] = None, func_kwargs: Optional[Dict[str, Any]] = None, pull_chunksize: Optional[int] = None, push_chunksize: Optional[int] = None, transform_chunksize: Optional[int] = 128, warmup_chunksize: Optional[int] = None, filters: Optional[list] = None, select_fields: Optional[list] = None, transform_workers: Optional[int] = None, push_workers: Optional[int] = None, buffer_size: int = 0, show_progress_bar: bool = True, show_pull_progress_bar: bool = True, show_transform_progress_bar: bool = True, show_push_progress_bar: bool = True, ingest_in_background: bool = True, run_in_background: bool = False, ram_ratio: float = 0.8, batched: bool = False, retry_count: int = 3, after_id: Optional[List[str]] = None, pull_limit: Optional[int] = None, timeout: Optional[int] = None)#
pull_count :int#
transform_count :int#
push_count :int#
pull_bar :tqdm.auto.tqdm#
transform_bar :tqdm.auto.tqdm#
push_bar :tqdm.auto.tqdm#
pull_thread :threading.Thread#
transform_threads :List[threading.Thread]#
push_threads :List[threading.Thread]#
pull_dataset :relevanceai.dataset.Dataset#
push_dataset :relevanceai.dataset.Dataset#
run(self) Dict[str, Any]#

(Main Method) Do the pulling, the updating, and of course, the pushing.

return the _ids of any failed documents

relevanceai.operations_new.ops_run.arguments(cls: Type[PullTransformPush])#
class relevanceai.operations_new.ops_run.OperationRun#

All functions related to running transforms as an operation on datasets

is_chunk_valid(self, chunk)#
post_run(self, dataset, documents, updated_documents)#
run(self, dataset: relevanceai.dataset.Dataset, batched: bool = False, chunksize: Optional[int] = None, filters: Optional[list] = None, select_fields: Optional[list] = None, output_fields: Optional[list] = None, refresh: bool = False, **kwargs)#

It takes a dataset, and then it gets all the documents from that dataset. Then it transforms the documents and then it upserts the documents.

Parameters
  • dataset (Dataset) – Dataset,

  • select_fields (list) – Used to determine which fields to retrieve for filters

  • output_fields (list) – Used to determine which output fields are missing to continue running operation

  • filters (list) – list = None,

batch_transform_upsert(self, dataset: relevanceai.dataset.Dataset, func_args: Optional[Tuple[Any]] = None, func_kwargs: Optional[Dict[str, Any]] = None, select_fields: list = None, filters: list = None, chunksize: int = None, transform_workers: Optional[int] = None, push_workers: Optional[int] = None, buffer_size: int = 0, show_progress_bar: bool = True, warmup_chunksize: int = None, transform_chunksize: int = 128, batched: bool = False, ingest_in_background: bool = True, **kwargs)#
store_operation_metadata(self, dataset: relevanceai.dataset.Dataset, values: Optional[Dict[str, Any]] = None)#

This function stores metadata about operators

Parameters
  • dataset (Dataset) – Dataset,

  • values (Optional[Dict[str, Any]]) – Optional[Dict[str, Any]] = None,

Return type

The dataset object with the metadata appended to it.

{
    "_operationhistory_": {
        "1-1-1-17-2-3": {
            "operation": "vector",
            "model_name": "miniLm"
        },
    }
}