MongoDBQueryEngine(connection_string, llm=None, database_name=None, embedding_function=None, embedding_model=None, collection_name=None)
A query engine backed by MongoDB Atlas that supports document insertion and querying.
This engine initializes a vector database, builds an index from input documents, and allows querying using the chat engine interface.
ATTRIBUTE | DESCRIPTION |
vector_db | The MongoDB vector database instance. TYPE: MongoDBAtlasVectorDB |
vector_search_engine | The vector search engine. TYPE: MongoDBAtlasVectorSearch |
storage_context | The storage context for the vector store. TYPE: StorageContext |
index | The index built from the documents. TYPE: Optional[VectorStoreIndex] |
Initializes a MongoDBQueryEngine instance.
PARAMETER | DESCRIPTION |
connection_string | Connection string used to connect to MongoDB. TYPE: str |
llm | Language model for querying. Defaults to an OpenAI model if not provided. TYPE: Optional[LLM] DEFAULT: None |
database_name | Name of the MongoDB database. TYPE: Optional[str] DEFAULT: None |
embedding_function | Custom embedding function. If None (default), defaults to SentenceTransformer encoding. TYPE: Optional[Union[BaseEmbedding, Callable[..., Any]]] DEFAULT: None |
embedding_model | Embedding model identifier or instance. If None (default), "local:all-MiniLM-L6-v2" will be used. TYPE: Optional[Union[BaseEmbedding, str]] DEFAULT: None |
collection_name | Name of the MongoDB collection. If None (default), DEFAULT_COLLECTION_NAME will be used. TYPE: Optional[str] DEFAULT: None |
RAISES | DESCRIPTION |
ValueError | If no connection string is provided. |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def __init__( # type: ignore[no-any-unimported]
self,
connection_string: str,
llm: Optional["LLM"] = None,
database_name: Optional[str] = None,
embedding_function: Optional[Union["BaseEmbedding", Callable[..., Any]]] = None, # type: ignore[type-arg]
embedding_model: Optional[Union["BaseEmbedding", str]] = None,
collection_name: Optional[str] = None,
):
"""
Initializes a MongoDBQueryEngine instance.
Args:
connection_string (str): Connection string used to connect to MongoDB.
llm (Optional[LLM]): Language model for querying. Defaults to an OpenAI model if not provided.
database_name (Optional[str]): Name of the MongoDB database.
embedding_function (Optional[Union["BaseEmbedding", Callable[..., Any]]]): Custom embedding function. If None (default),
defaults to SentenceTransformer encoding.
embedding_model (Optional[Union["BaseEmbedding", str]]): Embedding model identifier or instance. If None (default),
"local:all-MiniLM-L6-v2" will be used.
collection_name (Optional[str]): Name of the MongoDB collection. If None (default), `DEFAULT_COLLECTION_NAME` will be used.
Raises:
ValueError: If no connection string is provided.
"""
if not connection_string:
raise ValueError("Connection string is required to connect to MongoDB.")
self.connection_string = connection_string
# ToDo: Is it okay if database_name is None?
self.database_name = database_name
self.collection_name = collection_name or DEFAULT_COLLECTION_NAME
self.llm: LLM = llm or OpenAI(model="gpt-4o", temperature=0.0) # type: ignore[no-any-unimported]
self.embedding_model = embedding_model or "local:all-MiniLM-L6-v2" # type: ignore[no-any-unimported]
self.embedding_function = embedding_function or SentenceTransformer("all-MiniLM-L6-v2").encode
# These will be initialized later.
self.vector_db: Optional[MongoDBAtlasVectorDB] = None
self.vector_search_engine: Optional["MongoDBAtlasVectorSearch"] = None # type: ignore[no-any-unimported]
self.storage_context: Optional["StorageContext"] = None # type: ignore[no-any-unimported]
self.index: Optional[VectorStoreIndex] = None # type: ignore[no-any-unimported]
|
connection_string instance-attribute
connection_string = connection_string
database_name instance-attribute
database_name = database_name
collection_name instance-attribute
collection_name = collection_name or DEFAULT_COLLECTION_NAME
llm instance-attribute
llm = llm or OpenAI(model='gpt-4o', temperature=0.0)
embedding_model instance-attribute
embedding_model = embedding_model or 'local:all-MiniLM-L6-v2'
embedding_function instance-attribute
embedding_function = embedding_function or encode
vector_db instance-attribute
vector_search_engine instance-attribute
vector_search_engine = None
storage_context instance-attribute
connect_db
connect_db(*args, **kwargs)
Connects to the MongoDB database and initializes the query index from the existing collection.
This method verifies the existence of the collection, sets up the database connection, builds the vector store index, and pings the MongoDB server.
RETURNS | DESCRIPTION |
bool | True if connection is successful; False otherwise. TYPE: bool |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def connect_db(self, *args: Any, **kwargs: Any) -> bool:
"""
Connects to the MongoDB database and initializes the query index from the existing collection.
This method verifies the existence of the collection, sets up the database connection,
builds the vector store index, and pings the MongoDB server.
Returns:
bool: True if connection is successful; False otherwise.
"""
try:
# Check if the target collection exists.
if not self._check_existing_collection():
raise ValueError(
f"Collection '{self.collection_name}' not found in database '{self.database_name}'. "
"Please run init_db to create a new collection."
)
# Reinitialize without overwriting the existing collection.
self._set_up(overwrite=False)
self.index = VectorStoreIndex.from_vector_store(
vector_store=self.vector_search_engine, # type: ignore[arg-type]
storage_context=self.storage_context,
embed_model=self.embedding_model,
)
self.vector_db.client.admin.command("ping") # type: ignore[union-attr]
logger.info("Connected to MongoDB successfully.")
return True
except Exception as error:
logger.error("Failed to connect to MongoDB: %s", error)
return False
|
init_db
init_db(new_doc_dir=None, new_doc_paths_or_urls=None, *args, **kwargs)
Initializes the MongoDB database by creating or overwriting the collection and indexing documents.
This method loads documents from a directory or provided file paths, sets up the database (optionally overwriting any existing collection), builds the vector store index, and inserts the documents.
PARAMETER | DESCRIPTION |
new_doc_dir | Directory containing documents to be indexed. TYPE: Optional[Union[Path, str]] DEFAULT: None |
new_doc_paths_or_urls | List of file paths or URLs for documents. TYPE: Optional[Sequence[Union[Path, str]]] DEFAULT: None |
*args | Additional positional arguments. TYPE: Any DEFAULT: () |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
bool | True if the database is successfully initialized; False otherwise. TYPE: bool |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def init_db(
self,
new_doc_dir: Optional[Union[Path, str]] = None,
new_doc_paths_or_urls: Optional[Sequence[Union[Path, str]]] = None,
*args: Any,
**kwargs: Any,
) -> bool:
"""
Initializes the MongoDB database by creating or overwriting the collection and indexing documents.
This method loads documents from a directory or provided file paths, sets up the database (optionally
overwriting any existing collection), builds the vector store index, and inserts the documents.
Args:
new_doc_dir (Optional[Union[Path, str]]): Directory containing documents to be indexed.
new_doc_paths_or_urls (Optional[Sequence[Union[Path, str]]]): List of file paths or URLs for documents.
*args (Any): Additional positional arguments.
**kwargs (Any): Additional keyword arguments.
Returns:
bool: True if the database is successfully initialized; False otherwise.
"""
try:
# Check if the collection already exists.
if self._check_existing_collection():
logger.warning(
f"Collection '{self.collection_name}' already exists in database '{self.database_name}'. "
"Please use connect_db to connect to the existing collection or use init_db to overwrite it."
)
# Set up the database with overwriting.
self._set_up(overwrite=True)
self.vector_db.client.admin.command("ping") # type: ignore[union-attr]
# Gather document paths.
logger.info("Setting up the database with existing collection.")
documents = self._load_doc(input_dir=new_doc_dir, input_docs=new_doc_paths_or_urls)
self.index = VectorStoreIndex.from_vector_store(
vector_store=self.vector_search_engine, # type: ignore[arg-type]
storage_context=self.storage_context,
embed_model=self.embedding_model,
)
for doc in documents:
self.index.insert(doc)
logger.info("Database initialized with %d documents.", len(documents))
return True
except Exception as e:
logger.error("Failed to initialize the database: %s", e)
return False
|
add_docs
add_docs(new_doc_dir=None, new_doc_paths_or_urls=None, *args, **kwargs)
Adds new documents to the existing vector store index.
This method validates that the index exists, loads documents from the specified directory or file paths, and inserts them into the vector store index.
PARAMETER | DESCRIPTION |
new_doc_dir | Directory containing new documents. TYPE: Optional[Union[Path, str]] DEFAULT: None |
new_doc_paths_or_urls | List of file paths or URLs for new documents. TYPE: Optional[Sequence[Union[Path, str]]] DEFAULT: None |
*args | Additional positional arguments. TYPE: Any DEFAULT: () |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def add_docs(
self,
new_doc_dir: Optional[Union[Path, str]] = None,
new_doc_paths_or_urls: Optional[Sequence[Union[Path, str]]] = None,
*args: Any,
**kwargs: Any,
) -> None:
"""
Adds new documents to the existing vector store index.
This method validates that the index exists, loads documents from the specified directory or file paths,
and inserts them into the vector store index.
Args:
new_doc_dir (Optional[Union[Path, str]]): Directory containing new documents.
new_doc_paths_or_urls (Optional[Sequence[Union[Path, str]]]): List of file paths or URLs for new documents.
*args (Any): Additional positional arguments.
**kwargs (Any): Additional keyword arguments.
"""
self._validate_query_index()
documents = self._load_doc(input_dir=new_doc_dir, input_docs=new_doc_paths_or_urls)
for doc in documents:
self.index.insert(doc) # type: ignore[union-attr]
|
query
query(question, *args, **kwargs)
Queries the indexed documents using the provided question.
This method validates that the query index is initialized, creates a query engine from the vector store index, and executes the query. If the response is empty, a default reply is returned.
PARAMETER | DESCRIPTION |
question | TYPE: str |
args | Additional positional arguments. TYPE: Any DEFAULT: () |
kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
Any | The query response as a string, or a default reply if no results are found. TYPE: Any |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def query(self, question: str, *args: Any, **kwargs: Any) -> Any: # type: ignore[no-any-unimported, type-arg]
"""
Queries the indexed documents using the provided question.
This method validates that the query index is initialized, creates a query engine from the vector store index,
and executes the query. If the response is empty, a default reply is returned.
Args:
question (str): The query question.
args (Any): Additional positional arguments.
kwargs (Any): Additional keyword arguments.
Returns:
Any: The query response as a string, or a default reply if no results are found.
"""
self._validate_query_index()
self.query_engine = self.index.as_query_engine(llm=self.llm) # type: ignore[union-attr]
response = self.query_engine.query(question)
if str(response) == EMPTY_RESPONSE_TEXT:
return EMPTY_RESPONSE_REPLY
return str(response)
|
get_collection_name
Retrieves the name of the MongoDB collection.
RETURNS | DESCRIPTION |
str | TYPE: str |
RAISES | DESCRIPTION |
ValueError | If the collection name is not set. |
Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
| def get_collection_name(self) -> str:
"""
Retrieves the name of the MongoDB collection.
Returns:
str: The collection name.
Raises:
ValueError: If the collection name is not set.
"""
if self.collection_name:
return self.collection_name
else:
raise ValueError("Collection name not set.")
|