diff --git a/docarray/array/doc_list/doc_list.py b/docarray/array/doc_list/doc_list.py index 6dbfca3fa76..529fb87904f 100644 --- a/docarray/array/doc_list/doc_list.py +++ b/docarray/array/doc_list/doc_list.py @@ -19,7 +19,7 @@ from typing_inspect import is_union_type from docarray.array.any_array import AnyDocArray -from docarray.array.doc_list.io import IOMixinArray +from docarray.array.doc_list.io import IOMixinDocList from docarray.array.doc_list.pushpull import PushPullMixin from docarray.array.list_advance_indexing import IndexIterType, ListAdvancedIndexing from docarray.base_doc import AnyDoc, BaseDoc @@ -41,7 +41,7 @@ class DocList( ListAdvancedIndexing[T_doc], PushPullMixin, - IOMixinArray, + IOMixinDocList, AnyDocArray[T_doc], ): """ diff --git a/docarray/array/doc_list/io.py b/docarray/array/doc_list/io.py index 7dc5d5979d6..9ddc308641c 100644 --- a/docarray/array/doc_list/io.py +++ b/docarray/array/doc_list/io.py @@ -23,6 +23,7 @@ Type, TypeVar, Union, + cast, ) import orjson @@ -40,9 +41,12 @@ if TYPE_CHECKING: import pandas as pd + from docarray.array.doc_vec.doc_vec import DocVec + from docarray.array.doc_vec.io import IOMixinDocVec from docarray.proto import DocListProto + from docarray.typing.tensor.abstract_tensor import AbstractTensor -T = TypeVar('T', bound='IOMixinArray') +T = TypeVar('T', bound='IOMixinDocList') T_doc = TypeVar('T_doc', bound=BaseDoc) ARRAY_PROTOCOLS = {'protobuf-array', 'pickle-array', 'json-array'} @@ -96,7 +100,7 @@ def __getitem__(self, item: slice): return self.content[item] -class IOMixinArray(Iterable[T_doc]): +class IOMixinDocList(Iterable[T_doc]): doc_type: Type[T_doc] @abstractmethod @@ -515,8 +519,6 @@ class Person(BaseDoc): doc_dict = _access_path_dict_to_nested_dict(access_path2val) docs.append(doc_type.parse_obj(doc_dict)) - if not isinstance(docs, cls): - return cls(docs) return docs def to_dataframe(self) -> 'pd.DataFrame': @@ -577,11 +579,13 @@ def _load_binary_all( protocol: Optional[str], compress: Optional[str], show_progress: bool, + tensor_type: Optional[Type['AbstractTensor']] = None, ): """Read a `DocList` object from a binary file :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf' :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip` :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :param tensor_type: only relevant for DocVec; tensor_type of the DocVec :return: a `DocList` """ with file_ctx as fp: @@ -603,12 +607,20 @@ def _load_binary_all( proto = cls._get_proto_class()() proto.ParseFromString(d) - return cls.from_protobuf(proto) + if tensor_type is not None: + cls_ = cast('IOMixinDocVec', cls) + return cls_.from_protobuf(proto, tensor_type=tensor_type) + else: + return cls.from_protobuf(proto) elif protocol is not None and protocol == 'pickle-array': return pickle.loads(d) elif protocol is not None and protocol == 'json-array': - return cls.from_json(d) + if tensor_type is not None: + cls_ = cast('IOMixinDocVec', cls) + return cls_.from_json(d, tensor_type=tensor_type) + else: + return cls.from_json(d) # Binary format for streaming case else: @@ -658,6 +670,10 @@ def _load_binary_all( pbar.update( t, advance=1, total_size=str(filesize.decimal(_total_size)) ) + if tensor_type is not None: + cls__ = cast(Type['DocVec'], cls) + # mypy doesn't realize that cls_ is callable + return cls__(docs, tensor_type=tensor_type) # type: ignore return cls(docs) @classmethod @@ -724,6 +740,27 @@ def _load_binary_stream( t, advance=1, total_size=str(filesize.decimal(_total_size)) ) + @staticmethod + def _get_file_context( + file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader], + protocol: str, + compress: Optional[str] = None, + ) -> Tuple[Union[nullcontext, io.BufferedReader], Optional[str], Optional[str]]: + load_protocol: Optional[str] = protocol + load_compress: Optional[str] = compress + file_ctx: Union[nullcontext, io.BufferedReader] + if isinstance(file, (io.BufferedReader, _LazyRequestReader, bytes)): + file_ctx = nullcontext(file) + # by checking path existence we allow file to be of type Path, LocalPath, PurePath and str + elif isinstance(file, (str, pathlib.Path)) and os.path.exists(file): + load_protocol, load_compress = _protocol_and_compress_from_file_path( + file, protocol, compress + ) + file_ctx = open(file, 'rb') + else: + raise FileNotFoundError(f'cannot find file {file}') + return file_ctx, load_protocol, load_compress + @classmethod def load_binary( cls: Type[T], @@ -753,19 +790,9 @@ def load_binary( :return: a `DocList` object """ - load_protocol: Optional[str] = protocol - load_compress: Optional[str] = compress - file_ctx: Union[nullcontext, io.BufferedReader] - if isinstance(file, (io.BufferedReader, _LazyRequestReader, bytes)): - file_ctx = nullcontext(file) - # by checking path existence we allow file to be of type Path, LocalPath, PurePath and str - elif isinstance(file, (str, pathlib.Path)) and os.path.exists(file): - load_protocol, load_compress = _protocol_and_compress_from_file_path( - file, protocol, compress - ) - file_ctx = open(file, 'rb') - else: - raise FileNotFoundError(f'cannot find file {file}') + file_ctx, load_protocol, load_compress = cls._get_file_context( + file, protocol, compress + ) if streaming: if load_protocol not in SINGLE_PROTOCOLS: raise ValueError( diff --git a/docarray/array/doc_vec/doc_vec.py b/docarray/array/doc_vec/doc_vec.py index fab2c7313bc..57678b28736 100644 --- a/docarray/array/doc_vec/doc_vec.py +++ b/docarray/array/doc_vec/doc_vec.py @@ -17,34 +17,23 @@ overload, ) -import numpy as np -import orjson from pydantic import BaseConfig, parse_obj_as from typing_inspect import typingGenericAlias from docarray.array.any_array import AnyDocArray from docarray.array.doc_list.doc_list import DocList -from docarray.array.doc_list.io import IOMixinArray from docarray.array.doc_vec.column_storage import ColumnStorage, ColumnStorageView +from docarray.array.doc_vec.io import IOMixinDocVec from docarray.array.list_advance_indexing import ListAdvancedIndexing from docarray.base_doc import AnyDoc, BaseDoc -from docarray.base_doc.mixins.io import _type_to_protobuf from docarray.typing import NdArray from docarray.typing.tensor.abstract_tensor import AbstractTensor from docarray.utils._internal._typing import is_tensor_union from docarray.utils._internal.misc import is_tf_available, is_torch_available if TYPE_CHECKING: - import csv - from pydantic.fields import ModelField - from docarray.proto import ( - DocVecProto, - ListOfDocArrayProto, - ListOfDocVecProto, - NdArrayProto, - ) torch_available = is_torch_available() if torch_available: @@ -62,62 +51,13 @@ T_doc = TypeVar('T_doc', bound=BaseDoc) T = TypeVar('T', bound='DocVec') -T_io_mixin = TypeVar('T_io_mixin', bound='IOMixinArray') +T_io_mixin = TypeVar('T_io_mixin', bound='IOMixinDocVec') IndexIterType = Union[slice, Iterable[int], Iterable[bool], None] -NONE_NDARRAY_PROTO_SHAPE = (0,) -NONE_NDARRAY_PROTO_DTYPE = 'None' - - -def _none_ndarray_proto() -> 'NdArrayProto': - from docarray.proto import NdArrayProto - - zeros_arr = parse_obj_as(NdArray, np.zeros(NONE_NDARRAY_PROTO_SHAPE)) - nd_proto = NdArrayProto() - nd_proto.dense.buffer = zeros_arr.tobytes() - nd_proto.dense.ClearField('shape') - nd_proto.dense.shape.extend(list(zeros_arr.shape)) - nd_proto.dense.dtype = NONE_NDARRAY_PROTO_DTYPE - - return nd_proto - - -def _none_docvec_proto() -> 'DocVecProto': - from docarray.proto import DocVecProto - - return DocVecProto() - - -def _none_list_of_docvec_proto() -> 'ListOfDocArrayProto': - from docarray.proto import ListOfDocVecProto - return ListOfDocVecProto() - - -def _is_none_ndarray_proto(proto: 'NdArrayProto') -> bool: - return ( - proto.dense.shape == list(NONE_NDARRAY_PROTO_SHAPE) - and proto.dense.dtype == NONE_NDARRAY_PROTO_DTYPE - ) - - -def _is_none_docvec_proto(proto: 'DocVecProto') -> bool: - return ( - proto.tensor_columns == {} - and proto.doc_columns == {} - and proto.docs_vec_columns == {} - and proto.any_columns == {} - ) - - -def _is_none_list_of_docvec_proto(proto: 'ListOfDocVecProto') -> bool: - from docarray.proto import ListOfDocVecProto - - return isinstance(proto, ListOfDocVecProto) and len(proto.data) == 0 - - -class DocVec(IOMixinArray, AnyDocArray[T_doc]): +# type ignore because from_protobuf has a different signature +class DocVec(IOMixinDocVec, AnyDocArray[T_doc]): # type: ignore """ DocVec is a container of Documents appropriates to perform computation that require batches of data (ex: matrix multiplication, distance @@ -620,206 +560,6 @@ def _docarray_to_json_compatible(self) -> Dict[str, Dict[str, Any]]: tup = self._storage.columns_json_compatible() return tup._asdict() - @classmethod - def from_json( - cls: Type[T], - file: Union[str, bytes, bytearray], - tensor_type: Type[AbstractTensor] = NdArray, - ) -> T: - """Deserialize JSON strings or bytes into a `DocList`. - - :param file: JSON object from where to deserialize a `DocList` - :param tensor_type: the tensor type to use for the tensor columns. - Could be NdArray, TorchTensor, or TensorFlowTensor. Defaults to NdArray. - All tensors of the output DocVec will be of this type. - :return: the deserialized `DocList` - """ - json_columns = orjson.loads(file) - return cls._from_json_col_dict(json_columns, tensor_type=tensor_type) - - @classmethod - def _from_json_col_dict( - cls: Type[T], - json_columns: Dict[str, Any], - tensor_type: Type[AbstractTensor] = NdArray, - ) -> T: - - tensor_cols = json_columns['tensor_columns'] - doc_cols = json_columns['doc_columns'] - docs_vec_cols = json_columns['docs_vec_columns'] - any_cols = json_columns['any_columns'] - - for key, col in tensor_cols.items(): - if col is not None: - tensor_cols[key] = parse_obj_as(tensor_type, col) - else: - tensor_cols[key] = None - - for key, col in doc_cols.items(): - if col is not None: - col_doc_type = cls.doc_type._get_field_type(key) - doc_cols[key] = DocVec.__class_getitem__( - col_doc_type - )._from_json_col_dict(col, tensor_type=tensor_type) - else: - doc_cols[key] = None - - for key, col in docs_vec_cols.items(): - if col is not None: - col_doc_type = cls.doc_type._get_field_type(key).doc_type - col_ = ListAdvancedIndexing( - DocVec.__class_getitem__(col_doc_type)._from_json_col_dict( - vec, tensor_type=tensor_type - ) - for vec in col - ) - docs_vec_cols[key] = col_ - else: - docs_vec_cols[key] = None - - for key, col in any_cols.items(): - if col is not None: - col_type = cls.doc_type._get_field_type(key) - col_type = ( - col_type - if cls.doc_type.__fields__[key].required - else Optional[col_type] - ) - col_ = ListAdvancedIndexing(parse_obj_as(col_type, val) for val in col) - any_cols[key] = col_ - else: - any_cols[key] = None - - return cls.from_columns_storage( - ColumnStorage( - tensor_cols, doc_cols, docs_vec_cols, any_cols, tensor_type=tensor_type - ) - ) - - @classmethod - def from_protobuf( - cls: Type[T], pb_msg: 'DocVecProto', tensor_type: Type[AbstractTensor] = NdArray - ) -> T: - """create a DocVec from a protobuf message - :param pb_msg: the protobuf message to deserialize - :param tensor_type: the tensor type to use for the tensor columns. - Could be NdArray, TorchTensor, or TensorFlowTensor. Defaults to NdArray. - All tensors of the output DocVec will be of this type. - :return: The deserialized DocVec - """ - - tensor_columns: Dict[str, Optional[AbstractTensor]] = {} - doc_columns: Dict[str, Optional['DocVec']] = {} - docs_vec_columns: Dict[str, Optional[ListAdvancedIndexing['DocVec']]] = {} - any_columns: Dict[str, ListAdvancedIndexing] = {} - - for tens_col_name, tens_col_proto in pb_msg.tensor_columns.items(): - if _is_none_ndarray_proto(tens_col_proto): - # handle values that were None before serialization - tensor_columns[tens_col_name] = None - else: - tensor_columns[tens_col_name] = tensor_type.from_protobuf( - tens_col_proto - ) - - for doc_col_name, doc_col_proto in pb_msg.doc_columns.items(): - if _is_none_docvec_proto(doc_col_proto): - # handle values that were None before serialization - doc_columns[doc_col_name] = None - else: - col_doc_type: Type = cls.doc_type._get_field_type(doc_col_name) - doc_columns[doc_col_name] = DocVec.__class_getitem__( - col_doc_type - ).from_protobuf(doc_col_proto, tensor_type=tensor_type) - - for docs_vec_col_name, docs_vec_col_proto in pb_msg.docs_vec_columns.items(): - vec_list: Optional[ListAdvancedIndexing] - if _is_none_list_of_docvec_proto(docs_vec_col_proto): - # handle values that were None before serialization - vec_list = None - else: - vec_list = ListAdvancedIndexing() - for doc_list_proto in docs_vec_col_proto.data: - col_doc_type = cls.doc_type._get_field_type( - docs_vec_col_name - ).doc_type - vec_list.append( - DocVec.__class_getitem__(col_doc_type).from_protobuf( - doc_list_proto, tensor_type=tensor_type - ) - ) - docs_vec_columns[docs_vec_col_name] = vec_list - - for any_col_name, any_col_proto in pb_msg.any_columns.items(): - any_column: ListAdvancedIndexing = ListAdvancedIndexing() - for node_proto in any_col_proto.data: - content = cls.doc_type._get_content_from_node_proto( - node_proto, any_col_name - ) - any_column.append(content) - any_columns[any_col_name] = any_column - - storage = ColumnStorage( - tensor_columns=tensor_columns, - doc_columns=doc_columns, - docs_vec_columns=docs_vec_columns, - any_columns=any_columns, - tensor_type=tensor_type, - ) - - return cls.from_columns_storage(storage) - - def to_protobuf(self) -> 'DocVecProto': - """Convert DocVec into a Protobuf message""" - from docarray.proto import ( - DocVecProto, - ListOfAnyProto, - ListOfDocArrayProto, - ListOfDocVecProto, - NdArrayProto, - ) - - doc_columns_proto: Dict[str, DocVecProto] = dict() - tensor_columns_proto: Dict[str, NdArrayProto] = dict() - da_columns_proto: Dict[str, ListOfDocArrayProto] = dict() - any_columns_proto: Dict[str, ListOfAnyProto] = dict() - - for field, col_doc in self._storage.doc_columns.items(): - if col_doc is None: - # put dummy empty DocVecProto for serialization - doc_columns_proto[field] = _none_docvec_proto() - else: - doc_columns_proto[field] = col_doc.to_protobuf() - for field, col_tens in self._storage.tensor_columns.items(): - if col_tens is None: - # put dummy empty NdArrayProto for serialization - tensor_columns_proto[field] = _none_ndarray_proto() - else: - tensor_columns_proto[field] = ( - col_tens.to_protobuf() if col_tens is not None else None - ) - for field, col_da in self._storage.docs_vec_columns.items(): - list_proto = ListOfDocVecProto() - if col_da: - for docs in col_da: - list_proto.data.append(docs.to_protobuf()) - else: - # put dummy empty ListOfDocVecProto for serialization - list_proto = _none_list_of_docvec_proto() - da_columns_proto[field] = list_proto - for field, col_any in self._storage.any_columns.items(): - list_proto = ListOfAnyProto() - for data in col_any: - list_proto.data.append(_type_to_protobuf(data)) - any_columns_proto[field] = list_proto - - return DocVecProto( - doc_columns=doc_columns_proto, - tensor_columns=tensor_columns_proto, - docs_vec_columns=da_columns_proto, - any_columns=any_columns_proto, - ) - def to_doc_list(self: T) -> DocList[T_doc]: """Convert DocVec into a DocList. @@ -889,34 +629,7 @@ def traverse_flat( else: return flattened - def to_csv( - self, file_path: str, dialect: Union[str, 'csv.Dialect'] = 'excel' - ) -> None: - """ - DocVec does not support `.to_csv()`. This is because CSV is a row-based format - while DocVec has a column-based data layout. - To overcome this, do: `doc_vec.to_doc_list().to_csv(...)`. - """ - raise NotImplementedError( - f'{type(self)} does not support `.to_csv()`. This is because CSV is a row-based format' - f'while {type(self)} has a column-based data layout. ' - f'To overcome this, do: `doc_vec.to_doc_list().to_csv(...)`.' - ) - @classmethod - def from_csv( - cls: Type['T'], - file_path: str, - encoding: str = 'utf-8', - dialect: Union[str, 'csv.Dialect'] = 'excel', - ) -> 'T': - """ - DocVec does not support `.from_csv()`. This is because CSV is a row-based format - while DocVec has a column-based data layout. - To overcome this, do: `DocList[MyDoc].from_csv(...).to_doc_vec()`. - """ - raise NotImplementedError( - f'{cls} does not support `.from_csv()`. This is because CSV is a row-based format while' - f'{cls} has a column-based data layout. ' - f'To overcome this, do: `DocList[MyDoc].from_csv(...).to_doc_vec()`.' - ) + def __class_getitem__(cls, item: Union[Type[BaseDoc], TypeVar, str]): + # call implementation in AnyDocArray + return super(IOMixinDocVec, cls).__class_getitem__(item) diff --git a/docarray/array/doc_vec/io.py b/docarray/array/doc_vec/io.py new file mode 100644 index 00000000000..b044b315f45 --- /dev/null +++ b/docarray/array/doc_vec/io.py @@ -0,0 +1,503 @@ +import base64 +import io +import pathlib +from abc import abstractmethod +from contextlib import nullcontext +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Generator, + Optional, + Type, + TypeVar, + Union, + cast, +) + +import numpy as np +import orjson +from pydantic import parse_obj_as + +from docarray.array.doc_list.io import ( + SINGLE_PROTOCOLS, + IOMixinDocList, + _LazyRequestReader, +) +from docarray.array.doc_vec.column_storage import ColumnStorage +from docarray.array.list_advance_indexing import ListAdvancedIndexing +from docarray.base_doc import BaseDoc +from docarray.base_doc.mixins.io import _type_to_protobuf +from docarray.typing import NdArray +from docarray.typing.tensor.abstract_tensor import AbstractTensor + +if TYPE_CHECKING: + import csv + + import pandas as pd + + from docarray.array.doc_vec.doc_vec import DocVec + from docarray.proto import ( + DocVecProto, + ListOfDocArrayProto, + ListOfDocVecProto, + NdArrayProto, + ) + + +T = TypeVar('T', bound='IOMixinDocVec') +T_doc = TypeVar('T_doc', bound=BaseDoc) + +NONE_NDARRAY_PROTO_SHAPE = (0,) +NONE_NDARRAY_PROTO_DTYPE = 'None' + + +def _none_ndarray_proto() -> 'NdArrayProto': + from docarray.proto import NdArrayProto + + zeros_arr = parse_obj_as(NdArray, np.zeros(NONE_NDARRAY_PROTO_SHAPE)) + nd_proto = NdArrayProto() + nd_proto.dense.buffer = zeros_arr.tobytes() + nd_proto.dense.ClearField('shape') + nd_proto.dense.shape.extend(list(zeros_arr.shape)) + nd_proto.dense.dtype = NONE_NDARRAY_PROTO_DTYPE + + return nd_proto + + +def _none_docvec_proto() -> 'DocVecProto': + from docarray.proto import DocVecProto + + return DocVecProto() + + +def _none_list_of_docvec_proto() -> 'ListOfDocArrayProto': + from docarray.proto import ListOfDocVecProto + + return ListOfDocVecProto() + + +def _is_none_ndarray_proto(proto: 'NdArrayProto') -> bool: + return ( + proto.dense.shape == list(NONE_NDARRAY_PROTO_SHAPE) + and proto.dense.dtype == NONE_NDARRAY_PROTO_DTYPE + ) + + +def _is_none_docvec_proto(proto: 'DocVecProto') -> bool: + return ( + proto.tensor_columns == {} + and proto.doc_columns == {} + and proto.docs_vec_columns == {} + and proto.any_columns == {} + ) + + +def _is_none_list_of_docvec_proto(proto: 'ListOfDocVecProto') -> bool: + from docarray.proto import ListOfDocVecProto + + return isinstance(proto, ListOfDocVecProto) and len(proto.data) == 0 + + +class IOMixinDocVec(IOMixinDocList): + @classmethod + @abstractmethod + def from_columns_storage(cls: Type[T], storage: ColumnStorage) -> T: + ... + + @classmethod + @abstractmethod + def __class_getitem__(cls, item: Union[Type[BaseDoc], TypeVar, str]): + ... + + @classmethod + def from_json( + cls: Type[T], + file: Union[str, bytes, bytearray], + tensor_type: Type[AbstractTensor] = NdArray, + ) -> T: + """Deserialize JSON strings or bytes into a `DocList`. + + :param file: JSON object from where to deserialize a `DocList` + :param tensor_type: the tensor type to use for the tensor columns. + Could be NdArray, TorchTensor, or TensorFlowTensor. Defaults to NdArray. + All tensors of the output DocVec will be of this type. + :return: the deserialized `DocList` + """ + json_columns = orjson.loads(file) + return cls._from_json_col_dict(json_columns, tensor_type=tensor_type) + + @classmethod + def _from_json_col_dict( + cls: Type[T], + json_columns: Dict[str, Any], + tensor_type: Type[AbstractTensor] = NdArray, + ) -> T: + + tensor_cols = json_columns['tensor_columns'] + doc_cols = json_columns['doc_columns'] + docs_vec_cols = json_columns['docs_vec_columns'] + any_cols = json_columns['any_columns'] + + for key, col in tensor_cols.items(): + if col is not None: + tensor_cols[key] = parse_obj_as(tensor_type, col) + else: + tensor_cols[key] = None + + for key, col in doc_cols.items(): + if col is not None: + col_doc_type = cls.doc_type._get_field_type(key) + doc_cols[key] = cls.__class_getitem__(col_doc_type)._from_json_col_dict( + col, tensor_type=tensor_type + ) + else: + doc_cols[key] = None + + for key, col in docs_vec_cols.items(): + if col is not None: + col_doc_type = cls.doc_type._get_field_type(key).doc_type + col_ = ListAdvancedIndexing( + cls.__class_getitem__(col_doc_type)._from_json_col_dict( + vec, tensor_type=tensor_type + ) + for vec in col + ) + docs_vec_cols[key] = col_ + else: + docs_vec_cols[key] = None + + for key, col in any_cols.items(): + if col is not None: + col_type = cls.doc_type._get_field_type(key) + col_type = ( + col_type + if cls.doc_type.__fields__[key].required + else Optional[col_type] + ) + col_ = ListAdvancedIndexing(parse_obj_as(col_type, val) for val in col) + any_cols[key] = col_ + else: + any_cols[key] = None + + return cls.from_columns_storage( + ColumnStorage( + tensor_cols, doc_cols, docs_vec_cols, any_cols, tensor_type=tensor_type + ) + ) + + @classmethod + def from_protobuf( + cls: Type[T], pb_msg: 'DocVecProto', tensor_type: Type[AbstractTensor] = NdArray # type: ignore + ) -> T: + """create a DocVec from a protobuf message + :param pb_msg: the protobuf message to deserialize + :param tensor_type: the tensor type to use for the tensor columns. + Could be NdArray, TorchTensor, or TensorFlowTensor. Defaults to NdArray. + All tensors of the output DocVec will be of this type. + :return: The deserialized DocVec + """ + tensor_columns: Dict[str, Optional[AbstractTensor]] = {} + doc_columns: Dict[str, Optional['DocVec']] = {} + docs_vec_columns: Dict[str, Optional[ListAdvancedIndexing['DocVec']]] = {} + any_columns: Dict[str, ListAdvancedIndexing] = {} + + for tens_col_name, tens_col_proto in pb_msg.tensor_columns.items(): + if _is_none_ndarray_proto(tens_col_proto): + # handle values that were None before serialization + tensor_columns[tens_col_name] = None + else: + tensor_columns[tens_col_name] = tensor_type.from_protobuf( + tens_col_proto + ) + + for doc_col_name, doc_col_proto in pb_msg.doc_columns.items(): + if _is_none_docvec_proto(doc_col_proto): + # handle values that were None before serialization + doc_columns[doc_col_name] = None + else: + col_doc_type: Type = cls.doc_type._get_field_type(doc_col_name) + doc_columns[doc_col_name] = cls.__class_getitem__( + col_doc_type + ).from_protobuf(doc_col_proto, tensor_type=tensor_type) + + for docs_vec_col_name, docs_vec_col_proto in pb_msg.docs_vec_columns.items(): + vec_list: Optional[ListAdvancedIndexing] + if _is_none_list_of_docvec_proto(docs_vec_col_proto): + # handle values that were None before serialization + vec_list = None + else: + vec_list = ListAdvancedIndexing() + for doc_list_proto in docs_vec_col_proto.data: + col_doc_type = cls.doc_type._get_field_type( + docs_vec_col_name + ).doc_type + vec_list.append( + cls.__class_getitem__(col_doc_type).from_protobuf( + doc_list_proto, tensor_type=tensor_type + ) + ) + docs_vec_columns[docs_vec_col_name] = vec_list + + for any_col_name, any_col_proto in pb_msg.any_columns.items(): + any_column: ListAdvancedIndexing = ListAdvancedIndexing() + for node_proto in any_col_proto.data: + content = cls.doc_type._get_content_from_node_proto( + node_proto, any_col_name + ) + any_column.append(content) + any_columns[any_col_name] = any_column + + storage = ColumnStorage( + tensor_columns=tensor_columns, + doc_columns=doc_columns, + docs_vec_columns=docs_vec_columns, + any_columns=any_columns, + tensor_type=tensor_type, + ) + + return cls.from_columns_storage(storage) + + def to_protobuf(self) -> 'DocVecProto': + """Convert DocVec into a Protobuf message""" + from docarray.proto import ( + DocVecProto, + ListOfAnyProto, + ListOfDocArrayProto, + ListOfDocVecProto, + NdArrayProto, + ) + + self_ = cast('DocVec', self) + + doc_columns_proto: Dict[str, DocVecProto] = dict() + tensor_columns_proto: Dict[str, NdArrayProto] = dict() + da_columns_proto: Dict[str, ListOfDocArrayProto] = dict() + any_columns_proto: Dict[str, ListOfAnyProto] = dict() + + for field, col_doc in self_._storage.doc_columns.items(): + if col_doc is None: + # put dummy empty DocVecProto for serialization + doc_columns_proto[field] = _none_docvec_proto() + else: + doc_columns_proto[field] = col_doc.to_protobuf() + for field, col_tens in self_._storage.tensor_columns.items(): + if col_tens is None: + # put dummy empty NdArrayProto for serialization + tensor_columns_proto[field] = _none_ndarray_proto() + else: + tensor_columns_proto[field] = ( + col_tens.to_protobuf() if col_tens is not None else None + ) + for field, col_da in self_._storage.docs_vec_columns.items(): + list_proto = ListOfDocVecProto() + if col_da: + for docs in col_da: + list_proto.data.append(docs.to_protobuf()) + else: + # put dummy empty ListOfDocVecProto for serialization + list_proto = _none_list_of_docvec_proto() + da_columns_proto[field] = list_proto + for field, col_any in self_._storage.any_columns.items(): + list_proto = ListOfAnyProto() + for data in col_any: + list_proto.data.append(_type_to_protobuf(data)) + any_columns_proto[field] = list_proto + + return DocVecProto( + doc_columns=doc_columns_proto, + tensor_columns=tensor_columns_proto, + docs_vec_columns=da_columns_proto, + any_columns=any_columns_proto, + ) + + def to_csv( + self, file_path: str, dialect: Union[str, 'csv.Dialect'] = 'excel' + ) -> None: + """ + DocVec does not support `.to_csv()`. This is because CSV is a row-based format + while DocVec has a column-based data layout. + To overcome this, do: `doc_vec.to_doc_list().to_csv(...)`. + """ + raise NotImplementedError( + f'{type(self)} does not support `.to_csv()`. This is because CSV is a row-based format' + f'while {type(self)} has a column-based data layout. ' + f'To overcome this, do: `doc_vec.to_doc_list().to_csv(...)`.' + ) + + @classmethod + def from_csv( + cls: Type['T'], + file_path: str, + encoding: str = 'utf-8', + dialect: Union[str, 'csv.Dialect'] = 'excel', + ) -> 'T': + """ + DocVec does not support `.from_csv()`. This is because CSV is a row-based format + while DocVec has a column-based data layout. + To overcome this, do: `DocList[MyDoc].from_csv(...).to_doc_vec()`. + """ + raise NotImplementedError( + f'{cls} does not support `.from_csv()`. This is because CSV is a row-based format while' + f'{cls} has a column-based data layout. ' + f'To overcome this, do: `DocList[MyDoc].from_csv(...).to_doc_vec()`.' + ) + + @classmethod + def from_base64( + cls: Type[T], + data: str, + protocol: str = 'protobuf-array', + compress: Optional[str] = None, + show_progress: bool = False, + tensor_type: Type['AbstractTensor'] = NdArray, + ) -> T: + """Deserialize base64 strings into a `DocVec`. + + :param data: Base64 string to deserialize + :param protocol: protocol that was used to serialize + :param compress: compress algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip` + :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :param tensor_type: the tensor type of the resulting DocVEc + :return: the deserialized `DocVec` + """ + return cls._load_binary_all( + file_ctx=nullcontext(base64.b64decode(data)), + protocol=protocol, + compress=compress, + show_progress=show_progress, + tensor_type=tensor_type, + ) + + @classmethod + def from_bytes( + cls: Type[T], + data: bytes, + protocol: str = 'protobuf-array', + compress: Optional[str] = None, + show_progress: bool = False, + tensor_type: Type['AbstractTensor'] = NdArray, + ) -> T: + """Deserialize bytes into a `DocList`. + + :param data: Bytes from which to deserialize + :param protocol: protocol that was used to serialize + :param compress: compression algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip` + :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :param tensor_type: the tensor type of the resulting DocVec + :return: the deserialized `DocVec` + """ + return cls._load_binary_all( + file_ctx=nullcontext(data), + protocol=protocol, + compress=compress, + show_progress=show_progress, + tensor_type=tensor_type, + ) + + @classmethod + def from_dataframe( + cls: Type['T'], + df: 'pd.DataFrame', + tensor_type: Type['AbstractTensor'] = NdArray, + ) -> 'T': + """ + Load a `DocVec` from a `pandas.DataFrame` following the schema + defined in the [`.doc_type`][docarray.DocVec] attribute. + Every row of the dataframe will be mapped to one Document in the doc_vec. + The column names of the dataframe have to match the field names of the + Document type. + For nested fields use "__"-separated access paths as column names, + such as `'image__url'`. + + List-like fields (including field of type DocList) are not supported. + + --- + + ```python + import pandas as pd + + from docarray import BaseDoc, DocVec + + + class Person(BaseDoc): + name: str + follower: int + + + df = pd.DataFrame( + data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower'] + ) + + docs = DocVec[Person].from_dataframe(df) + + assert docs.name == ['Maria', 'Jake'] + assert docs.follower == [12345, 54321] + ``` + + --- + + :param df: `pandas.DataFrame` to extract Document's information from + :param tensor_type: the tensor type of the resulting DocVec + :return: `DocList` where each Document contains the information of one + corresponding row of the `pandas.DataFrame`. + """ + # type ignore could be avoided by simply putting this implementation in the DocVec class + # but leaving it here for code separation + return cls(super().from_dataframe(df), tensor_type=tensor_type) # type: ignore + + @classmethod + def load_binary( + cls: Type[T], + file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader], + protocol: str = 'protobuf-array', + compress: Optional[str] = None, + show_progress: bool = False, + streaming: bool = False, + tensor_type: Type['AbstractTensor'] = NdArray, + ) -> Union[T, Generator['T_doc', None, None]]: + """Load doc_vec elements from a compressed binary file. + + In case protocol is pickle the `Documents` are streamed from disk to save memory usage + + !!! note + If `file` is `str` it can specify `protocol` and `compress` as file extensions. + This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a + string interpolation of the respective `protocol` and `compress` methods. + For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf` + and `compress=lz4`. + + :param file: File or filename or serialized bytes where the data is stored. + :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf' + :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip` + :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :param streaming: if `True` returns a generator over `Document` objects. + :param tensor_type: the tensor type of the resulting DocVEc + + :return: a `DocVec` object + + """ + file_ctx, load_protocol, load_compress = cls._get_file_context( + file, protocol, compress + ) + if streaming: + if load_protocol not in SINGLE_PROTOCOLS: + raise ValueError( + f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, ' + f'got {load_protocol}' + ) + else: + return cls._load_binary_stream( + file_ctx, + protocol=load_protocol, + compress=load_compress, + show_progress=show_progress, + ) + else: + return cls._load_binary_all( + file_ctx, + load_protocol, + load_compress, + show_progress, + tensor_type=tensor_type, + ) diff --git a/docs/API_reference/array/da.md b/docs/API_reference/array/da.md index e1f5b33f008..6de33ec06bc 100644 --- a/docs/API_reference/array/da.md +++ b/docs/API_reference/array/da.md @@ -1,5 +1,5 @@ # DocList ::: docarray.array.doc_list.doc_list.DocList -::: docarray.array.doc_list.io.IOMixinArray +::: docarray.array.doc_list.io.IOMixinDocList ::: docarray.array.doc_list.pushpull.PushPullMixin diff --git a/docs/API_reference/array/da_stack.md b/docs/API_reference/array/da_stack.md index 917b4488d78..8aa38d46182 100644 --- a/docs/API_reference/array/da_stack.md +++ b/docs/API_reference/array/da_stack.md @@ -1,3 +1,4 @@ # DocVec -::: docarray.array.doc_vec.doc_vec.DocVec \ No newline at end of file +::: docarray.array.doc_vec.doc_vec.DocVec +::: docarray.array.doc_vec.io.IOMixinDocVec \ No newline at end of file diff --git a/docs/data_types/table/table.md b/docs/data_types/table/table.md index 45474ce80a1..3ebc90013d1 100644 --- a/docs/data_types/table/table.md +++ b/docs/data_types/table/table.md @@ -28,7 +28,7 @@ class Book(BaseDoc): author: str year: int ``` -Next, load the content of the CSV file to a [`DocList`][docarray.DocList] instance of `Book`s via [`.from_csv()`][docarray.array.doc_list.io.IOMixinArray.from_csv]: +Next, load the content of the CSV file to a [`DocList`][docarray.DocList] instance of `Book`s via [`.from_csv()`][docarray.array.doc_list.io.IOMixinDocList.from_csv]: ```python from docarray import DocList @@ -64,7 +64,7 @@ The resulting [`DocList`][docarray.DocList] object contains three `Book`s, since ## Save to CSV file -Vice versa, you can also store your [`DocList`][docarray.DocList] data in a `.csv` file using [`.to_csv()`][docarray.array.doc_list.io.IOMixinArray.to_csv]: +Vice versa, you can also store your [`DocList`][docarray.DocList] data in a `.csv` file using [`.to_csv()`][docarray.array.doc_list.io.IOMixinDocList.to_csv]: ``` { .python } docs.to_csv(file_path='/path/to/my_file.csv') @@ -126,8 +126,8 @@ addca0475756fc12cdec8faf8fb10d71,03194cec1b75927c2259b3c0fff1ab6f,A little life, ## Handle TSV tables Not only can you load and save comma-separated values (`CSV`) data, but also tab-separated values (`TSV`), -by adjusting the `dialect` parameter in [`.from_csv()`][docarray.array.doc_list.io.IOMixinArray.from_csv] -and [`.to_csv()`][docarray.array.doc_list.io.IOMixinArray.to_csv]. +by adjusting the `dialect` parameter in [`.from_csv()`][docarray.array.doc_list.io.IOMixinDocList.from_csv] +and [`.to_csv()`][docarray.array.doc_list.io.IOMixinDocList.to_csv]. The dialect defaults to `'excel'`, which refers to comma-separated values. For tab-separated values, you can use `'excel-tab'`. @@ -200,7 +200,7 @@ class SemicolonSeparator(csv.Dialect): quoting = csv.QUOTE_MINIMAL ``` -Finally, you can load your data by setting the `dialect` parameter in [`.from_csv()`][docarray.array.doc_list.io.IOMixinArray.from_csv] to an instance of your `SemicolonSeparator`. +Finally, you can load your data by setting the `dialect` parameter in [`.from_csv()`][docarray.array.doc_list.io.IOMixinDocList.from_csv] to an instance of your `SemicolonSeparator`. ```python docs = DocList[Book].from_csv( diff --git a/docs/user_guide/sending/serialization.md b/docs/user_guide/sending/serialization.md index 1986f12532b..145dd6befd0 100644 --- a/docs/user_guide/sending/serialization.md +++ b/docs/user_guide/sending/serialization.md @@ -59,8 +59,8 @@ When sending or storing [`DocList`][docarray.array.doc_list.doc_list.DocList], y ### JSON -- [`to_json()`][docarray.array.doc_list.io.IOMixinArray.to_json] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to JSON. It returns the binary representation of the JSON object. -- [`from_json()`][docarray.array.doc_list.io.IOMixinArray.from_json] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from JSON. It can load from either a `str` or `binary` representation of the JSON object. +- [`to_json()`][docarray.array.doc_list.io.IOMixinDocList.to_json] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to JSON. It returns the binary representation of the JSON object. +- [`from_json()`][docarray.array.doc_list.io.IOMixinDocList.from_json] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from JSON. It can load from either a `str` or `binary` representation of the JSON object. ```python from docarray import BaseDoc, DocList @@ -88,8 +88,8 @@ b'[{"id":"5540e72d407ae81abb2390e9249ed066","text":"doc 0"},{"id":"fbe9f80d2fa03 ### Protobuf -- [`to_protobuf()`][docarray.array.doc_list.io.IOMixinArray.to_protobuf] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to `protobuf`. It returns a `protobuf` object of `docarray_pb2.DocListProto` class. -- [`from_protobuf()`][docarray.array.doc_list.io.IOMixinArray.from_protobuf] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from `protobuf`. It accepts a `protobuf` message object to construct a [`DocList`][docarray.array.doc_list.doc_list.DocList]. +- [`to_protobuf()`][docarray.array.doc_list.io.IOMixinDocList.to_protobuf] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to `protobuf`. It returns a `protobuf` object of `docarray_pb2.DocListProto` class. +- [`from_protobuf()`][docarray.array.doc_list.io.IOMixinDocList.from_protobuf] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from `protobuf`. It accepts a `protobuf` message object to construct a [`DocList`][docarray.array.doc_list.doc_list.DocList]. ```python from docarray import BaseDoc, DocList @@ -112,8 +112,8 @@ print(dl_from_proto) When transferring data over the network, use `Base64` format to serialize the [`DocList`][docarray.array.doc_list.doc_list.DocList]. Serializing a [`DocList`][docarray.array.doc_list.doc_list.DocList] in Base64 supports both the `pickle` and `protobuf` protocols. You can also choose different compression methods. -- [`to_base64()`][docarray.array.doc_list.io.IOMixinArray.to_base64] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to Base64 -- [`from_base64()`][docarray.array.doc_list.io.IOMixinArray.from_base64] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from Base64: +- [`to_base64()`][docarray.array.doc_list.io.IOMixinDocList.to_base64] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to Base64 +- [`from_base64()`][docarray.array.doc_list.io.IOMixinDocList.from_base64] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from Base64: You can multiple compression methods: `lz4`, `bz2`, `lzma`, `zlib`, and `gzip`. @@ -138,8 +138,8 @@ dl_from_base64 = DocList[SimpleDoc].from_base64( These methods **serialize and save** your data: -- [`save_binary()`][docarray.array.doc_list.io.IOMixinArray.save_binary] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a binary file. -- [`load_binary()`][docarray.array.doc_list.io.IOMixinArray.load_binary] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a binary file. +- [`save_binary()`][docarray.array.doc_list.io.IOMixinDocList.save_binary] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a binary file. +- [`load_binary()`][docarray.array.doc_list.io.IOMixinDocList.load_binary] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a binary file. You can choose between multiple compression methods: `lz4`, `bz2`, `lzma`, `zlib`, and `gzip`. @@ -166,11 +166,11 @@ In the above snippet, the [`DocList`][docarray.array.doc_list.doc_list.DocList] These methods just serialize your data, without saving it to a file: -- [to_bytes()][docarray.array.doc_list.io.IOMixinArray.to_bytes] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a byte object. -- [from_bytes()][docarray.array.doc_list.io.IOMixinArray.from_bytes] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a byte object. +- [to_bytes()][docarray.array.doc_list.io.IOMixinDocList.to_bytes] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a byte object. +- [from_bytes()][docarray.array.doc_list.io.IOMixinDocList.from_bytes] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a byte object. !!! note - These methods are used under the hood by [save_binary()][docarray.array.doc_list.io.IOMixinArray.to_base64] and [`load_binary()`][docarray.array.doc_list.io.IOMixinArray.load_binary] to prepare/load/save to a binary file. You can also use them directly to work with byte files. + These methods are used under the hood by [save_binary()][docarray.array.doc_list.io.IOMixinDocList.to_base64] and [`load_binary()`][docarray.array.doc_list.io.IOMixinDocList.load_binary] to prepare/load/save to a binary file. You can also use them directly to work with byte files. Like working with binary files: @@ -196,8 +196,8 @@ dl_from_bytes = DocList[SimpleDoc].from_bytes( ### CSV -- [`to_csv()`][docarray.array.doc_list.io.IOMixinArray.to_csv] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a CSV file. -- [`from_csv()`][docarray.array.doc_list.io.IOMixinArray.from_csv] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a CSV file. +- [`to_csv()`][docarray.array.doc_list.io.IOMixinDocList.to_csv] serializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a CSV file. +- [`from_csv()`][docarray.array.doc_list.io.IOMixinDocList.from_csv] deserializes a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a CSV file. Use the `dialect` parameter to choose the [dialect of the CSV format](https://docs.python.org/3/library/csv.html#dialects-and-formatting-parameters): @@ -218,8 +218,8 @@ print(dl_from_csv) ### Pandas.Dataframe -- [`from_dataframe()`][docarray.array.doc_list.io.IOMixinArray.from_dataframe] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). -- [`to_dataframe()`][docarray.array.doc_list.io.IOMixinArray.to_dataframe] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). +- [`from_dataframe()`][docarray.array.doc_list.io.IOMixinDocList.from_dataframe] loads a [`DocList`][docarray.array.doc_list.doc_list.DocList] from a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). +- [`to_dataframe()`][docarray.array.doc_list.io.IOMixinDocList.to_dataframe] saves a [`DocList`][docarray.array.doc_list.doc_list.DocList] to a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). ```python from docarray import BaseDoc, DocList @@ -241,22 +241,38 @@ print(dl_from_dataframe) For sending or storing [`DocVec`][docarray.array.doc_vec.doc_vec.DocVec] it offers a very similar interface to that of [`DocList`][docarray.array.doc_list.doc_list.DocList]. +!!! note "Tensor type and (de)serialization" + + + You can deserialize any serialized [DocVec][docarray.array.doc_list.doc_list.DocVec] to any tensor type ([`NdArray`][docarray.typing.tensor.NdArray], [`TorchTensor`][docarray.typing.tensor.TorchTensor], or [`TensorFlowTensor`][docarray.typing.tensor.TensorFlowTensor]), + by passing the `tensor_type=...` parameter to the appropriate deserialization method. + This is analogous to the `tensor_type=...` parameter in the [DocVec][docarray.array.doc_list.doc_list.DocVec.__init__] constructor. + + This means that you can choose at deserialization time if you are working with numpy, PyTorch, or TensorFlow tensors. + + If no `tensor_type` is passed, the default is `NdArray`. + ### JSON -- [`to_json()`][docarray.array.doc_list.io.IOMixinArray.to_json] serializes a [`DocVec`][docarray.array.doc_vec.doc_vec.DocVec] to JSON. It returns the binary representation of the JSON object. -- [`from_json()`][docarray.array.doc_list.io.IOMixinArray.from_json] deserializes a [`DocList`][docarray.array.doc_vec.doc_vec.DocVec] from JSON. It can load from either a `str` or `binary` representation of the JSON object. +- [`to_json()`][docarray.array.doc_vec.io.IOMixinDocVec.to_json] serializes a [`DocVec`][docarray.array.doc_vec.doc_vec.DocVec] to JSON. It returns the binary representation of the JSON object. +- [`from_json()`][docarray.array.doc_list.io.IOMixinDocVec.from_json] deserializes a [`DocList`][docarray.array.doc_vec.doc_vec.DocVec] from JSON. It can load from either a `str` or `binary` representation of the JSON object. In contrast to [DocList's JSON format](#json-1), `DocVec.to_json()` outputs a column oriented JSON file: ```python +import torch from docarray import BaseDoc, DocVec +from docarray.typing import TorchTensor class SimpleDoc(BaseDoc): text: str + tensor: TorchTensor -dv = DocVec[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(2)]) +dv = DocVec[SimpleDoc]( + [SimpleDoc(text=f'doc {i}', tensor=torch.rand(64)) for i in range(2)] +) with open('simple-dv.json', 'wb') as f: json_dv = dv.to_json() @@ -264,7 +280,7 @@ with open('simple-dv.json', 'wb') as f: f.write(json_dv) with open('simple-dv.json', 'r') as f: - dv_load_from_json = DocVec[SimpleDoc].from_json(f.read()) + dv_load_from_json = DocVec[SimpleDoc].from_json(f.read(), tensor_type=TorchTensor) print(dv_load_from_json) ``` @@ -275,8 +291,8 @@ b'{"tensor_columns":{},"doc_columns":{},"docs_vec_columns":{},"any_columns":{"id ### Protobuf -- [`to_protobuf`][docarray.array.doc_list.doc_list.DocVec.to_protobuf] serializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] to `protobuf`. It returns a `protobuf` object of `docarray_pb2.DocVecProto` class. -- [`from_protobuf`][docarray.array.doc_list.doc_list.DocVec.from_protobuf] deserializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] from `protobuf`. It accepts a protobuf message object to construct a [DocVec][docarray.array.doc_list.doc_list.DocVec]. +- [`to_protobuf`][docarray.array.doc_vec.io.IOMixinDocVec.to_protobuf] serializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] to `protobuf`. It returns a `protobuf` object of `docarray_pb2.DocVecProto` class. +- [`from_protobuf`][docarray.array.doc_vec.io.IOMixinDocVec.from_protobuf] deserializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] from `protobuf`. It accepts a protobuf message object to construct a [DocVec][docarray.array.doc_list.doc_list.DocVec]. ```python import numpy as np @@ -297,7 +313,7 @@ dv_from_proto = DocVec[SimpleVecDoc].from_protobuf(proto_message_dv) ``` You can deserialize any [DocVec][docarray.array.doc_list.doc_list.DocVec] protobuf message to any tensor type, -by passing the `tensor_type=...` parameter to [`from_protobuf`][docarray.array.doc_list.doc_list.DocVec.from_protobuf] +by passing the `tensor_type=...` parameter to [`from_protobuf`][docarray.array.doc_vec.io.IOMixinDocVec.from_protobuf] This means that you can choose at deserialization time if you are working with numpy, PyTorch, or TensorFlow tensors. @@ -343,25 +359,30 @@ When transferring data over the network, use `Base64` format to serialize the [D Serializing a [DocVec][docarray.array.doc_list.doc_list.DocVec] in Base64 supports both the `pickle` and `protobuf` protocols. You can also choose different compression methods. -- [`to_base64()`][docarray.array.doc_list.io.IOMixinArray.to_base64] serializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] to Base64 -- [`from_base64()`][docarray.array.doc_list.io.IOMixinArray.from_base64] deserializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] from Base64: +- [`to_base64()`][docarray.array.doc_vec.io.IOMixinDocVec.to_base64] serializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] to Base64 +- [`from_base64()`][docarray.array.doc_vec.io.IOMixinDocVec.from_base64] deserializes a [DocVec][docarray.array.doc_list.doc_list.DocVec] from Base64: You can multiple compression methods: `lz4`, `bz2`, `lzma`, `zlib`, and `gzip`. ```python from docarray import BaseDoc, DocVec +from docarray.typing import TorchTensor +import torch class SimpleDoc(BaseDoc): text: str + tensor: TorchTensor -dv = DocVec[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(2)]) +dv = DocVec[SimpleDoc]( + [SimpleDoc(text=f'doc {i}', tensor=torch.rand(64)) for i in range(2)] +) base64_repr_dv = dv.to_base64(compress=None, protocol='pickle') dl_from_base64 = DocVec[SimpleDoc].from_base64( - base64_repr_dv, compress=None, protocol='pickle' + base64_repr_dv, compress=None, protocol='pickle', tensor_type=TorchTensor ) ``` @@ -369,25 +390,30 @@ dl_from_base64 = DocVec[SimpleDoc].from_base64( These methods **serialize and save** your data: -- [`save_binary()`][docarray.array.doc_list.io.IOMixinArray.save_binary] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a binary file. -- [`load_binary()`][docarray.array.doc_list.io.IOMixinArray.load_binary] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a binary file. +- [`save_binary()`][docarray.array.doc_vec.io.IOMixinDocVec.save_binary] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a binary file. +- [`load_binary()`][docarray.array.doc_vec.io.IOMixinDocVec.load_binary] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a binary file. You can choose between multiple compression methods: `lz4`, `bz2`, `lzma`, `zlib`, and `gzip`. ```python from docarray import BaseDoc, DocVec +from docarray.typing import TorchTensor +import torch class SimpleDoc(BaseDoc): text: str + tensor: TorchTensor -dv = DocVec[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(2)]) +dv = DocVec[SimpleDoc]( + [SimpleDoc(text=f'doc {i}', tensor=torch.rand(64)) for i in range(2)] +) dv.save_binary('simple-dl.pickle', compress=None, protocol='pickle') dv_from_binary = DocVec[SimpleDoc].load_binary( - 'simple-dv.pickle', compress=None, protocol='pickle' + 'simple-dv.pickle', compress=None, protocol='pickle', tensor_type=TorchTensor ) ``` @@ -397,11 +423,11 @@ In the above snippet, the [DocVec][docarray.array.doc_list.doc_list.DocVec] is s These methods just serialize your data, without saving it to a file: -- [to_bytes()][docarray.array.doc_list.io.IOMixinArray.to_bytes] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a byte object. -- [from_bytes()][docarray.array.doc_list.io.IOMixinArray.from_bytes] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a byte object. +- [to_bytes()][docarray.array.doc_vec.io.IOMixinDocVec.to_bytes] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a byte object. +- [from_bytes()][docarray.array.doc_vec.io.IOMixinDocVec.from_bytes] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a byte object. !!! note - These methods are used under the hood by [save_binary()][docarray.array.doc_list.io.IOMixinArray.to_base64] and [`load_binary()`][docarray.array.doc_list.io.IOMixinArray.load_binary] to prepare/load/save to a binary file. You can also use them directly to work with byte files. + These methods are used under the hood by [save_binary()][docarray.array.doc_vec.io.IOMixinDocVec.to_base64] and [`load_binary()`][docarray.array.doc_vec.io.IOMixinDocVec.load_binary] to prepare/load/save to a binary file. You can also use them directly to work with byte files. Like working with binary files: @@ -410,17 +436,24 @@ Like working with binary files: ```python from docarray import BaseDoc, DocVec +from docarray.typing import TorchTensor +import torch class SimpleDoc(BaseDoc): text: str + tensor: TorchTensor -dv = DocVec[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(2)]) +dv = DocVec[SimpleDoc]( + [SimpleDoc(text=f'doc {i}', tensor=torch.rand(64)) for i in range(2)] +) bytes_dv = dv.to_bytes(protocol='pickle', compress=None) -dv_from_bytes = DocVec[SimpleDoc].from_bytes(bytes_dv, compress=None, protocol='pickle') +dv_from_bytes = DocVec[SimpleDoc].from_bytes( + bytes_dv, compress=None, protocol='pickle', tensor_type=TorchTensor +) ``` ### CSV @@ -449,21 +482,26 @@ dv_from_bytes = DocVec[SimpleDoc].from_bytes(bytes_dv, compress=None, protocol=' ### Pandas.Dataframe -- [`from_dataframe()`][docarray.array.doc_list.io.IOMixinArray.from_dataframe] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). -- [`to_dataframe()`][docarray.array.doc_list.io.IOMixinArray.to_dataframe] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). +- [`from_dataframe()`][docarray.array.doc_vec.io.IOMixinDocVec.from_dataframe] loads a [DocVec][docarray.array.doc_list.doc_list.DocVec] from a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). +- [`to_dataframe()`][docarray.array.doc_vec.io.IOMixinDocVec.to_dataframe] saves a [DocVec][docarray.array.doc_list.doc_list.DocVec] to a [Pandas Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). ```python from docarray import BaseDoc, DocVec +from docarray.typing import TorchTensor +import torch class SimpleDoc(BaseDoc): text: str + tensor: TorchTensor -dv = DocVec[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(2)]) +dv = DocVec[SimpleDoc]( + [SimpleDoc(text=f'doc {i}', tensor=torch.rand(64)) for i in range(2)] +) df = dv.to_dataframe() -dv_from_dataframe = DocVec[SimpleDoc].from_dataframe(df) +dv_from_dataframe = DocVec[SimpleDoc].from_dataframe(df, tensor_type=TorchTensor) print(dv_from_dataframe) ``` diff --git a/tests/units/array/test_array_from_to_bytes.py b/tests/units/array/test_array_from_to_bytes.py index fba84cdfa80..d0c35b57907 100644 --- a/tests/units/array/test_array_from_to_bytes.py +++ b/tests/units/array/test_array_from_to_bytes.py @@ -2,7 +2,7 @@ from docarray import BaseDoc, DocList, DocVec from docarray.documents import ImageDoc -from docarray.typing import NdArray +from docarray.typing import NdArray, TorchTensor class MyDoc(BaseDoc): @@ -73,6 +73,48 @@ def test_from_to_base64(protocol, compress, show_progress, array_cls): assert da2[1].image.url is None +@pytest.mark.parametrize('tensor_type', [NdArray, TorchTensor]) +@pytest.mark.parametrize('protocol', ['protobuf-array', 'pickle-array']) +def test_from_to_base64_tensor_type(tensor_type, protocol): + class MyDoc(BaseDoc): + embedding: tensor_type + text: str + image: ImageDoc + + da = DocVec[MyDoc]( + [ + MyDoc( + embedding=[1, 2, 3, 4, 5], text='hello', image=ImageDoc(url='aux.png') + ), + MyDoc(embedding=[5, 4, 3, 2, 1], text='hello world', image=ImageDoc()), + ], + tensor_type=tensor_type, + ) + bytes_da = da.to_base64(protocol=protocol) + da2 = DocVec[MyDoc].from_base64( + bytes_da, tensor_type=tensor_type, protocol=protocol + ) + assert da2.tensor_type == tensor_type + assert isinstance(da2.embedding, tensor_type) + + +@pytest.mark.parametrize('tensor_type', [NdArray, TorchTensor]) +def test_from_to_bytes_tensor_type(tensor_type): + da = DocVec[MyDoc]( + [ + MyDoc( + embedding=[1, 2, 3, 4, 5], text='hello', image=ImageDoc(url='aux.png') + ), + MyDoc(embedding=[5, 4, 3, 2, 1], text='hello world', image=ImageDoc()), + ], + tensor_type=tensor_type, + ) + bytes_da = da.to_bytes() + da2 = DocVec[MyDoc].from_bytes(bytes_da, tensor_type=tensor_type) + assert da2.tensor_type == tensor_type + assert isinstance(da2.embedding, tensor_type) + + def test_union_type_error(tmp_path): from typing import Union diff --git a/tests/units/array/test_array_from_to_json.py b/tests/units/array/test_array_from_to_json.py index 030f023d82c..0569a566775 100644 --- a/tests/units/array/test_array_from_to_json.py +++ b/tests/units/array/test_array_from_to_json.py @@ -146,3 +146,20 @@ class CustomDoc(BaseDoc): docs_copy = docs.from_json(docs.to_json()) assert docs == docs_copy + + +@pytest.mark.parametrize('tensor_type', [NdArray, TorchTensor]) +def test_from_to_json_tensor_type(tensor_type): + da = DocVec[MyDoc]( + [ + MyDoc( + embedding=[1, 2, 3, 4, 5], text='hello', image=ImageDoc(url='aux.png') + ), + MyDoc(embedding=[5, 4, 3, 2, 1], text='hello world', image=ImageDoc()), + ], + tensor_type=tensor_type, + ) + json_da = da.to_json() + da2 = DocVec[MyDoc].from_json(json_da, tensor_type=tensor_type) + assert da2.tensor_type == tensor_type + assert isinstance(da2.embedding, tensor_type) diff --git a/tests/units/array/test_array_from_to_pandas.py b/tests/units/array/test_array_from_to_pandas.py index 463b3c62e86..c14f9529ff9 100644 --- a/tests/units/array/test_array_from_to_pandas.py +++ b/tests/units/array/test_array_from_to_pandas.py @@ -5,6 +5,7 @@ from docarray import BaseDoc, DocList, DocVec from docarray.documents import ImageDoc +from docarray.typing import NdArray, TorchTensor @pytest.fixture() @@ -132,3 +133,25 @@ class BasisUnion(BaseDoc): docs_basic = DocList[BasisUnion]([BasisUnion(ud="hello")]) docs_copy = DocList[BasisUnion].from_dataframe(docs_basic.to_dataframe()) assert docs_copy == docs_basic + + +@pytest.mark.parametrize('tensor_type', [NdArray, TorchTensor]) +def test_from_to_pandas_tensor_type(tensor_type): + class MyDoc(BaseDoc): + embedding: tensor_type + text: str + image: ImageDoc + + da = DocVec[MyDoc]( + [ + MyDoc( + embedding=[1, 2, 3, 4, 5], text='hello', image=ImageDoc(url='aux.png') + ), + MyDoc(embedding=[5, 4, 3, 2, 1], text='hello world', image=ImageDoc()), + ], + tensor_type=tensor_type, + ) + df_da = da.to_dataframe() + da2 = DocVec[MyDoc].from_dataframe(df_da, tensor_type=tensor_type) + assert da2.tensor_type == tensor_type + assert isinstance(da2.embedding, tensor_type) diff --git a/tests/units/array/test_array_save_load.py b/tests/units/array/test_array_save_load.py index fc9a3a22609..e3dc0bdaa80 100644 --- a/tests/units/array/test_array_save_load.py +++ b/tests/units/array/test_array_save_load.py @@ -5,7 +5,7 @@ from docarray import BaseDoc, DocList, DocVec from docarray.documents import ImageDoc -from docarray.typing import NdArray +from docarray.typing import NdArray, TorchTensor class MyDoc(BaseDoc): @@ -96,3 +96,27 @@ def _extend_da(num_docs=100): assert doc.image.url == da[i].image.url assert i == 99 + + +@pytest.mark.parametrize('tensor_type', [NdArray, TorchTensor]) +def test_save_load_tensor_type(tensor_type, tmp_path): + tmp_file = os.path.join(tmp_path, 'test123') + + class MyDoc(BaseDoc): + embedding: tensor_type + text: str + image: ImageDoc + + da = DocVec[MyDoc]( + [ + MyDoc( + embedding=[1, 2, 3, 4, 5], text='hello', image=ImageDoc(url='aux.png') + ), + MyDoc(embedding=[5, 4, 3, 2, 1], text='hello world', image=ImageDoc()), + ], + tensor_type=tensor_type, + ) + da.save_binary(tmp_file) + da2 = DocVec[MyDoc].load_binary(tmp_file, tensor_type=tensor_type) + assert da2.tensor_type == tensor_type + assert isinstance(da2.embedding, tensor_type)