Retrieval augmented generation
The first step in any SuperDuperDB application is to connect to your data-backend with SuperDuperDB:
Configure your production system​
If you would like to use the production features of SuperDuperDB, then you should set the relevant connections and configurations in a configuration file. Otherwise you are welcome to use "development" mode to get going with SuperDuperDB quickly.
import os
os.makedirs('.superduperdb', exist_ok=True)
os.environ['SUPERDUPERDB_CONFIG'] = '.superduperdb/config.yaml'
- MongoDB Community
- MongoDB Atlas
- SQLite
- MySQL
- Oracle
- PostgreSQL
- Snowflake
- Clickhouse
CFG = '''
data_backend: mongodb://127.0.0.1:27017/documents
artifact_store: filesystem://./artifact_store
cluster:
cdc:
strategy: null
uri: ray://127.0.0.1:20000
compute:
uri: ray://127.0.0.1:10001
vector_search:
backfill_batch_size: 100
type: in_memory
uri: http://127.0.0.1:21000
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
type: native
databackend: mongodb+srv://<user>:<password>@<mongo-host>:27017/documents
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: sqlite://<path-to-db>.db
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: mysql://<user>:<password>@<host>:<port>/database
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: mssql://<user>:<password>@<host>:<port>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: postgres://<user>:<password>@<host>:<port</<database>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
metadata_store: sqlite://<path-to-sqlite-db>.db
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: snowflake://<user>:<password>@<account>/<database>
'''
CFG = '''
artifact_store: filesystem://<path-to-artifact-store>
metadata_store: sqlite://<path-to-sqlite-db>.db
cluster:
compute: ray://<ray-host>
cdc:
uri: http://<cdc-host>:<cdc-port>
vector_search:
uri: http://<vector-search-host>:<vector-search-port>
databackend: clickhouse://<user>:<password>@<host>:<port>
'''
with open(os.environ['SUPERDUPERDB_CONFIG'], 'w') as f:
f.write(CFG)
Start your cluster​
Starting a SuperDuperDB cluster is useful in production and model development if you want to enable scalable compute, access to the models by multiple users for collaboration, monitoring.
If you don't need this, then it is simpler to start in development mode.
- Experimental Cluster
- Docker-Compose
!python -m superduperdb local-cluster up
!make testenv_image
!make testenv_init
Connect to SuperDuperDB​
Note that this is only relevant if you are running SuperDuperDB in development mode. Otherwise refer to "Configuring your production system".
- MongoDB
- SQLite
- MySQL
- Oracle
- PostgreSQL
- Snowflake
- Clickhouse
- DuckDB
- Pandas
- MongoMock
from superduperdb import superduper
db = superduper('mongodb://localhost:27017/documents')
from superduperdb import superduper
db = superduper('sqlite://my_db.db')
from superduperdb import superduper
user = 'superduper'
password = 'superduper'
port = 3306
host = 'localhost'
database = 'test_db'
db = superduper(f"mysql://{user}:{password}@{host}:{port}/{database}")
from superduperdb import superduper
user = 'sa'
password = 'Superduper#1'
port = 1433
host = 'localhost'
db = superduper(f"mssql://{user}:{password}@{host}:{port}")
!pip install psycopg2
from superduperdb import superduper
user = 'postgres'
password = 'postgres'
port = 5432
host = 'localhost'
database = 'test_db'
db_uri = f"postgres://{user}:{password}@{host}:{port}/{database}"
db = superduper(db_uri, metadata_store=db_uri.replace('postgres://', 'postgresql://'))
from superduperdb import superduper
user = "superduperuser"
password = "superduperpassword"
account = "XXXX-XXXX" # ORGANIZATIONID-USERID
database = "FREE_COMPANY_DATASET/PUBLIC"
snowflake_uri = f"snowflake://{user}:{password}@{account}/{database}"
db = superduper(
snowflake_uri,
metadata_store='sqlite:///your_database_name.db',
)
from superduperdb import superduper
user = 'default'
password = ''
port = 8123
host = 'localhost'
db = superduper(f"clickhouse://{user}:{password}@{host}:{port}", metadata_store=f'mongomock://meta')
from superduperdb import superduper
db = superduper('duckdb://mydb.duckdb')
from superduperdb import superduper
db = superduper(['my.csv'], metadata_store=f'mongomock://meta')
from superduperdb import superduper
db = superduper('mongomock:///test_db')
Get useful sample data​
from superduperdb import dtype
- Text
!curl -O https://superduperdb-public-demo.s3.amazonaws.com/text.json
import json
with open('text.json', 'r') as f:
data = json.load(f)
sample_datapoint = "What is mongodb?"
chunked_model_datatype = dtype('str')
!curl -O https://superduperdb-public-demo.s3.amazonaws.com/pdfs.zip && unzip -o pdfs.zip
import os
data = [f'pdfs/{x}' for x in os.listdir('./pdfs')]
sample_datapoint = data[-1]
chunked_model_datatype = dtype('str')
Setup tables or collections​
- MongoDB
- SQL
# Note this is an optional step for MongoDB
# Users can also work directly with `DataType` if they want to add
# custom data
from superduperdb import Schema, DataType
from superduperdb.backends.mongodb import Collection
table_or_collection = Collection('documents')
USE_SCHEMA = False
if USE_SCHEMA and isinstance(datatype, DataType):
schema = Schema(fields={'x': datatype})
db.apply(schema)
from superduperdb.backends.ibis import Table
from superduperdb import Schema, DataType
from superduperdb.backends.ibis.field_types import dtype
datatype = "str"
if isinstance(datatype, DataType):
schema = Schema(identifier="schema", fields={"id": dtype("str"), "x": datatype})
else:
schema = Schema(
identifier="schema", fields={"id": dtype("str"), "x": dtype(datatype)}
)
table_or_collection = Table('documents', schema=schema)
db.apply(table_or_collection)
Insert data​
In order to create data, we need to create a Schema
for encoding our special Datatype
column(s) in the databackend.
- MongoDB
- SQL
from superduperdb import Document
def do_insert(data):
schema = None
if schema is None and (datatype is None or isinstance(datatype, str)) :
data = [Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data))
elif schema is None and datatype is not None and isintance():
data = [Document({'x': datatype(x)}) for x in data]
db.execute(table_or_collection.insert_many(data))
else:
data = [Document({'x': x}) for x in data]
db.execute(table_or_collection.insert_many(data, schema='my_schema'))
from superduperdb import Document
def do_insert(data):
db.execute(table_or_collection.insert([Document({'id': str(idx), 'x': x}) for idx, x in enumerate(data)]))
do_insert(data[:-len(data) // 4])
Build simple select queries​
- MongoDB
- SQL
select = table_or_collection.find({})
select = table_or_collection.to_query()
Create Model Output Type​
- MongoDB
- SQL
chunked_model_datatype = None
from superduperdb.backends.ibis.field_types import dtype
chunked_model_datatype = dtype('str')
Apply a chunker for search​
Note that applying a chunker is not mandatory for search. If your data is already chunked (e.g. short text snippets or audio) or if you are searching through something like images, which can't be chunked, then this won't be necessary.
- Text
from superduperdb import objectmodel
CHUNK_SIZE = 200
@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False}, datatype=chunked_model_datatype)
def chunker(text):
text = text.split()
chunks = [' '.join(text[i:i + CHUNK_SIZE]) for i in range(0, len(text), CHUNK_SIZE)]
return chunks
!pip install -q "unstructured[pdf]"
from superduperdb import objectmodel
from unstructured.partition.pdf import partition_pdf
import PyPDF2
CHUNK_SIZE = 500
@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False}, datatype=chunked_model_datatype)
def chunker(pdf_file):
elements = partition_pdf(pdf_file)
text = '\n'.join([e.text for e in elements])
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, len(text), CHUNK_SIZE)]
return chunks
Now we apply this chunker to the data by wrapping the chunker in Listener
:
from superduperdb import Listener
upstream_listener = Listener(
model=chunker,
select=select,
key='x',
)
db.apply(upstream_listener)
Select outputs of upstream listener​
This is useful if you have performed a first step, such as pre-computing features, or chunking your data. You can use this query to operate on those outputs.
- MongoDB
- SQL
from superduperdb.backends.mongodb import Collection
indexing_key = upstream_listener.outputs
select = Collection(upstream_listener.outputs).find()
indexing_key = upstream_listener.outputs_key
select = db.load("table", upstream_listener.outputs).to_query()
Build text embedding model​
- OpenAI
- JinaAI
- Sentence-Transformers
- Transformers
!pip install openai
from superduperdb.ext.openai import OpenAIEmbedding
model = OpenAIEmbedding(identifier='text-embedding-ada-002')
import os
from superduperdb.ext.jina import JinaEmbedding
os.environ["JINA_API_KEY"] = "jina_xxxx"
# define the model
model = JinaEmbedding(identifier='jina-embeddings-v2-base-en')
!pip install sentence-transformers
from superduperdb import vector
import sentence_transformers
from superduperdb.ext.sentence_transformers import SentenceTransformer
model = SentenceTransformer(
identifier="embedding",
object=sentence_transformers.SentenceTransformer("BAAI/bge-small-en"),
datatype=vector(shape=(1024,)),
postprocess=lambda x: x.tolist(),
predict_kwargs={"show_progress_bar": True},
)
import dataclasses as dc
from superduperdb import vector
from superduperdb.components.model import Model, ensure_initialized, Signature
from transformers import AutoTokenizer, AutoModel
import torch
@dc.dataclass(kw_only=True)
class TransformerEmbedding(Model):
signature: Signature = 'singleton'
pretrained_model_name_or_path : str
def init(self):
self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_model_name_or_path)
self.model = AutoModel.from_pretrained(self.pretrained_model_name_or_path)
self.model.eval()
@ensure_initialized
def predict_one(self, x):
return self.predict([x])[0]
@ensure_initialized
def predict(self, dataset):
encoded_input = self.tokenizer(dataset, padding=True, truncation=True, return_tensors='pt')
# Compute token embeddings
with torch.no_grad():
model_output = self.model(**encoded_input)
# Perform pooling. In this case, cls pooling.
sentence_embeddings = model_output[0][:, 0]
# normalize embeddings
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings.tolist()
model = TransformerEmbedding(identifier="embedding", pretrained_model_name_or_path="BAAI/bge-small-en", datatype=vector(shape=(384, )))
print(len(model.predict_one("What is SuperDuperDB")))
Create vector-index​
vector_index_name = 'my-vector-index'
- 1-Modality
- 2-Modalities
from superduperdb import VectorIndex, Listener
jobs, _ = db.add(
VectorIndex(
vector_index_name,
indexing_listener=Listener(
key=indexing_key, # the `Document` key `model` should ingest to create embedding
select=select, # a `Select` query telling which data to search over
model=model, # a `_Predictor` how to convert data to embeddings
)
)
)
from superduperdb import VectorIndex, Listener
jobs, _ = db.add(
VectorIndex(
vector_index_name,
indexing_listener=Listener(
key=indexing_key, # the `Document` key `model` should ingest to create embedding
select=select, # a `Select` query telling which data to search over
model=model, # a `_Predictor` how to convert data to embeddings
),
compatible_listener=Listener(
key=compatible_key, # the `Document` key `model` should ingest to create embedding
model=compatible_model, # a `_Predictor` how to convert data to embeddings
active=False,
select=None,
)
)
)
query_table_or_collection = select.table_or_collection
sample_datapoint = data[0]
query = "Tell me about the SuperDuperDb"
Perform a vector search​
from superduperdb import Document
def get_sample_item(key, sample_datapoint, datatype=None):
if not isinstance(datatype, DataType):
item = Document({key: sample_datapoint})
else:
item = Document({key: datatype(sample_datapoint)})
return item
if compatible_key:
item = get_sample_item(compatible_key, sample_datapoint, None)
else:
item = get_sample_item(indexing_key, sample_datapoint, datatype=datatype)
Once we have this search target, we can execute a search as follows:
- MongoDB
- SQL
select = query_table_or_collection.like(item, vector_index=vector_index_name, n=10).find()
select = query_table_or_collection.like(item, vector_index=vector_index_name, n=10).limit(10)
results = db.execute(select)
Create Vector Search Model​
from superduperdb.base.serializable import Variable
item = {indexing_key: Variable('query')}
from superduperdb.components.model import QueryModel
vector_search_model = QueryModel(
identifier="VectorSearch",
select=select,
postprocess=lambda docs: [{"text": doc[indexing_key], "_source": doc["_source"]} for doc in docs]
)
vector_search_model.db = db
vector_search_model.predict_one(query=query)
Build LLM​
- OpenAI
- Anthropic
- vLLM
- Transformers
- Llama.cpp
!pip install openai
from superduperdb.ext.openai import OpenAIChatCompletion
llm = OpenAIChatCompletion(identifier='llm', model='gpt-3.5-turbo')
!pip install anthropic
from superduperdb.ext.anthropic import AnthropicCompletions
import os
os.environ["ANTHROPIC_API_KEY"] = "sk-xxx"
predict_kwargs = {
"max_tokens": 1024,
"temperature": 0.8,
}
llm = AnthropicCompletions(identifier='llm', model='claude-2.1', predict_kwargs=predict_kwargs)
!pip install vllm
from superduperdb.ext.vllm import VllmModel
predict_kwargs = {
"max_tokens": 1024,
"temperature": 0.8,
}
llm = VllmModel(
identifier="llm",
model_name="TheBloke/Mistral-7B-Instruct-v0.2-AWQ",
vllm_kwargs={
"gpu_memory_utilization": 0.7,
"max_model_len": 1024,
"quantization": "awq",
},
predict_kwargs=predict_kwargs,
)
!pip install transformers datasets bitsandbytes accelerate
from superduperdb.ext.transformers import LLM
llm = LLM.from_pretrained("mistralai/Mistral-7B-Instruct-v0.2", load_in_8bit=True, device_map="cuda", identifier="llm", predict_kwargs=dict(max_new_tokens=128))
!pip install llama_cpp_python
# !huggingface-cli download TheBloke/Mistral-7B-Instruct-v0.2-GGUF mistral-7b-instruct-v0.2.Q4_K_M.gguf --local-dir . --local-dir-use-symlinks False
from superduperdb.ext.llamacpp.model import LlamaCpp
llm = LlamaCpp(identifier="llm", model_name_or_path="mistral-7b-instruct-v0.2.Q4_K_M.gguf")
# test the llm model
llm.predict_one("Hello")
Answer question with LLM​
- No-context
- Prompt
- Context
llm.predict_one(query)
from superduperdb import objectmodel
from superduperdb.components.graph import Graph, input_node
@objectmodel
def build_prompt(query):
return f"Translate the sentence into German: {query}"
in_ = input_node('query')
prompt = build_prompt(query=in_)
answer = llm(prompt)
prompt_llm = answer.to_graph("prompt_llm")
prompt_llm.predict_one(query)[0]
from superduperdb import objectmodel
from superduperdb.components.graph import Graph, input_node
prompt_template = (
"Use the following context snippets, these snippets are not ordered!, Answer the question based on this context.\n"
"{context}\n\n"
"Here's the question: {query}"
)
@objectmodel
def build_prompt(query, docs):
chunks = [doc["text"] for doc in docs]
context = "\n\n".join(chunks)
prompt = prompt_template.format(context=context, query=query)
return prompt
in_ = input_node('query')
vector_search_results = vector_search_model(query=in_)
prompt = build_prompt(query=in_, docs=vector_search_results)
answer = llm(prompt)
context_llm = answer.to_graph("context_llm")
context_llm.predict_one(query)