superduperdb.backends.mongodb package#

Subpackages#

Submodules#

superduperdb.backends.mongodb.artifacts module#

class superduperdb.backends.mongodb.artifacts.MongoArtifactStore(conn, name: str)[source]#

Bases: ArtifactStore

Artifact store for MongoDB.

Parameters:
  • conn – MongoDB client connection

  • name – Name of database to host filesystem

disconnect()[source]#

Disconnect the client

drop(force: bool = False)[source]#

Drop the artifact store.

Parameters:

force – If True, don’t ask for confirmation

url()[source]#

Artifact store connection url

superduperdb.backends.mongodb.data_backend module#

class superduperdb.backends.mongodb.data_backend.MongoDataBackend(conn: MongoClient, name: str)[source]#

Bases: BaseDataBackend

Data backend for MongoDB.

Parameters:
  • conn – MongoDB client connection

  • name – Name of database to host filesystem

build_artifact_store()[source]#

Build a default artifact store based on current connection.

build_metadata()[source]#

Build a default metadata store based on current connection.

property db#
delete_vector_index(vector_index)[source]#

Delete a vector index in the data backend if an Atlas deployment.

Parameters:

vector_index – vector index to delete

disconnect()[source]#

Disconnect the client

drop(force: bool = False)[source]#

Drop the databackend.

exists(table_or_collection, id, key)[source]#
get_table_or_collection(identifier)[source]#
id_field = '_id'#
list_vector_indexes()[source]#
set_content_bytes(r, key, bytes_)[source]#
unset_outputs(info: Dict)[source]#
url()[source]#

Databackend connection url

superduperdb.backends.mongodb.metadata module#

class superduperdb.backends.mongodb.metadata.MongoMetaDataStore(conn: Any, name: str | None = None)[source]#

Bases: MetaDataStore

Metadata store for MongoDB.

Parameters:
  • conn – MongoDB client connection

  • name – Name of database to host filesystem

component_has_parents(type_id: str, identifier: str) int[source]#
component_version_has_parents(type_id: str, identifier: str, version: int) int[source]#

Check if a component version has parents.

Parameters:
  • type_id – type of component

  • identifier – identifier of component

  • version – version of component

create_component(info: Dict) InsertOneResult[source]#

Create a component in the metadata store.

Parameters:

info – dictionary containing information about the component.

create_job(info: Dict) InsertOneResult[source]#

Create a job in the metadata store. :param info: dictionary containing information about the job.

create_metadata(key: str, value: str)[source]#
create_parent_child(parent: str, child: str) None[source]#

Create a parent-child relationship between two components.

Parameters:
  • parent – parent component

  • child – child component

delete_component_version(type_id: str, identifier: str, version: int) DeleteResult[source]#

Delete a component version from the metadata store.

Parameters:
  • type_id – type of component

  • identifier – identifier of component

  • version – version of component

disconnect()[source]#

Disconnect the client

drop(force: bool = False)[source]#

Drop the metadata store.

Parameters:

force – whether to force the drop (without confirmation)

get_component_version_children(unique_id: str)[source]#
get_component_version_parents(unique_id: str) List[str][source]#

Get the parents of a component version.

Parameters:

unique_id – unique identifier of component version

get_job(identifier: str)[source]#

Get a job from the metadata store.

Parameters:

job_id – job identifier

get_latest_version(type_id: str, identifier: str, allow_hidden: bool = False) int[source]#

Get the latest version of a component.

Parameters:
  • type_id – type of component

  • identifier – identifier of component

  • allow_hidden – whether to allow hidden components

get_metadata(key: str)[source]#

Get metadata from the metadata store.

Parameters:

key – key of metadata

get_parent_child_relations()[source]#
hide_component_version(type_id: str, identifier: str, version: int) None[source]#

Hide a component version.

Parameters:
  • type_id – type of component

  • identifier – identifier of component

  • version – version of component

list_components_in_scope(scope: str)[source]#
show_component_versions(type_id: str, identifier: str) List[Any | int][source]#

Show all versions of a component in the metadata store.

Parameters:
  • type_id – type of component

  • identifier – identifier of component

show_components(type_id: str, **kwargs) List[Any | str][source]#

Show all components in the metadata store.

Parameters:
  • type_id – type of component

  • **kwargs

    additional arguments

show_job(job_id: str)[source]#
show_jobs(status=None)[source]#

Show all jobs in the metadata store.

