Skip to content

MongoDBQueryEngine

autogen.agentchat.contrib.rag.MongoDBQueryEngine #

MongoDBQueryEngine(connection_string, llm=None, database_name=None, embedding_function=None, embedding_model=None, collection_name=None)

A query engine backed by MongoDB Atlas that supports document insertion and querying.

This engine initializes a vector database, builds an index from input documents, and allows querying using the chat engine interface.

ATTRIBUTE DESCRIPTION
vector_db

The MongoDB vector database instance.

TYPE: MongoDBAtlasVectorDB

vector_search_engine

The vector search engine.

TYPE: MongoDBAtlasVectorSearch

storage_context

The storage context for the vector store.

TYPE: StorageContext

index

The index built from the documents.

TYPE: Optional[VectorStoreIndex]

Initializes a MongoDBQueryEngine instance.

PARAMETER DESCRIPTION
connection_string

Connection string used to connect to MongoDB.

TYPE: str

llm

Language model for querying. Defaults to an OpenAI model if not provided.

TYPE: Optional[LLM] DEFAULT: None

database_name

Name of the MongoDB database.

TYPE: Optional[str] DEFAULT: None

embedding_function

Custom embedding function. If None (default), defaults to SentenceTransformer encoding.

TYPE: Optional[Union[BaseEmbedding, Callable[..., Any]]] DEFAULT: None

embedding_model

Embedding model identifier or instance. If None (default), "local:all-MiniLM-L6-v2" will be used.

TYPE: Optional[Union[BaseEmbedding, str]] DEFAULT: None

collection_name

Name of the MongoDB collection. If None (default), DEFAULT_COLLECTION_NAME will be used.

TYPE: Optional[str] DEFAULT: None

RAISES DESCRIPTION
ValueError

If no connection string is provided.

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def __init__(  # type: ignore[no-any-unimported]
    self,
    connection_string: str,
    llm: Optional["LLM"] = None,
    database_name: Optional[str] = None,
    embedding_function: Optional[Union["BaseEmbedding", Callable[..., Any]]] = None,  # type: ignore[type-arg]
    embedding_model: Optional[Union["BaseEmbedding", str]] = None,
    collection_name: Optional[str] = None,
):
    """
    Initializes a MongoDBQueryEngine instance.

    Args:
        connection_string (str): Connection string used to connect to MongoDB.
        llm (Optional[LLM]): Language model for querying. Defaults to an OpenAI model if not provided.
        database_name (Optional[str]): Name of the MongoDB database.
        embedding_function (Optional[Union["BaseEmbedding", Callable[..., Any]]]): Custom embedding function. If None (default),
            defaults to SentenceTransformer encoding.
        embedding_model (Optional[Union["BaseEmbedding", str]]): Embedding model identifier or instance. If None (default),
            "local:all-MiniLM-L6-v2" will be used.
        collection_name (Optional[str]): Name of the MongoDB collection. If None (default), `DEFAULT_COLLECTION_NAME` will be used.

    Raises:
        ValueError: If no connection string is provided.
    """
    if not connection_string:
        raise ValueError("Connection string is required to connect to MongoDB.")

    self.connection_string = connection_string
    # ToDo: Is it okay if database_name is None?
    self.database_name = database_name
    self.collection_name = collection_name or DEFAULT_COLLECTION_NAME
    self.llm: LLM = llm or OpenAI(model="gpt-4o", temperature=0.0)  # type: ignore[no-any-unimported]
    self.embedding_model = embedding_model or "local:all-MiniLM-L6-v2"  # type: ignore[no-any-unimported]
    self.embedding_function = embedding_function or SentenceTransformer("all-MiniLM-L6-v2").encode

    # These will be initialized later.
    self.vector_db: Optional[MongoDBAtlasVectorDB] = None
    self.vector_search_engine: Optional["MongoDBAtlasVectorSearch"] = None  # type: ignore[no-any-unimported]
    self.storage_context: Optional["StorageContext"] = None  # type: ignore[no-any-unimported]
    self.index: Optional[VectorStoreIndex] = None  # type: ignore[no-any-unimported]

