superduperdb.cdc package#

Submodules#

superduperdb.cdc.app module#

superduperdb.cdc.cdc module#

Change Data Capture (CDC) is a mechanism used in database systems to track and capture changes made to a table or collection in real-time. It allows applications to stay up-to-date with the latest changes in the database and perform various tasks, such as data synchronization, auditing, or data integration. The ChangeDataCapture class is designed to handle CDC tasksfor a specified table/collection in a database.

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection,a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

ref: https://www.mongodb.com/docs/manual/changeStreams/

Use this module like this::

db = any_arbitary_database.connect(…) db = superduper(db) db.cdc.start() # or db.cdc.listen(on=Collection(‘test_collection’))

class superduperdb.cdc.cdc.BaseDatabaseListener(db: Datalayer, on: Table | TableOrCollection, stop_event: Event, identifier: str = '', timeout: float | None = None)[source]#

Bases: ABC

A Base class which defines basic functions to implement.

IDENTITY_SEP: str = '/'#
Packet: Packet#
classmethod _build_identifier(identifiers) str[source]#

_build_identifier.

Parameters:

identifiers – list of identifiers.

Return type:

str

create_event(ids: Sequence, db: Datalayer, table_or_collection: Table | TableOrCollection, event: DBEvent)[source]#

A helper to create packet based on the event type and put it on the cdc queue

Parameters:
  • change – The changed document.

  • db – a superduperdb instance.

  • table_or_collection – The collection on which change was observed.

  • event – CDC event type

event_handler(ids: Sequence, event: DBEvent) None[source]#

event_handler. A helper fxn to handle incoming changes from change stream on a collection.

Parameters:
  • ids – Changed document ids

  • event – CDC event

property identity: str#
info() Dict[source]#

Get info on the current state of listener.

abstract listen()[source]#
abstract next_cdc(stream: CollectionChangeStream) None[source]#
abstract on_create(*args, **kwargs)[source]#
abstract on_delete(*args, **kwargs)[source]#
abstract on_update(*args, **kwargs)[source]#
abstract setup_cdc() CollectionChangeStream[source]#
abstract stop()[source]#
class superduperdb.cdc.cdc.CDCHandler(db: Datalayer, stop_event: Event, queue)[source]#

Bases: Thread

This class is responsible for handling the change by executing the taskflow. This class also extends the task graph by adding funcation job node which does post model executiong jobs, i.e copy_vectors.

__init__.

Parameters:
  • db – a superduperdb instance.

  • stop_event – A threading event flag to notify for stoppage.

__init__(db: Datalayer, stop_event: Event, queue)[source]#

__init__.

Parameters:
  • db – a superduperdb instance.

  • stop_event – A threading event flag to notify for stoppage.

property is_running#
run()[source]#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class superduperdb.cdc.cdc.DBEvent(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

DBEvent simple enum to hold mongo basic events.

_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

delete = 'delete'#
insert = 'insert'#
update = 'update'#
class superduperdb.cdc.cdc.DatabaseChangeDataCapture(db: Datalayer)[source]#

Bases: object

DatabaseChangeDataCapture is a Python class that provides a flexible and extensible framework for capturing and managing data changes in a database.

This class is repsonsible for cdc service on the provided db instance This class is designed to simplify the process of tracking changes to database records,allowing you to monitor and respond to data modifications efficiently.

add(listener: Listener)[source]#

This method registered the given collection for cdc

listen(on: Table | TableOrCollection, identifier: str = '', *args, **kwargs)[source]#

Starts cdc service on the provided collection Not to be confused with superduperdb.container.listener.Listener.

Parameters:
  • db – A superduperdb instance.

  • on – Which collection/table listener service this be invoked on?

  • identifier – A identity given to the listener service.

property running: bool#
start()[source]#

This method starts the cdc process on the database.

stop(name: str = '')[source]#

Stop all registered listeners :param name: Listener name

stop_handler()[source]#

Stop the cdc handler thread

class superduperdb.cdc.cdc.DatabaseListenerFactory(db_type: str = 'mongodb')[source]#

Bases: Generic[DBListenerType]

A Factory class to create instance of DatabaseListener corresponding to the db_type.

SUPPORTED_LISTENERS: List[str] = ['mongodb', 'ibis']#
create(*args, **kwargs) DBListenerType[source]#
class superduperdb.cdc.cdc.DatabaseListenerThreadScheduler(listener: BaseDatabaseListener, stop_event: Event, start_event: Event)[source]#

Bases: Thread

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run() None[source]#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class superduperdb.cdc.cdc.Packet(ids: Any, query: Optional[ForwardRef('Serializable')], event_type: superduperdb.cdc.cdc.DBEvent = <DBEvent.insert: 'insert'>)[source]#

Bases: object

static collate(packets: Sequence[Packet]) Packet[source]#

Collate a batch of packets into one

event_type: DBEvent = 'insert'#
ids: Any#
property is_delete: bool#
query: Serializable | None#

Module contents#