Skip to content

FunctionObserver

autogen.agentchat.realtime.experimental.FunctionObserver #

FunctionObserver(*, logger=None)

Bases: RealtimeObserver

Observer for handling function calls from the OpenAI Realtime API.

Observer for handling function calls from the OpenAI Realtime API.

Source code in autogen/agentchat/realtime/experimental/function_observer.py
def __init__(self, *, logger: Optional["Logger"] = None) -> None:
    """Observer for handling function calls from the OpenAI Realtime API."""
    super().__init__(logger=logger)

logger property #

logger

agent property #

agent

realtime_client property #

realtime_client

run async #

run(agent)

Run the observer with the agent.

When implementing, be sure to call self._ready_event.set() when the observer is ready to process events.

PARAMETER DESCRIPTION
agent

The realtime agent attached to the observer.

TYPE: RealtimeAgent

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def run(self, agent: "RealtimeAgent") -> None:
    """Run the observer with the agent.

    When implementing, be sure to call `self._ready_event.set()` when the observer is ready to process events.

    Args:
        agent (RealtimeAgent): The realtime agent attached to the observer.
    """
    self._agent = agent
    await self.initialize_session()
    self._ready_event.set()

    await self.run_loop()

wait_for_ready async #

wait_for_ready()

Get the event that is set when the observer is ready.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def wait_for_ready(self) -> None:
    """Get the event that is set when the observer is ready."""
    await self._ready_event.wait()

on_close async #

on_close()

Handle close of RealtimeClient.

Source code in autogen/agentchat/realtime/experimental/realtime_observer.py
async def on_close(self) -> None:
    """Handle close of RealtimeClient."""
    ...

on_event async #

on_event(event)

Handle function call events from the OpenAI Realtime API.

PARAMETER DESCRIPTION
event

The event from the OpenAI Realtime API.

TYPE: dict[str, Any]

Source code in autogen/agentchat/realtime/experimental/function_observer.py
async def on_event(self, event: RealtimeEvent) -> None:
    """Handle function call events from the OpenAI Realtime API.

    Args:
        event (dict[str, Any]): The event from the OpenAI Realtime API.
    """
    if isinstance(event, FunctionCall):
        self.logger.info("Received function call event")
        await self.call_function(
            call_id=event.call_id,
            name=event.name,
            kwargs=event.arguments,
        )

call_function async #

call_function(call_id, name, kwargs)

Call a function registered with the agent.

PARAMETER DESCRIPTION
call_id

The ID of the function call.

TYPE: str

name

The name of the function to call.

TYPE: str

kwargs

The arguments to pass to the function.

TYPE: Any[str, Any]

Source code in autogen/agentchat/realtime/experimental/function_observer.py
async def call_function(self, call_id: str, name: str, kwargs: dict[str, Any]) -> None:
    """Call a function registered with the agent.

    Args:
        call_id (str): The ID of the function call.
        name (str): The name of the function to call.
        kwargs (Any[str, Any]): The arguments to pass to the function.
    """
    if name in self.agent.registered_realtime_tools:
        func = self.agent.registered_realtime_tools[name].func
        func = func if asyncio.iscoroutinefunction(func) else asyncify(func)
        try:
            result = await func(**kwargs)
        except Exception:
            result = "Function call failed"
            self.logger.info(f"Function call failed: {name=}, {kwargs=}", stack_info=True)

        if isinstance(result, BaseModel):
            result = result.model_dump_json()
        elif not isinstance(result, str):
            try:
                result = json.dumps(result)
            except Exception:
                result = str(result)

        await self.realtime_client.send_function_result(call_id, result)
    else:
        self.logger.warning(f"Function {name} called, but is not registered with the realtime agent.")

initialize_session async #

initialize_session()

Add registered tools to OpenAI with a session update.

Source code in autogen/agentchat/realtime/experimental/function_observer.py
async def initialize_session(self) -> None:
    """Add registered tools to OpenAI with a session update."""
    session_update = {
        "tools": [tool.realtime_tool_schema for tool in self.agent.registered_realtime_tools.values()],
        "tool_choice": "auto",
    }
    await self.realtime_client.session_update(session_update)

run_loop async #

run_loop()

Run the observer loop.

Source code in autogen/agentchat/realtime/experimental/function_observer.py
async def run_loop(self) -> None:
    """Run the observer loop."""
    pass