-
Notifications
You must be signed in to change notification settings - Fork 1
Add comments microservice #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| import datetime | ||
| from typing import List, Optional | ||
| from src.domain.entities.comment import Comment | ||
| from src.domain.exceptions import EntityNotFound, CommentNotFound | ||
| from src.domain.repositories.comment_repository import CommentRepository | ||
| from src.application.use_cases.log_use_cases import LogUseCase | ||
|
|
||
| class CreateCommentUseCase: | ||
| def __init__(self, comment_repository: CommentRepository, log_use_case: LogUseCase): | ||
| self.comment_repository = comment_repository | ||
| self.log_use_case = log_use_case | ||
|
|
||
| async def execute(self, user_id: int, content: str) -> Comment: | ||
| comment = Comment(post_id=None, user_id=user_id, content=content) | ||
| result = await self.comment_repository.create(comment) | ||
|
|
||
| if result: | ||
| await self.log_use_case.log_action(user_id=user_id, action="create_comment", | ||
| timestamp=datetime.datetime.utcnow().isoformat()) | ||
| return result | ||
|
|
||
| class GetCommentUseCase: | ||
| def __init__(self, comment_repository: CommentRepository): | ||
| self.comment_repository = comment_repository | ||
|
|
||
| async def execute(self, | ||
| page: int = 1, | ||
| limit: int = 10, | ||
| sort: str = "created_at", | ||
| order: str = "desc", | ||
| ) -> list[Comment]: | ||
| result = await self.comment_repository.get_with_pagination( | ||
| page=page, | ||
| limit=limit, | ||
| sort=sort, | ||
| order=order) | ||
| if not result or not result.get("comments"): | ||
| raise CommentNotFound(f"Comment not found") | ||
| return result | ||
|
|
||
| class GetAllCommentsUseCase: | ||
| def __init__(self, comment_repository: CommentRepository): | ||
| self.comment_repository = comment_repository | ||
|
|
||
| async def execute(self) -> List[Comment]: | ||
| comments = await self.comment_repository.get_all_comments() | ||
| return comments | ||
|
|
||
| class UpdateCommentUseCase: | ||
| def __init__(self, comment_repository: CommentRepository, log_use_case: LogUseCase): | ||
| self.comment_repository = comment_repository | ||
| self.log_use_case = log_use_case | ||
|
|
||
| async def execute(self, post_id: int, content: Optional[str] = None) -> Comment: | ||
| existing_comment = await self.comment_repository.get_by_id(post_id=post_id) | ||
| if not existing_comment: | ||
| raise EntityNotFound(f"Entity not found") | ||
|
|
||
| if content: | ||
| existing_comment.content = content | ||
| await self.log_use_case.log_action(user_id=existing_comment.user_id, action="update_comment", | ||
| timestamp=datetime.datetime.utcnow().isoformat()) | ||
|
|
||
| updated_comment = await self.comment_repository.update(existing_comment) | ||
| if not updated_comment: | ||
| raise EntityNotFound(f"Entity not found") | ||
| return updated_comment | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Юзкейсы - это бизнес-логика приложения. Обычно логирование к бизнес-логике не относится, но у тебя тут как-будто аудитлог, то есть бизнесовое требование сохранять все действия в сервисе. В таком случае лучше это назвать либо HistoryRecord, либо Event, либо AuditLog, чтобы можно было различать с системным логированием. Также факт логирования у тебя не происходит отдельно сам по себе, поэтому это лучше вынести в сервисный слой
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Спасибо, учту) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| import datetime | ||
| from typing import Optional | ||
|
|
||
| from src.domain.entities.log import LogEntry | ||
| from src.domain.repositories.log_repository import LogRepository | ||
|
|
||
| class LogUseCase: | ||
| def __init__(self, log_repository: LogRepository): | ||
| self.log_repository = log_repository | ||
|
|
||
| async def log_action(self, user_id: Optional[int], action: str, timestamp: Optional[str] = None) -> None: | ||
| log = LogEntry( | ||
| user_id=user_id, | ||
| action=action, | ||
| timestamp=timestamp or datetime.datetime.utcnow().isoformat() | ||
| ) | ||
| await self.log_repository.create(log) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| from dataclasses import dataclass | ||
| from datetime import datetime | ||
| from typing import Optional | ||
|
|
||
| @dataclass | ||
| class Comment: | ||
| post_id: Optional[int] | ||
| user_id: Optional[int] | ||
| content: str | ||
| created_at: Optional[datetime] = None | ||
| updated_at: Optional[datetime] = None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from dataclasses import dataclass | ||
| from datetime import datetime | ||
| from typing import Optional | ||
|
|
||
| @dataclass | ||
| class LogEntry: | ||
| user_id: int | ||
| action: str | ||
| timestamp: Optional[datetime] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| from abc import ABC, abstractmethod | ||
| from typing import List, Optional | ||
| from src.domain.entities.comment import Comment | ||
|
|
||
| class CommentRepository(ABC): | ||
| @abstractmethod | ||
| async def create(self, comment: Comment) -> Optional[Comment]: | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| async def get_by_id(self, post_id: int) -> Optional[Comment]: | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| async def get_all_comments(self) -> List[Comment]: | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| async def get_with_pagination(self, | ||
| page: int, | ||
| limit: int, | ||
| sort: str, | ||
| order: str | ||
| ) -> List[Comment]: | ||
| pass | ||
|
|
||
|
|
||
| @abstractmethod | ||
| async def update(self, comment: Comment) -> Optional[Comment]: | ||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from abc import ABC, abstractmethod | ||
| from src.domain.entities.log import LogEntry | ||
|
|
||
| class LogRepository(ABC): | ||
|
|
||
| @abstractmethod | ||
| async def create(self, log: LogEntry): | ||
| pass | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,10 +2,14 @@ | |
| import asyncpg | ||
| from pathlib import Path | ||
| from src.infrastructure.config import settings | ||
| import platform | ||
|
|
||
|
|
||
| class MigrationRunner: | ||
| def __init__(self, migrations_dir: str = "src/infrastructure/database/migrations"): | ||
| def __init__(self, migrations_dir: str = None): | ||
| base_dir = Path(__file__).resolve().parent | ||
| if migrations_dir is None: | ||
| migrations_dir = base_dir / "migrations" | ||
| self.migrations_dir = Path(migrations_dir) | ||
| self.migrations_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
|
|
@@ -23,20 +27,24 @@ async def _get_applied_migrations(self, conn): | |
|
|
||
| async def _get_pending_migrations(self, conn): | ||
| applied = await self._get_applied_migrations(conn) | ||
| all_migrations = sorted([ | ||
| f.stem for f in self.migrations_dir.glob("*.sql") | ||
| ]) | ||
| all_migrations = sorted([f.stem for f in self.migrations_dir.glob("*.sql")]) | ||
| print(f"🔍 Found migrations: {all_migrations}") | ||
| return [m for m in all_migrations if m not in applied] | ||
|
|
||
| async def migrate(self): | ||
| print( | ||
| f"🔌 Connecting to {settings.database_name}@" | ||
| f"{settings.database_host}:{settings.database_port}" | ||
| f" as {settings.database_user}") | ||
| conn = await asyncpg.connect( | ||
| host=settings.database_host, | ||
| port=settings.database_port, | ||
| database=settings.database_name, | ||
| user=settings.database_user, | ||
| password=settings.database_password, | ||
| ) | ||
|
|
||
| db_name = await conn.fetchval("SELECT current_database()") | ||
| print(f"📡 Connected to database: {db_name}") | ||
| try: | ||
| await self._ensure_migrations_table(conn) | ||
| pending = await self._get_pending_migrations(conn) | ||
|
|
@@ -76,7 +84,10 @@ async def status(self): | |
| user=settings.database_user, | ||
| password=settings.database_password, | ||
| ) | ||
|
|
||
|
|
||
| db_name = await conn.fetchval("SELECT current_database()") | ||
| print(f"📡 Connected to database: {db_name}") | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| try: | ||
| await self._ensure_migrations_table(conn) | ||
| applied = await self._get_applied_migrations(conn) | ||
|
|
@@ -112,6 +123,8 @@ async def main(): | |
| else: | ||
| await runner.migrate() | ||
|
|
||
| if platform.system() == "Windows": | ||
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | ||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| CREATE TABLE if not exists comments ( | ||
| post_id serial PRIMARY KEY, | ||
| user_id INTEGER references users(id) ON DELETE CASCADE, | ||
| content TEXT NOT NULL, | ||
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | ||
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | ||
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| CREATE TABLE if not exists logs ( | ||
| id SERIAL PRIMARY KEY, | ||
| user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, | ||
| action VARCHAR(255) NOT NULL, | ||
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP | ||
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| from typing import Optional, Any, Coroutine | ||
|
|
||
| from src.domain.entities.comment import Comment | ||
| from src.domain.repositories.comment_repository import CommentRepository | ||
| from src.infrastructure.database.connection import DatabaseConnection | ||
|
|
||
| class PostgresCommentRepository(CommentRepository): | ||
| def __init__(self, db: DatabaseConnection): | ||
| self.db = db | ||
|
|
||
| def _map_row_to_comment(self, row) -> Comment | None: | ||
| if not row: | ||
| return None | ||
| return Comment( | ||
| post_id=row['post_id'], | ||
| user_id=row['user_id'], | ||
| content=row['content'], | ||
| created_at=row['created_at'], | ||
| updated_at=row['updated_at'] | ||
| ) | ||
|
|
||
| async def create(self, comment: Comment) -> Comment: | ||
| row = await self.db.fetchrow( | ||
| """ | ||
| insert into comments (user_id, content) | ||
| values ($1, $2) | ||
| returning post_id, user_id, content, created_at, updated_at | ||
| """, | ||
| comment.user_id, comment.content | ||
| ) | ||
| return self._map_row_to_comment(row) | ||
|
|
||
| async def get_by_id(self, post_id: int) -> Optional[Comment]: | ||
| row = await self.db.fetchrow( | ||
| """ | ||
| select post_id, user_id, content, created_at, updated_at | ||
| from comments | ||
| where post_id = $1 | ||
| """, | ||
| post_id | ||
| ) | ||
| return self._map_row_to_comment(row) | ||
|
|
||
| async def get_all_comments(self) -> list[Comment]: | ||
| rows = await self.db.fetch( | ||
| """ | ||
| select post_id, user_id, content, created_at, updated_at | ||
| from comments | ||
| """ | ||
| ) | ||
| return [self._map_row_to_comment(row) for row in rows] | ||
|
|
||
| async def update(self, comment: Comment) -> Optional[Comment]: | ||
| row = await self.db.fetchrow( | ||
| """ | ||
| update comments | ||
| set content = $1, updated_at = current_timestamp | ||
| where post_id = $2 | ||
| returning post_id, user_id, content, created_at, updated_at | ||
| """, | ||
| comment.content, comment.post_id | ||
| ) | ||
| return self._map_row_to_comment(row) | ||
|
|
||
| async def get_with_pagination(self, | ||
| page: int = 1, | ||
| limit: int = 10, | ||
| sort: str = "created_at", | ||
| order: str = "desc" | ||
| ) -> dict[str, list[Comment | None] | int | Any]: | ||
| offset = (page - 1) * limit | ||
|
|
||
| if sort not in {"created_at", "updated_at"}: | ||
| sort = "created_at" | ||
| if order.lower() not in {"asc", "desc"}: | ||
| order = "desc" | ||
|
|
||
| query = f""" | ||
| SELECT post_id, user_id, content, created_at, updated_at | ||
| FROM comments | ||
| ORDER BY {sort} {order} | ||
| LIMIT $1 offset $2 | ||
| """ | ||
| rows = await self.db.fetch( | ||
| query, | ||
| limit, offset | ||
| ) | ||
|
|
||
| total_row = await self.db.fetchrow("SELECT COUNT(*) AS total FROM comments") | ||
| total = total_row["total"] if total_row else 0 | ||
|
|
||
| comments = [self._map_row_to_comment(row) for row in rows] | ||
|
|
||
| return { | ||
| "comments": comments, "total": total, | ||
| "page": page, "limit": limit | ||
| } |

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
тут и везде - если есть две записи в бд, которые не должны существовать отдельно друг от друга (в данном случае обновление комментария и лог об этом) то это нужно оборачивать в транзакцию