Bases: RealtimeClientBase
(Experimental) Client for Gemini Realtime API.
(Experimental) Client for Gemini Realtime API.
PARAMETER | DESCRIPTION |
llm_config | The config for the client. TYPE: dict[str, Any] |
logger | The logger for the client. TYPE: Optional[Logger] DEFAULT: None |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| def __init__(
self,
*,
llm_config: dict[str, Any],
logger: Optional[Logger] = None,
) -> None:
"""(Experimental) Client for Gemini Realtime API.
Args:
llm_config: The config for the client.
logger: The logger for the client.
"""
super().__init__()
self._llm_config = llm_config
self._logger = logger
self._connection: Optional["ClientConnection"] = None
config = llm_config["config_list"][0]
self._model: str = config["model"]
self._voice = config.get("voice", "charon")
self._temperature: float = config.get("temperature", 0.8) # type: ignore[union-attr]
self._response_modality = "AUDIO"
self._api_key = config.get("api_key", None)
# todo: add test with base_url just to make sure it works
self._base_url: str = config.get(
"base_url",
f"wss://{HOST}/ws/google.ai.generativelanguage.{API_VERSION}.GenerativeService.BidiGenerateContent?key={self._api_key}",
)
self._final_config: dict[str, Any] = {}
self._pending_session_updates: dict[str, Any] = {}
self._is_reading_events = False
|
logger property
Get the logger for the Gemini Realtime API.
connection property
Get the Gemini WebSocket connection.
add_event async
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
| async def add_event(self, event: Optional[RealtimeEvent]):
await self._eventQueue.put(event)
|
get_event async
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
| async def get_event(self) -> Optional[RealtimeEvent]:
return await self._eventQueue.get()
|
queue_input_audio_buffer_delta(audio)
queue InputAudioBufferDelta.
PARAMETER | DESCRIPTION |
audio | TYPE: str |
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
| async def queue_input_audio_buffer_delta(self, audio: str) -> None:
"""queue InputAudioBufferDelta.
Args:
audio (str): The audio.
"""
await self.add_event(InputAudioBufferDelta(delta=audio, item_id=None, raw_message=dict()))
|
send_function_result async
send_function_result(call_id, result)
Send the result of a function call to the Gemini Realtime API.
PARAMETER | DESCRIPTION |
call_id | The ID of the function call. TYPE: str |
result | The result of the function call. TYPE: str |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def send_function_result(self, call_id: str, result: str) -> None:
"""Send the result of a function call to the Gemini Realtime API.
Args:
call_id (str): The ID of the function call.
result (str): The result of the function call.
"""
msg = {
"tool_response": {"function_responses": [{"id": call_id, "response": {"result": {"string_value": result}}}]}
}
if self._is_reading_events:
await self.connection.send(json.dumps(msg))
|
send_text async
send_text(*, role, text, turn_complete=True)
Send a text message to the Gemini Realtime API.
PARAMETER | DESCRIPTION |
role | TYPE: Role |
text | TYPE: str |
turn_complete | A flag indicating if the turn is complete. TYPE: bool DEFAULT: True |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def send_text(self, *, role: Role, text: str, turn_complete: bool = True) -> None:
"""Send a text message to the Gemini Realtime API.
Args:
role: The role of the message.
text: The text of the message.
turn_complete: A flag indicating if the turn is complete.
"""
msg = {
"client_content": {
"turn_complete": turn_complete,
"turns": [{"role": role, "parts": [{"text": text}]}],
}
}
if self._is_reading_events:
await self.connection.send(json.dumps(msg))
|
send_audio async
Send audio to the Gemini Realtime API.
PARAMETER | DESCRIPTION |
audio | TYPE: str |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def send_audio(self, audio: str) -> None:
"""Send audio to the Gemini Realtime API.
Args:
audio (str): The audio to send.
"""
msg = {
"realtime_input": {
"media_chunks": [
{
"data": audio,
"mime_type": "audio/pcm",
}
]
}
}
await self.queue_input_audio_buffer_delta(audio)
if self._is_reading_events:
await self.connection.send(json.dumps(msg))
|
truncate_audio async
truncate_audio(audio_end_ms, content_index, item_id)
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def truncate_audio(self, audio_end_ms: int, content_index: int, item_id: str) -> None:
self.logger.info("This is not natively supported by Gemini Realtime API.")
pass
|
session_update async
session_update(session_options)
Record session updates to be applied when the connection is established.
PARAMETER | DESCRIPTION |
session_options | The session options to update. TYPE: dict[str, Any] |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def session_update(self, session_options: dict[str, Any]) -> None:
"""Record session updates to be applied when the connection is established.
Args:
session_options (dict[str, Any]): The session options to update.
"""
if self._is_reading_events:
self.logger.warning("Is reading events. Session update will be ignored.")
else:
self._pending_session_updates.update(session_options)
|
connect async
Connect to the Gemini Realtime API.
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| @asynccontextmanager
async def connect(self) -> AsyncGenerator[None, None]:
"""Connect to the Gemini Realtime API."""
try:
async with connect(
self._base_url, additional_headers={"Content-Type": "application/json"}
) as self._connection:
yield
finally:
self._connection = None
|
read_events async
Read Events from the Gemini Realtime Client
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| async def read_events(self) -> AsyncGenerator[RealtimeEvent, None]:
"""Read Events from the Gemini Realtime Client"""
if self._connection is None:
raise RuntimeError("Client is not connected, call connect() first.")
await self._initialize_session()
self._is_reading_events = True
async for event in self._read_events():
yield event
|
get_factory classmethod
get_factory(llm_config, logger, **kwargs)
Create a Realtime API client.
PARAMETER | DESCRIPTION |
llm_config | The LLM config for the client. TYPE: dict[str, Any] |
logger | The logger for the client. TYPE: Logger |
**kwargs | TYPE: Any DEFAULT: {} |
Source code in autogen/agentchat/realtime/experimental/clients/gemini/client.py
| @classmethod
def get_factory(
cls, llm_config: dict[str, Any], logger: Logger, **kwargs: Any
) -> Optional[Callable[[], "RealtimeClientProtocol"]]:
"""Create a Realtime API client.
Args:
llm_config: The LLM config for the client.
logger: The logger for the client.
**kwargs: Additional arguments.
Returns:
RealtimeClientProtocol: The Realtime API client is returned if the model matches the pattern
"""
if llm_config["config_list"][0].get("api_type") == "google" and list(kwargs.keys()) == []:
return lambda: GeminiRealtimeClient(llm_config=llm_config, logger=logger, **kwargs)
return None
|