superduperdb package#

Subpackages#

Module contents#

class superduperdb.Dataset(identifier: str, select: Select | None = None, sample_size: int | None = None, random_seed: int | None = None, creation_date: str | None = None, raw_data: Artifact | Any | None = None, version: int | None = None)[source]#

Bases: Component

A dataset is an immutable collection of documents that used for training

Parameters:
  • identifier – A unique identifier for the dataset

  • select – A query to select the documents for the dataset

  • sample_size – The number of documents to sample from the query

  • random_seed – The random seed to use for sampling

  • creation_date – The date the dataset was created

  • raw_data – The raw data for the dataset

  • version – The version of the dataset

creation_date: str | None = None#
identifier: str#
on_load(db: Datalayer) None[source]#

Called when this component is loaded from the data store

Parameters:

db – the db that loaded the component

post_create(db: Datalayer) None[source]#

Called after the first time this component is created. Generally used if self.version is important in this logic.

Parameters:

db – the db that creates the component

pre_create(db: Datalayer) None[source]#

Called the first time this component is created

Parameters:

db – the db that creates the component

property random#
random_seed: int | None = None#
raw_data: Artifact | Any | None = None#
sample_size: int | None = None#
select: Select | None = None#
type_id: ClassVar[str] = 'dataset'#
version: int | None = None#
class superduperdb.Document(content: Dict | Encodable)[source]#

Bases: object

A wrapper around an instance of dict or a Encodable which may be used to dump that resource to a mix of JSONable and bytes

Parameters:

content – The content to wrap

content: Dict | Encodable#
static decode(r: Dict, encoders: Dict) Any[source]#
dump_bson() bytes[source]#

Dump this document into BSON and encode as bytes

encode(schema: Schema | None = None) Any[source]#

Make a copy of the content with all the Encodables encoded

outputs(key: str, model: str, version: int | None = None) Any[source]#

Get document ouputs on key from model

Parameters:
  • key – Document key to get outputs from.

  • model – Model name to get outputs from.

unpack() Any[source]#

Returns the content, but with any encodables replacecs by their contents

class superduperdb.Encoder(identifier: str, decoder: ~typing.Callable | ~superduperdb.base.artifact.Artifact = <factory>, encoder: ~typing.Callable | ~superduperdb.base.artifact.Artifact = <factory>, shape: ~typing.Sequence | None = None, load_hybrid: bool = True, version: int | None = None)[source]#

Bases: Component

Storeable Component allowing byte encoding of primary data, i.e. data inserted using db.base.db.Datalayer.insert

Parameters:
  • identifier – Unique identifier

  • decoder – callable converting a bytes string to a Encodable of this Encoder

  • encoder – Callable converting an Encodable of this Encoder to bytes

  • shape – Shape of the data

  • version – Version of the encoder (don’t use this)

  • load_hybrid – Whether to load the data from the URI or return the URI in CFG.hybrid mode

artifact_artibutes: ClassVar[Sequence[str]] = ['decoder', 'encoder']#
decode(b: bytes) Any[source]#
decoder: Callable | Artifact#
dump(other)[source]#
encode(x: Any | None = None, uri: str | None = None, wrap: bool = True) str | None | Dict[str, Any][source]#
encoder: Callable | Artifact#
encoders: ClassVar[List] = ['_default']#
identifier: str#
load_hybrid: bool = True#
shape: Sequence | None = None#
type_id: ClassVar[str] = 'encoder'#
version: int | None = None#
class superduperdb.JSONable[source]#

Bases: BaseModel

A base class for classes that can be converted to and from JSON

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

__init__ uses __pydantic_self__ instead of the more common self for the first arg to allow self as a field name.

class Config[source]#

Bases: object

