Skip to content

parse_oai_message

autogen.agentchat.realtime.experimental.clients.oai.utils.parse_oai_message #

parse_oai_message(message)

Parse a message from the OpenAI Realtime API.

PARAMETER DESCRIPTION
message

The message to parse.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
RealtimeEvent

The parsed event.

TYPE: RealtimeEvent

Source code in autogen/agentchat/realtime/experimental/clients/oai/utils.py
def parse_oai_message(message: dict[str, Any]) -> RealtimeEvent:
    """Parse a message from the OpenAI Realtime API.

    Args:
        message (dict[str, Any]): The message to parse.

    Returns:
        RealtimeEvent: The parsed event.
    """
    if message.get("type") == "session.created":
        return SessionCreated(raw_message=message)
    elif message.get("type") == "session.updated":
        return SessionUpdated(raw_message=message)
    elif message.get("type") == "response.audio.delta":
        return AudioDelta(raw_message=message, delta=message["delta"], item_id=message["item_id"])
    elif message.get("type") == "input_audio_buffer.speech_started":
        return SpeechStarted(raw_message=message)
    elif message.get("type") == "input_audio_buffer.delta":
        return InputAudioBufferDelta(delta=message["delta"], item_id=None, raw_message=message)
    elif message.get("type") == "response.function_call_arguments.done":
        return FunctionCall(
            raw_message=message,
            call_id=message["call_id"],
            name=message["name"],
            arguments=json.loads(message["arguments"]),
        )
    else:
        return RealtimeEvent(raw_message=message)