superduperdb.backends.mongodb.cdc package#

Submodules#

superduperdb.backends.mongodb.cdc.base module#

class superduperdb.backends.mongodb.cdc.base.CachedTokens[source]#

Bases: object

append(token: Dict[str, str]) None[source]#
load() Sequence[Dict[str, str]][source]#
separate = '\n'#
token_path = '.superduperdb/.cdc.tokens'#
class superduperdb.backends.mongodb.cdc.base.MongoDBPacket(ids: List[ObjectId | str], query: Serializable | None = None, event_type: DBEvent = DBEvent.insert)[source]#

Bases: Packet

A base packet to represent message in task queue.

event_type: DBEvent = 'insert'#
ids: List[ObjectId | str]#
query: Serializable | None = None#
class superduperdb.backends.mongodb.cdc.base.ObjectId(oid: str | ObjectId | bytes | None = None)[source]#

Bases: ObjectId

Initialize a new ObjectId.

An ObjectId is a 12-byte unique identifier consisting of:

  • a 4-byte value representing the seconds since the Unix epoch,

  • a 5-byte random value,

  • a 3-byte counter, starting with a random value.

By default, ObjectId() creates a new unique identifier. The optional parameter oid can be an ObjectId, or any 12 bytes.

For example, the 12 bytes b’foo-bar-quux’ do not follow the ObjectId specification but they are acceptable input:

>>> ObjectId(b'foo-bar-quux')
ObjectId('666f6f2d6261722d71757578')

oid can also be a str of 24 hex digits:

>>> ObjectId('0123456789ab0123456789ab')
ObjectId('0123456789ab0123456789ab')

Raises InvalidId if oid is not 12 bytes nor 24 hex digits, or TypeError if oid is not an accepted type.

Parameters:
  • oid (optional): a valid ObjectId.

See also

The MongoDB documentation on ObjectIds.

Changed in version 3.8: ObjectId now implements the ObjectID specification version 0.2.

classmethod validate(v)[source]#

superduperdb.backends.mongodb.cdc.listener module#

class superduperdb.backends.mongodb.cdc.listener.CDCKeys(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

A enum to represent mongo change document keys.

_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

deleted_document_data_key = 'documentKey'#
document_data_key = 'fullDocument'#
document_key = 'documentKey'#
operation_type = 'operationType'#
update_descriptions_key = 'updateDescription'#
update_field_key = 'updatedFields'#
class superduperdb.backends.mongodb.cdc.listener.MongoChangePipeline(matching_operations: ~typing.Sequence[str] = <factory>)[source]#

Bases: object

MongoChangePipeline is a class to represent listen pipeline in mongodb watch api.

build_matching() Sequence[Dict][source]#

A helper fxn to build a listen pipeline for mongo watch api.

Parameters:

matching_operations – A list of operations to watch.

matching_operations: Sequence[str]#
validate()[source]#
class superduperdb.backends.mongodb.cdc.listener.MongoDatabaseListener(db: Datalayer, on: Collection, stop_event: Event, identifier: str = '', timeout: float | None = None, resume_token: Dict[str, str] | None = None)[source]#

Bases: BaseDatabaseListener

It is a class which helps capture data from mongodb database and handle it accordingly.

This class accepts options and db instance from user and starts a scheduler which could schedule a listening service to listen change stream.

This class builds a workflow graph on each change observed.

__init__.

Parameters:
  • db – It is a superduperdb instance.

  • on – It is used to define a Collection on which CDC would be performed.

  • stop_event – A threading event flag to notify for stoppage.

  • identifier – A identifier to represent the listener service.

  • resume_token – A resume token is a token used to resume

the change stream in mongo.

DEFAULT_ID: str = '_id'#
EXCLUSION_KEYS: Sequence[str] = ['_id']#
IDENTITY_SEP: str = '/'#
__init__(db: Datalayer, on: Collection, stop_event: Event, identifier: str = '', timeout: float | None = None, resume_token: Dict[str, str] | None = None)[source]#

__init__.

Parameters:
  • db – It is a superduperdb instance.

  • on – It is used to define a Collection on which CDC would be performed.

  • stop_event – A threading event flag to notify for stoppage.

  • identifier – A identifier to represent the listener service.

  • resume_token – A resume token is a token used to resume

the change stream in mongo.

_get_reference_id(document: Dict) str | None[source]#

_get_reference_id.

Parameters:

document

static _get_stream_pipeline(change: str) Sequence[Any] | None[source]#

_get_stream_pipeline.

Parameters:

change – change can be a prebuilt listen pipeline like

‘generic’ or user Defined listen pipeline.

check_if_taskgraph_change(change: Dict) bool[source]#

A helper method to check if the cdc change is done by taskgraph nodes.

dump_token(change: Dict) None[source]#

dump_token. A helper utility to dump resume token from the changed document.

Parameters:

change

listen(change_pipeline: str | Sequence[Dict] | None = None) None[source]#

Primary fxn to initiate listening of a database on the collection with defined change_pipeline by the user.

Parameters:

change_pipeline – A mongo listen pipeline defined by the user

for more fine grained listening.

next_cdc(stream: CollectionChangeStream) None[source]#

Get the next stream of change observed on the given Collection.

on_create(ids: Sequence, db: Datalayer, collection: Collection) None[source]#

on_create. A helper on create event handler which handles inserted document in the change stream. It basically extracts the change document and build the taskflow graph to execute.

Parameters:
  • ids – Changed document ids.

  • db – a superduperdb instance.

  • collection – The collection on which change was observed.

on_delete(ids: Sequence, db: Datalayer, collection: Collection) None[source]#

on_delete.

A helper on delete event handler which handles deleted document in the change stream. It basically extracts the change document and build the taskflow graph to execute.

Parameters:
  • ids – Changed document ids.

  • db – a superduperdb instance.

  • collection – The collection on which change was observed.

on_update(ids: Sequence, db: Datalayer, collection: Collection) None[source]#

on_update.

A helper on update event handler which handles updated document in the change stream. It basically extracts the change document and build the taskflow graph to execute.

Parameters:
  • ids – Changed document ids.

  • db – a superduperdb instance.

  • collection – The collection on which change was observed.

resume_tokens() Sequence[Dict[str, str]][source]#

Get the resume tokens from the change stream.

running() bool[source]#
set_change_pipeline(change_pipeline: str | Sequence[Dict] | None) None[source]#

Set the change pipeline for the listener.

setup_cdc() CollectionChangeStream[source]#

Setup cdc change stream from user provided

stop() None[source]#

Stop listening cdc changes. This stops the corresponding services as well.

Module contents#