superduperdb.misc package#

Subpackages#

Submodules#

superduperdb.misc.annotations module#

exception superduperdb.misc.annotations.SuperDuperDBDeprecationWarning[source]#

Bases: DeprecationWarning

Specialized Deprecation Warning for fine grained filtering control

superduperdb.misc.annotations.public_api(stability: str = 'stable')[source]#

Annotation for documenting public APIs.

If stability="alpha", the API can be used by advanced users who are tolerant to and expect breaking changes.

If stability="beta", the API is still public and can be used by early users, but are subject to change.

If stability="stable", the APIs will remain backwards compatible across minor releases.

superduperdb.misc.archives module#

superduperdb.misc.archives.from_tarball(tarball_path: str)[source]#

Extract the contents of stack tarball

Parameters:

tarball_path – Path to the tarball file.

superduperdb.misc.archives.to_tarball(folder_path: str, output_path: str)[source]#

Create a tarball (compressed archive) from a folder.

Parameters:

folder_path – Path to the folder to be archived.

superduperdb.misc.colors module#

class superduperdb.misc.colors.Colors[source]#

Bases: object

BLACK = '\x1b[30m'#
BLUE = '\x1b[34m'#
CYAN = '\x1b[36m'#
GREEN = '\x1b[32m'#
MAGENTA = '\x1b[35m'#
RED = '\x1b[31m'#
RESET = '\x1b[0m'#
UNDERLINE = '\x1b[4m'#
WHITE = '\x1b[37m'#
YELLOW = '\x1b[33m'#

superduperdb.misc.compat module#

Functions from later standard libraries not available in Python 3.8

superduperdb.misc.compat.cache(user_function, /)[source]#

superduperdb.misc.data module#

superduperdb.misc.data.ibatch(iterable: Iterable[T], batch_size: int) Iterator[List[T]][source]#

Batch an iterable into chunks of size batch_size

Parameters:
  • iterable – the iterable to batch

  • batch_size – the number of groups to write

superduperdb.misc.download module#

class superduperdb.misc.download.BaseDownloader(uris: List[str], n_workers: int = 0, timeout: int | None = None, headers: Dict | None = None, raises: bool = True)[source]#

Bases: object

Base class for downloading files

Parameters:
  • uris – list of uris/ file names to fetch

  • n_workers – number of multiprocessing workers

  • timeout – set seconds until request times out

  • headers – dictionary of request headers passed to``requests`` package

  • raises – raises error True/False

go()[source]#

Download all files Uses a multiprocessing.pool.ThreadPool to parallelize

connections.

class superduperdb.misc.download.Downloader(uris, update_one: Callable | None = None, ids: List[str] | List[int] | None = None, keys: List[str] | None = None, datatypes: List[str] | None = None, n_workers: int = 20, headers: Dict | None = None, skip_existing: bool = True, timeout: int | None = None, raises: bool = True)[source]#

Bases: BaseDownloader

Download files from a list of URIs.

Parameters:
  • uris – list of uris/ file names to fetch

  • update_one – function to call to insert data into table

  • ids – list of ids of rows/ documents to update

  • keys – list of keys in rows/ documents to insert to

  • n_workers – number of multiprocessing workers

  • headers – dictionary of request headers passed to``requests`` package

  • skip_existing – if True then don’t bother getting already present data

  • timeout – set seconds until request times out

  • raises – raises error True/False

results: Dict[int, str]#
class superduperdb.misc.download.Fetcher(headers: Dict | None = None, n_workers: int = 0)[source]#

Bases: object

Fetches data from a URI

Parameters:
  • headers – headers to be used for download

  • n_workers – number of download workers

__call__(uri: str)[source]#

Download data from a URI

Parameters:

uri – uri to download from

exception superduperdb.misc.download.TimeoutException[source]#

Bases: Exception

class superduperdb.misc.download.Updater(db, query)[source]#

Bases: object

exists(uri, key, id, datatype)[source]#
superduperdb.misc.download._gather_uris_for_document(r: Document, id_field: str = '_id')[source]#
>>> _gather_uris_for_document({'a': {'_content': {'uri': 'test'}}})
(['test'], ['a'])
>>> d = {'b': {'a': {'_content': {'uri': 'test'}}}}
>>> _gather_uris_for_document(d)
(['test'], ['b.a'])
>>> d = {'b': {'a': {'_content': {'uri': 'test', 'bytes': b'abc'}}}}
>>> _gather_uris_for_document(d)
([], [])
superduperdb.misc.download.download_content(db, query: Select | Insert | Dict, ids: Sequence[str] | None = None, documents: List[Document] | None = None, raises: bool = True, n_workers: int | None = None) Sequence[Document] | None[source]#

Download content contained in uploaded data. Items to be downloaded are identifier via the subdocuments in the form exemplified below. By default items are downloaded to the database, unless a download_update function is provided.

