diff --git a/bot/handlers/admin.py b/bot/handlers/admin.py index 0d289b0..ddcf81c 100644 --- a/bot/handlers/admin.py +++ b/bot/handlers/admin.py @@ -6,7 +6,7 @@ from aiogram import Bot, F, Router from aiogram.filters import Command from aiogram.fsm.context import FSMContext -from aiogram.types import BufferedInputFile, CallbackQuery, InlineKeyboardMarkup, Message +from aiogram.types import BufferedInputFile, CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message from bot.container import Services from bot.formatters import ( @@ -1266,7 +1266,6 @@ async def admin_trial_list(callback: CallbackQuery, services: Services) -> None: nav_rows.append(("Назад", f"admin:trial:{page - 1}")) if has_next: nav_rows.append(("Дальше", f"admin:trial:{page + 1}")) - from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup keyboard: list[list[InlineKeyboardButton]] = [] for req in requests: keyboard.append([ @@ -1482,7 +1481,6 @@ async def _show_announcement_batches(callback: CallbackQuery, services: Services def _simple_nav(rows: list[tuple[str, str]], back_data: str) -> InlineKeyboardMarkup: - from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup keyboard = [] if rows: diff --git a/repositories/announcements.py b/repositories/announcements.py index 2ec19da..52c083f 100644 --- a/repositories/announcements.py +++ b/repositories/announcements.py @@ -206,36 +206,40 @@ async def refresh_batch_counts(self, announcement_id: int, now: str) -> None: """ UPDATE announcement_batches SET - success_count = ( - SELECT COUNT(*) FROM announcement_deliveries - WHERE announcement_id = ? AND status = 'sent' - ), - failed_count = ( - SELECT COUNT(*) FROM announcement_deliveries - WHERE announcement_id = ? AND status = 'failed' - ), - skipped_count = ( - SELECT COUNT(*) FROM announcement_deliveries - WHERE announcement_id = ? AND status = 'skipped' - ), + success_count = agg.sent, + failed_count = agg.failed, + skipped_count = agg.skipped, updated_at = ? + FROM ( + SELECT + COALESCE(SUM(status = 'sent'), 0) AS sent, + COALESCE(SUM(status = 'failed'), 0) AS failed, + COALESCE(SUM(status = 'skipped'), 0) AS skipped + FROM announcement_deliveries + WHERE announcement_id = ? + ) AS agg WHERE id = ? """, - (announcement_id, announcement_id, announcement_id, now, announcement_id), + (now, announcement_id, announcement_id), ) await self.db.commit() - async def delivery_user_ids_by_status(self, announcement_id: int, status: str) -> tuple[int, ...]: + async def delivery_user_ids_grouped( + self, announcement_id: int + ) -> dict[str, tuple[int, ...]]: cursor = await self.db.conn.execute( """ - SELECT user_id FROM announcement_deliveries - WHERE announcement_id = ? AND status = ? + SELECT user_id, status FROM announcement_deliveries + WHERE announcement_id = ? ORDER BY user_id ASC """, - (announcement_id, status), + (announcement_id,), ) rows = await cursor.fetchall() - return tuple(int(row["user_id"]) for row in rows) + grouped: dict[str, list[int]] = {} + for row in rows: + grouped.setdefault(str(row["status"]), []).append(int(row["user_id"])) + return {status: tuple(ids) for status, ids in grouped.items()} def _truncate_error(value: str | None, limit: int = 256) -> str | None: diff --git a/services/announcements.py b/services/announcements.py index 8d5e12c..ce78374 100644 --- a/services/announcements.py +++ b/services/announcements.py @@ -354,9 +354,10 @@ async def _ledger_result(self, announcement_id: int, *, last_seen_id: int | None batch = await self.announcements.get_batch(announcement_id) if batch is None: raise NotFound("Объявление не найдено") - delivered = await self.announcements.delivery_user_ids_by_status(announcement_id, "sent") - failed = await self.announcements.delivery_user_ids_by_status(announcement_id, "failed") - skipped = await self.announcements.delivery_user_ids_by_status(announcement_id, "skipped") + grouped = await self.announcements.delivery_user_ids_grouped(announcement_id) + delivered = grouped.get("sent", ()) + failed = grouped.get("failed", ()) + skipped = grouped.get("skipped", ()) return AnnouncementResult( announcement_id=announcement_id, total=batch.total_count, diff --git a/services/anomaly_detection.py b/services/anomaly_detection.py index 38513b7..d4caf5c 100644 --- a/services/anomaly_detection.py +++ b/services/anomaly_detection.py @@ -148,6 +148,11 @@ def _parse_xray_log_tail( async def _check_thresholds(self, now: float) -> None: for key_id in list(self._observations): unique_ips = self._unique_ips_in_window(key_id, now) + if not self._observations[key_id]: + # All samples aged out of the window; drop the key so the + # observation maps don't accumulate an entry per key seen. + del self._observations[key_id] + self._last_alerted.pop(key_id, None) if len(unique_ips) < self._min_unique_ips: continue last = self._last_alerted.get(key_id, 0.0) diff --git a/services/traffic_stats.py b/services/traffic_stats.py index 88eca30..8a82d11 100644 --- a/services/traffic_stats.py +++ b/services/traffic_stats.py @@ -74,15 +74,19 @@ async def refresh_views(self, keys: list[VpnKey]) -> list[KeyTrafficStatsView]: ) current_stats = await self.stats.list_by_key_ids(key_ids) views: list[KeyTrafficStatsView] = [] - for key in keys: - stats: TrafficStats | None - if key.key_type == VpnKeyType.AWG: - stats = await self._refresh_awg_key(key, current_stats.get(key.id), awg_transfers, awg_error) - elif key.key_type == VpnKeyType.XRAY: - stats = await self._refresh_xray_key(key, current_stats.get(key.id), xray_stats, xray_error) - else: - stats = None - views.append(KeyTrafficStatsView(key=key, owner=owners.get(key.owner_user_id), stats=stats)) + # Coalesce the per-key upserts into a single commit. Each upsert's own + # commit() is a no-op inside an explicit transaction, so a batch of N + # keys flushes once instead of issuing N fsyncs (synchronous=FULL). + async with self.stats.db.transaction(): + for key in keys: + stats: TrafficStats | None + if key.key_type == VpnKeyType.AWG: + stats = await self._refresh_awg_key(key, current_stats.get(key.id), awg_transfers, awg_error) + elif key.key_type == VpnKeyType.XRAY: + stats = await self._refresh_xray_key(key, current_stats.get(key.id), xray_stats, xray_error) + else: + stats = None + views.append(KeyTrafficStatsView(key=key, owner=owners.get(key.owner_user_id), stats=stats)) return views async def refresh_all_awg(self) -> None: