(Experimental) A client for communicating with a Jupyter gateway server.
Source code in autogen/coding/jupyter/jupyter_client.py
| def __init__(self, connection_info: JupyterConnectionInfo):
"""(Experimental) A client for communicating with a Jupyter gateway server.
Args:
connection_info (JupyterConnectionInfo): Connection information
"""
self._connection_info = connection_info
self._session = requests.Session()
retries = Retry(total=5, backoff_factor=0.1)
self._session.mount("http://", HTTPAdapter(max_retries=retries))
|
list_kernel_specs
Source code in autogen/coding/jupyter/jupyter_client.py
| def list_kernel_specs(self) -> dict[str, dict[str, str]]:
response = self._session.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers())
return cast(dict[str, dict[str, str]], response.json())
|
list_kernels
Source code in autogen/coding/jupyter/jupyter_client.py
| def list_kernels(self) -> list[dict[str, str]]:
response = self._session.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers())
return cast(list[dict[str, str]], response.json())
|
start_kernel
start_kernel(kernel_spec_name)
Start a new kernel.
PARAMETER | DESCRIPTION |
kernel_spec_name | Name of the kernel spec to start TYPE: str |
RETURNS | DESCRIPTION |
str | TYPE: str |
Source code in autogen/coding/jupyter/jupyter_client.py
| def start_kernel(self, kernel_spec_name: str) -> str:
"""Start a new kernel.
Args:
kernel_spec_name (str): Name of the kernel spec to start
Returns:
str: ID of the started kernel
"""
response = self._session.post(
f"{self._get_api_base_url()}/api/kernels",
headers=self._get_headers(),
json={"name": kernel_spec_name},
)
return cast(str, response.json()["id"])
|
delete_kernel
Source code in autogen/coding/jupyter/jupyter_client.py
| def delete_kernel(self, kernel_id: str) -> None:
response = self._session.delete(
f"{self._get_api_base_url()}/api/kernels/{kernel_id}", headers=self._get_headers()
)
response.raise_for_status()
|
restart_kernel
restart_kernel(kernel_id)
Source code in autogen/coding/jupyter/jupyter_client.py
| def restart_kernel(self, kernel_id: str) -> None:
response = self._session.post(
f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers()
)
response.raise_for_status()
|
get_kernel_client
get_kernel_client(kernel_id)
Source code in autogen/coding/jupyter/jupyter_client.py
| @require_optional_import("websocket", "jupyter-executor")
def get_kernel_client(self, kernel_id: str) -> JupyterKernelClient:
ws_url = f"{self._get_ws_base_url()}/api/kernels/{kernel_id}/channels"
ws = websocket.create_connection(ws_url, header=self._get_headers())
return JupyterKernelClient(ws)
|