superduperdb package

Subpackages

Module contents

class superduperdb.CodeModel(identifier: str, artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, signature: ~typing.Literal['*args', '**kwargs', '*args, **kwargs', 'singleton'] = '*args, **kwargs', datatype: ~superduperdb.components.datatype.DataType | ~superduperdb.backends.ibis.field_types.FieldType | None = None, output_schema: ~superduperdb.components.schema.Schema | None = None, flatten: bool = False, model_update_kwargs: ~typing.Dict = <factory>, predict_kwargs: ~typing.Dict = <factory>, compute_kwargs: ~typing.Dict = <factory>, validation: ~superduperdb.components.model.Validation | None = None, metric_values: ~typing.Dict = <factory>, num_workers: int = 0, object: ~superduperdb.base.code.Code)[source]

Bases: _ObjectModel

Model component which wraps a model to become serializable :param datatype: DataType instance :param output_schema: Output schema (mapping of encoders) :param flatten: Flatten the model outputs :param collate_fn: Collate function :param model_update_kwargs: The kwargs to use for model update :param metrics: The metrics to evaluate on :param validation_sets: The validation Dataset instances to use :param predict_kwargs: Additional arguments to use at prediction time :param compute_kwargs: Kwargs used for compute backend job submit.

Example (Ray backend): compute_kwargs = dict(resources=…)

Parameters:

code – Code object, wrapping some foreign code

full_import_path = 'superduperdb.components.model.CodeModel'
classmethod handle_integration(kwargs)[source]
object: Code
ui_schema: t.ClassVar[t.List[t.Dict]] = [{'default': 'from superduperdb import code\n\n@code\ndef my_code(x):\n    return x\n', 'name': 'object', 'type': 'code'}]
class superduperdb.DataType(identifier: str, artifacts: dataclasses.InitVar[Optional[Dict]] = None, *, encoder: Callable | None = None, decoder: Callable | None = None, info: Dict | None = None, shape: Sequence | None = None, directory: str | None = None, encodable: str = 'encodable', bytes_encoding: str | None = BytesEncoding.BYTES, media_type: str | None = None)[source]

Bases: Component

Parameters:
  • identifier – A unique identifier for the component

  • identifier – Unique identifier

  • decoder – callable converting a bytes string to a Encodable of this Encoder

  • encoder – Callable converting an Encodable of this Encoder to bytes

  • shape – Shape of the data

  • load_hybrid – Whether to load the data from the URI or return the URI in CFG.hybrid mode

bytes_encoding: str | None = 'Bytes'
bytes_encoding_after_encode(data)[source]
bytes_encoding_before_decode(data)[source]
decode_data(item, info: Dict | None = None)[source]
decoder: Callable | None = None
directory: str | None = None
encodable: str = 'encodable'
encode_data(item, info: Dict | None = None)[source]
encoder: Callable | None = None
full_import_path = 'superduperdb.components.datatype.DataType'
info: Dict | None = None
media_type: str | None = None
shape: Sequence | None = None
type_id: ClassVar[str] = 'datatype'
ui_schema: ClassVar[List[Dict]] = [{'choices': ['pickle', 'dill', 'torch'], 'default': 'dill', 'name': 'serializer', 'type': 'string'}, {'name': 'info', 'optional': True, 'type': 'json'}, {'name': 'shape', 'optional': True, 'type': 'json'}, {'name': 'directory', 'optional': True, 'type': 'str'}, {'choices': ['encodable', 'lazy_artifact', 'file'], 'default': 'lazy_artifact', 'name': 'encodable', 'type': 'str'}, {'choices': ['base64', 'bytes'], 'default': 'bytes', 'name': 'bytes_encoding', 'type': 'str'}, {'name': 'media_type', 'optional': True, 'type': 'str'}]
class superduperdb.Dataset(identifier: str, artifacts: dataclasses.InitVar[Optional[Dict]] = None, *, select: Select | None = None, sample_size: int | None = None, random_seed: int | None = None, creation_date: str | None = None, raw_data: Sequence[Any] | None = None)[source]

Bases: Component

A dataset is an immutable collection of documents.

Parameters:
  • identifier – A unique identifier for the component

  • select – A query to select the documents for the dataset

  • sample_size – The number of documents to sample from the query

  • random_seed – The random seed to use for sampling

  • creation_date – The date the dataset was created

  • raw_data – The raw data for the dataset

