Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions bot/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aiogram import Bot, F, Router
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import BufferedInputFile, CallbackQuery, InlineKeyboardMarkup, Message
from aiogram.types import BufferedInputFile, CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message

from bot.container import Services
from bot.formatters import (
Expand Down Expand Up @@ -1266,7 +1266,6 @@ async def admin_trial_list(callback: CallbackQuery, services: Services) -> None:
nav_rows.append(("Назад", f"admin:trial:{page - 1}"))
if has_next:
nav_rows.append(("Дальше", f"admin:trial:{page + 1}"))
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
keyboard: list[list[InlineKeyboardButton]] = []
for req in requests:
keyboard.append([
Expand Down Expand Up @@ -1482,7 +1481,6 @@ async def _show_announcement_batches(callback: CallbackQuery, services: Services


def _simple_nav(rows: list[tuple[str, str]], back_data: str) -> InlineKeyboardMarkup:
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup

keyboard = []
if rows:
Expand Down
40 changes: 22 additions & 18 deletions repositories/announcements.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,40 @@ async def refresh_batch_counts(self, announcement_id: int, now: str) -> None:
"""
UPDATE announcement_batches
SET
success_count = (
SELECT COUNT(*) FROM announcement_deliveries
WHERE announcement_id = ? AND status = 'sent'
),
failed_count = (
SELECT COUNT(*) FROM announcement_deliveries
WHERE announcement_id = ? AND status = 'failed'
),
skipped_count = (
SELECT COUNT(*) FROM announcement_deliveries
WHERE announcement_id = ? AND status = 'skipped'
),
success_count = agg.sent,
failed_count = agg.failed,
skipped_count = agg.skipped,
updated_at = ?
FROM (
SELECT
COALESCE(SUM(status = 'sent'), 0) AS sent,
COALESCE(SUM(status = 'failed'), 0) AS failed,
COALESCE(SUM(status = 'skipped'), 0) AS skipped
FROM announcement_deliveries
WHERE announcement_id = ?
) AS agg
WHERE id = ?
""",
(announcement_id, announcement_id, announcement_id, now, announcement_id),
(now, announcement_id, announcement_id),
)
await self.db.commit()

async def delivery_user_ids_by_status(self, announcement_id: int, status: str) -> tuple[int, ...]:
async def delivery_user_ids_grouped(
self, announcement_id: int
) -> dict[str, tuple[int, ...]]:
cursor = await self.db.conn.execute(
"""
SELECT user_id FROM announcement_deliveries
WHERE announcement_id = ? AND status = ?
SELECT user_id, status FROM announcement_deliveries
WHERE announcement_id = ?
ORDER BY user_id ASC
""",
(announcement_id, status),
(announcement_id,),
)
rows = await cursor.fetchall()
return tuple(int(row["user_id"]) for row in rows)
grouped: dict[str, list[int]] = {}
for row in rows:
grouped.setdefault(str(row["status"]), []).append(int(row["user_id"]))
return {status: tuple(ids) for status, ids in grouped.items()}


def _truncate_error(value: str | None, limit: int = 256) -> str | None:
Expand Down
7 changes: 4 additions & 3 deletions services/announcements.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,10 @@ async def _ledger_result(self, announcement_id: int, *, last_seen_id: int | None
batch = await self.announcements.get_batch(announcement_id)
if batch is None:
raise NotFound("Объявление не найдено")
delivered = await self.announcements.delivery_user_ids_by_status(announcement_id, "sent")
failed = await self.announcements.delivery_user_ids_by_status(announcement_id, "failed")
skipped = await self.announcements.delivery_user_ids_by_status(announcement_id, "skipped")
grouped = await self.announcements.delivery_user_ids_grouped(announcement_id)
delivered = grouped.get("sent", ())
failed = grouped.get("failed", ())
skipped = grouped.get("skipped", ())
return AnnouncementResult(
announcement_id=announcement_id,
total=batch.total_count,
Expand Down
5 changes: 5 additions & 0 deletions services/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ def _parse_xray_log_tail(
async def _check_thresholds(self, now: float) -> None:
for key_id in list(self._observations):
unique_ips = self._unique_ips_in_window(key_id, now)
if not self._observations[key_id]:
# All samples aged out of the window; drop the key so the
# observation maps don't accumulate an entry per key seen.
del self._observations[key_id]
self._last_alerted.pop(key_id, None)
if len(unique_ips) < self._min_unique_ips:
continue
last = self._last_alerted.get(key_id, 0.0)
Expand Down
22 changes: 13 additions & 9 deletions services/traffic_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,19 @@ async def refresh_views(self, keys: list[VpnKey]) -> list[KeyTrafficStatsView]:
)
current_stats = await self.stats.list_by_key_ids(key_ids)
views: list[KeyTrafficStatsView] = []
for key in keys:
stats: TrafficStats | None
if key.key_type == VpnKeyType.AWG:
stats = await self._refresh_awg_key(key, current_stats.get(key.id), awg_transfers, awg_error)
elif key.key_type == VpnKeyType.XRAY:
stats = await self._refresh_xray_key(key, current_stats.get(key.id), xray_stats, xray_error)
else:
stats = None
views.append(KeyTrafficStatsView(key=key, owner=owners.get(key.owner_user_id), stats=stats))
# Coalesce the per-key upserts into a single commit. Each upsert's own
# commit() is a no-op inside an explicit transaction, so a batch of N
# keys flushes once instead of issuing N fsyncs (synchronous=FULL).
async with self.stats.db.transaction():
for key in keys:
stats: TrafficStats | None
if key.key_type == VpnKeyType.AWG:
stats = await self._refresh_awg_key(key, current_stats.get(key.id), awg_transfers, awg_error)
elif key.key_type == VpnKeyType.XRAY:
stats = await self._refresh_xray_key(key, current_stats.get(key.id), xray_stats, xray_error)
else:
stats = None
views.append(KeyTrafficStatsView(key=key, owner=owners.get(key.owner_user_id), stats=stats))
return views

async def refresh_all_awg(self) -> None:
Expand Down