Skip to content

WebSocketAudioAdapter

autogen.agentchat.realtime.experimental.WebSocketAudioAdapter #

WebSocketAudioAdapter(websocket, *, logger=None)

Bases: RealtimeObserver

Observer for handling function calls from the OpenAI Realtime API.

PARAMETER DESCRIPTION
websocket

The websocket connection.

TYPE: WebSocketProtocol

logger

The logger for the observer.

TYPE: Logger DEFAULT: None

Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
def __init__(self, websocket: "WebSocket", *, logger: Optional[Logger] = None) -> None:
    """Observer for handling function calls from the OpenAI Realtime API.

    Args:
        websocket (WebSocket): The websocket connection.
        logger (Logger): The logger for the observer.
    """
    super().__init__(logger=logger)
    self.websocket = websocket

    # Connection specific state
    self.stream_sid = None
    self.latest_media_timestamp = 0
    self.last_assistant_item: Optional[str] = None
    self.mark_queue: list[str] = []
    self.response_start_timestamp_socket: Optional[int] = None

logger property #

logger

agent property #

agent

realtime_client property #

realtime_client

websocket instance-attribute #

websocket = websocket

stream_sid instance-attribute #

stream_sid = None

latest_media_timestamp instance-attribute #

latest_media_timestamp = 0

last_assistant_item instance-attribute #

last_assistant_item = None

mark_queue instance-attribute #

mark_queue = []

response_start_timestamp_socket instance-attribute #

response_start_timestamp_socket = None

run async #

run(agent)

Run the observer with the agent.

When implementing, be sure to call self._ready_event.set() when the observer is ready to process events.

PARAMETER DESCRIPTION
agent

The realtime agent attached to the observer.

TYPE: RealtimeAgent

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def run(self, agent: "RealtimeAgent") -> None:
    """Run the observer with the agent.

    When implementing, be sure to call `self._ready_event.set()` when the observer is ready to process events.

    Args:
        agent (RealtimeAgent): The realtime agent attached to the observer.
    """
    self._agent = agent
    await self.initialize_session()
    self._ready_event.set()

    await self.run_loop()

wait_for_ready async #

wait_for_ready()

Get the event that is set when the observer is ready.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def wait_for_ready(self) -> None:
    """Get the event that is set when the observer is ready."""
    await self._ready_event.wait()

on_close async #

on_close()

Handle close of RealtimeClient.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def on_close(self) -> None:
    """Handle close of RealtimeClient."""
    ...

on_event async #

on_event(event)

Receive events from the OpenAI Realtime API, send audio back to websocket.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
async def on_event(self, event: RealtimeEvent) -> None:
    """Receive events from the OpenAI Realtime API, send audio back to websocket."""
    logger = self.logger

    if isinstance(event, AudioDelta):
        audio_payload = base64.b64encode(base64.b64decode(event.delta)).decode("utf-8")
        audio_delta = {"event": "media", "streamSid": self.stream_sid, "media": {"payload": audio_payload}}
        await self.websocket.send_json(audio_delta)

        if self.response_start_timestamp_socket is None:
            self.response_start_timestamp_socket = self.latest_media_timestamp
            if SHOW_TIMING_MATH:
                logger.info(f"Setting start timestamp for new response: {self.response_start_timestamp_socket}ms")

        # Update last_assistant_item safely
        if event.item_id:
            self.last_assistant_item = event.item_id

        await self.send_mark()

    # Trigger an interruption. Your use case might work better using `input_audio_buffer.speech_stopped`, or combining the two.
    if isinstance(event, SpeechStarted):
        logger.info("Speech start detected.")
        if self.last_assistant_item:
            logger.info(f"Interrupting response with id: {self.last_assistant_item}")
            await self.handle_speech_started_event()

handle_speech_started_event async #

handle_speech_started_event()

Handle interruption when the caller's speech starts.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
async def handle_speech_started_event(self) -> None:
    """Handle interruption when the caller's speech starts."""
    logger = self.logger
    logger.info("Handling speech started event.")
    if self.mark_queue and self.response_start_timestamp_socket is not None:
        elapsed_time = self.latest_media_timestamp - self.response_start_timestamp_socket
        if SHOW_TIMING_MATH:
            logger.info(
                f"Calculating elapsed time for truncation: {self.latest_media_timestamp} - {self.response_start_timestamp_socket} = {elapsed_time}ms"
            )

        if self.last_assistant_item:
            if SHOW_TIMING_MATH:
                logger.info(f"Truncating item with ID: {self.last_assistant_item}, Truncated at: {elapsed_time}ms")

            await self.realtime_client.truncate_audio(
                audio_end_ms=elapsed_time,
                content_index=0,
                item_id=self.last_assistant_item,
            )

        await self.websocket.send_json({"event": "clear", "streamSid": self.stream_sid})

        self.mark_queue.clear()
        self.last_assistant_item = None
        self.response_start_timestamp_socket = None

send_mark async #

send_mark()
Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
async def send_mark(self) -> None:
    if self.stream_sid:
        mark_event = {"event": "mark", "streamSid": self.stream_sid, "mark": {"name": "responsePart"}}
        await self.websocket.send_json(mark_event)
        self.mark_queue.append("responsePart")

initialize_session async #

initialize_session()

Control initial session with OpenAI.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
async def initialize_session(self) -> None:
    """Control initial session with OpenAI."""
    session_update = {"input_audio_format": "pcm16", "output_audio_format": "pcm16"}
    await self.realtime_client.session_update(session_update)

run_loop async #

run_loop()

Reads data from websocket and sends it to the RealtimeClient.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/websocket_audio_adapter.py
async def run_loop(self) -> None:
    """Reads data from websocket and sends it to the RealtimeClient."""
    logger = self.logger
    async for message in self.websocket.iter_text():
        try:
            data = json.loads(message)
            if data["event"] == "media":
                self.latest_media_timestamp = int(data["media"]["timestamp"])
                await self.realtime_client.send_audio(audio=data["media"]["payload"])
            elif data["event"] == "start":
                self.stream_sid = data["start"]["streamSid"]
                logger.info(f"Incoming stream has started {self.stream_sid}")
                self.response_start_timestamp_socket = None
                self.latest_media_timestamp = 0
                self.last_assistant_item = None
            elif data["event"] == "mark":
                if self.mark_queue:
                    self.mark_queue.pop(0)
        except Exception as e:
            logger.warning(f"Failed to process message: {e}", stack_info=True)