creation_date: t.Optional[str] = None
property data
full_import_path = 'superduperdb.components.dataset.Dataset'
init()[source]
pre_create(db: Datalayer) None[source]

Called the first time this component is created

Parameters:

db – the db that creates the component

property random
random_seed: t.Optional[int] = None
raw_data: t.Optional[t.Sequence[t.Any]] = None
sample_size: t.Optional[int] = None
select: t.Optional[Select] = None
type_id: t.ClassVar[str] = 'dataset'
class superduperdb.Document[source]

Bases: MongoStyleDict

A wrapper around an instance of dict or a Encodable which may be used to dump that resource to a mix of json-able content, ids and bytes

Parameters:

content – The content to wrap

static decode(r: Dict, db: Datalayer | None = None) Any[source]
encode(schema: Schema | None = None, leaf_types_to_keep: Sequence[Type] = ()) Dict[source]

Make a copy of the content with all the Leaves encoded

get_leaves(leaf_type: str | None = None)[source]
set_variables(db, **kwargs) Document[source]
unpack(db=None, leaves_to_keep: Sequence = ()) Any[source]

Returns the content, but with any encodables replaced by their contents

property variables: List[str]
superduperdb.Encoder

alias of DataType

class superduperdb.Listener(artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, identifier: str = '', key: str | ~typing.List[str] | ~typing.Tuple[~typing.List[str], ~typing.Dict[str, str]], model: ~superduperdb.components.model.Model, select: ~superduperdb.backends.base.query.CompoundSelect, active: bool = True, predict_kwargs: ~typing.Dict | None = <factory>)[source]

Bases: Component

Listener object which is used to process a column/ key of a collection or table, and store the outputs.

Parameters:
  • identifier – A unique identifier for the component

  • key – Key to be bound to model

  • model – Model for processing data

  • select – Object for selecting which data is processed

  • identifier – A string used to identify the model.

  • active – Toggle to False to deactivate change data triggering

  • predict_kwargs – Keyword arguments to self.model.predict

active: bool = True
cleanup(database: Datalayer) None[source]

Clean up when the listener is deleted

Parameters:

database – The DB instance to process

property dependencies: List[ComponentTuple]
depends_on(other: Component)[source]
full_import_path = 'superduperdb.components.listener.Listener'
classmethod handle_integration(kwargs)[source]
property id_key: str
identifier: str = ''
key: str | List[str] | Tuple[List[str], Dict[str, str]]
property mapping
model: Model
property outputs
property outputs_key
property outputs_select
post_create(db: Datalayer) None[source]

Called after the first time this component is created. Generally used if self.version is important in this logic.

Parameters:

db – the db that creates the component

pre_create(db: Datalayer) None[source]

Called the first time this component is created

Parameters:

db – the db that creates the component

property predict_id
predict_kwargs: Dict | None
schedule_jobs(db: Datalayer, dependencies: Sequence[Job] = (), overwrite: bool = False) Sequence[Any][source]

Schedule jobs for the listener

Parameters:
  • database – The DB instance to process

  • dependencies – A list of dependencies

  • verbose – Whether to print verbose output

select: CompoundSelect
type_id: ClassVar[str] = 'listener'
ui_schema: ClassVar[List[Dict]] = [{'default': '', 'name': 'identifier', 'type': 'str'}, {'name': 'key', 'type': 'json'}, {'name': 'model', 'type': 'component/model'}, {'default': {'documents': [], 'query': '<collection_name>.find()'}, 'name': 'select', 'type': 'json'}, {'default': True, 'name': 'active', 'type': 'bool'}, {'default': {}, 'name': 'predict_kwargs', 'type': 'json'}]
class superduperdb.Metric(identifier: str, artifacts: dataclasses.InitVar[Optional[Dict]] = None, *, object: Callable)[source]

Bases: Component

Metric base object with which to evaluate performance on a data-set. These objects are callable and are applied row-wise to the data, and averaged.

Parameters:
  • identifier – A unique identifier for the component

  • object – callable or Artifact to be applied to the data

public_api(beta): This API is in beta and may change before becoming stable.