Parameters:
  • db – database instance

  • query – query to be executed

  • ids – ids to be downloaded

  • documents – documents to be downloaded

  • timeout – timeout for download

  • raises – whether to raise errors

  • n_download_workers – number of download workers

  • headers – headers to be used for download

  • download_update – function to be used for updating the database

  • **kwargs

    additional keyword arguments

>>> d = {"_content": {"uri": "<uri>", "encoder": "<encoder-identifier>"}}
>>> def update(key, id, bytes):
>>> ... with open(f'/tmp/{key}+{id}', 'wb') as f:
>>> ...     f.write(bytes)
>>> download_content(None, None, ids=["0"], documents=[d]))
...
superduperdb.misc.download.download_from_one(r: Document)[source]#
superduperdb.misc.download.gather_uris(documents: Sequence[Document], gather_ids: bool = True) Tuple[List[str], List[str], List[Any], List[str]][source]#

Get the uris out of all documents as denoted by {"_content": ...}

Parameters:
  • documents – list of dictionaries

  • gather_ids – if True then gather ids of documents

superduperdb.misc.download.timeout(seconds)[source]#
superduperdb.misc.download.timeout_handler(signum, frame)[source]#

superduperdb.misc.files module#

superduperdb.misc.files.get_file_from_uri(uri)[source]#

Get file name from uri.

>>> _get_file('file://test.txt')
'test.txt'
>>> _get_file('http://test.txt')
'414388bd5644669b8a92e45a96318890f6e8de54'
superduperdb.misc.files.load_uris(r: dict, datatypes: Dict, root: str | None = None, raises: bool = False)[source]#

Load "bytes" into "_content" from "uri" inside r.

Parameters:
  • r – The dict to load the bytes into

  • root – The root directory to load the bytes from

  • raises – Whether to raise an error if the file is not found

>>> with open('/tmp/test.txt', 'wb') as f:
...     _ = f.write(bytes('test', 'utf-8'))
>>> r = {"_content": {"uri": "file://test.txt"}}
>>> load_uris(r, '/tmp')
>>> r
{'_content': {'uri': 'file://test.txt', 'bytes': b'test'}}

superduperdb.misc.retry module#

class superduperdb.misc.retry.Retry(exception_types: Type[BaseException] | Tuple[Type[BaseException], ...], cfg: Retry | None = None)[source]#

Bases: object

Retry a function until it succeeds.

This is a thin wrapper around the tenacity retry library, using our configs. :param exception_types: The exception types to retry on. :param cfg: The retry config.

cfg: Retry | None = None#
exception_types: Type[BaseException] | Tuple[Type[BaseException], ...]#

superduperdb.misc.run module#

exception superduperdb.misc.run.CalledProcessError(returncode, cmd, output=None, stderr=None)[source]#

Bases: SubprocessError

Raised when run() is called with check=True and the process returns a non-zero exit status.

cmd, returncode, stdout, stderr, output
property stdout#

Alias for output attribute, to match stderr

superduperdb.misc.run.out(args: Sequence[str], **kwargs) str[source]#

Run a command and return the output.

Parameters:
  • args – The command to run.

  • **kwargs

    Additional arguments to pass to subprocess.run.

superduperdb.misc.run.run(args: Sequence[str], text: bool = True, check: bool = True, verbose: bool = False, **kwargs) CompletedProcess[source]#

Run a command, printing it if verbose is enabled.

Parameters:
  • args – The command to run.

  • text – Whether to use text mode.

  • check – Whether to raise an error if the command fails.

  • **kwargs

    Additional arguments to pass to subprocess.run.

superduperdb.misc.serialization module#

superduperdb.misc.serialization.asdict(obj, *, copy_method=<function copy>) Dict[str, Any][source]#

Custom asdict function which exports a dataclass object into a dict, with a option to choose for nested non atomic objects copy strategy.

superduperdb.misc.server module#

superduperdb.misc.server.request_server(service: str = 'vector_search', data=None, endpoint='add', args={}, type='post')[source]#

superduperdb.misc.special_dicts module#

class superduperdb.misc.special_dicts.ArgumentDefaultDict[source]#

Bases: defaultdict

class superduperdb.misc.special_dicts.MongoStyleDict[source]#

Bases: Dict[str, Any]

Dictionary object mirroring how fields can be referred to and set in MongoDB.

>>> d = MongoStyleDict({'a': {'b': 1}})
>>> d['a.b']
1

t.Set deep fields directly with string keys: >>> d[‘a.c’] = 2 >>> d {‘a’: {‘b’: 1, ‘c’: 2}}

Parent keys should exist in order to set subfields: >>> d[‘a.d.e’] = 3 Traceback (most recent call last): … KeyError: ‘d’

Module contents#