Skip to content

TwilioAudioAdapter

autogen.agentchat.realtime.experimental.TwilioAudioAdapter #

TwilioAudioAdapter(websocket, *, logger=None)

Bases: RealtimeObserver

Adapter for streaming audio from Twilio to OpenAI Realtime API and vice versa.

Adapter for streaming audio from Twilio to OpenAI Realtime API and vice versa.

PARAMETER DESCRIPTION
websocket

the websocket connection to the Twilio service

TYPE: WebSocketProtocol

logger

the logger to use for logging events

TYPE: Optional[Logger] DEFAULT: None

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
def __init__(self, websocket: "WebSocket", *, logger: Optional[Logger] = None):
    """Adapter for streaming audio from Twilio to OpenAI Realtime API and vice versa.

    Args:
        websocket: the websocket connection to the Twilio service
        logger: the logger to use for logging events
    """
    super().__init__(logger=logger)
    self.websocket = websocket

    # Connection specific state
    self.stream_sid = None
    self.latest_media_timestamp = 0
    self.last_assistant_item: Optional[str] = None
    self.mark_queue: list[str] = []
    self.response_start_timestamp_twilio: Optional[int] = None

logger property #

logger

agent property #

agent

realtime_client property #

realtime_client

websocket instance-attribute #

websocket = websocket

stream_sid instance-attribute #

stream_sid = None

latest_media_timestamp instance-attribute #

latest_media_timestamp = 0

last_assistant_item instance-attribute #

last_assistant_item = None

mark_queue instance-attribute #

mark_queue = []

response_start_timestamp_twilio instance-attribute #

response_start_timestamp_twilio = None

run async #

run(agent)

Run the observer with the agent.

When implementing, be sure to call self._ready_event.set() when the observer is ready to process events.

PARAMETER DESCRIPTION
agent

The realtime agent attached to the observer.

TYPE: RealtimeAgent

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def run(self, agent: "RealtimeAgent") -> None:
    """Run the observer with the agent.

    When implementing, be sure to call `self._ready_event.set()` when the observer is ready to process events.

    Args:
        agent (RealtimeAgent): The realtime agent attached to the observer.
    """
    self._agent = agent
    await self.initialize_session()
    self._ready_event.set()

    await self.run_loop()

wait_for_ready async #

wait_for_ready()

Get the event that is set when the observer is ready.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def wait_for_ready(self) -> None:
    """Get the event that is set when the observer is ready."""
    await self._ready_event.wait()

on_close async #

on_close()

Handle close of RealtimeClient.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def on_close(self) -> None:
    """Handle close of RealtimeClient."""
    ...

on_event async #

on_event(event)

Receive events from the OpenAI Realtime API, send audio back to Twilio.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
async def on_event(self, event: RealtimeEvent) -> None:
    """Receive events from the OpenAI Realtime API, send audio back to Twilio."""
    logger = self.logger

    if isinstance(event, AudioDelta):
        audio_payload = base64.b64encode(base64.b64decode(event.delta)).decode("utf-8")
        audio_delta = {"event": "media", "streamSid": self.stream_sid, "media": {"payload": audio_payload}}
        await self.websocket.send_json(audio_delta)

        if self.response_start_timestamp_twilio is None:
            self.response_start_timestamp_twilio = self.latest_media_timestamp
            if SHOW_TIMING_MATH:
                logger.info(f"Setting start timestamp for new response: {self.response_start_timestamp_twilio}ms")

        # Update last_assistant_item safely
        if event.item_id:
            self.last_assistant_item = event.item_id

        await self.send_mark()

    # Trigger an interruption. Your use case might work better using `input_audio_buffer.speech_stopped`, or combining the two.
    if isinstance(event, SpeechStarted):
        logger.info("Speech start detected.")
        if self.last_assistant_item:
            logger.info(f"Interrupting response with id: {self.last_assistant_item}")
            await self.handle_speech_started_event()

handle_speech_started_event async #

handle_speech_started_event()

Handle interruption when the caller's speech starts.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
async def handle_speech_started_event(self) -> None:
    """Handle interruption when the caller's speech starts."""
    logger = self.logger

    logger.info("Handling speech started event.")
    if self.mark_queue and self.response_start_timestamp_twilio is not None:
        elapsed_time = self.latest_media_timestamp - self.response_start_timestamp_twilio
        if SHOW_TIMING_MATH:
            logger.info(
                f"Calculating elapsed time for truncation: {self.latest_media_timestamp} - {self.response_start_timestamp_twilio} = {elapsed_time}ms"
            )

        if self.last_assistant_item:
            if SHOW_TIMING_MATH:
                logger.info(f"Truncating item with ID: {self.last_assistant_item}, Truncated at: {elapsed_time}ms")

            await self.realtime_client.truncate_audio(
                audio_end_ms=elapsed_time,
                content_index=0,
                item_id=self.last_assistant_item,
            )

        await self.websocket.send_json({"event": "clear", "streamSid": self.stream_sid})

        self.mark_queue.clear()
        self.last_assistant_item = None
        self.response_start_timestamp_twilio = None

send_mark async #

send_mark()

Send a mark of audio interruption to the Twilio websocket.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
async def send_mark(self) -> None:
    """Send a mark of audio interruption to the Twilio websocket."""
    if self.stream_sid:
        mark_event = {"event": "mark", "streamSid": self.stream_sid, "mark": {"name": "responsePart"}}
        await self.websocket.send_json(mark_event)
        self.mark_queue.append("responsePart")

run_loop async #

run_loop()

Run the adapter loop.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
async def run_loop(self) -> None:
    """Run the adapter loop."""
    logger = self.logger

    async for message in self.websocket.iter_text():
        try:
            data = json.loads(message)
            if data["event"] == "media":
                self.latest_media_timestamp = int(data["media"]["timestamp"])
                await self.realtime_client.send_audio(audio=data["media"]["payload"])
            elif data["event"] == "start":
                self.stream_sid = data["start"]["streamSid"]
                logger.info(f"Incoming stream has started {self.stream_sid}")
                self.response_start_timestamp_twilio = None
                self.latest_media_timestamp = 0
                self.last_assistant_item = None
            elif data["event"] == "mark":
                if self.mark_queue:
                    self.mark_queue.pop(0)
        except Exception as e:
            logger.warning(f"Error processing Twilio message: {e}", stack_info=True)

initialize_session async #

initialize_session()

Control initial session with OpenAI.

Source code in autogen/agentchat/realtime/experimental/audio_adapters/twilio_audio_adapter.py
async def initialize_session(self) -> None:
    """Control initial session with OpenAI."""
    session_update = {
        "input_audio_format": "g711_ulaw",
        "output_audio_format": "g711_ulaw",
    }
    await self.realtime_client.session_update(session_update)