full_import_path = 'superduperdb.components.metric.Metric'
object: Callable
type_id: ClassVar[str] = 'metric'
ui_schema: ClassVar[List[Dict]] = [{'name': 'object', 'type': 'artifact'}]
class superduperdb.Model(identifier: str, artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, signature: ~typing.Literal['*args', '**kwargs', '*args, **kwargs', 'singleton'] = '*args, **kwargs', datatype: ~superduperdb.components.datatype.DataType | ~superduperdb.backends.ibis.field_types.FieldType | None = None, output_schema: ~superduperdb.components.schema.Schema | None = None, flatten: bool = False, model_update_kwargs: ~typing.Dict = <factory>, predict_kwargs: ~typing.Dict = <factory>, compute_kwargs: ~typing.Dict = <factory>, validation: ~superduperdb.components.model.Validation | None = None, metric_values: ~typing.Dict = <factory>)[source]

Bases: Component

Parameters:
  • datatype – DataType instance

  • output_schema – Output schema (mapping of encoders)

  • flatten – Flatten the model outputs

  • collate_fn – Collate function

  • model_update_kwargs – The kwargs to use for model update

  • metrics – The metrics to evaluate on

  • validation_sets – The validation Dataset instances to use

  • predict_kwargs – Additional arguments to use at prediction time

  • compute_kwargs – Kwargs used for compute backend job submit. Example (Ray backend): compute_kwargs = dict(resources=…)

compute_kwargs: t.Dict
datatype: EncoderArg = None
encode_outputs(outputs)[source]
encode_with_schema(outputs)[source]
flatten: bool = False
full_import_path = 'superduperdb.components.model.Model'
static handle_input_type(data, signature)[source]
property inputs: Inputs
metric_values: t.Dict
model_update_kwargs: t.Dict
output_schema: t.Optional[Schema] = None
abstract predict(dataset: List | QueryDataset) List[source]

Execute a single prediction on a datapoint given by positional and keyword arguments.

Parameters:
  • args – arguments handled by model

  • kwargs – key-word arguments handled by model

predict_in_db(X: ModelInputType, db: Datalayer, predict_id: str, select: CompoundSelect, ids: t.Optional[t.List[str]] = None, max_chunk_size: t.Optional[int] = None, in_memory: bool = True, overwrite: bool = False) t.Any[source]

Execute a single prediction on a datapoint given by positional and keyword arguments as a job.

Parameters:
  • X – combination of input keys to be mapped to the model

  • db – SuperDuperDB instance

  • select – CompoundSelect query

  • ids – Iterable of ids

  • max_chunk_size – Chunks of data

  • dependencies – List of dependencies (jobs)

  • in_memory – Load data into memory or not

  • overwrite – Overwrite all documents or only new documents

predict_in_db_job(X: ModelInputType, db: Datalayer, predict_id: str, select: t.Optional[CompoundSelect], ids: t.Optional[t.List[str]] = None, max_chunk_size: t.Optional[int] = None, dependencies: t.Sequence[Job] = (), in_memory: bool = True, overwrite: bool = False)[source]

Execute a single prediction on a datapoint given by positional and keyword arguments as a job.

Parameters:
  • X – combination of input keys to be mapped to the model

  • db – SuperDuperDB instance

  • select – CompoundSelect query

  • ids – Iterable of ids

  • max_chunk_size – Chunks of data

  • dependencies – List of dependencies (jobs)

  • in_memory – Load data into memory or not

  • overwrite – Overwrite all documents or only new documents

predict_kwargs: t.Dict
abstract predict_one(*args, **kwargs) int[source]

Execute a single prediction on a datapoint given by positional and keyword arguments.

Parameters:
  • args – arguments handled by model

  • kwargs – key-word arguments handled by model

signature: Signature = '*args,**kwargs'
to_listener(key: str | List[str] | Tuple[List[str], Dict[str, str]], select: CompoundSelect, identifier='', predict_kwargs: dict | None = None, **kwargs)[source]

Convert the model to a listener. :param key: Key to be bound to model :param select: Object for selecting which data is processed :param identifier: A string used to identify the model. :param predict_kwargs: Keyword arguments to self.model.predict