extra = 'forbid'#
ignored_types = (<class 'functools.cached_property'>,)#
SUBCLASSES: ClassVar[Set[Type]] = {<class 'superduperdb.base.config.BaseConfigJSONable'>, <class 'superduperdb.base.config.Cluster'>, <class 'superduperdb.base.config.Config'>, <class 'superduperdb.base.config.Retry'>}#
TYPE_ID_TO_CLASS: ClassVar[Dict[str, Type]] = {}#
deepcopy() JSONable[source]#
dict(*, include: IncEx = None, exclude: IncEx = None, by_alias: bool = False, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) Dict[str, Any]#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'ignored_types': (<class 'functools.cached_property'>,)}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class superduperdb.Listener(key: str, model: str | ~superduperdb.components.model.Model, select: ~superduperdb.backends.base.query.CompoundSelect, active: bool = True, identifier: str | None = None, predict_kwargs: ~typing.Dict | None = <factory>, version: int | None = None)[source]#

Bases: Component

Listener object which is used to process a column/ key of a collection or table, and store the outputs.

Parameters:
  • key – Key to be bound to model

  • model – Model for processing data

  • select – Object for selecting which data is processed

  • active – Toggle to False to deactivate change data triggering

  • identifier – A string used to identify the model.

  • predict_kwargs – Keyword arguments to self.model.predict

  • version – Version number of the model(?)

active: bool = True#
property child_components: Sequence[Tuple[str, str]]#

Returns a list of child components as pairs TBD

cleanup(database: Datalayer) None[source]#

Clean up when the listener is deleted

Parameters:

database – The DB instance to process

property dependencies: List[str]#
property id_key: str#
identifier: str | None = None#
key: str#
model: str | Model#
property outputs#
post_create(db: Datalayer) None[source]#

Called after the first time this component is created. Generally used if self.version is important in this logic.

Parameters:

db – the db that creates the component

pre_create(db: Datalayer) None[source]#

Called the first time this component is created

Parameters:

db – the db that creates the component

predict_kwargs: Dict | None#
schedule_jobs(db: Datalayer, dependencies: Sequence[Job] = (), verbose: bool = False) Sequence[Any][source]#

Schedule jobs for the listener

Parameters:
  • database – The DB instance to process

  • dependencies – A list of dependencies

  • verbose – Whether to print verbose output

select: CompoundSelect#
type_id: ClassVar[str] = 'listener'#
version: int | None = None#
class superduperdb.Metric(identifier: str, object: Artifact | Callable | None = None, version: int | None = None)[source]#

Bases: Component

Metric base object with which to evaluate performance on a data-set. These objects are callable and are applied row-wise to the data, and averaged.

Parameters:
  • identifier – unique identifier

  • object – callable or Artifact to be applied to the data

  • version – version of the Metric

artifacts: ClassVar[List[str]] = ['object']#
identifier: str#
object: Artifact | Callable | None = None#
type_id: ClassVar[str] = 'metric'#
version: int | None = None#
class superduperdb.Model(identifier: str, object: t.Union[Artifact, t.Any], flatten: bool = False, output_schema: t.Optional[t.Union[Schema, dict]] = None, encoder: EncoderArg = None, preprocess: t.Union[t.Callable, Artifact, None] = None, postprocess: t.Union[t.Callable, Artifact, None] = None, collate_fn: t.Union[t.Callable, Artifact, None] = None, metrics: t.Sequence[t.Union[str, Metric, None]] = (), predict_method: t.Optional[str] = None, model_to_device_method: t.Optional[str] = None, batch_predict: bool = False, takes_context: bool = False, train_X: t.Optional[str] = None, train_y: t.Optional[str] = None, training_select: t.Union[Select, None] = None, metric_values: t.Optional[t.Dict] = <factory>, training_configuration: t.Union[str, _TrainingConfiguration, None] = None, model_update_kwargs: dict = <factory>, serializer: str = 'dill', device: str = 'cpu', preferred_devices: t.Union[None, t.Sequence[str]] = ('cuda', 'mps', 'cpu'), validation_sets: t.Optional[t.Sequence[t.Union[str, Dataset]]] = None, version: t.Optional[int] = None)[source]#

