relevanceai.dataset.write.write
#
Pandas like dataset API
Module Contents#
- class relevanceai.dataset.write.write.Write(*args, **kw)#
A Pandas Like datatset API for interacting with the RelevanceAI python package
- insert_df#
- concat#
- host_media_documents#
- insert_documents(self, documents: list, max_workers: Optional[int] = None, media_workers: Optional[int] = None, show_progress_bar: bool = True, chunksize: Optional[int] = None, overwrite: bool = True, ingest_in_background: bool = True, media_fields: Optional[List[str]] = None) Dict #
Insert a list of documents with multi-threading automatically enabled.
When inserting the document you can optionally specify your own id for a document by using the field name “_id”, if not specified a random id is assigned.
When inserting or specifying vectors in a document use the suffix (ends with) “_vector_” for the field name. e.g. “product_description_vector_”.
When inserting or specifying chunks in a document the suffix (ends with) “_chunk_” for the field name. e.g. “products_chunk_”.
When inserting or specifying chunk vectors in a document’s chunks use the suffix (ends with) “_chunkvector_” for the field name. e.g. “products_chunk_.product_description_chunkvector_”.
Documentation can be found here: https://ingest-api-dev-aueast.tryrelevance.com/latest/documentation#operation/InsertEncode
- Parameters
documents (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’
bulk_fn (callable) – Function to apply to documents before uploading
max_workers (int) – Number of workers active for multi-threading
retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails
chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb
use_json_encoder (bool) – Whether to automatically convert documents to json encodable format
media_fields (List[str]) – specifies which fields are local medias and need to upserted to S3. These should be given in absolute path format
Example
from relevanceai import Client client = Client() dataset_id = "sample_dataset_id" df = client.Dataset(dataset_id) documents = [ { "_id": "10", "value": 5 }, { "_id": "332", "value": 10 } ] df.insert_documents(documents)
- insert_csv(self, filepath_or_buffer, chunksize: int = 10000, max_workers: int = 2, retry_chunk_mult: float = 0.5, show_progress_bar: bool = False, index_col: int = None, csv_args: Optional[dict] = None, col_for_id: str = None, auto_generate_id: bool = True) Dict #
Insert data from csv file
- Parameters
filepath_or_buffer – Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file.
chunksize (int) – Number of lines to read from csv per iteration
max_workers (int) – Number of workers active for multi-threading
retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails
csv_args (dict) – Optional arguments to use when reading in csv. For more info, see https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
index_col (None) – Optional argument to specify if there is an index column to be skipped (e.g. index_col = 0)
col_for_id (str) – Optional argument to use when a specific field is supposed to be used as the unique identifier (‘_id’)
auto_generate_id (bool = True) – Automatically generateds UUID if auto_generate_id is True and if the ‘_id’ field does not exist
Example
from relevanceai import Client client = Client() df = client.Dataset("sample_dataset_id") csv_filename = "temp.csv" df.insert_csv(csv_filename)
- insert_pandas_dataframe(self, df: pandas.DataFrame, col_for_id=None, *args, **kwargs)#
Insert a dataframe into the dataset. Takes additional args and kwargs based on insert_documents.
from relevanceai import Client client = Client() df = client.Dataset("sample_dataset_id") pandas_df = pd.DataFrame({"value": [3, 2, 1], "_id": ["10", "11", "12"]}) df.insert_pandas_dataframe(pandas_df)
- insert_media_folder(self, path: Union[pathlib.Path, str], field: str = 'medias', recurse: bool = True, *args, **kwargs)#
Given a path to a directory, this method loads all media-related files into a Dataset.
- Parameters
field (str) – A text field of a dataset.
path (Union[Path, str]) – The path to the directory containing medias.
recurse (bool) – Indicator that determines whether to recursively insert medias from subdirectories in the directory.
- Return type
dict
Example
from relevanceai import Client client = Client() ds = client.Dataset("dataset_id") from pathlib import Path path = Path("medias/") # list(path.iterdir()) returns # [ # PosixPath('media.jpg'), # PosixPath('more-medias'), # a directory # ] get_all_medias: bool = True if get_all_medias: # Inserts all medias, even those in the more-medias directory ds.insert_media_folder( field="medias", path=path, recurse=True ) else: # Only inserts media.jpg ds.insert_media_folder( field="medias", path=path, recurse=False )
- upsert_documents(self, documents: list, max_workers: Optional[int] = 2, media_workers: Optional[int] = None, show_progress_bar: bool = False, chunksize: Optional[int] = None, ingest_in_background: bool = True, media_fields: Optional[List[str]] = None) Dict #
Update a list of documents with multi-threading automatically enabled. Edits documents by providing a key value pair of fields you are adding or changing, make sure to include the “_id” in the documents.
- Parameters
documents (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’
bulk_fn (callable) – Function to apply to documents before uploading
max_workers (int) – Number of workers active for multi-threading
retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails
chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb
use_json_encoder (bool) – Whether to automatically convert documents to json encodable format
create_id (bool) – If True, creates ID for users automatically
Example
from relevanceai import Client client = Client() documents = [ { "_id": "321", "value": 10 }, { "_id": "4243", "value": 100 } ] dataset_id = "sample_dataset_id" ds = client.Dataset(dataset_id) ds.upsert_documents(documents)
- apply(self, func: Callable, retrieve_chunksize: int = 100, filters: Optional[list] = None, select_fields: Optional[list] = None, show_progress_bar: bool = True, use_json_encoder: bool = True, axis: int = 0, log_to_file: bool = True, log_file: Optional[str] = None, **apply_args)#
Apply a function along an axis of the DataFrame.
Objects passed to the function are Series objects whose index is either the DataFrame’s index (axis=0) or the DataFrame’s columns (axis=1). By default (result_type=None), the final return type is inferred from the return type of the applied function. Otherwise, it depends on the result_type argument.
- Parameters
func (function) – Function to apply to each document
retrieve_chunksize (int) – The number of documents that are received from the original collection with each loop iteration.
max_workers (int) – The number of processors you want to parallelize with
max_error (int) – How many failed uploads before the function breaks
json_encoder (bool) – Whether to automatically convert documents to json encodable format
axis (int) – Axis along which the function is applied. - 9 or ‘index’: apply function to each column - 1 or ‘columns’: apply function to each row
Example
from relevanceai import Client from relevanceai.package_utils.datasets import mock_documents client = Client() ds = client.Dataset("sample_dataset_id") ds.upsert_documents(mock_documents(100)) def update_doc(doc): doc["value"] = 2 return doc df.apply(update_doc) def update_doc_wargs(doc, value1, value2): doc["value"] += value1 doc["value"] *= value2 return doc df.apply(func=update_doc, value1=3, value2=2)
- bulk_apply(self, bulk_func: Callable, bulk_func_args: Optional[Tuple[Any]] = None, bulk_func_kwargs: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = None, filters: Optional[list] = None, select_fields: Optional[list] = None, transform_workers: int = 2, push_workers: int = 2, timeout: Optional[int] = None, buffer_size: int = 0, show_progress_bar: bool = True, transform_chunksize: int = 32, multithreaded_update: bool = True, ingest_in_background: bool = True, **kwargs)#
Apply a bulk function along an axis of the DataFrame.
- Parameters
bulk_func (function) – Function to apply to a bunch of documents at a time
retrieve_chunksize (int) – The number of documents that are received from the original collection with each loop iteration.
max_workers (int) – The number of processors you want to parallelize with
max_error (int) – How many failed uploads before the function breaks
json_encoder (bool) – Whether to automatically convert documents to json encodable format
axis (int) – Axis along which the function is applied. - 9 or ‘index’: apply function to each column - 1 or ‘columns’: apply function to each row
Example
from relevanceai import Client client = Client() df = client.Dataset("sample_dataset_id") def update_documents(documents): for d in documents: d["value"] = 10 return documents df.apply(update_documents)
- cat(self, vector_name: Union[str, None] = None, fields: Optional[List] = None)#
Concatenates numerical fields along an axis and reuploads this vector for other operations
- Parameters
vector_name (str, default None) – name of the new concatenated vector field
fields (List) – fields alone which the new vector will concatenate
Example
from relevanceai import Client client = Client() dataset_id = "sample_dataset_id" df = client.Dataset(dataset_id) fields = [ "numeric_field1", "numeric_field2", "numeric_field3" ] df.concat(fields) concat_vector_field_name = "concat_vector_" df.concat(vector_name=concat_vector_field_name, fields=fields)
- set_cluster_labels(self, vector_fields, alias, labels)#
- create(self, schema: Optional[dict] = None) Dict #
A dataset can store documents to be searched, retrieved, filtered and aggregated (similar to Collections in MongoDB, Tables in SQL, Indexes in ElasticSearch). A powerful and core feature of Relevance is that you can store both your metadata and vectors in the same document. When specifying the schema of a dataset and inserting your own vector use the suffix (ends with) “_vector_” for the field name, and specify the length of the vector in dataset_schema.
For example:
These are the field types supported in our datasets: [“text”, “numeric”, “date”, “dict”, “chunks”, “vector”, “chunkvector”].
For example:
{ "product_text_description" : "text", "price" : "numeric", "created_date" : "date", "product_texts_chunk_": "chunks", "product_text_chunkvector_" : 1024 }
You don’t have to specify the schema of every single field when creating a dataset, as Relevance will automatically detect the appropriate data type for each field (vectors will be automatically identified by its “_vector_” suffix). Infact you also don’t always have to use this endpoint to create a dataset as /datasets/bulk_insert will infer and create the dataset and schema as you insert new documents.
Note
A dataset name/id can only contain undercase letters, dash, underscore and numbers.
“_id” is reserved as the key and id of a document.
Once a schema is set for a dataset it cannot be altered. If it has to be altered, utlise the copy dataset endpoint.
For more information about vectors check out the ‘Vectorizing’ section, services.search.vector or out blog at https://tryrelevance.com/blog. For more information about chunks and chunk vectors check out datasets.search.chunk.
- Parameters
schema (dict) – Schema for specifying the field that are vectors and its length
Example
from relevanceai import Client client = Client() documents = [ { "_id": "321", "value": 10 }, { "_id": "4243", "value": 100 } ] dataset_id = "sample_dataset_id" df = client.Dataset(dataset_id) df.create() df.insert_documents(documents)
- delete(self)#
Delete a dataset
Example
from relevanceai import Client client = Client() dataset_id = "sample_dataset_id" df = client.Dataset(dataset_id) df.delete()
- insert_media_bytes(self, bytes: Write.insert_media_bytes.bytes, filename: str, verbose: bool = True)#
Insert a single media URL
- insert_media_url(self, media_url: str, verbose: bool = True)#
Insert a single media URL
- insert_media_urls(self, media_urls: List[str], verbose: bool = True, file_log: str = 'insert_media_urls.log', logging: bool = True)#
Insert a single media URL
- insert_local_media(self, media_fn: str, verbose: bool = True)#
Insert local media
- Parameters
media_fn (str) – A local media to upload
verbose (bool) – If True, prints a statement after uploading each media
- insert_local_medias(self, media_fns: List[str], verbose: bool = False, file_log='local_media_upload.log', logging: bool = True)#
Insert a list of local medias.
- Parameters
media_fns (List[str]) – A list of local medias
verbose (bool) – If True, this will print after each successful upload.
file_log (str) – The log to write
- get_media_documents(self, media_fns: List[str], verbose: bool = False, file_log: str = 'media_upload.log', logging: bool = True) dict #
Bulk insert medias. Returns a link to once it has been hosted
- Parameters
media_fns (List[str]) – List of medias to upload
verbose (bool) – If True, prints statements after uploading
file_log (str) – The file log to write
- upsert_media(self, media_fns: List[str], verbose: bool = False, file_log: str = 'media_upload.log', logging: bool = True, **kw)#
Insert medias into a dataset.
- Parameters
media_fns (List[str]) – A list of medias to upsert
verbose (bool) – If True, prints statements after uploading
file_log (str) – The file log to write
- delete_documents(self, document_ids: List[str])#
Delete documents in a dataset
- Parameters
document_ids (List[str]) – A list of document IDs to delete
- update_where(self, update: dict, filters)#
Updates documents by filters. The updates to make to the documents that is returned by a filter.
For more information about filters refer to datasets.documents.get_where.
Example
from relevanceai import Client client = Client() ds = client.Dataset() ds.update_where( {"value": 3}, filters=ds['value'] != 10 # apply a simple filter )
- insert_list(self, labels: list, label_field: str = 'label', **kwargs)#
It takes a list of labels, and inserts them into the database as documents
- Parameters
labels (list) – list of labels to insert
label_field (str, optional) – The field in the document that contains the label.
- Return type
A list of the ids of the documents that were inserted.
- batched_upsert_media(self, images: List[str], show_progress_bar: bool = False, n_workers: Optional[int] = None) List[str] #
It takes a list of images, splits it into batches, and then uses a thread pool to upsert the images in parallel
- Parameters
images (List[str]) – A list of media src paths to upload
show_progress_bar (bool) – Show the progress bar
max_workers (Optional[int]) – The number of workers to use. If None, this is set to the max number in ThreadPoolExecutor
- Returns
List[str]
- Return type
A list of media_urls
- prepare_media_documents(self, documents: List[Dict[str, Any]], media_fields: List[str], max_workers: Optional[int] = None) List[Dict[str, Any]] #