type_id: t.ClassVar[str] = 'model'
ui_schema: t.ClassVar[t.Dict] = [{'name': 'datatype', 'optional': True, 'type': 'component/datatype'}, {'default': {}, 'name': 'predict_kwargs', 'type': 'json'}, {'default': '*args,**kwargs', 'name': 'signature', 'type': 'str'}]
validate(X, dataset: Dataset, metrics: t.Sequence[Metric])[source]
validate_in_db(db)[source]
validate_in_db_job(db, dependencies: Sequence[Job] = ())[source]
validation: t.Optional[Validation] = None
class superduperdb.ObjectModel(identifier: str, artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, signature: ~typing.Literal['*args', '**kwargs', '*args, **kwargs', 'singleton'] = '*args, **kwargs', datatype: ~superduperdb.components.datatype.DataType | ~superduperdb.backends.ibis.field_types.FieldType | None = None, output_schema: ~superduperdb.components.schema.Schema | None = None, flatten: bool = False, model_update_kwargs: ~typing.Dict = <factory>, predict_kwargs: ~typing.Dict = <factory>, compute_kwargs: ~typing.Dict = <factory>, validation: ~superduperdb.components.model.Validation | None = None, metric_values: ~typing.Dict = <factory>, num_workers: int = 0, object: ~typing.Any)[source]

Bases: _ObjectModel

Model component which wraps a model to become serializable :param datatype: DataType instance :param output_schema: Output schema (mapping of encoders) :param flatten: Flatten the model outputs :param collate_fn: Collate function :param model_update_kwargs: The kwargs to use for model update :param metrics: The metrics to evaluate on :param validation_sets: The validation Dataset instances to use :param predict_kwargs: Additional arguments to use at prediction time :param compute_kwargs: Kwargs used for compute backend job submit.

Example (Ray backend): compute_kwargs = dict(resources=…)

Parameters:

object – Model object, e.g. sklearn model, etc..

full_import_path = 'superduperdb.components.model.ObjectModel'
ui_schema: t.ClassVar[t.List[t.Dict]] = [{'name': 'object', 'type': 'artifact'}]
class superduperdb.QueryModel(identifier: str, artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, signature: ~typing.Literal['*args', '**kwargs', '*args, **kwargs', 'singleton'] = '*args, **kwargs', datatype: ~superduperdb.components.datatype.DataType | ~superduperdb.backends.ibis.field_types.FieldType | None = None, output_schema: ~superduperdb.components.schema.Schema | None = None, flatten: bool = False, model_update_kwargs: ~typing.Dict = <factory>, predict_kwargs: ~typing.Dict = <factory>, compute_kwargs: ~typing.Dict = <factory>, validation: ~superduperdb.components.model.Validation | None = None, metric_values: ~typing.Dict = <factory>, preprocess: ~typing.Callable | None = None, postprocess: ~typing.Callable | ~superduperdb.base.code.Code | None = None, select: ~superduperdb.backends.base.query.CompoundSelect)[source]

Bases: Model

Model which can be used to query data and return those results as pre-computed queries.

Parameters:

select – query used to find data (can include like)

full_import_path = 'superduperdb.components.model.QueryModel'
classmethod handle_integration(kwargs)[source]
property inputs: Inputs
postprocess: t.Optional[t.Union[t.Callable, Code]] = None
predict(dataset: List | QueryDataset) List[source]

Execute a single prediction on a datapoint given by positional and keyword arguments.

Parameters:
  • args – arguments handled by model

  • kwargs – key-word arguments handled by model

predict_one(**kwargs)[source]

Execute a single prediction on a datapoint given by positional and keyword arguments.

Parameters:
  • args – arguments handled by model

  • kwargs – key-word arguments handled by model

preprocess: t.Optional[t.Callable] = None
select: CompoundSelect
ui_schema: t.ClassVar[t.List[t.Dict]] = [{'default': 'from superduperdb import code\n\n@code\ndef my_code(x):\n    return x\n', 'name': 'postprocess', 'type': 'code'}, {'default': {'documents': [{'<key-1>': '$my_value'}, {'_id': 0, '_outputs': 0}], 'query': "<collection_name>.like(_documents[0], vector_index='<index_id>').find({}, _documents[1]).limit(10)"}, 'name': 'select', 'type': 'json'}]
class superduperdb.Schema(identifier: str, artifacts: dataclasses.InitVar[Optional[Dict]] = None, *, fields: Mapping[str, DataType])[source]

Bases: Component

A component carrying the information about the types or Encoders of a Table

Parameters:
  • identifier – A unique identifier for the component

  • fields – A mapping of field names to types or Encoders

public_api(beta): This API is in beta and may change before becoming stable.

__call__(data: Mapping[str, Any])[source]

Encode data using the schema’s encoders