connection_string instance-attribute #

connection_string = connection_string

database_name instance-attribute #

database_name = database_name

collection_name instance-attribute #

collection_name = collection_name or DEFAULT_COLLECTION_NAME

llm instance-attribute #

llm = llm or OpenAI(model='gpt-4o', temperature=0.0)

embedding_model instance-attribute #

embedding_model = embedding_model or 'local:all-MiniLM-L6-v2'

embedding_function instance-attribute #

embedding_function = embedding_function or encode

vector_db instance-attribute #

vector_db = None

vector_search_engine instance-attribute #

vector_search_engine = None

storage_context instance-attribute #

storage_context = None

index instance-attribute #

index = None

connect_db #

connect_db(*args, **kwargs)

Connects to the MongoDB database and initializes the query index from the existing collection.

This method verifies the existence of the collection, sets up the database connection, builds the vector store index, and pings the MongoDB server.

RETURNS DESCRIPTION
bool

True if connection is successful; False otherwise.

TYPE: bool

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def connect_db(self, *args: Any, **kwargs: Any) -> bool:
    """
    Connects to the MongoDB database and initializes the query index from the existing collection.

    This method verifies the existence of the collection, sets up the database connection,
    builds the vector store index, and pings the MongoDB server.

    Returns:
        bool: True if connection is successful; False otherwise.
    """
    try:
        # Check if the target collection exists.
        if not self._check_existing_collection():
            raise ValueError(
                f"Collection '{self.collection_name}' not found in database '{self.database_name}'. "
                "Please run init_db to create a new collection."
            )
        # Reinitialize without overwriting the existing collection.
        self._set_up(overwrite=False)

        self.index = VectorStoreIndex.from_vector_store(
            vector_store=self.vector_search_engine,  # type: ignore[arg-type]
            storage_context=self.storage_context,
            embed_model=self.embedding_model,
        )

        self.vector_db.client.admin.command("ping")  # type: ignore[union-attr]
        logger.info("Connected to MongoDB successfully.")
        return True
    except Exception as error:
        logger.error("Failed to connect to MongoDB: %s", error)
        return False

init_db #

init_db(new_doc_dir=None, new_doc_paths_or_urls=None, *args, **kwargs)

Initializes the MongoDB database by creating or overwriting the collection and indexing documents.

This method loads documents from a directory or provided file paths, sets up the database (optionally overwriting any existing collection), builds the vector store index, and inserts the documents.

PARAMETER DESCRIPTION
new_doc_dir

Directory containing documents to be indexed.

TYPE: Optional[Union[Path, str]] DEFAULT: None

new_doc_paths_or_urls

List of file paths or URLs for documents.

TYPE: Optional[Sequence[Union[Path, str]]] DEFAULT: None

*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
bool

True if the database is successfully initialized; False otherwise.

TYPE: bool

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def init_db(
    self,
    new_doc_dir: Optional[Union[Path, str]] = None,
    new_doc_paths_or_urls: Optional[Sequence[Union[Path, str]]] = None,
    *args: Any,
    **kwargs: Any,
) -> bool:
    """
    Initializes the MongoDB database by creating or overwriting the collection and indexing documents.

    This method loads documents from a directory or provided file paths, sets up the database (optionally
    overwriting any existing collection), builds the vector store index, and inserts the documents.

    Args:
        new_doc_dir (Optional[Union[Path, str]]): Directory containing documents to be indexed.
        new_doc_paths_or_urls (Optional[Sequence[Union[Path, str]]]): List of file paths or URLs for documents.
        *args (Any): Additional positional arguments.
        **kwargs (Any): Additional keyword arguments.

    Returns:
        bool: True if the database is successfully initialized; False otherwise.
    """
    try:
        # Check if the collection already exists.
        if self._check_existing_collection():
            logger.warning(
                f"Collection '{self.collection_name}' already exists in database '{self.database_name}'. "
                "Please use connect_db to connect to the existing collection or use init_db to overwrite it."
            )
        # Set up the database with overwriting.
        self._set_up(overwrite=True)
        self.vector_db.client.admin.command("ping")  # type: ignore[union-attr]
        # Gather document paths.
        logger.info("Setting up the database with existing collection.")
        documents = self._load_doc(input_dir=new_doc_dir, input_docs=new_doc_paths_or_urls)
        self.index = VectorStoreIndex.from_vector_store(
            vector_store=self.vector_search_engine,  # type: ignore[arg-type]
            storage_context=self.storage_context,
            embed_model=self.embedding_model,
        )
        for doc in documents:
            self.index.insert(doc)
        logger.info("Database initialized with %d documents.", len(documents))
        return True
    except Exception as e:
        logger.error("Failed to initialize the database: %s", e)
        return False

