superduperdb.misc.runnable package#

Submodules#

superduperdb.misc.runnable.collection module#

class superduperdb.misc.runnable.collection.HasRunnables[source]#

Bases: Runnable

Collect zero or more Runnable into one

finish()[source]#

Request an orderly shutdown where all existing work is completed.

Note that self.stopped might not be immediately true after this method completes

join(timeout: float | None = None)[source]#

Join this thread or process. Might block indefinitely, might do nothing

runnables: Sequence[Runnable]#
start()[source]#

Start this object.

Note that self.running might not be immediately true after this method completes

stop()[source]#

Stop as soon as possible. might not do anything, should never raise.

Note that self.stopped might not be immediately true after this method completes

class superduperdb.misc.runnable.collection.ThreadQueue(callback: ~typing.Callable[[~typing.Any], None], error: ~typing.Callable = <function none>, maxsize: int = 0, name: str = 'thread_queue', thread_count: int = 1, timeout: float | None = 0.1)[source]#

Bases: HasRunnables

A simple multi-producer, multi-consumer queue with one thread per consumer.

There is a special finish_message value, which when received shuts down that consumer. ThreadQueue.finish() puts one self.finish_message onto the queue for each consumer.

callback: Callable[[Any], None]#
error()#
finish() None[source]#

Put an empty message into the queue for each listener

maxsize: int = 0#
name: str = 'thread_queue'#
property queue: Queue#
thread_count: int = 1#
timeout: float | None = 0.1#

superduperdb.misc.runnable.queue_chunker module#

class superduperdb.misc.runnable.queue_chunker.QueueChunker(chunk_size: int, timeout: float, accumulate_timeouts: bool = False)[source]#

Bases: object

Chunk a queue into lists of length at most chunk_size within time timeout :param chunk_size: Maximum number of entries in a chunk :param timeout: Maximum amount of time to block :param accumulate_timeouts: If accumulate timeouts is True, then timeout is

the total timeout allowed over the whole chunk, otherwise the timeout is applied to each item.

accumulate_timeouts: bool = False#
chunk_size: int#
timeout: float#

superduperdb.misc.runnable.runnable module#

class superduperdb.misc.runnable.runnable.Event(*on_set: Callable[[], None])[source]#

Bases: Event

A threading.Event that also calls back to zero or more functions when its state is set or reset, and has a __bool__ method.

Note that the callback might happen on some completely different thread, so these functions cannot block

clear()[source]#

Reset the internal flag to false.

Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

on_set: List[Callable[[], None]]#
set()[source]#

Set the internal flag to true.

All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.

class superduperdb.misc.runnable.runnable.Runnable[source]#

Bases: object

A base class for things that start, run, finish, stop and join

Stopping is requesting immediate termination: finishing is saying that there is no more work to be done, finish what you are doing.

A Runnable has two Event`s, `running and stopped, and you can either wait on either of these conditions to be true, or add a callback function (which must be non-blocking) to either of them.

running is not set until the setup for a Runnable has finished; stopped is not set until all the computations in a thread have ceased.

An Runnable can be used as a context manager:

with runnable:

# The runnable is running by this point do_stuff()

# By the time you get to here, the runnable has completely stopped

The above means roughly the same as

runnable.start() try:

do_stuff() runnable.finish() runnable.join()

finally:

runnable.stop()

finish()[source]#

Request an orderly shutdown where all existing work is completed.

Note that self.stopped might not be immediately true after this method completes

join(timeout: float | None = None)[source]#

Join this thread or process. Might block indefinitely, might do nothing

running: Event#

An Event that is only set once this object is actually running

start()[source]#

Start this object.

Note that self.running might not be immediately true after this method completes

stop()[source]#

Stop as soon as possible. might not do anything, should never raise.

Note that self.stopped might not be immediately true after this method completes

stopped: Event#

An event that is only set once this object is fully stopped

superduperdb.misc.runnable.thread module#

class superduperdb.misc.runnable.thread.HasThread(callback: ~typing.Callable[[], None] = <built-in function print>, daemon: bool = False, error: ~typing.Callable = <function none>, looping: bool = False, name: str = '')[source]#

Bases: ThreadBase

This ThreadBase contains a thread, and is constructed with a callback

callback(*, sep=' ', end='\n', file=None, flush=False)#

Prints the values to a stream, or to sys.stdout by default.

sep

string inserted between values, default a space.

end

string appended after the last value, default a newline.

file

a file-like object (stream); defaults to the current sys.stdout.

flush

whether to forcibly flush the stream.

daemon: bool = False#
error()#
join(timeout: float | None = None)[source]#

Join this thread or process. Might block indefinitely, might do nothing

looping: bool = False#
name: str = ''#
new_thread() Thread[source]#
start()[source]#

Start this object.

Note that self.running might not be immediately true after this method completes

property thread: Thread#
class superduperdb.misc.runnable.thread.IsThread(*args, **kwargs)[source]#

Bases: ThreadBase, Thread

This ThreadBase inherits from threading.Thread.

To use IsThread, derive from it and override either or both of self.callback() and self.pre_run()

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

callback()[source]#
error(item: Exception) None[source]#
join(timeout: float | None = None)[source]#

Join this thread or process. Might block indefinitely, might do nothing

start()[source]#

Start this object.

Note that self.running might not be immediately true after this method completes

class superduperdb.misc.runnable.thread.ThreadBase[source]#

Bases: Runnable

A base class for classes with a thread.

It adds the following features to threading.Thread:

  • Has Events running and stopped with on_set callbacks

  • Handles exceptions and prints or redirects them

  • Runs once, or multiple times, depending on self.looping

callback: Callable[[], None]#
daemon: bool = False#
error: Callable[[Exception], None]#
finish()[source]#

Request an orderly shutdown where all existing work is completed.

Note that self.stopped might not be immediately true after this method completes

looping: bool = False#
name: str = ''#
pre_run()[source]#
run()[source]#
stop()[source]#

Stop as soon as possible. might not do anything, should never raise.

Note that self.stopped might not be immediately true after this method completes

superduperdb.misc.runnable.thread.none(x)[source]#

Module contents#