Multimodal vector search
Configure your production system​
If you would like to use the production features of SuperDuperDB, then you should set the relevant connections and configurations in a configuration file. Otherwise you are welcome to use "development" mode to get going with SuperDuperDB quickly.
import os
os.makedirs('.superduperdb', exist_ok=True)
os.environ['SUPERDUPERDB_CONFIG'] = '.superduperdb/config.yaml'
- MongoDB Community
- MongoDB Atlas
- SQLite
- MySQL
- Oracle
- PostgreSQL
- Snowflake
- Clickhouse
CFG = '''
data_backend: mongodb://127.0.0.1:27017/documents
artifact_store: filesystem://./artifact_store
cluster:
cdc:
strategy: null
uri: ray://127.0.0.1:20000
compute:
uri: ray://127.0.0.1:10001
vector_search:
backfill_batch_size: 100
type: in_memory
uri: http://127.0.0.1:21000
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
type: native
databackend: mongodb+srv://<user>:<password>@<mongo-host>:27017/documents
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: sqlite://<path-to-db>.db
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: mysql://<user>:<password>@<host>:<port>/database
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: mssql://<user>:<password>@<host>:<port>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: postgres://<user>:<password>@<host>:<port</<database>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
metadata_store: sqlite://<path-to-sqlite-db>.db
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: snowflake://<user>:<password>@<account>/<database>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
metadata_store: sqlite://<path-to-sqlite-db>.db
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: clickhouse://<user>:<password>@<host>:<port>
'''
with open(os.environ['SUPERDUPERDB_CONFIG'], 'w') as f:
f.write(CFG)
Start your cluster​
Starting a SuperDuperDB cluster is useful in production and model development if you want to enable scalable compute, access to the models by multiple users for collaboration, monitoring.
If you don't need this, then it is simpler to start in development mode.
- Experimental Cluster
- Docker-Compose
!python -m superduperdb local-cluster up
!make testenv_image
!make testenv_init
Connect to SuperDuperDB​
Note that this is only relevant if you are running SuperDuperDB in development mode. Otherwise refer to "Configuring your production system".
- MongoDB
- SQLite
- MySQL
- Oracle
- PostgreSQL
- Snowflake
- Clickhouse
- DuckDB
- Pandas
- MongoMock
from superduperdb import superduper
db = superduper('mongodb://localhost:27017/documents')
from superduperdb import superduper
db = superduper('sqlite://my_db.db')
from superduperdb import superduper
user = 'superduper'
password = 'superduper'
port = 3306
host = 'localhost'
database = 'test_db'
db = superduper(f"mysql://{user}:{password}@{host}:{port}/{database}")
from superduperdb import superduper
user = 'sa'
password = 'Superduper#1'
port = 1433
host = 'localhost'
db = superduper(f"mssql://{user}:{password}@{host}:{port}")
!pip install psycopg2
from superduperdb import superduper
user = 'postgres'
password = 'postgres'
port = 5432
host = 'localhost'
database = 'test_db'
db_uri = f"postgres://{user}:{password}@{host}:{port}/{database}"
db = superduper(db_uri, metadata_store=db_uri.replace('postgres://', 'postgresql://'))
from superduperdb import superduper
user = "superduperuser"
password = "superduperpassword"
account = "XXXX-XXXX" # ORGANIZATIONID-USERID
database = "FREE_COMPANY_DATASET/PUBLIC"
snowflake_uri = f"snowflake://{user}:{password}@{account}/{database}"
db = superduper(
snowflake_uri,
metadata_store='sqlite:///your_database_name.db',
)
from superduperdb import superduper
user = 'default'
password = ''
port = 8123
host = 'localhost'
db = superduper(f"clickhouse://{user}:{password}@{host}:{port}", metadata_store=f'mongomock://meta')
from superduperdb import superduper
db = superduper('duckdb://mydb.duckdb')
from superduperdb import superduper
db = superduper(['my.csv'], metadata_store=f'mongomock://meta')
from superduperdb import superduper
db = superduper('mongomock:///test_db')
Get useful sample data​
from superduperdb import dtype
- Text
- Image
- Video
- Audio
!curl -O https://superduperdb-public-demo.s3.amazonaws.com/text.json
import json
with open('text.json', 'r') as f:
data = json.load(f)
sample_datapoint = "What is mongodb?"
chunked_model_datatype = dtype('str')
!curl -O https://superduperdb-public-demo.s3.amazonaws.com/pdfs.zip && unzip -o pdfs.zip
import os
data = [f'pdfs/{x}' for x in os.listdir('./pdfs')]
sample_datapoint = data[-1]
chunked_model_datatype = dtype('str')
!curl -O s3://superduperdb-public-demo/images.zip && unzip images.zip
import os
from PIL import Image
data = [f'images/{x}' for x in os.listdir('./images')]
data = [ Image.open(path) for path in data]
sample_datapoint = data[-1]
from superduperdb.ext.pillow import pil_image
chunked_model_datatype = pil_image
!curl -O s3://superduperdb-public-demo/videos.zip && unzip videos.zip
import os
data = [f'videos/{x}' for x in os.listdir('./videos')]
sample_datapoint = data[-1]
from superduperdb.ext.pillow import pil_image
chunked_model_datatype = pil_image
!curl -O s3://superduperdb-public-demo/audio.zip && unzip audio.zip
import os
data = [f'audios/{x}' for x in os.listdir('./audios')]
sample_datapoint = data[-1]
chunked_model_datatype = dtype('str')
Create datatype​
Data types such as "text" or "integer" which are natively support by your db.databackend
don't need a datatype.
datatype = None
Otherwise do one of the following:
- Text
- Image
- Audio
- Video
!pip install PyPDF2
from superduperdb import DataType
from superduperdb.components.datatype import File
datatype = DataType('pdf', encodable='file')
datatype = 'str'
from superduperdb.ext.pillow import pil_image
import PIL.Image
datatype = pil_image
from superduperdb.ext.numpy import array
from superduperdb import DataType
import scipy.io.wavfile
import io
def encoder(data):
buffer = io.BytesIO()
fs = data[0]
content = data[1]
scipy.io.wavfile.write(buffer, fs, content)
return buffer.getvalue()
def decoder(data):
buffer = io.BytesIO(data)
content = scipy.io.wavfile.read(buffer)
return content
datatype = DataType(
'wav',
encoder=encoder,
decoder=decoder,
encodable='artifact',
)
from superduperdb import DataType
# Create an instance of the Encoder with the identifier 'video_on_file' and load_hybrid set to False
datatype = DataType(
identifier='video_on_file',
encodable='artifact',
)
from superduperdb import DataType
if datatype and isinstance(datatype, DataType):
db.apply(datatype)
Setup tables or collections​
- MongoDB
- SQL
# Note this is an optional step for MongoDB
# Users can also work directly with `DataType` if they want to add
# custom data
from superduperdb import Schema, DataType
from superduperdb.backends.mongodb import Collection
table_or_collection = Collection('documents')
USE_SCHEMA = False
if USE_SCHEMA and isinstance(datatype, DataType):
schema = Schema(fields={'x': datatype})
db.apply(schema)
from superduperdb.backends.ibis import Table
from superduperdb import Schema, DataType
from superduperdb.backends.ibis.field_types import dtype
datatype = "str"
if isinstance(datatype, DataType):
schema = Schema(identifier="schema", fields={"id": dtype("str"), "x": datatype})
else:
schema = Schema(
identifier="schema", fields={"id": dtype("str"), "x": dtype(datatype)}
)
table_or_collection = Table('documents', schema=schema)
db.apply(table_or_collection)
Insert data​
In order to create data, we need to create a Schema
for encoding our special Datatype
column(s) in the databackend.
- MongoDB
- SQL
from superduperdb import Document
def do_insert(data):
schema = None
if schema is None and (datatype is None or isinstance(datatype, str)) :
data = [Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data))
elif schema is None and datatype is not None and isintance():
data = [Document({'x': datatype(x)}) for x in data]
db.execute(table_or_collection.insert_many(data))
else:
data = [Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data, schema='my_schema'))
from superduperdb import Document
def do_insert(data):
db.execute(table_or_collection.insert([Document({'id': str(idx), 'x': x}) for idx, x in enumerate(data)]))
do_insert(data[:-len(data) // 4])
Define the embedding model datatype​
- SQL
- MongoDB
from superduperdb.components.vector_index import sqlvector
get_chunking_datatype = lambda shape: sqlvector(shape=(shape,))
from superduperdb.components.vector_index import vector
get_chunking_datatype = lambda shape: vector(shape=(shape,))
Build simple select queries​
- MongoDB
- SQL
select = table_or_collection.find({})
select = table_or_collection.to_query()
Create Model Output Type​
- MongoDB
- SQL
chunked_model_datatype = None
from superduperdb.backends.ibis.field_types import dtype
chunked_model_datatype = dtype('str')
Apply a chunker for search​
Note that applying a chunker is not mandatory for search. If your data is already chunked (e.g. short text snippets or audio) or if you are searching through something like images, which can't be chunked, then this won't be necessary.
- Text
- Video
- Audio
from superduperdb import objectmodel
CHUNK_SIZE = 200
@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False}, datatype=chunked_model_datatype)
def chunker(text):
text = text.split()
chunks = [' '.join(text[i:i + CHUNK_SIZE]) for i in range(0, len(text), CHUNK_SIZE)]
return chunks
!pip install -q "unstructured[pdf]"
from superduperdb import objectmodel
from unstructured.partition.pdf import partition_pdf
import PyPDF2
CHUNK_SIZE = 500
@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False}, datatype=chunked_model_datatype)
def chunker(pdf_file):
elements = partition_pdf(pdf_file)
text = '\n'.join([e.text for e in elements])
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, len(text), CHUNK_SIZE)]
return chunks
!pip install opencv-python
import cv2
import tqdm
from PIL import Image
from superduperdb.ext.pillow import pil_image
from superduperdb import objectmodel, Schema
@objectmodel(
flatten=True,
model_update_kwargs={'document_embedded': False},
output_schema=Schema(identifier='output-schema', fields={'image': pil_image}),
)
def chunker(video_file):
# Set the sampling frequency for frames
sample_freq = 10
# Open the video file using OpenCV
cap = cv2.VideoCapture(video_file)
# Initialize variables
frame_count = 0
fps = cap.get(cv2.CAP_PROP_FPS)
extracted_frames = []
progress = tqdm.tqdm()
# Iterate through video frames
while True:
ret, frame = cap.read()
if not ret:
break
# Get the current timestamp based on frame count and FPS
current_timestamp = frame_count // fps
# Sample frames based on the specified frequency
if frame_count % sample_freq == 0:
extracted_frames.append({
'image': Image.fromarray(frame[:,:,::-1]), # Convert BGR to RGB
'current_timestamp': current_timestamp,
})
frame_count += 1
progress.update(1)
# Release resources
cap.release()
cv2.destroyAllWindows()
# Return the list of extracted frames
return extracted_frames
from superduperdb import objectmodel, Schema
CHUNK_SIZE = 10 # in seconds
@objectmodel(
flatten=True,
model_update_kwargs={'document_embedded': False},
output_schema=Schema(identifier='output-schema', fields={'audio': datatype}),
)
def chunker(audio):
chunks = []
for i in range(0, len(audio), CHUNK_SIZE):
chunks.append(audio[1][i: i + CHUNK_SIZE])
return [(audio[0], chunk) for chunk in chunks]
Now we apply this chunker to the data by wrapping the chunker in Listener
:
from superduperdb import Listener
upstream_listener = Listener(
model=chunker,
select=select,
key='x',
)
db.apply(upstream_listener)
Build multimodal embedding models​
Some embedding models such as CLIP come in pairs of model
and compatible_model
.
Otherwise:
compatible_model = None
- Text
- Image
- Text+Image
- Audio
from superduperdb.ext.sentence_transformers import SentenceTransformer
if not get_chunking_datatype:
model_dtype = vector(shape=(384,))
else:
model_dtype = get_chunking_datatype(384)
# Load the pre-trained sentence transformer model
model = SentenceTransformer(
identifier='all-MiniLM-L6-v2',
postprocess=lambda x: x.tolist(),
datatype=model_dtype,
)
from torchvision import transforms
import torch
import torch.nn as nn
import torchvision.models as models
import warnings
# Import custom modules
from superduperdb.ext.torch import TorchModel, tensor
# Define a series of image transformations using torchvision.transforms.Compose
t = transforms.Compose([
transforms.Resize((224, 224)), # Resize the input image to 224x224 pixels (must same as here)
transforms.CenterCrop((224, 224)), # Perform a center crop on the resized image
transforms.ToTensor(), # Convert the image to a PyTorch tensor
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # Normalize the tensor with specified mean and standard deviation
])
# Define a preprocess function that applies the defined transformations to an input image
def preprocess(x):
try:
return t(x)
except Exception as e:
# If an exception occurs during preprocessing, issue a warning and return a tensor of zeros
warnings.warn(str(e))
return torch.zeros(3, 224, 224)
# Load the pre-trained ResNet-50 model from torchvision
resnet50 = models.resnet50(pretrained=True)
# Extract all layers of the ResNet-50 model except the last one
modules = list(resnet50.children())[:-1]
resnet50 = nn.Sequential(*modules)
# Create a TorchModel instance with the ResNet-50 model, preprocessing function, and postprocessing lambda
model = TorchModel(
identifier='resnet50',
preprocess=preprocess,
object=resnet50,
postprocess=lambda x: x[:, 0, 0], # Postprocess by extracting the top-left element of the output tensor
datatype=tensor(torch.float, shape=(2048,)) # Specify the encoder configuration
)
import clip
from superduperdb import vector
from superduperdb.ext.torch import TorchModel
# Load the CLIP model and obtain the preprocessing function
model, preprocess = clip.load("RN50", device='cpu')
# Define a vector with shape (1024,)
if not get_chunking_datatype:
e = vector(shape=(1024,))
else:
e = get_chunking_datatype(1024)
# Create a TorchModel for text encoding
compatible_model = TorchModel(
identifier='clip_text', # Unique identifier for the model
object=model, # CLIP model
preprocess=lambda x: clip.tokenize(x)[0], # Model input preprocessing using CLIP
postprocess=lambda x: x.tolist(), # Convert the model output to a list
datatype=e, # Vector encoder with shape (1024,)
forward_method='encode_text', # Use the 'encode_text' method for forward pass
)
# Create a TorchModel for visual encoding
model = TorchModel(
identifier='clip_image', # Unique identifier for the model
object=model.visual, # Visual part of the CLIP model
preprocess=preprocess, # Visual preprocessing using CLIP
postprocess=lambda x: x.tolist(), # Convert the output to a list
datatype=e, # Vector encoder with shape (1024,)
)
!pip install librosa
import librosa
import numpy as np
from superduperdb import ObjectModel
from superduperdb import vector
def audio_embedding(audio_file):
# Load the audio file
MAX_SIZE= 10000
y, sr = librosa.load(audio_file)
y = y[:MAX_SIZE]
mfccs = librosa.feature.mfcc(y=y, sr=44000, n_mfcc=1)
mfccs = mfccs.squeeze().tolist()
return mfccs
if not get_chunking_datatype:
e = vector(shape=(1000,))
else:
e = get_chunking_datatype(1000)
model= ObjectModel(identifier='my-model-audio', object=audio_embedding, datatype=e)
Select outputs of upstream listener​
This is useful if you have performed a first step, such as pre-computing features, or chunking your data. You can use this query to operate on those outputs.
- MongoDB
- SQL
from superduperdb.backends.mongodb import Collection
indexing_key = upstream_listener.outputs
select = Collection(upstream_listener.outputs).find()
indexing_key = upstream_listener.outputs_key
select = db.load("table", upstream_listener.outputs).to_query()
Depending on whether we have chunked the data, the indexing key will be different:
- Chunked Search
- Un-chunked Search
compatible_key = None
if compatible_model:
compatible_key = 'y'
indexing_key = 'x'
compatible_key = None
if compatible_model:
compatible_key = 'y'
Create vector-index​
vector_index_name = 'my-vector-index'
- 1-Modality
- 2-Modalities
from superduperdb import VectorIndex, Listener
jobs, _ = db.add(
VectorIndex(
vector_index_name,
indexing_listener=Listener(
key=indexing_key, # the `Document` key `model` should ingest to create embedding
select=select, # a `Select` query telling which data to search over
model=model, # a `_Predictor` how to convert data to embeddings
)
)
)
from superduperdb import VectorIndex, Listener
jobs, _ = db.add(
VectorIndex(
vector_index_name,
indexing_listener=Listener(
key=indexing_key, # the `Document` key `model` should ingest to create embedding
select=select, # a `Select` query telling which data to search over
model=model, # a `_Predictor` how to convert data to embeddings
),
compatible_listener=Listener(
key=compatible_key, # the `Document` key `model` should ingest to create embedding
model=compatible_model, # a `_Predictor` how to convert data to embeddings
active=False,
select=None,
)
)
)
query_table_or_collection = select.table_or_collection
Perform a vector search​
from superduperdb import Document
def get_sample_item(key, sample_datapoint, datatype=None):
if not isinstance(datatype, DataType):
item = Document({key: sample_datapoint})
else:
item = Document({key: datatype(sample_datapoint)})
return item
if compatible_key:
item = get_sample_item(compatible_key, sample_datapoint, None)
else:
item = get_sample_item(indexing_key, sample_datapoint, datatype=datatype)
Once we have this search target, we can execute a search as follows:
- MongoDB
- SQL
select = query_table_or_collection.like(item, vector_index=vector_index_name, n=10).find()
select = query_table_or_collection.like(item, vector_index=vector_index_name, n=10).limit(10)
results = db.execute(select)
Visualize Results​
- Text
- Image
- Audio
- Video
from IPython.display import Markdown, display
def visualize(item, source):
display(Markdown(item))
def show(results, output_key, get_original_callable=None):
for result in results:
source = None
if '_source' in result:
source = get_original_callable(result['_source'])
visualize(result[output_key], source)
from IPython.display import display
def visualize(item, source):
display(item) # item is a PIL.Image
def show(results, output_key, get_original_callable=None):
for result in results:
source = None
if '_source' in result:
source = get_original_callable(result['_source'])
visualize(result[output_key], source)
from IPython.display import Audio, display
def visualize(item, source):
display(Audio(item[1], fs=item[0]))
def show(results, output_key, get_original_callable=None):
for result in results:
source = None
if '_source' in result:
source = get_original_callable(result['_source'])
visualize(result[output_key], source)
from IPython.display import IFrame, display
def visualize(item, source):
display(item)
def show(results, output_key, get_original_callable=None):
for result in results:
source = None
if '_source' in result:
source = get_original_callable(result['_source'])
visualize(result[output_key], source)
from IPython.display import display, HTML
def visualize(uri, source):
timestamp = source # increment to the frame you want to start at
# Create HTML code for the video player with a specified source and controls
video_html = f"""
<video width="640" height="480" controls>
<source src="{uri}" type="video/mp4">
</video>
<script>
// Get the video element
var video = document.querySelector('video');
// Set the current time of the video to the specified timestamp
video.currentTime = {timestamp};
// Play the video automatically
video.play();
</script>
"""
display(HTML(video_html))
def show(results, output_key, get_original_callable=None):
# show only the first video
for result in results:
result = result[output_key]
timestamp = result['current_timestamp']
source = result['_source']
uri = get_original_callable(source)['x']
visualize(uri, timestamp)
break
If your use-case involved chunking, you will want to be able to recover original rows/ documents, after getting the result of a vector-search:
- MongoDB
- SQL
def get_original(_source):
return db.execute(table_or_collection.find_one({'_id': _source}))
visualization_key = upstream_listener.outputs
def get_original(_source):
return next(db.execute(table_or_collection.select_using_ids([_source])))
visualization_key = indexing_key
show(results, visualization_key, get_original)
Check the system stays updated​
- Development
- Cluster
do_insert(data[-len(data) // 4:])
# As an example with MongoDB, we show that inserting to/ updating the DB with a different client (potentially from different source)
# still means that the system stays up-to-date. This should work with any Cluster mode compatible DB (see "Configuring your production system")
collection = pymongo.MongoClient('mongodb://<mongo-host>:/27017/<database>')['<database>'].documents
collection.insert_many([{'x': x} for x in data[-len(data) // 4:])