Skip to main content

Multimodal vector search

Configure your production system

note

If you would like to use the production features of SuperDuperDB, then you should set the relevant connections and configurations in a configuration file. Otherwise you are welcome to use "development" mode to get going with SuperDuperDB quickly.

import os

os.makedirs('.superduperdb', exist_ok=True)
os.environ['SUPERDUPERDB_CONFIG'] = '.superduperdb/config.yaml'
CFG = '''
data_backend: mongodb://127.0.0.1:27017/documents
artifact_store: filesystem://./artifact_store
cluster:
cdc:
strategy: null
uri: ray://127.0.0.1:20000
compute:
uri: ray://127.0.0.1:10001
vector_search:
backfill_batch_size: 100
type: in_memory
uri: http://127.0.0.1:21000
'''
with open(os.environ['SUPERDUPERDB_CONFIG'], 'w') as f:
f.write(CFG)

Start your cluster

note

Starting a SuperDuperDB cluster is useful in production and model development if you want to enable scalable compute, access to the models by multiple users for collaboration, monitoring.

If you don't need this, then it is simpler to start in development mode.

!python -m superduperdb local-cluster up        

Connect to SuperDuperDB

note

Note that this is only relevant if you are running SuperDuperDB in development mode. Otherwise refer to "Configuring your production system".

from superduperdb import superduper

db = superduper('mongodb://localhost:27017/documents')

Get useful sample data

from superduperdb.backends.ibis import dtype

!curl -O https://superduperdb-public-demo.s3.amazonaws.com/text.json
import json

with open('text.json', 'r') as f:
data = json.load(f)
sample_datapoint = "What is mongodb?"

chunked_model_datatype = dtype('str')

Create datatype

Data types such as "text" or "integer" which are natively support by your db.databackend don't need a datatype.

datatype = None

Otherwise do one of the following:

!pip install PyPDF2
from superduperdb import DataType
from superduperdb.components.datatype import File

datatype = DataType('pdf', encodable='file')
from superduperdb import DataType
if datatype and isinstance(datatype, DataType):
db.apply(datatype)

Setup tables or collections

# Note this is an optional step for MongoDB
# Users can also work directly with `DataType` if they want to add
# custom data
from superduperdb import Schema, DataType
from superduperdb.backends.mongodb import Collection

table_or_collection = Collection('documents')
USE_SCHEMA = False

if USE_SCHEMA and isinstance(datatype, DataType):
schema = Schema(fields={'x': datatype})
db.apply(schema)

Insert data

In order to create data, we need to create a Schema for encoding our special Datatype column(s) in the databackend.

from superduperdb import Document, DataType

def do_insert(data, schema = None):

if schema is None and (datatype is None or isinstance(datatype, str)):
data = [Document({'x': x['x'], 'y': x['y']}) if isinstance(x, dict) and 'x' in x and 'y' in x else Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data))
elif schema is None and datatype is not None and isinstance(datatype, DataType):
data = [Document({'x': datatype(x['x']), 'y': x['y']}) if isinstance(x, dict) and 'x' in x and 'y' in x else Document({'x': datatype(x)}) for x in data]
db.execute(table_or_collection.insert_many(data))
else:
data = [Document({'x': x['x'], 'y': x['y']}) if isinstance(x, dict) and 'x' in x and 'y' in x else Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data, schema=schema))

do_insert(data[:-len(data) // 4])

Define the embedding model datatype

from superduperdb.components.vector_index import sqlvector
get_chunking_datatype = lambda shape: sqlvector(shape=(shape,))

Build simple select queries


select = table_or_collection.find({})

Create Model Output Type

chunked_model_datatype = None        
note

Note that applying a chunker is not mandatory for search. If your data is already chunked (e.g. short text snippets or audio) or if you are searching through something like images, which can't be chunked, then this won't be necessary.

from superduperdb import objectmodel

CHUNK_SIZE = 200

@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False}, datatype=chunked_model_datatype)
def chunker(text):
text = text.split()
chunks = [' '.join(text[i:i + CHUNK_SIZE]) for i in range(0, len(text), CHUNK_SIZE)]
return chunks

Now we apply this chunker to the data by wrapping the chunker in Listener:

from superduperdb import Listener

upstream_listener = Listener(
model=chunker,
select=select,
key='x',
)

db.apply(upstream_listener)

Build multimodal embedding models

Some embedding models such as CLIP come in pairs of model and compatible_model. Otherwise:

compatible_model = None
from superduperdb.ext.sentence_transformers import SentenceTransformer

if not get_chunking_datatype:
model_dtype = vector(shape=(384,))
else:
model_dtype = get_chunking_datatype(384)

# Load the pre-trained sentence transformer model
model = SentenceTransformer(
identifier='all-MiniLM-L6-v2',
postprocess=lambda x: x.tolist(),
datatype=model_dtype,
)

Select outputs of upstream listener

note

This is useful if you have performed a first step, such as pre-computing features, or chunking your data. You can use this query to operate on those outputs.

from superduperdb.backends.mongodb import Collection

indexing_key = upstream_listener.outputs
select = Collection(upstream_listener.outputs).find()

Depending on whether we have chunked the data, the indexing key will be different:

compatible_key = None
if compatible_model:
compatible_key = 'y'

Create vector-index

vector_index_name = 'my-vector-index'
from superduperdb import VectorIndex, Listener

jobs, _ = db.add(
VectorIndex(
vector_index_name,
indexing_listener=Listener(
key=indexing_key, # the `Document` key `model` should ingest to create embedding
select=select, # a `Select` query telling which data to search over
model=model, # a `_Predictor` how to convert data to embeddings
)
)
)
query_table_or_collection = select.table_or_collection
from superduperdb import Document

def get_sample_item(key, sample_datapoint, datatype=None):
if not isinstance(datatype, DataType):
item = Document({key: sample_datapoint})
else:
item = Document({key: datatype(sample_datapoint)})

return item

if compatible_key:
item = get_sample_item(compatible_key, sample_datapoint, None)
else:
item = get_sample_item(indexing_key, sample_datapoint, datatype=datatype)

Once we have this search target, we can execute a search as follows:

select = query_table_or_collection.like(item, vector_index=vector_index_name, n=10).find()        
results = db.execute(select)

Visualize Results

from IPython.display import Markdown, display

def visualize(item, source):
display(Markdown(item))

def show(results, output_key, get_original_callable=None):
for result in results:
source = None
if '_source' in result:

source = get_original_callable(result['_source'])
visualize(result[output_key], source)

If your use-case involved chunking, you will want to be able to recover original rows/ documents, after getting the result of a vector-search:

def get_original(_source):
return db.execute(table_or_collection.find_one({'_id': _source}))

visualization_key = upstream_listener.outputs
show(results, visualization_key, get_original)

Check the system stays updated


do_insert(data[-len(data) // 4:])