superduperdb.base package#

Submodules#

superduperdb.base.build module#

superduperdb.base.build.build(uri, mapping, type: str = 'data_backend')[source]#
superduperdb.base.build.build_artifact_store(artifact_store: str | None = None, databackend: BaseDataBackend | None = None)[source]#
superduperdb.base.build.build_compute(compute)[source]#
superduperdb.base.build.build_databackend(cfg, databackend=None)[source]#
superduperdb.base.build.build_datalayer(cfg=None, databackend=None, **kwargs) Datalayer[source]#

Build a Datalayer object as per db = superduper(db) from configuration.

Parameters:
  • cfg – Configuration to use. If None, use superduperdb.CFG.

  • databackend – Databacked to use. If None, use superduperdb.CFG.data_backend.

superduperdb.base.build.build_metadata(cfg, databackend: BaseDataBackend | None = None)[source]#

superduperdb.base.config module#

The classes in this file define the configuration variables for SuperDuperDB, which means that this file gets imported before alost anything else, and canot contain any other imports from this project.

class superduperdb.base.config.BaseConfig[source]#

Bases: object

force_set(name, value)[source]#

Forcefully setattr of BaseConfigJSONable instance

class superduperdb.base.config.BytesEncoding(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

BASE64 = 'Str'#
BYTES = 'Bytes'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

class superduperdb.base.config.CDCConfig(uri: str | None = None, strategy: superduperdb.base.config.PollingStrategy | superduperdb.base.config.LogBasedStrategy | NoneType = None)[source]#

Bases: BaseConfig

strategy: PollingStrategy | LogBasedStrategy | None = None#
uri: str | None = None#
class superduperdb.base.config.CDCStrategy(type: str)[source]#

Bases: object

Base CDC strategy dataclass

type: str#
class superduperdb.base.config.Cluster(compute: str = 'local', vector_search: str = 'in_memory', cdc: ~superduperdb.base.config.CDCConfig = <factory>, backfill_batch_size: int = 100)[source]#

Bases: BaseConfig

Describes a connection to distributed work via Dask

Parameters:
  • backfill_batch_size – The number of rows to backfill at a time for vector-search loading

  • compute – The URI for compute i.e ‘local’, ‘dask+tcp://localhost:8786’ “None”: Run all jobs in local mode i.e simple function call “local”: same as above “dask+thread”: Run all jobs on a local threaded dask cluster “dask+tcp://<host>:<port>”: Run all jobs on a remote dask cluster “ray://<host>:<port>”: Run all jobs on a remote ray cluster

  • vector_search – The URI for the vector search service “None”: Run vector search on local “http://<host>:<port>”: Connect a remote vector search service

  • cdc – The URI for the change data capture service (if “None” then no cdc assumed) “None”: Run cdc on local as a thread. “http://<host>:<port>”: Connect a remote cdc service

backfill_batch_size: int = 100#
cdc: CDCConfig#
compute: str = 'local'#
property vector_search_type#
class superduperdb.base.config.Config(data_backend: str = 'mongodb://superduper:superduper@localhost:27017/test_db', lance_home: str = '.superduperdb/vector_indices', artifact_store: str | None = None, metadata_store: str | None = None, cluster: ~superduperdb.base.config.Cluster = <factory>, retries: ~superduperdb.base.config.Retry = <factory>, downloads: ~superduperdb.base.config.Downloads = <factory>, fold_probability: float = 0.05, log_level: ~superduperdb.base.config.LogLevel = LogLevel.INFO, logging_type: ~superduperdb.base.config.LogType = LogType.SYSTEM, dot_env: str | None = None, bytes_encoding: ~superduperdb.base.config.BytesEncoding = BytesEncoding.BYTES)[source]#

Bases: BaseConfig

The data class containing all configurable superduperdb values

Parameters:
  • data_backend – The URI for the data backend

  • vector_search – The configuration for the vector search {‘in_memory’, ‘lance’}

  • artifact_store – The URI for the artifact store

  • metadata_store – The URI for the metadata store

  • cluster – Settings distributed computing and change data capture

  • retries – Settings for retrying failed operations

  • downloads – Settings for downloading files

  • fold_probability – The probability of validation fold

  • log_level – The severity level of the logs

  • logging_type – The type of logging to use

  • bytes_encoding – The encoding of bytes in the data backend

artifact_store: str | None = None#
bytes_encoding: BytesEncoding = 'Bytes'#
cluster: Cluster#
property comparables#

A dict of self excluding some defined attributes.

data_backend: str = 'mongodb://superduper:superduper@localhost:27017/test_db'#
dict()[source]#
dot_env: str | None = None#
downloads: Downloads#
fold_probability: float = 0.05#
force_set(name, value)[source]#

Brings immutable behaviour to CFG instance.

CAUTION: Only use it in development mode with caution, as this can bring unexpected behaviour.

property hybrid_storage#
lance_home: str = '.superduperdb/vector_indices'#
log_level: LogLevel = 'INFO'#
logging_type: LogType = 'SYSTEM'#
match(cfg: Dict)[source]#

Match the target cfg dict with self comparables dict.

metadata_store: str | None = None#
retries: Retry#
class superduperdb.base.config.Downloads(folder: Optional[str] = None, n_workers: int = 0, headers: Dict = <factory>, timeout: Optional[int] = None)[source]#

Bases: BaseConfig

folder: str | None = None#
headers: Dict#
n_workers: int = 0#
timeout: int | None = None#
class superduperdb.base.config.LogBasedStrategy(type: str = 'logbased', resume_token: Dict[str, str] | None = None)[source]#

Bases: CDCStrategy

resume_token: Dict[str, str] | None = None#
type: str = 'logbased'#
class superduperdb.base.config.LogLevel(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

Enumerate log severity level

DEBUG = 'DEBUG'#
ERROR = 'ERROR'#
INFO = 'INFO'#
SUCCESS = 'SUCCESS'#
WARN = 'WARN'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

class superduperdb.base.config.LogType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

Enumerate the standard logs

LOKI = 'LOKI'#
SYSTEM = 'SYSTEM'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

class superduperdb.base.config.PollingStrategy(type: 'str' = 'incremental', auto_increment_field: str | None = None, frequency: float = 3600)[source]#

Bases: CDCStrategy

auto_increment_field: str | None = None#
frequency: float = 3600#
type: str = 'incremental'#
class superduperdb.base.config.Retry(stop_after_attempt: int = 2, wait_max: float = 10.0, wait_min: float = 4.0, wait_multiplier: float = 1.0)[source]#

Bases: BaseConfig

Describes how to retry using the tenacity library

Parameters:
  • stop_after_attempt – The number of attempts to make

  • wait_max – The maximum time to wait between attempts

  • wait_min – The minimum time to wait between attempts

  • wait_multiplier – The multiplier for the wait time between attempts

stop_after_attempt: int = 2#
wait_max: float = 10.0#
wait_min: float = 4.0#
wait_multiplier: float = 1.0#

superduperdb.base.config_dicts module#

Operations on dictionaries used to fill and combine config files and environment variables

superduperdb.base.config_dicts.combine_configs(dicts: Sequence[Dict[str, object]]) Dict[str, object][source]#
superduperdb.base.config_dicts.environ_to_config_dict(prefix: str, parent: ~typing.Dict[str, str], environ: ~typing.Dict[str, str] | None = None, err: ~typing.TextIO | None = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>, fail: bool = False)[source]#

superduperdb.base.configs module#

class superduperdb.base.configs.ConfigSettings(cls: Type, default_files: Sequence[Path] | str, prefix: str, environ: Dict | None = None, base_config: Config | None = None)[source]#

Bases: object

A class that reads a Pydantic class from config files and environment variables.

Parameters:
  • cls – The Pydantic class to read.

  • default_files – The default config files to read.

  • prefix – The prefix to use for environment variables.

  • environ – The environment variables to read from.

base_config: Config | None = None#
cls: Type#
property config: Any#

Read a Pydantic class

default_files: Sequence[Path] | str#
environ: Dict | None = None#
prefix: str#
superduperdb.base.configs.build_config(cfg: Config | None = None) Config[source]#

Build the config object from the environment variables and config files.

superduperdb.base.cursor module#

class superduperdb.base.cursor.SuperDuperCursor(raw_cursor: Any, id_field: str, db: Datalayer | None = None, scores: Dict[str, float] | None = None, reference: bool = False, _it: int = 0)[source]#

Bases: object

A cursor that wraps a cursor and returns Document wrapping a dict including Encodable objects.

Parameters:
  • raw_cursor – the cursor to wrap

  • id_field – the field to use as the document id

  • encoders – a dict of encoders to use to decode the documents

  • scores – a dict of scores to add to the documents

cursor_next()[source]#
db: Datalayer | None = None#
id_field: str#
limit(*args, **kwargs) SuperDuperCursor[source]#

Limit the number of results returned by the cursor.

next()#
raw_cursor: Any#
reference: bool = False#
scores: Dict[str, float] | None = None#

superduperdb.base.datalayer module#

class superduperdb.base.datalayer.Datalayer(databackend: ~superduperdb.backends.base.data_backend.BaseDataBackend, metadata: ~superduperdb.backends.base.metadata.MetaDataStore, artifact_store: ~superduperdb.backends.base.artifact.ArtifactStore, compute: ~superduperdb.backends.base.compute.ComputeBackend = <superduperdb.backends.local.compute.LocalComputeBackend object>)[source]#

Bases: object

Base database connector for SuperDuperDB

Parameters:
  • databackend – object containing connection to Datastore

  • metadata – object containing connection to Metadatastore

  • artifact_store – object containing connection to Artifactstore

  • compute – object containing connection to ComputeBackend

__init__(databackend: ~superduperdb.backends.base.data_backend.BaseDataBackend, metadata: ~superduperdb.backends.base.metadata.MetaDataStore, artifact_store: ~superduperdb.backends.base.artifact.ArtifactStore, compute: ~superduperdb.backends.base.compute.ComputeBackend = <superduperdb.backends.local.compute.LocalComputeBackend object>)[source]#
Parameters:
  • databackend – object containing connection to Datastore

  • metadata – object containing connection to Metadatastore

  • artifact_store – object containing connection to Artifactstore

  • compute – object containing connection to ComputeBackend

_add_component_to_cache(component: Component)[source]#

Add component to cache when it is added to the db. Avoiding the need to load it from the db again.

add(object: Component | Sequence[Any] | Any, dependencies: Sequence[Job] = ())[source]#

Add functionality in the form of components. Components are stored in the configured artifact store, and linked to the primary db through the metadata.

Parameters:
  • object – Object to be stored

  • dependencies – list of jobs which should execute before component init begins

async apredict(model_name: str, input: Document | Any, context_select: Select | None = None, context_key: str = '_base', **kwargs)[source]#

Apply model to input using asyncio.

Parameters:
  • model_name – model identifier

  • input – input to be passed to the model. Must be possible to encode with registered datatypes

  • context_select – select query object to provide context

  • context_key – key to use to extract context from context_select

close()[source]#

Gracefully shutdown the Datalayer

delete(delete: Delete, refresh: bool = True) Any[source]#

Delete data.

Parameters:

delete – delete query object

drop(force: bool = False)[source]#

Drop all data, artifacts and metadata

execute(query: Select | Delete | Update | Insert | str, *args, **kwargs) SuperDuperCursor | Any | Tuple[Any, Any | None][source]#

Execute a query on the db.

Parameters:

query – select, insert, delete, update,

get_compute()[source]#
initialize_vector_searcher(identifier, searcher_type: str | None = None, backfill=False) BaseVectorSearcher | None[source]#
insert(insert: Insert, refresh: bool = True, datatypes: Sequence[DataType] = ()) Tuple[Any, Any | None][source]#

Insert data.

Parameters:

insert – insert query object

load(type_id: str, identifier: str, version: int | None = None, allow_hidden: bool = False, info_only: bool = False) Component | Dict[str, Any][source]#

Load component using uniquely identifying information.

Parameters:
  • type_id – type_id of component to remove [‘datatype’, ‘model’, ‘listener’, …]

  • identifier – identifier of component (see container.base.Component)

  • version – [optional] numerical version

  • allow_hidden – toggle to True to allow loading of deprecated components

  • info_only – toggle to True to return metadata only

predict(model_name: str, input: Document | Any, context_select: str | Select | None = None, context_key: str | None = None, **kwargs) Tuple[Document, List[Document]][source]#

Apply model to input.

Parameters:
  • model_name – model identifier

  • input – input to be passed to the model. Must be possible to encode with registered datatypes

  • context_select – select query object to provide context

  • context_key – key to use to extract context from context_select

rebuild(cfg=None)[source]#
refresh_after_delete(query: Delete, ids: Sequence[str], verbose: bool = False)[source]#

Trigger cleanup jobs after data deletion.

Parameters:
  • query – Select or Update which reduces scope of computations

  • ids – ids which reduce scopy of computations

  • verbose – Toggle to True to get more output

refresh_after_update_or_insert(query: Insert | Select | Update, ids: Sequence[str], verbose: bool = False)[source]#

Trigger computation jobs after data insertion.

Parameters:
  • query – Select or Update which reduces scope of computations

  • ids – ids which reduce scopy of computations

  • verbose – Toggle to True to get more output

remove(type_id: str, identifier: str, version: int | None = None, force: bool = False)[source]#

Remove component (version: optional)

Parameters:
  • type_id – type_id of component to remove [‘datatype’, ‘model’, ‘listener’, ‘training_configuration’, ‘vector_index’]

  • identifier – identifier of component (see container.base.Component)

  • version – [optional] numerical version to remove

  • force – force skip confirmation (use with caution)

replace(object: Any, upsert: bool = False)[source]#

(Use-with caution!!) Replace a model in artifact store with updated object. :param object: object to replace :param upsert: toggle to True to enable even if object doesn’t exist yet

select(select: Select, reference: bool = True) SuperDuperCursor[source]#

Select data.

Parameters:

select – select query object

select_nearest(like: Dict | Document, vector_index: str, ids: Sequence[str] | None = None, outputs: Document | None = None, n: int = 100) Tuple[List[str], List[float]][source]#
property server_mode#
set_compute(new: ComputeBackend)[source]#

Set a new compute engine at runtime. Use it only if you know what you do. The standard procedure is to set compute engine during initialization.

show(type_id: str, identifier: str | None = None, version: int | None = None)[source]#

Show available functionality which has been added using self.add. If version is specified, then print full metadata

Parameters:
  • type_id – type_id of component to show [‘datatype’, ‘model’, ‘listener’, ‘learning_task’, ‘training_configuration’, ‘metric’, ‘vector_index’, ‘job’]

  • identifier – identifying string to component

  • version – (optional) numerical version - specify for full metadata

type_id_to_cache_mapping = {'datatype': 'datatypes', 'metric': 'metrics', 'model': 'models', 'vector_index': 'vector_indices'}#
update(update: Update, refresh: bool = True) Any[source]#

Update data.

Parameters:

update – update query object

validate(identifier: str, type_id: str, validation_set: str, metrics: Sequence[str])[source]#

Evaluate quality of component, using Component.validate, if implemented.

Parameters:
  • identifier – identifier of semantic index

  • type_id – type_id of component

  • validation_set – validation dataset on which to validate

  • metrics – metric functions to compute

class superduperdb.base.datalayer.LoadDict(database: superduperdb.base.datalayer.Datalayer, field: str | None = None, callable: Callable | None = None)[source]#

Bases: dict

callable: Callable | None = None#
database: Datalayer#
field: str | None = None#

superduperdb.base.document module#

class superduperdb.base.document.Document[source]#

Bases: MongoStyleDict

A wrapper around an instance of dict or a Encodable which may be used to dump that resource to a mix of json-able content, ids and bytes

Parameters:

content – The content to wrap

static decode(r: Dict, db, bytes_encoding: BytesEncoding | None = None, reference: bool = False) Any[source]#
encode(schema: Schema | None = None, leaf_types_to_keep: Sequence[Type] = (), bytes_encoding: BytesEncoding | None = None) Tuple[dict, List[Leaf]][source]#

Make a copy of the content with all the Leaves encoded

get_leaves(leaf_type: str | None = None)[source]#
outputs(key: str, model: str, version: int | None = None) Any[source]#

Get document ouputs on key from model

Parameters:
  • key – Document key to get outputs from.

  • model – Model name to get outputs from.

set_variables(db, **kwargs) Document[source]#
unpack() Any[source]#

Returns the content, but with any encodables replacecs by their contents

property variables: List[str]#
class superduperdb.base.document.Reference(identifier: str, leaf_type: str)[source]#

Bases: Serializable

identifier: str#
leaf_type: str#

superduperdb.base.exceptions module#

exception superduperdb.base.exceptions.BaseException(msg)[source]#

Bases: Exception

BaseException which logs a message after exception

exception superduperdb.base.exceptions.ComponentException(msg)[source]#

Bases: BaseException

exception superduperdb.base.exceptions.ComponentInUseError[source]#

Bases: Exception

exception superduperdb.base.exceptions.ComponentInUseWarning[source]#

Bases: Warning

exception superduperdb.base.exceptions.DatabackendException(msg)[source]#

Bases: BaseException

exception superduperdb.base.exceptions.MetadataException(msg)[source]#

Bases: BaseException

exception superduperdb.base.exceptions.QueryException(msg)[source]#

Bases: BaseException

exception superduperdb.base.exceptions.ServiceRequestException(msg)[source]#

Bases: BaseException

superduperdb.base.leaf module#

class superduperdb.base.leaf.Leaf[source]#

Bases: ABC

abstract classmethod decode(r, db)[source]#
abstract encode(bytes_encoding: BytesEncoding | None = None, leaf_types_to_keep: Sequence = ())[source]#
abstract property unique_id#

superduperdb.base.logger module#

class superduperdb.base.logger.Logging[source]#

Bases: object

static debug(msg: str, *args)#
static error(msg: str, *args)#
static exception(msg: str, *args, e=None)#
fmt = '<green> {time:YYYY-MMM-DD HH:mm:ss.SS}</green>| <level>{level: <8}</level> | <cyan>{extra[hostname]: <8}</cyan>| <cyan>{name}</cyan>:<cyan>{line: <4}</cyan> | <level>{message}</level>'#
static info(msg: str, *args)#
static multikey_debug(msg: str, *args)[source]#
static multikey_error(msg: str, *args)[source]#
static multikey_exception(msg: str, *args, e=None)[source]#
static multikey_info(msg: str, *args)[source]#
static multikey_success(msg: str, *args)[source]#
static multikey_warn(msg: str, *args)[source]#
static success(msg: str, *args)#
static warn(msg: str, *args)#

superduperdb.base.serializable module#

class superduperdb.base.serializable.Serializable[source]#

Bases: Leaf

Base class for serializable objects. This class is used to serialize and deserialize objects to and from JSON + Artifact instances.

classmethod decode(r, db: Any | None = None, reference: bool = False)[source]#
dict()[source]#
encode(bytes_encoding: BytesEncoding | None = None, leaf_types_to_keep: Sequence = ())[source]#
set_post_init: ClassVar[Sequence] = ()#
set_variables(db, **kwargs) Serializable[source]#
property unique_id#
property variables: List[Variable]#
class superduperdb.base.serializable.Variable(value: Any, setter_callback: dataclasses.InitVar[Optional[Callable]] = None)[source]#

Bases: Serializable

Mechanism for allowing “free variables” in a serializable object. The idea is to allow a variable to be set at runtime, rather than at object creation time.

Parameters:
  • value – The name of the variable to be set at runtime.

  • setter_callback – A callback function that takes the value, datalayer and kwargs as input and returns the formatted variable.

set(db, **kwargs)[source]#

Get the intended value from the values of the global variables.

>>> Variable('number').set(db, number=1.5, other='test')
1.5
Parameters:
  • db – The datalayer instance.

  • kwargs – Variables to be used in the setter_callback or as formatting variables.

setter_callback: dataclasses.InitVar[Optional[Callable]] = None#
value: Any#
exception superduperdb.base.serializable.VariableError[source]#

Bases: Exception

superduperdb.base.superduper module#

superduperdb.base.superduper.superduper(item: Any | None = None, **kwargs) Any[source]#

Attempts to automatically wrap an item in a superduperdb component by using duck typing to recognize it.

Parameters:

item – A database or model

Module contents#