update_job(identifier: str, key: str, value: Any) UpdateResult[source]#

Update a job in the metadata store.

Parameters:
  • job_id – job identifier

  • key – key to be updated

  • value – value to be updated

update_metadata(key: str, value: str)[source]#

Update metadata in the metadata store.

Parameters:
  • key – Key of metadata

  • value – Value of metadata

url()[source]#

Metadata store connection url

write_output_to_job(identifier, msg, stream)[source]#

Write output to a job in the metadata store.

Parameters:
  • identifier – identifier of job

  • msg – message to be written

  • stream – stream to be written to

superduperdb.backends.mongodb.query module#

class superduperdb.backends.mongodb.query.Aggregate(table_or_collection: ~superduperdb.backends.base.query.TableOrCollection, vector_index: str | None = None, args: ~typing.Tuple[~typing.Any, ...] = <factory>, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Bases: Select

Wrapper around pymongo.Collection.aggregate

Parameters:
  • table_or_collection – The table or collection to perform the query on

  • vector_index – The vector index to use

  • args – Positional arguments to pymongo.Collection.aggregate

  • kwargs – Named arguments to pymongo.Collection.aggregate

add_fold()[source]#
args: Tuple[Any, ...]#
execute(db, reference=False)[source]#

Execute the query on the DB instance.

property id_field#
kwargs: Dict[str, Any]#
property select_ids#
select_ids_of_missing_outputs(key: str, model: str, version: int)[source]#
select_single_id(id: str)[source]#
property select_table#
select_using_ids()[source]#
table_or_collection: TableOrCollection#
vector_index: str | None = None#
class superduperdb.backends.mongodb.query.ChangeStream(collection: ~superduperdb.backends.mongodb.query.Collection, args: ~typing.Sequence = <factory>, kwargs: ~typing.Dict = <factory>)[source]#

Bases: object

Request a stream of changes from a db

Parameters:
  • collection – The collection to perform the query on

  • args – Positional query arguments to pymongo.Collection.watch

  • kwargs – Named query arguments to pymongo.Collection.watch

args: Sequence#
collection: Collection#
kwargs: Dict#
class superduperdb.backends.mongodb.query.Collection(identifier: str | superduperdb.base.serializable.Variable)[source]#

Bases: TableOrCollection

aggregate(*args, vector_index: str | None = None, **kwargs) Aggregate[source]#
change_stream(*args, **kwargs)[source]#
delete_one(*args, **kwargs)[source]#
get_table(db)[source]#
insert(*args, **kwargs)[source]#
insert_many(*args, **kwargs)[source]#
insert_one(document, *args, **kwargs)[source]#
model_update(db, ids: List[Any], key: str, model: str, version: int, outputs: Sequence[Any], flatten: bool = False, **kwargs)[source]#
primary_id: ClassVar[str] = '_id'#
query_components: ClassVar[Dict] = {'find': <class 'superduperdb.backends.mongodb.query.Find'>, 'find_one': <class 'superduperdb.backends.mongodb.query.FindOne'>}#
replace_one(filter, replacement, *args, **kwargs)[source]#
type_id: ClassVar[str] = 'collection'#
update_many(filter, update, *args, **kwargs)[source]#
update_one(filter, update, *args, **kwargs)[source]#
class superduperdb.backends.mongodb.query.Find(name: str, type: str = QueryType.ATTR, args: ~typing.Sequence = <factory>, kwargs: ~typing.Dict = <factory>, output_fields: ~typing.Dict[str, str] | None = None)[source]#

Bases: QueryComponent

Wrapper around pymongo.Collection.find

Parameters:
  • args – Positional arguments to pymongo.Collection.find

  • kwargs – Named arguments to pymongo.Collection.find

add_fold(fold: str)[source]#
output_fields: Dict[str, str] | None = None#
outputs(**kwargs)[source]#

Join the query with the outputs for a table.

Parameters:

**kwargs

key=model/version or key=model pairs

property select_ids#
select_ids_of_missing_outputs(key: str, model: str, version: int)[source]#
select_single_id(id)[source]#
select_using_ids(ids)[source]#
class superduperdb.backends.mongodb.query.FindOne(name: str, type: str = QueryType.ATTR, args: ~typing.Sequence = <factory>, kwargs: ~typing.Dict = <factory>)[source]#

Bases: QueryComponent

Wrapper around pymongo.Collection.find_one

Parameters:
  • args – Positional arguments to pymongo.Collection.find_one

  • kwargs – Named arguments to pymongo.Collection.find_one

add_fold(fold: str)[source]#

Modify the query to add a fold to filter {‘_fold’: fold}

Parameters:

fold – The fold to add

select_using_ids(ids)[source]#
class superduperdb.backends.mongodb.query.MongoCompoundSelect(table_or_collection: 'TableOrCollection', pre_like: Optional[ForwardRef('Like')] = None, post_like: Optional[ForwardRef('Like')] = None, query_linker: Optional[ForwardRef('QueryLinker')] = None)[source]#

Bases: CompoundSelect

change_stream(*args, **kwargs)[source]#
check_exists(db)[source]#
download_update(db, id: str, key: str, bytes: bytearray) None[source]#

Update to set the content of key in the document id.

Parameters:
  • db – The db to query

  • id – The id to filter on

  • key

  • bytes – The bytes to update

execute(db, reference=False)[source]#

Execute the compound query on the DB instance.

Parameters:

db – The DB instance to use

property output_fields#
outputs(**kwargs)[source]#

This method returns a query which joins a query with the outputs for a table.

Parameters:

model – The model identifier for which to get the outputs

>>> q = Collection(...).find(...).outputs('key', 'model_name')
property select_table#
class superduperdb.backends.mongodb.query.MongoDelete(table_or_collection: 'TableOrCollection', args: Sequence = <factory>, kwargs: Dict = <factory>, one: bool = False)[source]#

Bases: Delete

property collection#
execute(db)[source]#
one: bool = False#
class superduperdb.backends.mongodb.query.MongoInsert(table_or_collection: 'TableOrCollection', documents: Sequence[ForwardRef('Document')] = <factory>, verbose: bool = True, kwargs: Dict = <factory>, one: bool = False)[source]#

Bases: Insert

execute(db)[source]#

Insert the data.

Parameters:

parent – The parent instance to use for insertion

one: bool = False#
property select_table#
class superduperdb.backends.mongodb.query.MongoQueryLinker(table_or_collection: 'TableOrCollection', members: List = <factory>)[source]#

Bases: QueryLinker

add_fold(fold)[source]#
execute(db)[source]#
property output_fields#
outputs(**kwargs)[source]#
property query_components#
property select_ids#
select_single_id(id)[source]#
select_using_ids(ids)[source]#
class superduperdb.backends.mongodb.query.MongoReplaceOne(table_or_collection: 'TableOrCollection', replacement: superduperdb.base.document.Document, filter: Dict, args: Sequence = <factory>, kwargs: Dict = <factory>)[source]#

Bases: Update

args: Sequence#
property collection#
execute(db)[source]#
filter: Dict#
kwargs: Dict#
replacement: Document#
property select_table#
class superduperdb.backends.mongodb.query.MongoUpdate(table_or_collection: 'TableOrCollection', update: superduperdb.base.document.Document, filter: Dict, one: bool = False, args: Sequence = <factory>, kwargs: Dict = <factory>)[source]#

Bases: Update

args: Sequence#
execute(db)[source]#
filter: Dict#
kwargs: Dict#
one: bool = False#
property select_table#
update: Document#

Module contents#

class superduperdb.backends.mongodb.Collection(identifier: str | superduperdb.base.serializable.Variable)[source]#

Bases: TableOrCollection

aggregate(*args, vector_index: str | None = None, **kwargs) Aggregate[source]#
change_stream(*args, **kwargs)[source]#
delete_one(*args, **kwargs)[source]#
get_table(db)[source]#
identifier: str | Variable#
insert(*args, **kwargs)[source]#
insert_many(*args, **kwargs)[source]#
insert_one(document, *args, **kwargs)[source]#
model_update(db, ids: List[Any], key: str, model: str, version: int, outputs: Sequence[Any], flatten: bool = False, **kwargs)[source]#
primary_id: ClassVar[str] = '_id'#
query_components: ClassVar[Dict] = {'find': <class 'superduperdb.backends.mongodb.query.Find'>, 'find_one': <class 'superduperdb.backends.mongodb.query.FindOne'>}#
replace_one(filter, replacement, *args, **kwargs)[source]#
type_id: ClassVar[str] = 'collection'#
update_many(filter, update, *args, **kwargs)[source]#
update_one(filter, update, *args, **kwargs)[source]#