relevanceai.dataset.write.write#

Pandas like dataset API

Module Contents#

class relevanceai.dataset.write.write.Write(*args, **kw)#

A Pandas Like datatset API for interacting with the RelevanceAI python package

insert_df#
concat#
host_media_documents#
insert_documents(self, documents: list, max_workers: Optional[int] = None, media_workers: Optional[int] = None, show_progress_bar: bool = True, chunksize: Optional[int] = None, overwrite: bool = True, ingest_in_background: bool = True, media_fields: Optional[List[str]] = None) Dict#

Insert a list of documents with multi-threading automatically enabled.

  • When inserting the document you can optionally specify your own id for a document by using the field name “_id”, if not specified a random id is assigned.

  • When inserting or specifying vectors in a document use the suffix (ends with) “_vector_” for the field name. e.g. “product_description_vector_”.

  • When inserting or specifying chunks in a document the suffix (ends with) “_chunk_” for the field name. e.g. “products_chunk_”.

  • When inserting or specifying chunk vectors in a document’s chunks use the suffix (ends with) “_chunkvector_” for the field name. e.g. “products_chunk_.product_description_chunkvector_”.

Documentation can be found here: https://ingest-api-dev-aueast.tryrelevance.com/latest/documentation#operation/InsertEncode

Parameters
  • documents (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’

  • bulk_fn (callable) – Function to apply to documents before uploading

  • max_workers (int) – Number of workers active for multi-threading

  • retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails

  • chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb

  • use_json_encoder (bool) – Whether to automatically convert documents to json encodable format

  • media_fields (List[str]) – specifies which fields are local medias and need to upserted to S3. These should be given in absolute path format

Example

from relevanceai import Client

client = Client()

dataset_id = "sample_dataset_id"
df = client.Dataset(dataset_id)

documents = [
    {
        "_id": "10",
        "value": 5
    },
    {
        "_id": "332",
        "value": 10
    }
]

df.insert_documents(documents)
insert_csv(self, filepath_or_buffer, chunksize: int = 10000, max_workers: int = 2, retry_chunk_mult: float = 0.5, show_progress_bar: bool = False, index_col: int = None, csv_args: Optional[dict] = None, col_for_id: str = None, auto_generate_id: bool = True) Dict#

Insert data from csv file

Parameters
  • filepath_or_buffer – Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file.

  • chunksize (int) – Number of lines to read from csv per iteration

  • max_workers (int) – Number of workers active for multi-threading

  • retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails

  • csv_args (dict) – Optional arguments to use when reading in csv. For more info, see https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html

  • index_col (None) – Optional argument to specify if there is an index column to be skipped (e.g. index_col = 0)

  • col_for_id (str) – Optional argument to use when a specific field is supposed to be used as the unique identifier (‘_id’)

  • auto_generate_id (bool = True) – Automatically generateds UUID if auto_generate_id is True and if the ‘_id’ field does not exist

Example

from relevanceai import Client
client = Client()
df = client.Dataset("sample_dataset_id")

csv_filename = "temp.csv"
df.insert_csv(csv_filename)
insert_pandas_dataframe(self, df: pandas.DataFrame, col_for_id=None, *args, **kwargs)#

Insert a dataframe into the dataset. Takes additional args and kwargs based on insert_documents.

from relevanceai import Client
client = Client()
df = client.Dataset("sample_dataset_id")
pandas_df = pd.DataFrame({"value": [3, 2, 1], "_id": ["10", "11", "12"]})
df.insert_pandas_dataframe(pandas_df)
insert_media_folder(self, path: Union[pathlib.Path, str], field: str = 'medias', recurse: bool = True, *args, **kwargs)#

Given a path to a directory, this method loads all media-related files into a Dataset.

Parameters
  • field (str) – A text field of a dataset.

  • path (Union[Path, str]) – The path to the directory containing medias.

  • recurse (bool) – Indicator that determines whether to recursively insert medias from subdirectories in the directory.

Return type

dict

Example

from relevanceai import Client
client = Client()
ds = client.Dataset("dataset_id")

from pathlib import Path
path = Path("medias/")
# list(path.iterdir()) returns
# [
#    PosixPath('media.jpg'),
#    PosixPath('more-medias'), # a directory
# ]

get_all_medias: bool = True
if get_all_medias:
    # Inserts all medias, even those in the more-medias directory
    ds.insert_media_folder(
        field="medias", path=path, recurse=True
    )
else:
    # Only inserts media.jpg
    ds.insert_media_folder(
        field="medias", path=path, recurse=False
    )
upsert_documents(self, documents: list, max_workers: Optional[int] = 2, media_workers: Optional[int] = None, show_progress_bar: bool = False, chunksize: Optional[int] = None, ingest_in_background: bool = True, media_fields: Optional[List[str]] = None) Dict#

Update a list of documents with multi-threading automatically enabled. Edits documents by providing a key value pair of fields you are adding or changing, make sure to include the “_id” in the documents.

Parameters
  • documents (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’

  • bulk_fn (callable) – Function to apply to documents before uploading

  • max_workers (int) – Number of workers active for multi-threading

  • retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails

  • chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb

  • use_json_encoder (bool) – Whether to automatically convert documents to json encodable format

  • create_id (bool) – If True, creates ID for users automatically

Example

from relevanceai import Client

client = Client()

documents = [
    {
        "_id": "321",
        "value": 10
    },
    {
        "_id": "4243",
        "value": 100
    }
]

dataset_id = "sample_dataset_id"
ds = client.Dataset(dataset_id)
ds.upsert_documents(documents)
apply(self, func: Callable, retrieve_chunksize: int = 100, filters: Optional[list] = None, select_fields: Optional[list] = None, show_progress_bar: bool = True, use_json_encoder: bool = True, axis: int = 0, log_to_file: bool = True, log_file: Optional[str] = None, **apply_args)#

Apply a function along an axis of the DataFrame.

Objects passed to the function are Series objects whose index is either the DataFrame’s index (axis=0) or the DataFrame’s columns (axis=1). By default (result_type=None), the final return type is inferred from the return type of the applied function. Otherwise, it depends on the result_type argument.

Parameters
  • func (function) – Function to apply to each document

  • retrieve_chunksize (int) – The number of documents that are received from the original collection with each loop iteration.

  • max_workers (int) – The number of processors you want to parallelize with

  • max_error (int) – How many failed uploads before the function breaks

  • json_encoder (bool) – Whether to automatically convert documents to json encodable format

  • axis (int) – Axis along which the function is applied. - 9 or ‘index’: apply function to each column - 1 or ‘columns’: apply function to each row

Example

from relevanceai import Client
from relevanceai.package_utils.datasets import mock_documents

client = Client()

ds = client.Dataset("sample_dataset_id")
ds.upsert_documents(mock_documents(100))

def update_doc(doc):
    doc["value"] = 2
    return doc

df.apply(update_doc)

def update_doc_wargs(doc, value1, value2):
    doc["value"] += value1
    doc["value"] *= value2
    return doc

df.apply(func=update_doc, value1=3, value2=2)
bulk_apply(self, bulk_func: Callable, bulk_func_args: Optional[Tuple[Any]] = None, bulk_func_kwargs: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = None, filters: Optional[list] = None, select_fields: Optional[list] = None, transform_workers: int = 2, push_workers: int = 2, timeout: Optional[int] = None, buffer_size: int = 0, show_progress_bar: bool = True, transform_chunksize: int = 32, multithreaded_update: bool = True, ingest_in_background: bool = True, **kwargs)#

Apply a bulk function along an axis of the DataFrame.

Parameters
  • bulk_func (function) – Function to apply to a bunch of documents at a time

  • retrieve_chunksize (int) – The number of documents that are received from the original collection with each loop iteration.

  • max_workers (int) – The number of processors you want to parallelize with

  • max_error (int) – How many failed uploads before the function breaks

  • json_encoder (bool) – Whether to automatically convert documents to json encodable format

  • axis (int) – Axis along which the function is applied. - 9 or ‘index’: apply function to each column - 1 or ‘columns’: apply function to each row

Example

from relevanceai import Client

client = Client()

df = client.Dataset("sample_dataset_id")

def update_documents(documents):
    for d in documents:
        d["value"] = 10
    return documents

df.apply(update_documents)
cat(self, vector_name: Union[str, None] = None, fields: Optional[List] = None)#

Concatenates numerical fields along an axis and reuploads this vector for other operations

Parameters
  • vector_name (str, default None) – name of the new concatenated vector field

  • fields (List) – fields alone which the new vector will concatenate

Example

from relevanceai import Client

client = Client()

dataset_id = "sample_dataset_id"
df = client.Dataset(dataset_id)

fields = [
    "numeric_field1",
    "numeric_field2",
    "numeric_field3"
]

df.concat(fields)

concat_vector_field_name = "concat_vector_"
df.concat(vector_name=concat_vector_field_name, fields=fields)
set_cluster_labels(self, vector_fields, alias, labels)#
create(self, schema: Optional[dict] = None) Dict#

A dataset can store documents to be searched, retrieved, filtered and aggregated (similar to Collections in MongoDB, Tables in SQL, Indexes in ElasticSearch). A powerful and core feature of Relevance is that you can store both your metadata and vectors in the same document. When specifying the schema of a dataset and inserting your own vector use the suffix (ends with) “_vector_” for the field name, and specify the length of the vector in dataset_schema.

For example:

These are the field types supported in our datasets: [“text”, “numeric”, “date”, “dict”, “chunks”, “vector”, “chunkvector”].

For example:

{
    "product_text_description" : "text",
    "price" : "numeric",
    "created_date" : "date",
    "product_texts_chunk_": "chunks",
    "product_text_chunkvector_" : 1024
}

You don’t have to specify the schema of every single field when creating a dataset, as Relevance will automatically detect the appropriate data type for each field (vectors will be automatically identified by its “_vector_” suffix). Infact you also don’t always have to use this endpoint to create a dataset as /datasets/bulk_insert will infer and create the dataset and schema as you insert new documents.

Note

  • A dataset name/id can only contain undercase letters, dash, underscore and numbers.

  • “_id” is reserved as the key and id of a document.

  • Once a schema is set for a dataset it cannot be altered. If it has to be altered, utlise the copy dataset endpoint.

For more information about vectors check out the ‘Vectorizing’ section, services.search.vector or out blog at https://tryrelevance.com/blog. For more information about chunks and chunk vectors check out datasets.search.chunk.

Parameters

schema (dict) – Schema for specifying the field that are vectors and its length

Example

from relevanceai import Client
client = Client()

documents = [
    {
        "_id": "321",
        "value": 10
    },
    {
        "_id": "4243",
        "value": 100
    }
]

dataset_id = "sample_dataset_id"
df = client.Dataset(dataset_id)
df.create()

df.insert_documents(documents)
delete(self)#

Delete a dataset

Example

from relevanceai import Client
client = Client()

dataset_id = "sample_dataset_id"
df = client.Dataset(dataset_id)
df.delete()
insert_media_bytes(self, bytes: Write.insert_media_bytes.bytes, filename: str, verbose: bool = True)#

Insert a single media URL

insert_media_url(self, media_url: str, verbose: bool = True)#

Insert a single media URL

insert_media_urls(self, media_urls: List[str], verbose: bool = True, file_log: str = 'insert_media_urls.log', logging: bool = True)#

Insert a single media URL

insert_local_media(self, media_fn: str, verbose: bool = True)#

Insert local media

Parameters
  • media_fn (str) – A local media to upload

  • verbose (bool) – If True, prints a statement after uploading each media

insert_local_medias(self, media_fns: List[str], verbose: bool = False, file_log='local_media_upload.log', logging: bool = True)#

Insert a list of local medias.

Parameters
  • media_fns (List[str]) – A list of local medias

  • verbose (bool) – If True, this will print after each successful upload.

  • file_log (str) – The log to write

get_media_documents(self, media_fns: List[str], verbose: bool = False, file_log: str = 'media_upload.log', logging: bool = True) dict#

Bulk insert medias. Returns a link to once it has been hosted

Parameters
  • media_fns (List[str]) – List of medias to upload

  • verbose (bool) – If True, prints statements after uploading

  • file_log (str) – The file log to write

upsert_media(self, media_fns: List[str], verbose: bool = False, file_log: str = 'media_upload.log', logging: bool = True, **kw)#

Insert medias into a dataset.

Parameters
  • media_fns (List[str]) – A list of medias to upsert

  • verbose (bool) – If True, prints statements after uploading

  • file_log (str) – The file log to write

delete_documents(self, document_ids: List[str])#

Delete documents in a dataset

Parameters

document_ids (List[str]) – A list of document IDs to delete

update_where(self, update: dict, filters)#

Updates documents by filters. The updates to make to the documents that is returned by a filter.

For more information about filters refer to datasets.documents.get_where.

Example

from relevanceai import Client
client = Client()
ds = client.Dataset()
ds.update_where(
    {"value": 3},
    filters=ds['value'] != 10 # apply a simple filter
)
insert_list(self, labels: list, label_field: str = 'label', **kwargs)#

It takes a list of labels, and inserts them into the database as documents

Parameters
  • labels (list) – list of labels to insert

  • label_field (str, optional) – The field in the document that contains the label.

Return type

A list of the ids of the documents that were inserted.

batched_upsert_media(self, images: List[str], show_progress_bar: bool = False, n_workers: Optional[int] = None) List[str]#

It takes a list of images, splits it into batches, and then uses a thread pool to upsert the images in parallel

Parameters
  • images (List[str]) – A list of media src paths to upload

  • show_progress_bar (bool) – Show the progress bar

  • max_workers (Optional[int]) – The number of workers to use. If None, this is set to the max number in ThreadPoolExecutor

Returns

List[str]

Return type

A list of media_urls

prepare_media_documents(self, documents: List[Dict[str, Any]], media_fields: List[str], max_workers: Optional[int] = None) List[Dict[str, Any]]#