From f98a58e47df442ed9548b6d99b56176b27b5f213 Mon Sep 17 00:00:00 2001 From: "Ifechukwu T. Ogidi" Date: Sun, 13 Apr 2025 20:08:01 +0100 Subject: [PATCH] Rabbit MQ Prototype --- pyproject.toml | 1 + src/api/services/external/RabbitMQService.py | 96 ++++++++++++++++++++ src/api/services/external/__init__.py | 0 src/env.py | 6 ++ uv.lock | 11 +++ 5 files changed, 114 insertions(+) create mode 100644 src/api/services/external/RabbitMQService.py create mode 100644 src/api/services/external/__init__.py diff --git a/pyproject.toml b/pyproject.toml index 0bfd5dc..b96fdd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "pydantic[email]>=2.10.6", "django-ninja>=1.3.0", "pyjwt>=2.10.1", + "pika>=1.3.2", ] [dependency-groups] diff --git a/src/api/services/external/RabbitMQService.py b/src/api/services/external/RabbitMQService.py new file mode 100644 index 0000000..e03a69e --- /dev/null +++ b/src/api/services/external/RabbitMQService.py @@ -0,0 +1,96 @@ +import atexit +import threading +from typing import Annotated +from functools import partial +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor + +import pika +from pika import spec +from pika.channel import Channel +from pika.adapters.blocking_connection import BlockingChannel + +from src.env import rabbitmq_config +from src.utils.svcs import Service +from src.utils.logger import Logger + + +@Service() +class RabbitMQService: + def __init__(self, logger: Annotated[Logger, "RabbitMQService"]) -> None: + self.logger = logger + self.__params = pika.ConnectionParameters(rabbitmq_config["host"]) + self.__connection = None + self.__callbacks: dict[str, Callable] = {} + + self.__executor = ThreadPoolExecutor() + + with threading.RLock(): + atexit.register(self.close) + + def __connect(self, fn: str) -> tuple[pika.BlockingConnection, BlockingChannel]: + if self.__connection is None or self.__connection.is_closed: + self.__connection = pika.BlockingConnection(self.__params) + self.logger.info(f"Connected to RabbitMQ, from {fn}") + return self.__connection, self.__connection.channel() + + def __publish_message(self, queue_name: str, message: str) -> None: + connection, channel = self.__connect("publish") + + channel.queue_declare(queue=queue_name, durable=True) + channel.basic_publish( + exchange="", + routing_key=queue_name, + body=message.encode(), + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + self.logger.debug(f"Published message to queue {queue_name}") + channel.close() + connection.close() + + def publish(self, queue_name: str, message: str) -> None: + self.__executor.submit(self.__publish_message, queue_name, message) + + def __process_message( + self, + ch: Channel, + method: spec.Basic.Deliver, + properties: spec.BasicProperties, + body: bytes, + queue_name: str, + ) -> None: + try: + self.__callbacks[queue_name](body.decode()) + except Exception as e: + self.logger.error(f"Error processing message: {e}") + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + + def __consume(self, queue_name: str) -> None: + connection, channel = self.__connect("consume") + + channel.queue_declare(queue=queue_name, durable=True) + channel.basic_qos(prefetch_count=1) + + channel.basic_consume( + queue=queue_name, + on_message_callback=partial(self.__process_message, queue_name=queue_name), + exclusive=True, + ) + channel.start_consuming() + channel.close() + connection.close() + + def register_callback( + self, queue_name: str, callback: Callable[[str], None] + ) -> None: + if queue_name in self.__callbacks: + if self.__callbacks[queue_name] != callback: + self.__callbacks[queue_name] = callback + return + + self.__callbacks[queue_name] = callback + self.__executor.submit(self.__consume, queue_name) + + def close(self) -> None: + self.__executor.shutdown(wait=False) diff --git a/src/api/services/external/__init__.py b/src/api/services/external/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/env.py b/src/env.py index 4dbdd1f..7135338 100644 --- a/src/env.py +++ b/src/env.py @@ -50,6 +50,10 @@ class OTP(TypedDict): lifetime: int +class RabbitMQ(TypedDict): + host: str + + env = Env() app: App = { @@ -100,4 +104,6 @@ class OTP(TypedDict): otp: OTP = {"lifetime": get_env_int("OTP_LIFETIME")} +rabbitmq_config: RabbitMQ = {"host": get_env_str("RABBITMQ_HOST")} + __all__ = ["app", "cache", "db", "env", "jwt_config", "log", "otp"] diff --git a/uv.lock b/uv.lock index 68efb0b..2840d23 100644 --- a/uv.lock +++ b/uv.lock @@ -99,6 +99,7 @@ dependencies = [ { name = "django-ninja" }, { name = "faker" }, { name = "hiredis" }, + { name = "pika" }, { name = "psycopg2-binary" }, { name = "pydantic", extra = ["email"] }, { name = "pyjwt" }, @@ -126,6 +127,7 @@ requires-dist = [ { name = "django-ninja", specifier = ">=1.3.0" }, { name = "faker", specifier = ">=36.1.1" }, { name = "hiredis", specifier = ">=3.1.0" }, + { name = "pika", specifier = ">=1.3.2" }, { name = "psycopg2-binary", specifier = ">=2.9.10" }, { name = "pydantic", extras = ["email"], specifier = ">=2.10.6" }, { name = "pyjwt", specifier = ">=2.10.1" }, @@ -371,6 +373,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f4/5c/cab444afaa387dceac8debb817b52fd00596efcd2d54506c27311c6fe6a8/nodejs_wheel_binaries-22.14.0-py2.py3-none-win_arm64.whl", hash = "sha256:fd59c8e9a202221e316febe1624a1ae3b42775b7fb27737bf12ec79565983eaf", size = 36206637 }, ] +[[package]] +name = "pika" +version = "1.3.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f", size = 145029 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/f3/f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0/pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f", size = 155415 }, +] + [[package]] name = "platformdirs" version = "4.3.7"