add_docs #

add_docs(new_doc_dir=None, new_doc_paths_or_urls=None, *args, **kwargs)

Adds new documents to the existing vector store index.

This method validates that the index exists, loads documents from the specified directory or file paths, and inserts them into the vector store index.

PARAMETER DESCRIPTION
new_doc_dir

Directory containing new documents.

TYPE: Optional[Union[Path, str]] DEFAULT: None

new_doc_paths_or_urls

List of file paths or URLs for new documents.

TYPE: Optional[Sequence[Union[Path, str]]] DEFAULT: None

*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def add_docs(
    self,
    new_doc_dir: Optional[Union[Path, str]] = None,
    new_doc_paths_or_urls: Optional[Sequence[Union[Path, str]]] = None,
    *args: Any,
    **kwargs: Any,
) -> None:
    """
    Adds new documents to the existing vector store index.

    This method validates that the index exists, loads documents from the specified directory or file paths,
    and inserts them into the vector store index.

    Args:
        new_doc_dir (Optional[Union[Path, str]]): Directory containing new documents.
        new_doc_paths_or_urls (Optional[Sequence[Union[Path, str]]]): List of file paths or URLs for new documents.
        *args (Any): Additional positional arguments.
        **kwargs (Any): Additional keyword arguments.
    """
    self._validate_query_index()
    documents = self._load_doc(input_dir=new_doc_dir, input_docs=new_doc_paths_or_urls)
    for doc in documents:
        self.index.insert(doc)  # type: ignore[union-attr]

query #

query(question, *args, **kwargs)

Queries the indexed documents using the provided question.

This method validates that the query index is initialized, creates a query engine from the vector store index, and executes the query. If the response is empty, a default reply is returned.

PARAMETER DESCRIPTION
question

The query question.

TYPE: str

args

Additional positional arguments.

TYPE: Any DEFAULT: ()

kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

The query response as a string, or a default reply if no results are found.

TYPE: Any

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def query(self, question: str, *args: Any, **kwargs: Any) -> Any:  # type: ignore[no-any-unimported, type-arg]
    """
    Queries the indexed documents using the provided question.

    This method validates that the query index is initialized, creates a query engine from the vector store index,
    and executes the query. If the response is empty, a default reply is returned.

    Args:
        question (str): The query question.
        args (Any): Additional positional arguments.
        kwargs (Any): Additional keyword arguments.

    Returns:
        Any: The query response as a string, or a default reply if no results are found.
    """
    self._validate_query_index()
    self.query_engine = self.index.as_query_engine(llm=self.llm)  # type: ignore[union-attr]
    response = self.query_engine.query(question)

    if str(response) == EMPTY_RESPONSE_TEXT:
        return EMPTY_RESPONSE_REPLY

    return str(response)

get_collection_name #

get_collection_name()

Retrieves the name of the MongoDB collection.

RETURNS DESCRIPTION
str

The collection name.

TYPE: str

RAISES DESCRIPTION
ValueError

If the collection name is not set.

Source code in autogen/agentchat/contrib/rag/mongodb_query_engine.py
def get_collection_name(self) -> str:
    """
    Retrieves the name of the MongoDB collection.

    Returns:
        str: The collection name.

    Raises:
        ValueError: If the collection name is not set.
    """
    if self.collection_name:
        return self.collection_name
    else:
        raise ValueError("Collection name not set.")