Bases: Component, Predictor

Model component which wraps a model to become serializable

Parameters:
  • identifier – Unique identifier of model

  • object – Model object, e.g. sklearn model, etc..

  • encoder – Encoder instance (optional)

  • flatten – Flatten the model outputs

  • output_schema – Output schema (mapping of encoders) (optional)

  • preprocess – Preprocess function (optional)

  • postprocess – Postprocess function (optional)

  • collate_fn – Collate function (optional)

  • metrics – Metrics to use (optional)

  • predict_method – The method to use for prediction (optional)

  • model_to_device_method – The method to transfer the model to a device

  • batch_predict – Whether to batch predict (optional)

  • takes_context – Whether the model takes context into account (optional)

  • train_X – The key of the input data to use for training (optional)

  • train_y – The key of the target data to use for training (optional)

  • training_select – The select to use for training (optional)

  • metric_values – The metric values (optional)

  • training_configuration – The training configuration (optional)

  • model_update_kwargs – The kwargs to use for model update (optional)

  • serializer – Serializer to store model to artifact store (optional)

  • device – The device to use (optional)

  • preferred_devices – The preferred devices to use (optional)

append_metrics(d: Dict[str, float]) None[source]#
artifact_attributes: t.ClassVar[t.Sequence[str]] = ['object']#
batch_predict: bool = False#
property child_components: Sequence[Tuple[str, str]]#
collate_fn: t.Union[t.Callable, Artifact, None] = None#
create_fit_job(X: str | Sequence[str], select: Select | None = None, y: str | None = None, **kwargs)[source]#
device: str = 'cpu'#
encoder: EncoderArg = None#
fit(X: t.Any, y: t.Any = None, configuration: t.Optional[_TrainingConfiguration] = None, data_prefetch: bool = False, db: t.Optional[Datalayer] = None, dependencies: t.Sequence[Job] = (), metrics: t.Optional[t.Sequence[Metric]] = None, select: t.Optional[Select] = None, validation_sets: t.Optional[t.Sequence[t.Union[str, Dataset]]] = None, **kwargs) t.Optional[Pipeline][source]#

Fit the model on the given data.

Parameters:
  • X – The key of the input data to use for training

  • y – The key of the target data to use for training

  • configuration – The training configuration (optional)

  • data_prefetch – Whether to prefetch the data (optional)

  • db – The datalayer (optional)

  • dependencies – The dependencies (optional)

  • metrics – The metrics to evaluate on (optional)

  • select – The select to use for training (optional)

  • validation_sets – The validation Dataset instances to use (optional)

flatten: bool = False#
identifier: str#
metric_values: t.Optional[t.Dict]#
metrics: t.Sequence[t.Union[str, Metric, None]] = ()#
model_to_device_method: t.Optional[str] = None#
model_update_kwargs: dict#
object: t.Union[Artifact, t.Any]#
on_load(db: Datalayer) None[source]#

Called when this component is loaded from the data store

Parameters:

db – the db that loaded the component

output_schema: t.Optional[t.Union[Schema, dict]] = None#
post_create(db: Datalayer) None[source]#

Called after the first time this component is created. Generally used if self.version is important in this logic.

Parameters:

db – the db that creates the component

postprocess: t.Union[t.Callable, Artifact, None] = None#
pre_create(db: Datalayer)[source]#

Called the first time this component is created

Parameters:

db – the db that creates the component

predict_method: t.Optional[str] = None#
preferred_devices: t.Union[None, t.Sequence[str]] = ('cuda', 'mps', 'cpu')#
preprocess: t.Union[t.Callable, Artifact, None] = None#
schedule_jobs(db: Datalayer, dependencies: t.Sequence[Job] = (), verbose: bool = False) t.Sequence[t.Any][source]#

Run the job for this listener

Parameters:
  • database – The db to process

  • dependencies – A sequence of dependencies,

  • verbose – If true, print more information