Parameters:

data – data to encode

decode_data(data: Mapping[str, Any]) Mapping[str, Any][source]

Decode data using the schema’s encoders

Parameters:

data – data to decode

property encoded_types
property encoders
fields: Mapping[str, DataType]
full_import_path = 'superduperdb.components.schema.Schema'
pre_create(db) None[source]

Called the first time this component is created

Parameters:

db – the db that creates the component

property raw
property trivial
type_id: ClassVar[str] = 'schema'
class superduperdb.Stack(identifier: str, artifacts: dataclasses.InitVar[Optional[Dict]] = None, *, components: Sequence[Component])[source]

Bases: Component

A placeholder to hold list of components under a namespace and packages them as a tarball This tarball can be retrieved back to a Stack instance with load method.

Parameters:
  • identifier – A unique identifier for the component

  • components – List of components to stack together and add to database.

public_api(alpha): This API is in alpha and may change before becoming stable.

components: Sequence[Component]
property db
static from_list(identifier, content, db: Datalayer | None = None)[source]
full_import_path = 'superduperdb.components.stack.Stack'
type_id: ClassVar[str] = 'stack'
class superduperdb.VectorIndex(identifier: str, artifacts: dataclasses.InitVar[typing.Optional[typing.Dict]] = None, *, indexing_listener: ~superduperdb.components.listener.Listener, compatible_listener: ~superduperdb.components.listener.Listener | None = None, measure: ~superduperdb.vector_search.base.VectorIndexMeasureType = VectorIndexMeasureType.cosine, metric_values: ~typing.Dict | None = <factory>)[source]

Bases: Component

A component carrying the information to apply a vector index to a DB instance

Parameters:
  • identifier – A unique identifier for the component

  • indexing_listener – Listener which is applied to created vectors

  • compatible_listener – Listener which is applied to vectors to be compared

  • measure – Measure to use for comparison

  • metric_values – Metric values for this index

compatible_listener: Listener | None = None
property dimensions: int
full_import_path = 'superduperdb.components.vector_index.VectorIndex'
get_nearest(like: Document, db: Any, id_field: str = '_id', outputs: Dict | None = None, ids: Sequence[str] | None = None, n: int = 100) Tuple[List[str], List[float]][source]

Given a document, find the nearest results in this vector index, returned as two parallel lists of result IDs and scores

Parameters:
  • like – The document to compare against

  • db – The datastore to use

  • outputs – An optional dictionary

  • ids – A list of ids to match

  • n – Number of items to return

get_vector(like: Document, models: List[str], keys: str | List | Dict, db: Any = None, outputs: Dict | None = None)[source]
indexing_listener: Listener
measure: VectorIndexMeasureType = 'cosine'
metric_values: Dict | None
property models_keys: Tuple[List[str], List[str | List[str] | Tuple[List[str], Dict[str, str]]]]

Return a list of model and keys for each listener

on_load(db: Datalayer) None[source]

Called when this component is loaded from the data store

Parameters:

db – the db that loaded the component

schedule_jobs(db: Datalayer, dependencies: Sequence[Job] = ()) Sequence[Any][source]

Schedule jobs for the listener

Parameters:
  • database – The DB instance to process

  • dependencies – A list of dependencies

  • verbose – Whether to print verbose output

type_id: ClassVar[str] = 'vector_index'
ui_schema: ClassVar[List[Dict]] = [{'name': 'indexing_listener', 'type': 'component/listener'}, {'name': 'compatible_listener', 'optional': True, 'type': 'component/listener'}, {'choices': ['cosine', 'dot', 'l2'], 'name': 'measure', 'type': 'str'}]
superduperdb.code(my_callable)[source]
superduperdb.logging

alias of Logging

superduperdb.objectmodel(item: Callable | None = None, identifier: str | None = None, datatype=None, model_update_kwargs: Dict | None = None, flatten: bool = False, output_schema: Schema | None = None)[source]

When a function is wrapped with this decorator, the function comes out as an ObjectModel.

superduperdb.superduper(item: Any | None = None, **kwargs) Any[source]

Attempts to automatically wrap an item in a superduperdb component by using duck typing to recognize it.

Parameters:

item – A database or model

superduperdb.vector(shape, identifier: str | None = None)[source]

Create an encoder for a vector (list of ints/ floats) of a given shape

Parameters:

shape – The shape of the vector