Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/application/use_cases/comment_use_cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import datetime
from typing import List, Optional
from src.domain.entities.comment import Comment
from src.domain.exceptions import EntityNotFound, CommentNotFound
from src.domain.repositories.comment_repository import CommentRepository
from src.application.use_cases.log_use_cases import LogUseCase

class CreateCommentUseCase:
def __init__(self, comment_repository: CommentRepository, log_use_case: LogUseCase):
self.comment_repository = comment_repository
self.log_use_case = log_use_case

async def execute(self, user_id: int, content: str) -> Comment:
comment = Comment(post_id=None, user_id=user_id, content=content)
result = await self.comment_repository.create(comment)

if result:
await self.log_use_case.log_action(user_id=user_id, action="create_comment",
timestamp=datetime.datetime.utcnow().isoformat())
return result

class GetCommentUseCase:
def __init__(self, comment_repository: CommentRepository):
self.comment_repository = comment_repository

async def execute(self,
page: int = 1,
limit: int = 10,
sort: str = "created_at",
order: str = "desc",
) -> list[Comment]:
result = await self.comment_repository.get_with_pagination(
page=page,
limit=limit,
sort=sort,
order=order)
if not result or not result.get("comments"):
raise CommentNotFound(f"Comment not found")
return result

class GetAllCommentsUseCase:
def __init__(self, comment_repository: CommentRepository):
self.comment_repository = comment_repository

async def execute(self) -> List[Comment]:
comments = await self.comment_repository.get_all_comments()
return comments

class UpdateCommentUseCase:
def __init__(self, comment_repository: CommentRepository, log_use_case: LogUseCase):
self.comment_repository = comment_repository
self.log_use_case = log_use_case

async def execute(self, post_id: int, content: Optional[str] = None) -> Comment:
existing_comment = await self.comment_repository.get_by_id(post_id=post_id)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут и везде - если есть две записи в бд, которые не должны существовать отдельно друг от друга (в данном случае обновление комментария и лог об этом) то это нужно оборачивать в транзакцию

if not existing_comment:
raise EntityNotFound(f"Entity not found")

if content:
existing_comment.content = content
await self.log_use_case.log_action(user_id=existing_comment.user_id, action="update_comment",
timestamp=datetime.datetime.utcnow().isoformat())

updated_comment = await self.comment_repository.update(existing_comment)
if not updated_comment:
raise EntityNotFound(f"Entity not found")
return updated_comment
17 changes: 17 additions & 0 deletions src/application/use_cases/log_use_cases.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Юзкейсы - это бизнес-логика приложения. Обычно логирование к бизнес-логике не относится, но у тебя тут как-будто аудитлог, то есть бизнесовое требование сохранять все действия в сервисе. В таком случае лучше это назвать либо HistoryRecord, либо Event, либо AuditLog, чтобы можно было различать с системным логированием.

Также факт логирования у тебя не происходит отдельно сам по себе, поэтому это лучше вынести в сервисный слой

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Спасибо, учту)

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import datetime
from typing import Optional

from src.domain.entities.log import LogEntry
from src.domain.repositories.log_repository import LogRepository

class LogUseCase:
def __init__(self, log_repository: LogRepository):
self.log_repository = log_repository

async def log_action(self, user_id: Optional[int], action: str, timestamp: Optional[str] = None) -> None:
log = LogEntry(
user_id=user_id,
action=action,
timestamp=timestamp or datetime.datetime.utcnow().isoformat()
)
await self.log_repository.create(log)
11 changes: 11 additions & 0 deletions src/domain/entities/comment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class Comment:
post_id: Optional[int]
user_id: Optional[int]
content: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
9 changes: 9 additions & 0 deletions src/domain/entities/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class LogEntry:
user_id: int
action: str
timestamp: Optional[datetime]
4 changes: 4 additions & 0 deletions src/domain/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ class EntityAlreadyExists(DomainException):
class ValidationError(DomainException):
pass


class CommentNotFound(DomainException):
pass

30 changes: 30 additions & 0 deletions src/domain/repositories/comment_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from typing import List, Optional
from src.domain.entities.comment import Comment

class CommentRepository(ABC):
@abstractmethod
async def create(self, comment: Comment) -> Optional[Comment]:
pass

@abstractmethod
async def get_by_id(self, post_id: int) -> Optional[Comment]:
pass

@abstractmethod
async def get_all_comments(self) -> List[Comment]:
pass

@abstractmethod
async def get_with_pagination(self,
page: int,
limit: int,
sort: str,
order: str
) -> List[Comment]:
pass


@abstractmethod
async def update(self, comment: Comment) -> Optional[Comment]:
pass
9 changes: 9 additions & 0 deletions src/domain/repositories/log_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from src.domain.entities.log import LogEntry

class LogRepository(ABC):

@abstractmethod
async def create(self, log: LogEntry):
pass

32 changes: 21 additions & 11 deletions src/infrastructure/database/connection.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
import os
import asyncpg
import logging
from typing import Optional

from src.infrastructure.config import settings

logger = logging.getLogger(__name__)


class DatabaseConnection:
def __init__(self):
self.pool: Optional[asyncpg.Pool] = None
self.dsn = os.getenv('DATABASE_URL')

