From 7de1d1cfcf00cba461b42b29b37adc21460fc515 Mon Sep 17 00:00:00 2001 From: Mastermind-U Date: Thu, 13 Feb 2025 12:28:54 +0300 Subject: [PATCH 1/2] add: implement task utility for scheduled tasks management --- app/extra/scripts/task_base.py | 58 ++++++++++++++++++++++++++++++++++ app/schedule.py | 27 ++++++++-------- 2 files changed, 71 insertions(+), 14 deletions(-) create mode 100644 app/extra/scripts/task_base.py diff --git a/app/extra/scripts/task_base.py b/app/extra/scripts/task_base.py new file mode 100644 index 000000000..adb52b1bb --- /dev/null +++ b/app/extra/scripts/task_base.py @@ -0,0 +1,58 @@ +"""Task utils. + +Copyright (c) 2024 MultiFactor +License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE +""" +from __future__ import annotations + +from functools import wraps +from typing import Any, Awaitable, Callable, Coroutine, Generic, TypeVar + +from redis.asyncio import Redis + +T = TypeVar("T", bound=Callable[..., Awaitable | Coroutine]) + + +class Task(Generic[T]): + """Task.""" + + def __init__( + self, + f: T, + repeat: float, + one_time: bool, + global_: bool, + ) -> None: + """Init. + + :param T f: function + :param int repeat: repeat time + :param bool one_time: flag to run task only once + :param bool global_: flag to run task in lock mode + """ + self.f = f + self.repeat = repeat + self.one_time = one_time + self.global_ = global_ + + async def __call__(self, storage: Redis) -> None: + """Call.""" + if self.global_: + async with storage.lock(self.f.__name__): + await self.f() + else: + await self.f() + + +def task_metadata( + repeat: float, + one_time: bool = False, + global_: bool = False, +) -> Callable[[T], Task[T]]: + """Decorate a Task.""" + def decorator(f: T) -> Task[T]: + @wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> Task[T]: + return Task(f, repeat, one_time, global_) + return wrapper + return decorator diff --git a/app/schedule.py b/app/schedule.py index 34255de70..9e45273b4 100644 --- a/app/schedule.py +++ b/app/schedule.py @@ -11,24 +11,23 @@ from extra.scripts.check_ldap_principal import check_ldap_principal from extra.scripts.krb_pass_sync import read_and_save_krb_pwds from extra.scripts.principal_block_user_sync import principal_block_sync +from extra.scripts.task_base import Task from extra.scripts.uac_sync import disable_accounts from extra.scripts.update_krb5_config import update_krb5_config from ioc import MainProvider from ldap_protocol.dependency import resolve_deps -task_type: TypeAlias = Callable[..., Coroutine] - -_TASKS: set[tuple[task_type, float]] = { - (read_and_save_krb_pwds, 1.5), - (disable_accounts, 600.0), - (principal_block_sync, 60.0), - (check_ldap_principal, -1.0), - (update_krb5_config, -1.0), +_TASKS: set[Task] = { + read_and_save_krb_pwds, + disable_accounts, + principal_block_sync, + check_ldap_principal, + update_krb5_config, } async def _schedule( - task: task_type, + task: Task, wait: float, container: AsyncContainer, ) -> None: @@ -38,14 +37,14 @@ async def _schedule( :param AsyncContainer container: container :param float wait: time to wait after execution """ - logger.info("Registered: {}", task.__name__) + logger.info("Registered: {}", task.f.__name__) while True: async with container(scope=Scope.REQUEST) as ctnr: - handler = await resolve_deps(func=task, container=ctnr) + handler = await resolve_deps(func=task.f, container=ctnr) await handler() # NOTE: one-time tasks - if wait < 0: + if wait < 0.0: break await asyncio.sleep(wait) @@ -59,8 +58,8 @@ async def runner(settings: Settings) -> None: ) async with asyncio.TaskGroup() as tg: - for task, timeout in _TASKS: - tg.create_task(_schedule(task, timeout, container)) + for task in _TASKS: + tg.create_task(_schedule(task, task.repeat, container)) def _run() -> None: uvloop.run(runner(settings)) From c6a8865fc5af46bafb87a3f5712b3327bdb27e2c Mon Sep 17 00:00:00 2001 From: Mastermind-U Date: Thu, 13 Feb 2025 12:29:04 +0300 Subject: [PATCH 2/2] add: integrate task metadata for scheduled task management in various scripts --- app/extra/scripts/check_ldap_principal.py | 3 +++ app/extra/scripts/krb_pass_sync.py | 3 +++ app/extra/scripts/principal_block_user_sync.py | 3 +++ app/extra/scripts/uac_sync.py | 3 +++ app/extra/scripts/update_krb5_config.py | 3 +++ 5 files changed, 15 insertions(+) diff --git a/app/extra/scripts/check_ldap_principal.py b/app/extra/scripts/check_ldap_principal.py index cc13bc4dc..5a334545e 100644 --- a/app/extra/scripts/check_ldap_principal.py +++ b/app/extra/scripts/check_ldap_principal.py @@ -17,7 +17,10 @@ ) from ldap_protocol.utils.queries import get_base_directories +from .task_base import task_metadata + +@task_metadata(repeat=0, one_time=True, global_=True) async def check_ldap_principal( kadmin: AbstractKadmin, session: AsyncSession, diff --git a/app/extra/scripts/krb_pass_sync.py b/app/extra/scripts/krb_pass_sync.py index 93bc0f853..04a6f0860 100644 --- a/app/extra/scripts/krb_pass_sync.py +++ b/app/extra/scripts/krb_pass_sync.py @@ -15,10 +15,13 @@ from models import User from security import get_password_hash +from .task_base import task_metadata + _LOCK_FILE = ".lock" _PATH = "/var/spool/krb5-sync" +@task_metadata(repeat=1.5) async def read_and_save_krb_pwds(session: AsyncSession) -> None: """Process file queue with lock. diff --git a/app/extra/scripts/principal_block_user_sync.py b/app/extra/scripts/principal_block_user_sync.py index 5e56dd06b..8711cd4ad 100644 --- a/app/extra/scripts/principal_block_user_sync.py +++ b/app/extra/scripts/principal_block_user_sync.py @@ -21,7 +21,10 @@ ) from models import Attribute, Directory, User +from .task_base import task_metadata + +@task_metadata(repeat=60.0, global_=True) async def principal_block_sync( session: AsyncSession, settings: Settings, ) -> None: diff --git a/app/extra/scripts/uac_sync.py b/app/extra/scripts/uac_sync.py index 57383e0bd..14515dfcb 100644 --- a/app/extra/scripts/uac_sync.py +++ b/app/extra/scripts/uac_sync.py @@ -14,7 +14,10 @@ from ldap_protocol.utils.queries import add_lock_and_expire_attributes from models import Attribute, User +from .task_base import task_metadata + +@task_metadata(repeat=600.0, global_=True) async def disable_accounts( session: AsyncSession, kadmin: AbstractKadmin, settings: Settings, ) -> None: diff --git a/app/extra/scripts/update_krb5_config.py b/app/extra/scripts/update_krb5_config.py index 0d41fc3ea..ef34ac3ac 100644 --- a/app/extra/scripts/update_krb5_config.py +++ b/app/extra/scripts/update_krb5_config.py @@ -10,7 +10,10 @@ from ldap_protocol.kerberos import AbstractKadmin from ldap_protocol.utils.queries import get_base_directories +from .task_base import task_metadata + +@task_metadata(repeat=0.0, one_time=True, global_=True) async def update_krb5_config( kadmin: AbstractKadmin, session: AsyncSession,