diff --git a/CHANGES/7214.feature b/CHANGES/7214.feature new file mode 100644 index 00000000000..bae4d16c821 --- /dev/null +++ b/CHANGES/7214.feature @@ -0,0 +1 @@ +Added a "debug-logs" option to the TASK_DIAGNOSTICS / X-Task-Diagnostics feature - dumps task-specific logs into a profile artifact at a DEBUG level. \ No newline at end of file diff --git a/CHANGES/7215.feature b/CHANGES/7215.feature new file mode 100644 index 00000000000..20d24923aa0 --- /dev/null +++ b/CHANGES/7215.feature @@ -0,0 +1 @@ +Added a "logs" option to the TASK_DIAGNOSTICS / X-Task-Diagnostics feature - dumps task-specific logs into a profile artifact. diff --git a/CHANGES/7219.bugfix b/CHANGES/7219.bugfix new file mode 100644 index 00000000000..4788c96a705 --- /dev/null +++ b/CHANGES/7219.bugfix @@ -0,0 +1 @@ +"profile options" designated using the X-Task-Diagnostics feature should be propagated to child tasks. \ No newline at end of file diff --git a/docs/dev/guides/pull-request-walkthrough.md b/docs/dev/guides/pull-request-walkthrough.md index feb143ae036..d9865a2e817 100644 --- a/docs/dev/guides/pull-request-walkthrough.md +++ b/docs/dev/guides/pull-request-walkthrough.md @@ -14,9 +14,9 @@ Please be sure to follow the [Pulp policy on AI Usage](site:help/more/governance 1. Add `functional tests` or `unit tests` where appropriate and ensure tests are passing on the CI. -2. Add a [`CHANGES entry`](site:pulpcore/docs/dev/guides/git/#markdown-header-changelog-update). +2. Add a [`CHANGES entry`](site:pulpcore/docs/dev/guides/git/#changelog-update). 3. Update relevent `documentation`. Please build the docs to test! -4. If the PR is a simple feature or a bugfix, [`rebase and squash`](site:pulpcore/docs/dev/guides/git/#markdown-header-rebasing-and-squashing) to a single commit. +4. If the PR is a simple feature or a bugfix, [`rebase and squash`](site:pulpcore/docs/dev/guides/git/#rebasing-and-squashing) to a single commit. If the PR is a complex feature, make sure that all commits are cleanly separated and have meaningful commit messages. 5. Make sure you tag commits with `closes #IssueNumber` or `ref #IssueNumber` when working on a tracked issue. 6. If AI was used, make sure you are following the [Pulp policy on AI Usage](site:help/more/governance/ai_policy/) diff --git a/docs/dev/learn/tasks/diagnostics.md b/docs/dev/learn/tasks/diagnostics.md index 514c28f992e..cabbc4ef8c1 100644 --- a/docs/dev/learn/tasks/diagnostics.md +++ b/docs/dev/learn/tasks/diagnostics.md @@ -35,7 +35,10 @@ The following diagnostics are supported currently: - memray: Dumps a profile which can be processed with `memray`, which shows which lines and functions were responsible for the most allocations at the time of peak RSS of the process - +- logs: + Dumps all logs specific to the task. Formatting not guaranteed to be identical. +- debug-logs: + Same as the `logs` option except it exports logs at a DEBUG level, whether or not the system is configured to print DEBUG logs otherwise. ## Memory Logging diff --git a/pulpcore/app/contexts.py b/pulpcore/app/contexts.py index 76737ed155a..01f6da0ee5f 100644 --- a/pulpcore/app/contexts.py +++ b/pulpcore/app/contexts.py @@ -8,6 +8,7 @@ _current_task = ContextVar("current_task", default=None) _current_user_func = ContextVar("current_user", default=lambda: None) _current_domain = ContextVar("current_domain", default=None) +x_task_diagnostics_var = ContextVar("x_profile_task") @contextmanager @@ -44,11 +45,15 @@ def with_domain(domain): @contextmanager def with_task_context(task): with with_domain(task.pulp_domain), with_guid(task.logging_cid), with_user(task.user): - token = _current_task.set(task) + task_token = _current_task.set(task) + # If this task is being spawned by another task, we should inherit the profile options + # from the current task. + diagnostics_token = x_task_diagnostics_var.set(task.profile_options) try: yield finally: - _current_task.reset(token) + x_task_diagnostics_var.reset(diagnostics_token) + _current_task.reset(task_token) @asynccontextmanager @@ -59,8 +64,12 @@ def _fetch(task): domain, user = await _fetch(task) with with_domain(domain), with_guid(task.logging_cid), with_user(user): - token = _current_task.set(task) + task_token = _current_task.set(task) + # If this task is being spawned by another task, we should inherit the profile options + # from the current task. + diagnostics_token = x_task_diagnostics_var.set(task.profile_options) try: yield finally: - _current_task.reset(token) + x_task_diagnostics_var.reset(diagnostics_token) + _current_task.reset(task_token) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index e653aef0e58..0c3005058ac 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -260,6 +260,7 @@ "handlers": { "console": { "class": "logging.StreamHandler", + "level": "INFO", "formatter": "simple", "filters": ["correlation_id"], } @@ -382,8 +383,11 @@ # lines and functions, at the time of peak RSS of the task process. This adds significant # runtime overhead to the task process, 20-40%. Tweaking code might be warranted for # some advanced settings. +# * "logs" - Dumps the logs specific to each task. +# * "debug-logs" - Same as "logs" but it exports logs at a DEBUG level, even if the system is +# otherwise not configured to print DEBUG logs. # NOTE: "memray" and "pyinstrument" require additional packages to be installed on the system. -TASK_DIAGNOSTICS = [] # ["memory", "pyinstrument", "memray"] +TASK_DIAGNOSTICS = [] # ["memory", "pyinstrument", "memray", "logs", "debug-logs"] ANALYTICS = True diff --git a/pulpcore/middleware.py b/pulpcore/middleware.py index f31966d5ce0..b1a72a5dad0 100644 --- a/pulpcore/middleware.py +++ b/pulpcore/middleware.py @@ -1,7 +1,6 @@ import time import re -from contextvars import ContextVar from os import environ from django.http.response import Http404 @@ -10,6 +9,7 @@ from pulpcore.metrics import init_otel_meter from pulpcore.app.models import Domain +from pulpcore.app.contexts import x_task_diagnostics_var from pulpcore.app.util import ( get_worker_name, normalize_http_status, @@ -17,8 +17,6 @@ set_domain, ) -x_task_diagnostics_var = ContextVar("x_profile_task") - class DomainMiddleware: """ diff --git a/pulpcore/plugin/stages/api.py b/pulpcore/plugin/stages/api.py index ded0e372ee2..3f4156978f3 100644 --- a/pulpcore/plugin/stages/api.py +++ b/pulpcore/plugin/stages/api.py @@ -76,7 +76,6 @@ async def run(self): content = await self._in_q.get() if content is None: break - log.debug("%(name)s - next: %(content)s.", {"name": self, "content": content}) yield content async def batches(self, minsize=500): @@ -171,7 +170,6 @@ async def put(self, item): if item is None: raise ValueError(_("(None) not permitted.")) await self._out_q.put(item) - log.debug("{name} - put: {content}".format(name=self, content=item)) def __str__(self): return "[{id}] {name}".format(id=id(self), name=self.__class__.__name__) diff --git a/pulpcore/tasking/_util.py b/pulpcore/tasking/_util.py index 7274d7cb8ce..3982598ca38 100644 --- a/pulpcore/tasking/_util.py +++ b/pulpcore/tasking/_util.py @@ -122,6 +122,10 @@ def _execute_task_and_profile(task, profile_options): if "memray" in profile_options: _execute_task = _memray_diagnostic_decorator(temp_dir, _execute_task) + is_debug = "debug-logs" in profile_options + if "logs" in profile_options or is_debug: + _execute_task = _logging_decorator(temp_dir, is_debug, _execute_task) + _execute_task(task) @@ -218,6 +222,57 @@ def __memray_diagnostic_decorator(task): return __memray_diagnostic_decorator +def _logging_decorator(temp_dir, is_debug, func): + def __logging_decorator(task): + log_file_path = os.path.join(temp_dir, "task_logs.log") + + # Create a file handler that captures all logging levels + file_handler = logging.FileHandler(log_file_path, mode="w", encoding="utf-8") + file_handler.setLevel(logging.NOTSET) # Capture all levels + + # Create a formatter for consistent log formatting + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + file_handler.setFormatter(formatter) + + # Get the root logger to capture all logs + root_logger = logging.getLogger() + original_level = root_logger.level + + try: + # Add the handler to the root logger + root_logger.addHandler(file_handler) + + if is_debug: + # Temporarily lower the root logger level to allow all messages through + # The existing handlers maintain their own levels, so they won't be affected + root_logger.setLevel(logging.NOTSET) + + # Execute the task + func(task) + finally: + # Always restore original level and remove the handler + if is_debug: + root_logger.setLevel(original_level) + root_logger.removeHandler(file_handler) + file_handler.close() + + # Save the log file as a ProfileArtifact + artifact = Artifact.init_and_validate(log_file_path) + try: + # it's unlikely for a log file to be identical, but we retain the same check as the + # other decorators + artifact.save() + except IntegrityError: + artifact = Artifact.objects.get(sha256=artifact.sha256) + + ProfileArtifact.objects.get_or_create(artifact=artifact, name="task_logs", task=task) + _logger.info("Created task logging diagnostic data.") + + return __logging_decorator + + def dispatch_scheduled_tasks(): # Warning, dispatch_scheduled_tasks is not race condition free! now = timezone.now() diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index c187bd91957..9d9cb3c7853 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -21,7 +21,7 @@ get_domain, get_prn, ) -from pulpcore.app.contexts import with_task_context, awith_task_context +from pulpcore.app.contexts import with_task_context, awith_task_context, x_task_diagnostics_var from pulpcore.constants import ( TASK_FINAL_STATES, TASK_INCOMPLETE_STATES, @@ -30,7 +30,6 @@ TASK_WAKEUP_HANDLE, TASK_WAKEUP_UNBLOCK, ) -from pulpcore.middleware import x_task_diagnostics_var from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) @@ -66,6 +65,7 @@ def _execute_task(task): with with_task_context(task): task.set_running() domain = get_domain() + try: log_task_start(task, domain) task_function = get_task_function(task) @@ -336,6 +336,7 @@ async def adispatch( def get_task_payload( function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock ): + """Create arguments for creation of a new task""" payload = { "state": TASK_STATES.WAITING, "logging_cid": (get_guid()),