Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"pydantic[email]>=2.10.6",
"django-ninja>=1.3.0",
"pyjwt>=2.10.1",
"pika>=1.3.2",
]

[dependency-groups]
Expand Down
96 changes: 96 additions & 0 deletions src/api/services/external/RabbitMQService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import atexit
import threading
from typing import Annotated
from functools import partial
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor

import pika
from pika import spec
from pika.channel import Channel
from pika.adapters.blocking_connection import BlockingChannel

from src.env import rabbitmq_config
from src.utils.svcs import Service
from src.utils.logger import Logger


@Service()
class RabbitMQService:
def __init__(self, logger: Annotated[Logger, "RabbitMQService"]) -> None:
self.logger = logger
self.__params = pika.ConnectionParameters(rabbitmq_config["host"])
self.__connection = None
self.__callbacks: dict[str, Callable] = {}

self.__executor = ThreadPoolExecutor()

with threading.RLock():
atexit.register(self.close)

def __connect(self, fn: str) -> tuple[pika.BlockingConnection, BlockingChannel]:
if self.__connection is None or self.__connection.is_closed:
Comment thread
CaptainAril marked this conversation as resolved.
self.__connection = pika.BlockingConnection(self.__params)
self.logger.info(f"Connected to RabbitMQ, from {fn}")
return self.__connection, self.__connection.channel()

def __publish_message(self, queue_name: str, message: str) -> None:
connection, channel = self.__connect("publish")

channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(
exchange="",
routing_key=queue_name,
body=message.encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
self.logger.debug(f"Published message to queue {queue_name}")
channel.close()
connection.close()

def publish(self, queue_name: str, message: str) -> None:
self.__executor.submit(self.__publish_message, queue_name, message)

def __process_message(
self,
ch: Channel,
method: spec.Basic.Deliver,
properties: spec.BasicProperties,
body: bytes,
queue_name: str,
) -> None:
try:
self.__callbacks[queue_name](body.decode())
except Exception as e:
self.logger.error(f"Error processing message: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)

def __consume(self, queue_name: str) -> None:
connection, channel = self.__connect("consume")
Comment thread
CaptainAril marked this conversation as resolved.

channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
queue=queue_name,
on_message_callback=partial(self.__process_message, queue_name=queue_name),
exclusive=True,
)
channel.start_consuming()
channel.close()
connection.close()

def register_callback(
self, queue_name: str, callback: Callable[[str], None]
) -> None:
if queue_name in self.__callbacks:
if self.__callbacks[queue_name] != callback:
self.__callbacks[queue_name] = callback
return

self.__callbacks[queue_name] = callback
self.__executor.submit(self.__consume, queue_name)

def close(self) -> None:
self.__executor.shutdown(wait=False)
Empty file.
6 changes: 6 additions & 0 deletions src/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class OTP(TypedDict):
lifetime: int


class RabbitMQ(TypedDict):
host: str


env = Env()

app: App = {
Expand Down Expand Up @@ -100,4 +104,6 @@ class OTP(TypedDict):

otp: OTP = {"lifetime": get_env_int("OTP_LIFETIME")}

rabbitmq_config: RabbitMQ = {"host": get_env_str("RABBITMQ_HOST")}

__all__ = ["app", "cache", "db", "env", "jwt_config", "log", "otp"]
11 changes: 11 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading