Skip to content

OpenAIRealtimeWebRTCClient

autogen.agentchat.realtime.experimental.clients.oai.OpenAIRealtimeWebRTCClient #

OpenAIRealtimeWebRTCClient(*, llm_config, websocket, logger=None)

Bases: RealtimeClientBase

(Experimental) Client for OpenAI Realtime API that uses WebRTC protocol.

(Experimental) Client for OpenAI Realtime API.

PARAMETER DESCRIPTION
llm_config

The config for the client.

TYPE: dict[str, Any]

websocket

the websocket to use for the connection

TYPE: WebSocketProtocol

logger

the logger to use for logging events

TYPE: Optional[Logger] DEFAULT: None

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
def __init__(
    self,
    *,
    llm_config: dict[str, Any],
    websocket: "WebSocket",
    logger: Optional[Logger] = None,
) -> None:
    """(Experimental) Client for OpenAI Realtime API.

    Args:
        llm_config: The config for the client.
        websocket: the websocket to use for the connection
        logger: the logger to use for logging events
    """
    super().__init__()
    self._llm_config = llm_config
    self._logger = logger
    self._websocket = websocket

    config = llm_config["config_list"][0]
    self._model: str = config["model"]
    self._voice: str = config.get("voice", "alloy")
    self._temperature: float = llm_config.get("temperature", 0.8)  # type: ignore[union-attr]
    self._config = config
    self._base_url = config.get("base_url", "https://api.openai.com/v1/realtime/sessions")

logger property #

logger

Get the logger for the OpenAI Realtime API.

add_event async #

add_event(event)
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def add_event(self, event: Optional[RealtimeEvent]):
    await self._eventQueue.put(event)

get_event async #

get_event()
Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def get_event(self) -> Optional[RealtimeEvent]:
    return await self._eventQueue.get()

queue_input_audio_buffer_delta async #

queue_input_audio_buffer_delta(audio)

queue InputAudioBufferDelta.

PARAMETER DESCRIPTION
audio

The audio.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/realtime_client.py
async def queue_input_audio_buffer_delta(self, audio: str) -> None:
    """queue InputAudioBufferDelta.

    Args:
        audio (str): The audio.
    """
    await self.add_event(InputAudioBufferDelta(delta=audio, item_id=None, raw_message=dict()))

send_function_result async #

send_function_result(call_id, result)

Send the result of a function call to the OpenAI Realtime API.

PARAMETER DESCRIPTION
call_id

The ID of the function call.

TYPE: str

result

The result of the function call.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def send_function_result(self, call_id: str, result: str) -> None:
    """Send the result of a function call to the OpenAI Realtime API.

    Args:
        call_id (str): The ID of the function call.
        result (str): The result of the function call.
    """
    await self._websocket.send_json({
        "type": "conversation.item.create",
        "item": {
            "type": "function_call_output",
            "call_id": call_id,
            "output": result,
        },
    })
    await self._websocket.send_json({"type": "response.create"})

send_text async #

send_text(*, role, text)

Send a text message to the OpenAI Realtime API.

PARAMETER DESCRIPTION
role

The role of the message.

TYPE: str

text

The text of the message.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def send_text(self, *, role: Role, text: str) -> None:
    """Send a text message to the OpenAI Realtime API.

    Args:
        role (str): The role of the message.
        text (str): The text of the message.
    """
    # await self.connection.response.cancel() #why is this here?
    await self._websocket.send_json({
        "type": "response.cancel",
    })
    await self._websocket.send_json({
        "type": "conversation.item.create",
        "item": {"type": "message", "role": role, "content": [{"type": "input_text", "text": text}]},
    })
    # await self.connection.response.create()
    await self._websocket.send_json({"type": "response.create"})

send_audio async #

send_audio(audio)

Send audio to the OpenAI Realtime API. in case of WebRTC, audio is already sent by js client, so we just queue it in order to be logged.

PARAMETER DESCRIPTION
audio

The audio to send.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def send_audio(self, audio: str) -> None:
    """Send audio to the OpenAI Realtime API.
    in case of WebRTC, audio is already sent by js client, so we just queue it in order to be logged.

    Args:
        audio (str): The audio to send.
    """
    await self.queue_input_audio_buffer_delta(audio)

truncate_audio async #

truncate_audio(audio_end_ms, content_index, item_id)

Truncate audio in the OpenAI Realtime API.

PARAMETER DESCRIPTION
audio_end_ms

The end of the audio to truncate.

TYPE: int

content_index

The index of the content to truncate.

TYPE: int

item_id

The ID of the item to truncate.

TYPE: str

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def truncate_audio(self, audio_end_ms: int, content_index: int, item_id: str) -> None:
    """Truncate audio in the OpenAI Realtime API.

    Args:
        audio_end_ms (int): The end of the audio to truncate.
        content_index (int): The index of the content to truncate.
        item_id (str): The ID of the item to truncate.
    """
    await self._websocket.send_json({
        "type": "conversation.item.truncate",
        "content_index": content_index,
        "item_id": item_id,
        "audio_end_ms": audio_end_ms,
    })

session_update async #

session_update(session_options)

Send a session update to the OpenAI Realtime API.

In the case of WebRTC we can not send it directly, but we can send it to the javascript over the websocket, and rely on it to send session update to OpenAI

PARAMETER DESCRIPTION
session_options

The session options to update.

TYPE: dict[str, Any]

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def session_update(self, session_options: dict[str, Any]) -> None:
    """Send a session update to the OpenAI Realtime API.

    In the case of WebRTC we can not send it directly, but we can send it
    to the javascript over the websocket, and rely on it to send session
    update to OpenAI

    Args:
        session_options (dict[str, Any]): The session options to update.
    """
    logger = self.logger
    logger.info(f"Sending session update: {session_options}")
    # await self.connection.session.update(session=session_options)  # type: ignore[arg-type]
    await self._websocket.send_json({"type": "session.update", "session": session_options})
    logger.info("Sending session update finished")

session_init_data #

session_init_data()

Control initial session with OpenAI.

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
def session_init_data(self) -> list[dict[str, Any]]:
    """Control initial session with OpenAI."""
    session_update = {
        "turn_detection": {"type": "server_vad"},
        "voice": self._voice,
        "modalities": ["audio", "text"],
        "temperature": self._temperature,
    }
    return [{"type": "session.update", "session": session_update}]

connect async #

connect()

Connect to the OpenAI Realtime API.

In the case of WebRTC, we pass connection information over the websocket, so that javascript on the other end of websocket open actual connection to OpenAI

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
@asynccontextmanager
async def connect(self) -> AsyncGenerator[None, None]:
    """Connect to the OpenAI Realtime API.

    In the case of WebRTC, we pass connection information over the
    websocket, so that javascript on the other end of websocket open
    actual connection to OpenAI
    """
    try:
        base_url = self._base_url
        api_key = self._config.get("api_key", None)
        headers = {
            "Authorization": f"Bearer {api_key}",  # Use os.getenv to get from environment
            "Content-Type": "application/json",
        }
        data = {
            # "model": "gpt-4o-realtime-preview-2024-12-17",
            "model": self._model,
            "voice": self._voice,
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(base_url, headers=headers, json=data)
            response.raise_for_status()
            json_data = response.json()
            json_data["model"] = self._model
        if self._websocket is not None:
            session_init = self.session_init_data()
            await self._websocket.send_json({"type": "ag2.init", "config": json_data, "init": session_init})
        yield
    finally:
        pass

read_events async #

read_events()

Read events from the OpenAI Realtime API.

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
async def read_events(self) -> AsyncGenerator[RealtimeEvent, None]:
    """Read events from the OpenAI Realtime API."""
    async for event in self._read_events():
        yield event

get_factory classmethod #

get_factory(llm_config, logger, **kwargs)

Create a Realtime API client.

PARAMETER DESCRIPTION
llm_config

The config for the client.

TYPE: dict[str, Any]

logger

The logger to use for logging events.

TYPE: Logger

**kwargs

Additional arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
RealtimeClientProtocol

The Realtime API client is returned if the model matches the pattern

TYPE: Optional[Callable[[], RealtimeClientProtocol]]

Source code in autogen/agentchat/realtime/experimental/clients/oai/rtc_client.py
@classmethod
def get_factory(
    cls, llm_config: dict[str, Any], logger: Logger, **kwargs: Any
) -> Optional[Callable[[], "RealtimeClientProtocol"]]:
    """Create a Realtime API client.

    Args:
        llm_config: The config for the client.
        logger: The logger to use for logging events.
        **kwargs: Additional arguments.

    Returns:
        RealtimeClientProtocol: The Realtime API client is returned if the model matches the pattern
    """
    if llm_config["config_list"][0].get("api_type", "openai") == "openai" and list(kwargs.keys()) == ["websocket"]:
        return lambda: OpenAIRealtimeWebRTCClient(llm_config=llm_config, logger=logger, **kwargs)

    return None