From 64e1bb4f917f015190a070bc15d4c5a298816d3e Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 13 Apr 2024 18:48:00 +0900 Subject: [PATCH 1/6] Add a very minimal prometheus exporter interface --- aiomonitor/monitor.py | 83 +++++++++++++++++++----------- aiomonitor/telemetry/__init__.py | 0 aiomonitor/telemetry/app.py | 25 +++++++++ aiomonitor/telemetry/prometheus.py | 22 ++++++++ 4 files changed, 101 insertions(+), 29 deletions(-) create mode 100644 aiomonitor/telemetry/__init__.py create mode 100644 aiomonitor/telemetry/app.py create mode 100644 aiomonitor/telemetry/prometheus.py diff --git a/aiomonitor/monitor.py b/aiomonitor/monitor.py index 5a2ad70d..fb2c733a 100644 --- a/aiomonitor/monitor.py +++ b/aiomonitor/monitor.py @@ -16,7 +16,9 @@ from types import TracebackType from typing import ( Any, + AsyncIterator, Awaitable, + Callable, Coroutine, Dict, Final, @@ -35,6 +37,7 @@ from .exceptions import MissingTask from .task import TracedTask, persistent_coro +from .telemetry.app import init_telemetry from .termui.commands import interact from .types import ( CancellationChain, @@ -68,6 +71,7 @@ MONITOR_TERMUI_PORT: Final = 20101 MONITOR_WEBUI_PORT: Final = 20102 CONSOLE_PORT: Final = 20103 +TELEMETRY_PORT: Final = 20104 T = TypeVar("T") T_co = TypeVar("T_co", covariant=True) @@ -120,6 +124,8 @@ def __init__( webui_port: int = MONITOR_WEBUI_PORT, console_port: int = CONSOLE_PORT, console_enabled: bool = True, + telemetry_port: int = TELEMETRY_PORT, + telemetry_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: int = 1000, locals: Optional[Dict[str, Any]] = None, @@ -134,6 +140,8 @@ def __init__( self.console_locals = {"__name__": "__console__", "__doc__": None} else: self.console_locals = locals + self._telemetry_port = telemetry_port + self._telemetry_enabled = telemetry_enabled self.prompt = "monitor >>> " log.info( @@ -545,6 +553,26 @@ def _create_task( def _ui_main(self) -> None: asyncio.run(self._ui_main_async()) + @contextlib.asynccontextmanager + async def _webapp_ctx( + self, + app_factory: Callable[[], Awaitable[web.Application]], + port: int, + ) -> AsyncIterator[None]: + runner = web.AppRunner(await app_factory()) + await runner.setup() + site = web.TCPSite( + runner, + str(self._host), + port, + reuse_port=True, + ) + await site.start() + try: + yield + finally: + await runner.cleanup() + async def _ui_main_async(self) -> None: loop = asyncio.get_running_loop() self._termination_info_queue = janus.Queue() @@ -562,36 +590,29 @@ async def _ui_main_async(self) -> None: host=self._host, port=self._termui_port, ) - webui_app = await init_webui(self) - webui_runner = web.AppRunner(webui_app) - await webui_runner.setup() - webui_site = web.TCPSite( - webui_runner, - str(self._host), - self._webui_port, - reuse_port=True, - ) - await webui_site.start() telnet_server.start() - await asyncio.sleep(0) - self._ui_started.set() - try: - await self._ui_forever_future - except asyncio.CancelledError: - pass - finally: - termui_tasks = {*self._termui_tasks} - for termui_task in termui_tasks: - termui_task.cancel() - await asyncio.gather(*termui_tasks, return_exceptions=True) - self._ui_termination_handler_task.cancel() - self._ui_cancellation_handler_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._ui_termination_handler_task - with contextlib.suppress(asyncio.CancelledError): - await self._ui_cancellation_handler_task - await telnet_server.stop() - await webui_runner.cleanup() + async with ( + self._webapp_ctx(lambda: init_webui(self), self._webui_port), + self._webapp_ctx(lambda: init_telemetry(self), self._telemetry_port), + ): + await asyncio.sleep(0) + self._ui_started.set() + try: + await self._ui_forever_future + except asyncio.CancelledError: + pass + finally: + termui_tasks = {*self._termui_tasks} + for termui_task in termui_tasks: + termui_task.cancel() + await asyncio.gather(*termui_tasks, return_exceptions=True) + self._ui_termination_handler_task.cancel() + self._ui_cancellation_handler_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._ui_termination_handler_task + with contextlib.suppress(asyncio.CancelledError): + await self._ui_cancellation_handler_task + await telnet_server.stop() async def _ui_handle_termination_updates(self) -> None: while True: @@ -633,7 +654,9 @@ def start_monitor( port: int = MONITOR_TERMUI_PORT, # kept the name for backward compatibility console_port: int = CONSOLE_PORT, webui_port: int = MONITOR_WEBUI_PORT, + telemetry_port: int = TELEMETRY_PORT, console_enabled: bool = True, + telemetry_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: Optional[int] = None, locals: Optional[Dict[str, Any]] = None, @@ -659,6 +682,8 @@ def start_monitor( webui_port=webui_port, console_port=console_port, console_enabled=console_enabled, + telemetry_port=telemetry_port, + telemetry_enabled=telemetry_enabled, hook_task_factory=hook_task_factory, max_termination_history=( max_termination_history diff --git a/aiomonitor/telemetry/__init__.py b/aiomonitor/telemetry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/aiomonitor/telemetry/app.py b/aiomonitor/telemetry/app.py new file mode 100644 index 00000000..d6c96a04 --- /dev/null +++ b/aiomonitor/telemetry/app.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import dataclasses +from typing import TYPE_CHECKING + +from .prometheus import prometheus_exporter + +if TYPE_CHECKING: + from ..monitor import Monitor + +from aiohttp import web + + +@dataclasses.dataclass +class TelemetryContext: + monitor: Monitor + + +async def init_telemetry(monitor: Monitor) -> web.Application: + app = web.Application() + app["ctx"] = TelemetryContext( + monitor=monitor, + ) + app.router.add_route("GET", "/prometheus", prometheus_exporter) + return app diff --git a/aiomonitor/telemetry/prometheus.py b/aiomonitor/telemetry/prometheus.py new file mode 100644 index 00000000..320709b1 --- /dev/null +++ b/aiomonitor/telemetry/prometheus.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import asyncio +import io +import time +from typing import TYPE_CHECKING + +from aiohttp import web + +if TYPE_CHECKING: + from .app import TelemetryContext + + +async def prometheus_exporter(request: web.Request) -> web.Response: + ctx: TelemetryContext = request.app["ctx"] + out = io.StringIO() + print("# TYPE asyncio_tasks gauge", file=out) + now = time.time_ns() // 1000 # unix timestamp in msec + all_task_count = len(asyncio.all_tasks(ctx.monitor._monitored_loop)) + print(f"asyncio_running_tasks {all_task_count} {now}", file=out) + # TODO: count per name of explicitly named tasks (using labels) + return web.Response(body=out.getvalue()) From 4e99161ab3b3bc0db93a6d2547cc130a2f1f8d5f Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 13 Apr 2024 18:48:23 +0900 Subject: [PATCH 2/6] Use the latest ruff/black preview code style --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 884e9827..7131b4bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,9 @@ ignore = ["E203", "E731", "E501", "Q000"] known-first-party = ["aiomonitor"] split-on-trailing-comma = true +[tool.ruff.format] +preview = true # enable the black's preview style + [tool.mypy] ignore_missing_imports = true From 9204f03c31199bf8519a299c886e9b49a2687248 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Apr 2024 09:50:21 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- aiomonitor/task.py | 10 ++--- aiomonitor/termui/commands.py | 78 ++++++++++++++--------------------- aiomonitor/webui/app.py | 32 ++++++-------- examples/extension.py | 12 +++--- 4 files changed, 54 insertions(+), 78 deletions(-) diff --git a/aiomonitor/task.py b/aiomonitor/task.py index 3d385fa4..966965ef 100644 --- a/aiomonitor/task.py +++ b/aiomonitor/task.py @@ -50,12 +50,10 @@ def __init__( self._persistent = persistent def get_trace_id(self) -> str: - h = hash( - ( - id(self), - self.get_name(), - ) - ) + h = hash(( + id(self), + self.get_name(), + )) b = struct.pack("P", h) return base64.b32encode(b).rstrip(b"=").decode() diff --git a/aiomonitor/termui/commands.py b/aiomonitor/termui/commands.py index eeb0e643..98fa52c9 100644 --- a/aiomonitor/termui/commands.py +++ b/aiomonitor/termui/commands.py @@ -66,23 +66,19 @@ def _get_current_stderr() -> TextIO: def print_ok(msg: str) -> None: print_formatted_text( - FormattedText( - [ - ("ansibrightgreen", "✓ "), - ("", msg), - ] - ) + FormattedText([ + ("ansibrightgreen", "✓ "), + ("", msg), + ]) ) def print_fail(msg: str) -> None: print_formatted_text( - FormattedText( - [ - ("ansibrightred", "✗ "), - ("", msg), - ] - ) + FormattedText([ + ("ansibrightred", "✗ "), + ("", msg), + ]) ) @@ -117,11 +113,9 @@ async def interact(self: Monitor, connection: TelnetConnection) -> None: try: user_input = ( await prompt_session.prompt_async( - FormattedText( - [ - (style_prompt, self.prompt), - ] - ) + FormattedText([ + (style_prompt, self.prompt), + ]) ) ).strip() except KeyboardInterrupt: @@ -422,16 +416,14 @@ def do_ps( table_data: List[Tuple[str, str, str, str, str, str]] = [headers] tasks = self.format_running_task_list(filter_, persistent) for task in tasks: - table_data.append( - ( - task.task_id, - task.state, - task.name, - task.coro, - task.created_location, - task.since, - ) - ) + table_data.append(( + task.task_id, + task.state, + task.name, + task.coro, + task.created_location, + task.since, + )) table = AsciiTable(table_data) table.inner_row_border = False table.inner_column_border = False @@ -469,15 +461,13 @@ def do_ps_terminated( table_data: List[Tuple[str, str, str, str, str]] = [headers] tasks = self.format_terminated_task_list(filter_, persistent) for task in tasks: - table_data.append( - ( - task.task_id, - task.name, - task.coro, - task.started_since, - task.terminated_since, - ) - ) + table_data.append(( + task.task_id, + task.name, + task.coro, + task.started_since, + task.terminated_since, + )) table = AsciiTable(table_data) table.inner_row_border = False table.inner_column_border = False @@ -509,11 +499,9 @@ def do_where(ctx: click.Context, taskid: str) -> None: if item_type == "header": stdout.write("\n") print_formatted_text( - FormattedText( - [ - ("ansiwhite", item_text), - ] - ) + FormattedText([ + ("ansiwhite", item_text), + ]) ) else: stdout.write(textwrap.indent(item_text.strip("\n"), " ")) @@ -537,11 +525,9 @@ def do_where_terminated(ctx: click.Context, trace_id: str) -> None: if item_type == "header": stdout.write("\n") print_formatted_text( - FormattedText( - [ - ("ansiwhite", item_text), - ] - ) + FormattedText([ + ("ansiwhite", item_text), + ]) ) else: stdout.write(textwrap.indent(item_text.strip("\n"), " ")) diff --git a/aiomonitor/webui/app.py b/aiomonitor/webui/app.py index 22e6283f..2416662a 100644 --- a/aiomonitor/webui/app.py +++ b/aiomonitor/webui/app.py @@ -39,14 +39,12 @@ class TaskTypeParams(APIParams): @classmethod def get_checker(cls): - return t.Dict( - { - t.Key("task_type", default=TaskTypes.RUNNING): t.Enum( - TaskTypes.RUNNING, - TaskTypes.TERMINATED, - ), - } - ) + return t.Dict({ + t.Key("task_type", default=TaskTypes.RUNNING): t.Enum( + TaskTypes.RUNNING, + TaskTypes.TERMINATED, + ), + }) @dataclasses.dataclass @@ -55,11 +53,9 @@ class TaskIdParams(APIParams): @classmethod def get_checker(cls) -> t.Trafaret: - return t.Dict( - { - t.Key("task_id"): t.String, - } - ) + return t.Dict({ + t.Key("task_id"): t.String, + }) @dataclasses.dataclass @@ -69,12 +65,10 @@ class ListFilterParams(APIParams): @classmethod def get_checker(cls) -> t.Trafaret: - return t.Dict( - { - t.Key("filter", default=""): t.String(allow_blank=True), - t.Key("persistent", default=False): t.ToBool, - } - ) + return t.Dict({ + t.Key("filter", default=""): t.String(allow_blank=True), + t.Key("persistent", default=False): t.ToBool, + }) @dataclasses.dataclass diff --git a/examples/extension.py b/examples/extension.py index e98d76af..87082b71 100644 --- a/examples/extension.py +++ b/examples/extension.py @@ -35,13 +35,11 @@ def do_hello(ctx: click.Context) -> None: from prompt_toolkit.formatted_text import FormattedText print_formatted_text( - FormattedText( - [ - ("ansibrightblue", "Hello, "), - ("ansibrightyellow", "world, "), - ("ansibrightmagenta", "with color!"), - ] - ) + FormattedText([ + ("ansibrightblue", "Hello, "), + ("ansibrightyellow", "world, "), + ("ansibrightmagenta", "with color!"), + ]) ) # or: From cd9932f7e9cc7a1abaffcef35706760d86320662 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 13 Apr 2024 19:18:38 +0900 Subject: [PATCH 4/6] Add pid and explicit content type to the prometheus output --- aiomonitor/telemetry/prometheus.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/aiomonitor/telemetry/prometheus.py b/aiomonitor/telemetry/prometheus.py index 320709b1..de432a5a 100644 --- a/aiomonitor/telemetry/prometheus.py +++ b/aiomonitor/telemetry/prometheus.py @@ -2,6 +2,7 @@ import asyncio import io +import os import time from typing import TYPE_CHECKING @@ -16,7 +17,10 @@ async def prometheus_exporter(request: web.Request) -> web.Response: out = io.StringIO() print("# TYPE asyncio_tasks gauge", file=out) now = time.time_ns() // 1000 # unix timestamp in msec + pid = os.getpid() # we use threads to run the aiomonitor UI thread, so the pid is same to the monitored program. all_task_count = len(asyncio.all_tasks(ctx.monitor._monitored_loop)) - print(f"asyncio_running_tasks {all_task_count} {now}", file=out) + print( + 'asyncio_running_tasks{pid="%d"} %d %d' % (pid, all_task_count, now), file=out + ) # TODO: count per name of explicitly named tasks (using labels) - return web.Response(body=out.getvalue()) + return web.Response(body=out.getvalue(), content_type="text/plain") From bb53eb9cbeef8217878533449867ad536ae8eb05 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Thu, 25 Apr 2024 00:00:18 +0900 Subject: [PATCH 5/6] Make it Python 3.8 friendly --- aiomonitor/monitor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/aiomonitor/monitor.py b/aiomonitor/monitor.py index fb2c733a..6a4ada8b 100644 --- a/aiomonitor/monitor.py +++ b/aiomonitor/monitor.py @@ -591,10 +591,9 @@ async def _ui_main_async(self) -> None: port=self._termui_port, ) telnet_server.start() - async with ( - self._webapp_ctx(lambda: init_webui(self), self._webui_port), - self._webapp_ctx(lambda: init_telemetry(self), self._telemetry_port), - ): + async with self._webapp_ctx( + lambda: init_webui(self), self._webui_port + ), self._webapp_ctx(lambda: init_telemetry(self), self._telemetry_port): await asyncio.sleep(0) self._ui_started.set() try: From 450a617113a1153f8c88b0691538f048b3edb5b4 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Thu, 25 Apr 2024 00:33:06 +0900 Subject: [PATCH 6/6] Add a log message to show the telemetry service status --- aiomonitor/monitor.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/aiomonitor/monitor.py b/aiomonitor/monitor.py index 6a4ada8b..fc465b22 100644 --- a/aiomonitor/monitor.py +++ b/aiomonitor/monitor.py @@ -144,14 +144,23 @@ def __init__( self._telemetry_enabled = telemetry_enabled self.prompt = "monitor >>> " - log.info( - "Starting aiomonitor at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d", - { - "host": host, - "tport": termui_port, - "wport": webui_port, - }, - ) + if console_enabled: + log.info( + "Starting aiomonitor console at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d", + { + "host": host, + "tport": termui_port, + "wport": webui_port, + }, + ) + if telemetry_enabled: + log.info( + "Starting aiomonitor telemetry service at http://%(host)s:%(port)d", + { + "host": host, + "port": telemetry_port, + }, + ) self._closed = False self._started = False