async def connect(self):
if not self.pool:
self.pool = await asyncpg.create_pool(
host=settings.database_host,
port=settings.database_port,
database=settings.database_name,
user=settings.database_user,
password=settings.database_password,
min_size=1,
max_size=20,
timeout=30.0,
command_timeout=60.0,
)
try:
logger.info("Попытка подключения к БД")
self.pool = await asyncpg.create_pool(
host=settings.database_host,
port=settings.database_port,
database=settings.database_name,
user=settings.database_user,
password=settings.database_password,
min_size=1,
max_size=20,
timeout=30.0,
command_timeout=60.0,
)
logger.info("Подключение к БД успешно установлено")
except (ConnectionError, OSError, Exception) as e:
logger.error(f"Ошибка подключения к БД: {e}")

async def disconnect(self):
if self.pool:
Expand Down
25 changes: 19 additions & 6 deletions src/infrastructure/database/migration_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import asyncpg
from pathlib import Path
from src.infrastructure.config import settings
import platform


class MigrationRunner:
def __init__(self, migrations_dir: str = "src/infrastructure/database/migrations"):
def __init__(self, migrations_dir: str = None):
base_dir = Path(__file__).resolve().parent
if migrations_dir is None:
migrations_dir = base_dir / "migrations"
self.migrations_dir = Path(migrations_dir)
self.migrations_dir.mkdir(parents=True, exist_ok=True)

Expand All @@ -23,20 +27,24 @@ async def _get_applied_migrations(self, conn):

async def _get_pending_migrations(self, conn):
applied = await self._get_applied_migrations(conn)
all_migrations = sorted([
f.stem for f in self.migrations_dir.glob("*.sql")
])
all_migrations = sorted([f.stem for f in self.migrations_dir.glob("*.sql")])
print(f"🔍 Found migrations: {all_migrations}")
return [m for m in all_migrations if m not in applied]

async def migrate(self):
print(
f"🔌 Connecting to {settings.database_name}@"
f"{settings.database_host}:{settings.database_port}"
f" as {settings.database_user}")
conn = await asyncpg.connect(
host=settings.database_host,
port=settings.database_port,
database=settings.database_name,
user=settings.database_user,
password=settings.database_password,
)

db_name = await conn.fetchval("SELECT current_database()")
print(f"📡 Connected to database: {db_name}")
try:
await self._ensure_migrations_table(conn)
pending = await self._get_pending_migrations(conn)
Expand Down Expand Up @@ -76,7 +84,10 @@ async def status(self):
user=settings.database_user,
password=settings.database_password,
)


db_name = await conn.fetchval("SELECT current_database()")
print(f"📡 Connected to database: {db_name}")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image


try:
await self._ensure_migrations_table(conn)
applied = await self._get_applied_migrations(conn)
Expand Down Expand Up @@ -112,6 +123,8 @@ async def main():
else:
await runner.migrate()

if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

if __name__ == "__main__":
asyncio.run(main())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE if not exists comments (
post_id serial PRIMARY KEY,
user_id INTEGER references users(id) ON DELETE CASCADE,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE if not exists logs (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
action VARCHAR(255) NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
97 changes: 97 additions & 0 deletions src/infrastructure/repositories/postgres_comment_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import Optional, Any, Coroutine

from src.domain.entities.comment import Comment
from src.domain.repositories.comment_repository import CommentRepository
from src.infrastructure.database.connection import DatabaseConnection

class PostgresCommentRepository(CommentRepository):
def __init__(self, db: DatabaseConnection):
self.db = db

def _map_row_to_comment(self, row) -> Comment | None:
if not row:
return None
return Comment(
post_id=row['post_id'],
user_id=row['user_id'],
content=row['content'],
created_at=row['created_at'],
updated_at=row['updated_at']
)

async def create(self, comment: Comment) -> Comment:
row = await self.db.fetchrow(
"""
insert into comments (user_id, content)
values ($1, $2)
returning post_id, user_id, content, created_at, updated_at
""",
comment.user_id, comment.content
)
return self._map_row_to_comment(row)

async def get_by_id(self, post_id: int) -> Optional[Comment]:
row = await self.db.fetchrow(
"""
select post_id, user_id, content, created_at, updated_at
from comments
where post_id = $1
""",
post_id
)
return self._map_row_to_comment(row)

async def get_all_comments(self) -> list[Comment]:
rows = await self.db.fetch(
"""
select post_id, user_id, content, created_at, updated_at
from comments
"""
)
return [self._map_row_to_comment(row) for row in rows]

async def update(self, comment: Comment) -> Optional[Comment]:
row = await self.db.fetchrow(
"""
update comments
set content = $1, updated_at = current_timestamp
where post_id = $2
returning post_id, user_id, content, created_at, updated_at
""",
comment.content, comment.post_id
)
return self._map_row_to_comment(row)

async def get_with_pagination(self,
page: int = 1,
limit: int = 10,
sort: str = "created_at",
order: str = "desc"
) -> dict[str, list[Comment | None] | int | Any]:
offset = (page - 1) * limit

if sort not in {"created_at", "updated_at"}:
sort = "created_at"
if order.lower() not in {"asc", "desc"}:
order = "desc"

query = f"""
SELECT post_id, user_id, content, created_at, updated_at
FROM comments
ORDER BY {sort} {order}
LIMIT $1 offset $2
"""
rows = await self.db.fetch(
query,
limit, offset
)

total_row = await self.db.fetchrow("SELECT COUNT(*) AS total FROM comments")
total = total_row["total"] if total_row else 0

comments = [self._map_row_to_comment(row) for row in rows]

return {
"comments": comments, "total": total,
"page": page, "limit": limit
}
Loading