serializer: str = 'dill'#
takes_context: bool = False#
train_X: t.Optional[str] = None#
train_y: t.Optional[str] = None#
training_configuration: t.Union[str, _TrainingConfiguration, None] = None#
property training_keys: List#
training_select: t.Union[Select, None] = None#
type_id: t.ClassVar[str] = 'model'#
validate(db, validation_set: t.Union[Dataset, str], metrics: t.Sequence[Metric])[source]#
validation_sets: t.Optional[t.Sequence[t.Union[str, Dataset]]] = None#
version: t.Optional[int] = None#
class superduperdb.Schema(identifier: str, fields: Mapping[str, superduperdb.components.encoder.Encoder | str], version: int | None = None)[source]#

Bases: Component

decode(data: Mapping[str, Any]) Mapping[str, Any][source]#

Decode data using the schema’s encoders

Parameters:

data – data to decode

encode(data: Mapping[str, Any])[source]#

Encode data using the schema’s encoders

Parameters:

data – data to encode

property encoded_types#
property encoders#
fields: Mapping[str, Encoder | str]#
identifier: str#
pre_create(db) None[source]#

Called the first time this component is created

Parameters:

db – the db that creates the component

property raw#
property trivial#
type_id: ClassVar[str] = 'schema'#
version: int | None = None#
class superduperdb.Serializer(identifier: str, object: Type, version: int | None)[source]#

Bases: Component

identifier: str#
object: Type#
pre_create(db: Datalayer)[source]#

Called the first time this component is created

Parameters:

db – the db that creates the component

type_id: ClassVar[str] = 'serializer'#
version: int | None#
class superduperdb.VectorIndex(identifier: str, indexing_listener: ~superduperdb.components.listener.Listener | str, compatible_listener: None | ~superduperdb.components.listener.Listener | str = None, measure: ~superduperdb.vector_search.base.VectorIndexMeasureType = VectorIndexMeasureType.cosine, version: int | None = None, metric_values: ~typing.Dict | None = <factory>)[source]#

Bases: Component

A component carrying the information to apply a vector index to a DB instance

Parameters:
  • identifier – Unique string identifier of index

  • indexing_listener – Listener which is applied to created vectors

  • compatible_listener – Listener which is applied to vectors to be compared

  • measure – Measure to use for comparison

  • version – version of this index

  • metric_values – Metric values for this index

property child_components: Sequence[Tuple[str, str]]#
compatible_listener: None | Listener | str = None#
property dimensions: int#
get_nearest(like: Document, db: Any, id_field: str = '_id', outputs: Dict | None = None, ids: Sequence[str] | None = None, n: int = 100) Tuple[List[str], List[float]][source]#

Given a document, find the nearest results in this vector index, returned as two parallel lists of result IDs and scores

Parameters:
  • like – The document to compare against

  • db – The datastore to use

  • outputs – An optional dictionary

  • ids – A list of ids to match

  • n – Number of items to return

get_vector(like: Document, models: List[str], keys: List[str], db: Any = None, outputs: Dict | None = None)[source]#
identifier: str#
indexing_listener: Listener | str#
measure: VectorIndexMeasureType = 'cosine'#
metric_values: Dict | None#
property models_keys: Tuple[List[str], List[str]]#

Return a list of model and keys for each listener

on_load(db: Datalayer) None[source]#

Called when this component is loaded from the data store

Parameters:

db – the db that loaded the component

post_create(db: Datalayer) None[source]#

Called after the first time this component is created. Generally used if self.version is important in this logic.

Parameters:

db – the db that creates the component

type_id: ClassVar[str] = 'vector_index'#
version: int | None = None#
superduperdb.logging#

alias of Logging

superduperdb.superduper(item: Any | None = None, **kwargs) Any[source]#

Attempts to automatically wrap an item in a superduperdb component by using duck typing to recognize it.

Parameters:

item – A database or model

superduperdb.vector(shape)[source]#

Create an encoder for a vector (list of ints/ floats) of a given shape

Parameters:

shape – The shape of the vector