Skip to content

IOWebsockets

autogen.io.IOWebsockets #

IOWebsockets(websocket)

Bases: IOStream

A websocket input/output stream.

Initialize the websocket input/output stream.

PARAMETER DESCRIPTION
websocket

The websocket server.

TYPE: ServerConnection

Source code in autogen/io/websockets.py
def __init__(self, websocket: ServerConnection) -> None:
    """Initialize the websocket input/output stream.

    Args:
        websocket (ServerConnection): The websocket server.
    """
    self._websocket = websocket

websocket property #

websocket

The URI of the websocket server.

set_global_default staticmethod #

set_global_default(stream)

Set the default input/output stream.

PARAMETER DESCRIPTION
stream

The input/output stream to set as the default.

TYPE: IOStream

Source code in autogen/io/base.py
@staticmethod
def set_global_default(stream: "IOStream") -> None:
    """Set the default input/output stream.

    Args:
        stream (IOStream): The input/output stream to set as the default.
    """
    IOStream._global_default = stream

get_global_default staticmethod #

get_global_default()

Get the default input/output stream.

RETURNS DESCRIPTION
IOStream

The default input/output stream.

TYPE: IOStream

Source code in autogen/io/base.py
@staticmethod
def get_global_default() -> "IOStream":
    """Get the default input/output stream.

    Returns:
        IOStream: The default input/output stream.
    """
    if IOStream._global_default is None:
        raise RuntimeError("No global default IOStream has been set")
    return IOStream._global_default

get_default staticmethod #

get_default()

Get the default input/output stream.

RETURNS DESCRIPTION
IOStream

The default input/output stream.

TYPE: IOStream

Source code in autogen/io/base.py
@staticmethod
def get_default() -> "IOStream":
    """Get the default input/output stream.

    Returns:
        IOStream: The default input/output stream.
    """
    iostream = IOStream._default_io_stream.get()
    if iostream is None:
        iostream = IOStream.get_global_default()
        # Set the default IOStream of the current context (thread/cooroutine)
        IOStream.set_default(iostream)
    return iostream

set_default staticmethod #

set_default(stream)

Set the default input/output stream.

PARAMETER DESCRIPTION
stream

The input/output stream to set as the default.

TYPE: IOStream

Source code in autogen/io/base.py
@staticmethod
@contextmanager
def set_default(stream: Optional["IOStream"]) -> Iterator[None]:
    """Set the default input/output stream.

    Args:
        stream (IOStream): The input/output stream to set as the default.
    """
    global _default_io_stream
    try:
        token = IOStream._default_io_stream.set(stream)
        yield
    finally:
        IOStream._default_io_stream.reset(token)

    return

run_server_in_thread staticmethod #

run_server_in_thread(*, host='127.0.0.1', port=8765, on_connect, ssl_context=None, **kwargs)

Factory function to create a websocket input/output stream.

PARAMETER DESCRIPTION
host

The host to bind the server to. Defaults to "127.0.0.1".

TYPE: str DEFAULT: '127.0.0.1'

port

The port to bind the server to. Defaults to 8765.

TYPE: int DEFAULT: 8765

on_connect

The function to be executed on client connection. Typically creates agents and initiate chat.

TYPE: Callable[[IOWebsockets], None]

ssl_context

The SSL context to use for secure connections. Defaults to None.

TYPE: Optional[SSLContext] DEFAULT: None

kwargs

Additional keyword arguments to pass to the websocket server.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
str

The URI of the websocket server.

TYPE:: str

Source code in autogen/io/websockets.py
@staticmethod
@contextmanager
def run_server_in_thread(
    *,
    host: str = "127.0.0.1",
    port: int = 8765,
    on_connect: Callable[["IOWebsockets"], None],
    ssl_context: Optional[ssl.SSLContext] = None,
    **kwargs: Any,
) -> Iterator[str]:
    """Factory function to create a websocket input/output stream.

    Args:
        host (str, optional): The host to bind the server to. Defaults to "127.0.0.1".
        port (int, optional): The port to bind the server to. Defaults to 8765.
        on_connect (Callable[[IOWebsockets], None]): The function to be executed on client connection. Typically creates agents and initiate chat.
        ssl_context (Optional[ssl.SSLContext], optional): The SSL context to use for secure connections. Defaults to None.
        kwargs (Any): Additional keyword arguments to pass to the websocket server.

    Yields:
        str: The URI of the websocket server.
    """
    server_dict: dict[str, WebSocketServer] = {}

    def _run_server() -> None:
        # print(f" - _run_server(): starting server on ws://{host}:{port}", flush=True)
        with ws_serve(
            handler=partial(IOWebsockets._handler, on_connect=on_connect),
            host=host,
            port=port,
            ssl_context=ssl_context,
            **kwargs,
        ) as server:
            # print(f" - _run_server(): server {server} started on ws://{host}:{port}", flush=True)

            server_dict["server"] = server

            # runs until the server is shutdown
            server.serve_forever()

            return

    # start server in a separate thread
    thread = threading.Thread(target=_run_server)
    thread.start()
    try:
        while "server" not in server_dict:
            sleep(0.1)

        yield f"ws://{host}:{port}"

    finally:
        # print(f" - run_server_in_thread(): shutting down server on ws://{host}:{port}", flush=True)
        # gracefully stop server
        if "server" in server_dict:
            # print(f" - run_server_in_thread(): shutting down server {server_dict['server']}", flush=True)
            server_dict["server"].shutdown()

        # wait for the thread to stop
        if thread:
            thread.join()

print #

print(*objects, sep=' ', end='\n', flush=False)

Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects. Defaults to " ".
        end (str, optional): The end of the output. Defaults to "

". flush (bool, optional): Whether to flush the output. Defaults to False.

Source code in autogen/io/websockets.py
def print(self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None:
    """Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects. Defaults to " ".
        end (str, optional): The end of the output. Defaults to "\n".
        flush (bool, optional): Whether to flush the output. Defaults to False.
    """
    print_message = PrintMessage(*objects, sep=sep, end=end)
    self.send(print_message)

send #

send(message)

Send a message to the output stream.

PARAMETER DESCRIPTION
message

The message to send.

TYPE: Any

Source code in autogen/io/websockets.py
def send(self, message: BaseMessage) -> None:
    """Send a message to the output stream.

    Args:
        message (Any): The message to send.
    """
    self._websocket.send(message.model_dump_json())

input #

input(prompt='', *, password=False)

Read a line from the input stream.

PARAMETER DESCRIPTION
prompt

The prompt to display. Defaults to "".

TYPE: str DEFAULT: ''

password

Whether to read a password. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

The line read from the input stream.

TYPE: str

Source code in autogen/io/websockets.py
def input(self, prompt: str = "", *, password: bool = False) -> str:
    """Read a line from the input stream.

    Args:
        prompt (str, optional): The prompt to display. Defaults to "".
        password (bool, optional): Whether to read a password. Defaults to False.

    Returns:
        str: The line read from the input stream.

    """
    if prompt != "":
        self._websocket.send(prompt)

    msg = self._websocket.recv()

    return msg.decode("utf-8") if isinstance(msg, bytes) else msg