Skip to content

GeminiRealtimeClient

autogen.agentchat.realtime.experimental.clients.GeminiRealtimeClient #

GeminiRealtimeClient(*, llm_config, logger=None)

Bases: RealtimeClientBase

(Experimental) Client for Gemini Realtime API.

(Experimental) Client for Gemini Realtime API.

PARAMETER DESCRIPTION
llm_config

The config for the client.

TYPE: dict[str, Any]

logger

The logger for the client.

TYPE: Optional[Logger] DEFAULT: None

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
def __init__(
    self,
    *,
    llm_config: dict[str, Any],
    logger: Optional[Logger] = None,
) -> None:
    """(Experimental) Client for Gemini Realtime API.

    Args:
        llm_config: The config for the client.
        logger: The logger for the client.
    """
    super().__init__()
    self._llm_config = llm_config
    self._logger = logger

    self._connection: Optional["ClientConnection"] = None
    config = llm_config["config_list"][0]

    self._model: str = config["model"]
    self._voice = config.get("voice", "charon")
    self._temperature: float = config.get("temperature", 0.8)  # type: ignore[union-attr]

    self._response_modality = "AUDIO"

    self._api_key = config.get("api_key", None)
    # todo: add test with base_url just to make sure it works
    self._base_url: str = config.get(
        "base_url",
        f"wss://{HOST}/ws/google.ai.generativelanguage.{API_VERSION}.GenerativeService.BidiGenerateContent?key={self._api_key}",
    )
    self._final_config: dict[str, Any] = {}
    self._pending_session_updates: dict[str, Any] = {}
    self._is_reading_events = False

logger property #

logger

Get the logger for the Gemini Realtime API.

connection property #

connection

Get the Gemini WebSocket connection.

add_event async #

add_event(event)
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def add_event(self, event: Optional[RealtimeEvent]):
    await self._eventQueue.put(event)

get_event async #

get_event()
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def get_event(self) -> Optional[RealtimeEvent]:
    return await self._eventQueue.get()

queue_input_audio_buffer_delta async #

queue_input_audio_buffer_delta(audio)

queue InputAudioBufferDelta.

PARAMETER DESCRIPTION
audio

The audio.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def queue_input_audio_buffer_delta(self, audio: str) -> None:
    """queue InputAudioBufferDelta.

    Args:
        audio (str): The audio.
    """
    await self.add_event(InputAudioBufferDelta(delta=audio, item_id=None, raw_message=dict()))

send_function_result async #

send_function_result(call_id, result)

Send the result of a function call to the Gemini Realtime API.

PARAMETER DESCRIPTION
call_id

The ID of the function call.

TYPE: str

result

The result of the function call.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def send_function_result(self, call_id: str, result: str) -> None:
    """Send the result of a function call to the Gemini Realtime API.

    Args:
        call_id (str): The ID of the function call.
        result (str): The result of the function call.
    """
    msg = {
        "tool_response": {"function_responses": [{"id": call_id, "response": {"result": {"string_value": result}}}]}
    }
    if self._is_reading_events:
        await self.connection.send(json.dumps(msg))

send_text async #

send_text(*, role, text, turn_complete=True)

Send a text message to the Gemini Realtime API.

PARAMETER DESCRIPTION
role

The role of the message.

TYPE: Role

text

The text of the message.

TYPE: str

turn_complete

A flag indicating if the turn is complete.

TYPE: bool DEFAULT: True

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def send_text(self, *, role: Role, text: str, turn_complete: bool = True) -> None:
    """Send a text message to the Gemini Realtime API.

    Args:
        role: The role of the message.
        text: The text of the message.
        turn_complete: A flag indicating if the turn is complete.
    """
    msg = {
        "client_content": {
            "turn_complete": turn_complete,
            "turns": [{"role": role, "parts": [{"text": text}]}],
        }
    }
    if self._is_reading_events:
        await self.connection.send(json.dumps(msg))

send_audio async #

send_audio(audio)

Send audio to the Gemini Realtime API.

PARAMETER DESCRIPTION
audio

The audio to send.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def send_audio(self, audio: str) -> None:
    """Send audio to the Gemini Realtime API.

    Args:
        audio (str): The audio to send.
    """
    msg = {
        "realtime_input": {
            "media_chunks": [
                {
                    "data": audio,
                    "mime_type": "audio/pcm",
                }
            ]
        }
    }
    await self.queue_input_audio_buffer_delta(audio)
    if self._is_reading_events:
        await self.connection.send(json.dumps(msg))

truncate_audio async #

truncate_audio(audio_end_ms, content_index, item_id)
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def truncate_audio(self, audio_end_ms: int, content_index: int, item_id: str) -> None:
    self.logger.info("This is not natively supported by Gemini Realtime API.")
    pass

session_update async #

session_update(session_options)

Record session updates to be applied when the connection is established.

PARAMETER DESCRIPTION
session_options

The session options to update.

TYPE: dict[str, Any]

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def session_update(self, session_options: dict[str, Any]) -> None:
    """Record session updates to be applied when the connection is established.

    Args:
        session_options (dict[str, Any]): The session options to update.
    """
    if self._is_reading_events:
        self.logger.warning("Is reading events. Session update will be ignored.")
    else:
        self._pending_session_updates.update(session_options)

connect async #

connect()

Connect to the Gemini Realtime API.

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
@asynccontextmanager
async def connect(self) -> AsyncGenerator[None, None]:
    """Connect to the Gemini Realtime API."""
    try:
        async with connect(
            self._base_url, additional_headers={"Content-Type": "application/json"}
        ) as self._connection:
            yield
    finally:
        self._connection = None

read_events async #

read_events()

Read Events from the Gemini Realtime Client

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
async def read_events(self) -> AsyncGenerator[RealtimeEvent, None]:
    """Read Events from the Gemini Realtime Client"""
    if self._connection is None:
        raise RuntimeError("Client is not connected, call connect() first.")
    await self._initialize_session()

    self._is_reading_events = True

    async for event in self._read_events():
        yield event

get_factory classmethod #

get_factory(llm_config, logger, **kwargs)

Create a Realtime API client.

PARAMETER DESCRIPTION
llm_config

The LLM config for the client.

TYPE: dict[str, Any]

logger

The logger for the client.

TYPE: Logger

**kwargs

Additional arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
RealtimeClientProtocol

The Realtime API client is returned if the model matches the pattern

TYPE: Optional[Callable[[], RealtimeClientProtocol]]

Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
@classmethod
def get_factory(
    cls, llm_config: dict[str, Any], logger: Logger, **kwargs: Any
) -> Optional[Callable[[], "RealtimeClientProtocol"]]:
    """Create a Realtime API client.

    Args:
        llm_config: The LLM config for the client.
        logger: The logger for the client.
        **kwargs: Additional arguments.

    Returns:
        RealtimeClientProtocol: The Realtime API client is returned if the model matches the pattern
    """
    if llm_config["config_list"][0].get("api_type") == "google" and list(kwargs.keys()) == []:
        return lambda: GeminiRealtimeClient(llm_config=llm_config, logger=logger, **kwargs)
    return None