diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..d7687e1 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# Telegram Bot Configuration +TELEGRAM_TOKEN=your_telegram_token_here + +# Optional: Ngrok Configuration (only needed for ngrok mode) +NGROK_TOKEN=your_ngrok_token_here + +# Optional: Development Mode +DEV=true + +# Optional: PEM file path for self-signed certificates +# PEM_FILE=path/to/certificate.pem diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a83da9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,52 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Database +*.db +*.sqlite +*.sqlite3 + +# Logs +*.log +exceptions.log + +# Environment variables +.env +.env.local + +# OS +.DS_Store +Thumbs.db + diff --git a/repro.md b/repro.md index c324904..bf13f6c 100644 --- a/repro.md +++ b/repro.md @@ -1,26 +1,45 @@ ## Requirements +- Python 3.12 - sqlite - requirements +### Setup + +```bash +# Install dependencies +pip install --upgrade pip +pip install -r requirements.txt ``` -pip3 install -r requirements.txt + +## Configuration + +Create a `.env` file in the project root (copy `.env.example` and fill in your values): + +```bash +cp .env.example .env +# Edit .env with your actual tokens ``` +Or set environment variables directly: +- `TELEGRAM_TOKEN` - Required +- `NGROK_TOKEN` - Optional, only needed for ngrok mode +- `DEV` - Optional, set to `true` for development mode + ## Run With Ngrok Run without timeout limit: -``` -export NGROK_AUTH_TOKEN={TOKEN} +```bash +export NGROK_TOKEN={TOKEN} export TELEGRAM_TOKEN={TOKEN} python3 -m src.__init__ ngrok ``` Run with timeout limit: -``` +```bash export TELEGRAM_TOKEN={TOKEN} python3 -m src.__init__ ngrok ``` @@ -29,14 +48,14 @@ python3 -m src.__init__ ngrok Run with self-signed ssl certificate -``` +```bash export TELEGRAM_TOKEN={TOKEN} python3 -m src.__init__ self-signed ``` Run with authority-signed ssl certificate -``` +```bash export TELEGRAM_TOKEN={TOKEN} python3 -m src.__init__ ``` diff --git a/requirements.in b/requirements.in index 4b27e48..2d9239b 100644 --- a/requirements.in +++ b/requirements.in @@ -1,12 +1,12 @@ black pytest -asyncio +pytest-asyncio sqlmodel sqlalchemy pyngrok fastapi uvicorn pydantic -typing httpx -datetime \ No newline at end of file +yfinance +python-dotenv \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 569e919..11cad57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,110 +1,151 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # -# pip-compile --output-file=requirements.txt requirements.in +# pip-compile requirements.in # -anyio==3.4.0 +annotated-doc==0.0.4 + # via fastapi +annotated-types==0.7.0 + # via pydantic +anyio==4.11.0 # via - # httpcore + # httpx # starlette -asgiref==3.4.1 - # via uvicorn -asyncio==3.4.3 - # via -r requirements.in -attrs==21.2.0 - # via pytest -black==21.12b0 +beautifulsoup4==4.14.2 + # via yfinance +black==25.11.0 # via -r requirements.in -certifi==2021.10.8 +certifi==2025.11.12 # via + # curl-cffi # httpcore # httpx -charset-normalizer==2.0.9 - # via httpx -click==8.0.3 + # requests +cffi==2.0.0 + # via curl-cffi +charset-normalizer==3.4.4 + # via requests +click==8.3.1 # via # black # uvicorn -datetime==4.3 - # via -r requirements.in -fastapi==0.70.1 +curl-cffi==0.13.0 + # via yfinance +fastapi==0.121.3 # via -r requirements.in -greenlet==1.1.2 +frozendict==2.4.7 + # via yfinance +greenlet==3.2.4 # via sqlalchemy -h11==0.12.0 +h11==0.16.0 # via # httpcore # uvicorn -httpcore==0.14.3 +httpcore==1.0.9 # via httpx -httpx==0.21.1 +httpx==0.28.1 # via -r requirements.in -idna==3.3 +idna==3.11 # via # anyio - # rfc3986 -iniconfig==1.1.1 - # via pytest -mypy-extensions==0.4.3 - # via black -packaging==21.3 + # httpx + # requests +iniconfig==2.3.0 # via pytest -pathspec==0.9.0 +multitasking==0.0.12 + # via yfinance +mypy-extensions==1.1.0 # via black -platformdirs==2.4.0 +numpy==2.3.5 + # via + # pandas + # yfinance +packaging==25.0 + # via + # black + # pytest +pandas==2.3.3 + # via yfinance +pathspec==0.12.1 # via black -pluggy==1.0.0 - # via pytest -py==1.11.0 +peewee==3.18.3 + # via yfinance +platformdirs==4.5.0 + # via + # black + # yfinance +pluggy==1.6.0 # via pytest -pydantic==1.8.2 +protobuf==6.33.1 + # via yfinance +pycparser==2.23 + # via cffi +pydantic==2.12.4 # via # -r requirements.in # fastapi # sqlmodel -pyngrok==7.1.2 +pydantic-core==2.41.5 + # via pydantic +pygments==2.19.2 + # via pytest +pyngrok==7.5.0 # via -r requirements.in -pyparsing==3.0.6 - # via packaging -pytest==6.2.5 +pytest==9.0.1 + # via + # -r requirements.in + # pytest-asyncio +pytest-asyncio==1.3.0 # via -r requirements.in -pytz==2021.3 - # via datetime -pyyaml==6.0 - # via pyngrok -rfc3986[idna2008]==1.5.0 - # via httpx -sniffio==1.2.0 +python-dateutil==2.9.0.post0 + # via pandas +python-dotenv==1.2.1 + # via -r requirements.in +pytokens==0.3.0 + # via black +pytz==2025.2 # via - # anyio - # httpcore - # httpx -sqlalchemy==1.4.28 + # pandas + # yfinance +pyyaml==6.0.3 + # via pyngrok +requests==2.32.5 + # via yfinance +six==1.17.0 + # via python-dateutil +sniffio==1.3.1 + # via anyio +soupsieve==2.8 + # via beautifulsoup4 +sqlalchemy==2.0.44 # via # -r requirements.in # sqlmodel -sqlalchemy2-stubs==0.0.2a19 - # via sqlmodel -sqlmodel==0.0.4 +sqlmodel==0.0.27 # via -r requirements.in -starlette==0.16.0 +starlette==0.50.0 # via fastapi -toml==0.10.2 - # via pytest -tomli==1.2.3 - # via black -typing==3.7.4.3 - # via -r requirements.in -typing-extensions==4.0.1 +typing-extensions==4.15.0 # via - # black + # anyio + # beautifulsoup4 + # fastapi # pydantic - # sqlalchemy2-stubs -uvicorn==0.16.0 + # pydantic-core + # pytest-asyncio + # sqlalchemy + # starlette + # typing-inspection +typing-inspection==0.4.2 + # via pydantic +tzdata==2025.2 + # via pandas +urllib3==2.5.0 + # via requests +uvicorn==0.38.0 + # via -r requirements.in +websockets==15.0.1 + # via yfinance +yfinance==0.2.66 # via -r requirements.in -zope-interface==5.4.0 - # via datetime - -# The following packages are considered to be unsafe in a requirements file: -# setuptools diff --git a/run_locally.sh b/run_locally.sh new file mode 100644 index 0000000..6d42721 --- /dev/null +++ b/run_locally.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# This script runs the Memory Vault application locally +# +# Environment variables can be set in two ways: +# 1. Create a .env file in the project root (copy .env.example and fill in your values) +# 2. Export them directly: export TELEGRAM_TOKEN='your_token' +# +# The application will automatically load variables from .env file if it exists + +export DEV='true' +source venv/bin/activate +python3 -m src.__init__ ${1:-} # Pass 'ngrok' as argument to enable ngrok mode diff --git a/src/__init__.py b/src/__init__.py index b998a82..1c49a9a 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,29 +1,27 @@ import datetime import sys import os +from pathlib import Path -from pyngrok import ngrok import uvicorn import asyncio +from dotenv import load_dotenv + +# Load environment variables from .env file BEFORE importing modules that use them +env_path = Path(__file__).parent.parent / ".env" +load_dotenv(env_path) from .events import Events from .constants import Constants __all__ = [] -if __name__ == "__main__": - if Events.TOKEN is None: - sys.exit("No TELEGRAM_TOKEN found in the environment, exiting now.") - +async def setup_application(running_option: str | None) -> bool: + """Setup the application asynchronously.""" PORT = Events.PORT - loop = asyncio.get_event_loop() - - # Run with ngrok if the parameter is given - running_option = None - if len(sys.argv) == 2: - running_option = sys.argv[1] if running_option == "ngrok": + from pyngrok import ngrok ngrok_token = str(os.environ.get("NGROK_TOKEN")) if ngrok_token == "None": print( @@ -37,19 +35,32 @@ public_url = http_tunnel.public_url ssh_url = ssh_tunnel.public_url Events.HOST_URL = public_url - _ = loop.run_until_complete( - Events.send_a_message_to_user( - Constants.BROADCAST_CHAT_ID, f"ssh: {ssh_url}, http:{public_url}" - ) + await Events.send_a_message_to_user( + Constants.BROADCAST_CHAT_ID, f"ssh: {ssh_url}, http:{public_url}" ) else: - public_url = loop.run_until_complete(Events.get_public_ip()) + public_url = await Events.get_public_ip() Events.HOST_URL = f"https://{public_url}" if running_option == "self_signed": Events.SELF_SIGNED = True print(f"%% New run: {datetime.datetime.now()}") - success = loop.run_until_complete(Events.set_telegram_webhook_url()) + success = await Events.set_telegram_webhook_url() + return success + + +if __name__ == "__main__": + if Events.TOKEN is None: + sys.exit("No TELEGRAM_TOKEN found in the environment, exiting now.") + + PORT = Events.PORT + + # Run with ngrok if the parameter is given + running_option = None + if len(sys.argv) == 2: + running_option = sys.argv[1] + + success = asyncio.run(setup_application(running_option)) if success: uvicorn.run( diff --git a/src/db.py b/src/db.py index 5de63b9..972c267 100644 --- a/src/db.py +++ b/src/db.py @@ -288,7 +288,7 @@ def add_memory( user_id=user.id, ) - db_reminder = Reminder.from_orm(reminder) + db_reminder = Reminder.model_validate(reminder) session.add(db_reminder) session.commit() session.refresh(db_reminder) @@ -532,7 +532,7 @@ def db_create_user( session: Session = next(get_session()), ) -> Optional[User]: try: - user = User.from_orm(user) + user = User.model_validate(user) session.add(user) session.commit() session.refresh(user) diff --git a/src/events.py b/src/events.py index 25329dd..73ce8ca 100644 --- a/src/events.py +++ b/src/events.py @@ -194,7 +194,7 @@ async def send_a_message_to_user( print(f"%% {datetime.datetime.now()}: Message is: {message}") for retry in range(retry_count): - response = await cls.request(url, message.dict()) + response = await cls.request(url, message.model_dump()) if response.status_code == 200: return True elif response.status_code == 429: diff --git a/src/listener.py b/src/listener.py index 3353ff1..1eaf684 100644 --- a/src/listener.py +++ b/src/listener.py @@ -1,6 +1,7 @@ import asyncio import logging import time +from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import PlainTextResponse from .db import * @@ -9,14 +10,18 @@ from .events import Events from .response_logic import ResponseLogic -app = FastAPI(openapi_url=None) -logging.basicConfig(filename="exceptions.log", encoding="utf-8", level=logging.ERROR) - -@app.on_event("startup") -def on_startup(): +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup create_db_and_tables() asyncio.create_task(Events.main_event()) + yield + # Shutdown (if needed in the future) + + +app = FastAPI(openapi_url=None, lifespan=lifespan) +logging.basicConfig(filename="exceptions.log", encoding="utf-8", level=logging.ERROR) @app.middleware("http") @@ -40,8 +45,13 @@ async def health(): @app.post(f"/webhook/{Events.TOKEN}") async def listen_telegram_messages(r: Request, message: MessageBodyModel): - print(f"%% {datetime.datetime.now()} Incoming Message: {message.dict()}") - print(f"%% {datetime.datetime.now()} Incoming Request: {await r.json()}") + print(f"%% {datetime.datetime.now()} Incoming Message: {message.model_dump()}") + try: + request_json = await r.json() + print(f"%% {datetime.datetime.now()} Incoming Request: {request_json}") + except Exception as e: + print(f"%% {datetime.datetime.now()} Error reading request JSON: {e}") + request_json = {} response_message = "" chat_id = 0 diff --git a/src/message_validations.py b/src/message_validations.py index 8ff43ed..c56642b 100644 --- a/src/message_validations.py +++ b/src/message_validations.py @@ -1,95 +1,118 @@ from typing import Optional, List, Any -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict class Chat(BaseModel): - last_name: Optional[str] - id: Optional[int] - type: Optional[str] - first_name: Optional[str] - username: Optional[str] + model_config = ConfigDict(extra="ignore") + + last_name: Optional[str] = None + id: Optional[int] = None + type: Optional[str] = None + first_name: Optional[str] = None + username: Optional[str] = None class From(BaseModel): - last_name: Optional[str] - id: Optional[int] - first_name: Optional[str] - user_name: Optional[str] - language_code: Optional[str] - is_bot: Optional[str] + model_config = ConfigDict(extra="ignore") + + last_name: Optional[str] = None + id: Optional[int] = None + first_name: Optional[str] = None + user_name: Optional[str] = None + username: Optional[str] = None # Telegram uses 'username' not 'user_name' + language_code: Optional[str] = None + is_bot: Optional[bool] = None # Telegram sends bool, not string class ReplyMessage(BaseModel): - date: Optional[int] - chat: Optional[Chat] - message_id: Optional[int] - text: Optional[str] + model_config = ConfigDict(extra="ignore") + + date: Optional[int] = None + chat: Optional[Chat] = None + message_id: Optional[int] = None + text: Optional[str] = None class File(BaseModel): - file_id: Optional[str] + model_config = ConfigDict(extra="ignore") + + file_id: Optional[str] = None class NewChatMember(BaseModel): - id: Optional[int] + model_config = ConfigDict(extra="ignore") + + id: Optional[int] = None class Message(BaseModel): - date: Optional[int] - chat: Optional[Chat] - message_id: Optional[str] - from_field: Optional[From] = Field(alias="from") - forward_date: Optional[int] - text: Optional[str] - photo: Optional[List[File]] - document: Optional[File] - video: Optional[File] - video_note: Optional[File] - voice: Optional[File] - new_chat_member: Optional[NewChatMember] - left_chat_member: Optional[NewChatMember] - group_chat_created: Optional[bool] + model_config = ConfigDict(extra="ignore") + + date: Optional[int] = None + chat: Optional[Chat] = None + message_id: Optional[int] = None # Telegram sends int, not string + from_field: Optional[From] = Field(default=None, alias="from") + forward_date: Optional[int] = None + text: Optional[str] = None + photo: Optional[List[File]] = None + document: Optional[File] = None + video: Optional[File] = None + video_note: Optional[File] = None + voice: Optional[File] = None + new_chat_member: Optional[NewChatMember] = None + left_chat_member: Optional[NewChatMember] = None + group_chat_created: Optional[bool] = None class ChatGroup(BaseModel): - id: Optional[int] - title: Optional[str] - type: Optional[str] + model_config = ConfigDict(extra="ignore") + + id: Optional[int] = None + title: Optional[str] = None + type: Optional[str] = None class MockVal(BaseModel): - rand_int: Optional[int] + model_config = ConfigDict(extra="ignore") + + rand_int: Optional[int] = None class OtherChatMember(BaseModel): - user: Optional[From] - status: Optional[str] + model_config = ConfigDict(extra="ignore") + + user: Optional[From] = None + status: Optional[str] = None class MyChatMember(BaseModel): - rand_int: Optional[int] - chat: Optional[ChatGroup] - from_field: Optional[From] = Field(alias="from") - date: Optional[int] - old_chat_member: Optional[OtherChatMember] - new_chat_member: Optional[OtherChatMember] + model_config = ConfigDict(extra="ignore") + + rand_int: Optional[int] = None + chat: Optional[ChatGroup] = None + from_field: Optional[From] = Field(default=None, alias="from") + date: Optional[int] = None + old_chat_member: Optional[OtherChatMember] = None + new_chat_member: Optional[OtherChatMember] = None class MessageBodyModel(BaseModel): - update_id: Optional[int] - message: Optional[Message] - my_chat_member: Optional[MyChatMember] - reply_to_message: Optional[ReplyMessage] + model_config = ConfigDict(extra="ignore") + + update_id: Optional[int] = None + message: Optional[Message] = None + my_chat_member: Optional[MyChatMember] = None + reply_to_message: Optional[ReplyMessage] = None class ResponseToMessage(BaseModel): method: Optional[str] = "sendMessage" chat_id: Optional[int] = 861126057 - from_chat_id: Optional[int] - message_id: Optional[int] - text: Optional[str] - photo: Optional[str] - document: Optional[str] + from_chat_id: Optional[int] = None + message_id: Optional[int] = None + text: Optional[str] = None + photo: Optional[str] = None + document: Optional[str] = None parse_mode: Optional[str] = "Markdown" - disable_notification: Optional[bool] + disable_notification: Optional[bool] = None diff --git a/tests/all_tests.py b/tests/all_tests.py index f4626de..7be7ebe 100644 --- a/tests/all_tests.py +++ b/tests/all_tests.py @@ -1,5 +1,16 @@ +""" +Comprehensive test suite for Memory Vault application. + +Tests are organized by functionality: +- User Management +- Memory/Reminder Operations +- Schedule Management +- User Settings +- API Endpoints +- Events and Utilities +""" import pytest - +from datetime import datetime from fastapi.testclient import TestClient from sqlmodel import Session, SQLModel, create_engine, select from sqlmodel.pool import StaticPool @@ -7,15 +18,19 @@ from src.listener import app, User, UserCreate, get_session from src.events import Events from src.db import * +from src.constants import Constants pytest_plugins = ("pytest_asyncio",) +# ==================== FIXTURES ==================== + @pytest.fixture(name="session") def session_fixture(): + """Create a fresh database session for each test.""" engine = create_engine( "sqlite://", - echo=True, + echo=False, # Set to True for SQL debugging connect_args={"check_same_thread": False}, poolclass=StaticPool, ) @@ -26,6 +41,7 @@ def session_fixture(): @pytest.fixture(name="client") def client_fixture(session: Session): + """Create a test client with database session override.""" def get_session_override(): return session @@ -35,151 +51,549 @@ def get_session_override(): app.dependency_overrides.clear() -@pytest.mark.asyncio -async def test_main_event(): - return True - await Events.main_event() - - -@pytest.mark.asyncio -def test_time_until_midday(): - print(Events.get_time_until_next_hour()) - - -def test_add_user_to_db(session): +@pytest.fixture(name="sample_user") +def sample_user_fixture(session: Session) -> User: + """Create a sample user for testing.""" user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, + name="Test User", + telegram_chat_id=123456789, ) - user = db_create_user(user, session=session) - user_list = db_read_users(session=session) - assert 1 == len(user_list) - assert user_list[0] == user + return db_create_user(user, session=session) -def test_user_send_time_list(session): - user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, - ) - user = User.from_orm(user) - session.add(user) - session.commit() - session.refresh(user) - user_list = db_read_users(session=session) - assert 1 == len(user_list) - assert user_list[0] == user - found_user = session.exec( - select(User).where(User.telegram_chat_id == user.telegram_chat_id) - ).first() - print(found_user) - found_user.scheduled_hours = "1,2" - session.add(found_user) - session.commit() - user_list = db_read_users(session=session) - assert 1 == len(user_list) - assert user_list[0] == user - - -def test_add_and_remove_users(session): - user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, - ) - user = User.from_orm(user) - db_create_user(user, session) - add_memory(user, "hey", session) - add_memory(user, "heyyo", session) - add_memory(user, "hey", session) - assert list_memories(user, session) == [ - Reminder(user_id=1, reminder="heyyo", id=2), - Reminder(user_id=1, reminder="hey", id=3), - ] - - -def test_add_schedules(session): +@pytest.fixture(name="sample_user_inactive") +def sample_user_inactive_fixture(session: Session) -> User: + """Create an inactive sample user for testing.""" user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, + name="Inactive User", + telegram_chat_id=987654321, + active=False, ) - user = User.from_orm(user) - session.add(user) - session.commit() - session.refresh(user) - user_list = db_read_users(session=session) - assert 1 == len(user_list) - assert user_list[0] == user - - scheduled_hours = add_hours_to_the_schedule(user, [1, 13], session) - split_hours = scheduled_hours.split(",") - str_set1 = set() - for str in split_hours: - str_set1.add(str) - str_set2 = set() - str_set2.add("20") - str_set2.add("1") - str_set2.add("13") - str_set2.add("8") - - assert str_set2 == str_set1 - - user_list = db_read_users(session=session) - assert 1 == len(user_list) - print(user_list[0]) - - -def test_db_read_users(session): - user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, - ) - user = db_create_user(user, session=session) - user2 = UserCreate( - name="22lloll", - telegram_chat_id=12200100010001, - ) - user2.active = False - user2 = db_create_user(user2, session=session) - - user_list = db_read_users(session=session, only_active_users=False, limit=1) - assert 1 == len(user_list) - assert user_list[0] == user - print(user_list) - - -def test_read_random_user(session): - user = UserCreate( - name="lloll", - telegram_chat_id=100100010001, - ) - user = db_create_user(user, session=session) - user2 = UserCreate( - name="22lloll", - telegram_chat_id=12200100010001, - ) - user2.active = False - user2 = db_create_user(user2, session=session) - - from sqlalchemy import func - - random_user = session.exec(select(User).order_by(func.random()).limit(2)).all() - print(random_user) - assert False - - -def test_print_start_message(): - name = "" - language_code = "en" - from src.constants import Constants - - print() - print(Constants.Start.start_message(name, language_code)) - - -def test_print_help_message(): - name = "" - language_code = "en" - from src.constants import Constants - - print() - print(Constants.Help.help_message(name, language_code)) + return db_create_user(user, session=session) + + +# ==================== USER MANAGEMENT TESTS ==================== + +class TestUserManagement: + """Tests for user creation, reading, joining, and leaving.""" + + def test_create_user(self, session): + """Test creating a new user.""" + user = UserCreate( + name="New User", + telegram_chat_id=111111111, + ) + created_user = db_create_user(user, session=session) + + assert created_user is not None + assert created_user.id is not None + assert created_user.name == "New User" + assert created_user.telegram_chat_id == 111111111 + assert created_user.active is True + assert created_user.scheduled_hours == "8" # default + + def test_create_user_duplicate_telegram_id(self, session): + """Test that duplicate telegram_chat_id is handled.""" + user1 = UserCreate(name="User 1", telegram_chat_id=222222222) + user2 = UserCreate(name="User 2", telegram_chat_id=222222222) + + created1 = db_create_user(user1, session=session) + assert created1 is not None + + # Second user with same telegram_chat_id should fail due to unique constraint + # The function catches exceptions and returns None + created2 = db_create_user(user2, session=session) + assert created2 is None + + def test_read_users(self, session, sample_user, sample_user_inactive): + """Test reading users with filters.""" + # Test reading only active users + active_users = db_read_users(session=session, only_active_users=True) + assert len(active_users) == 1 + assert active_users[0].id == sample_user.id + + # Test reading all users + all_users = db_read_users(session=session, only_active_users=False) + assert len(all_users) == 2 + + # Test with limit + limited = db_read_users(session=session, limit=1) + assert len(limited) == 1 + + def test_join_user_new(self, session): + """Test joining a new user.""" + user = UserCreate(name="Join Test", telegram_chat_id=333333333) + joined_user = join_user(user, session=session) + + assert joined_user is not None + assert joined_user.active is True + + def test_join_user_already_active(self, session, sample_user): + """Test joining an already active user.""" + user = UserCreate( + name="Already Active", + telegram_chat_id=sample_user.telegram_chat_id, + ) + result = join_user(user, session=session) + + assert result is None # Already active, should return None + + def test_join_user_reactivate(self, session, sample_user_inactive): + """Test reactivating an inactive user.""" + user = UserCreate( + name="Reactivate", + telegram_chat_id=sample_user_inactive.telegram_chat_id, + ) + reactivated = join_user(user, session=session) + + assert reactivated is not None + assert reactivated.active is True + + def test_leave_user(self, session, sample_user): + """Test leaving/deactivating a user.""" + user = UserCreate( + name="Leave Test", + telegram_chat_id=sample_user.telegram_chat_id, + ) + left_user = leave_user(user, session=session) + + assert left_user is not None + assert left_user.active is False + + def test_leave_user_not_found(self, session): + """Test leaving a non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = leave_user(user, session=session) + + assert result is None + + def test_leave_user_already_inactive(self, session, sample_user_inactive): + """Test leaving an already inactive user.""" + user = UserCreate( + name="Already Inactive", + telegram_chat_id=sample_user_inactive.telegram_chat_id, + ) + result = leave_user(user, session=session) + + assert result is None # Already inactive + + def test_get_user_status(self, session, sample_user): + """Test getting user status.""" + status = get_user_status(sample_user.telegram_chat_id, session=session) + + assert status is not None + assert status.id == sample_user.id + assert status.name == sample_user.name + + def test_get_user_status_not_found(self, session): + """Test getting status for non-existent user.""" + status = get_user_status(999999999, session=session) + assert status is None + + +# ==================== MEMORY/REMINDER TESTS ==================== + +class TestMemoryOperations: + """Tests for memory/reminder CRUD operations.""" + + def test_add_memory(self, session, sample_user): + """Test adding a memory.""" + memory = add_memory(sample_user, "Test memory", session=session) + + assert memory is not None + assert memory.reminder == "Test memory" + assert memory.user_id == sample_user.id + + def test_add_multiple_memories(self, session, sample_user): + """Test adding multiple memories.""" + add_memory(sample_user, "Memory 1", session=session) + add_memory(sample_user, "Memory 2", session=session) + add_memory(sample_user, "Memory 3", session=session) + + memories = list_memories(sample_user, session=session) + assert len(memories) == 3 + assert all(m.user_id == sample_user.id for m in memories) + + def test_list_memories(self, session, sample_user): + """Test listing all memories for a user.""" + add_memory(sample_user, "First", session=session) + add_memory(sample_user, "Second", session=session) + + memories = list_memories(sample_user, session=session) + assert len(memories) == 2 + assert memories[0].reminder == "First" + assert memories[1].reminder == "Second" + + def test_list_memories_empty(self, session, sample_user): + """Test listing memories for user with no memories.""" + memories = list_memories(sample_user, session=session) + assert memories == [] + + def test_list_memories_user_not_found(self, session): + """Test listing memories for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = list_memories(user, session=session) + assert result is None + + def test_count_memories(self, session, sample_user): + """Test counting memories.""" + add_memory(sample_user, "Memory 1", session=session) + add_memory(sample_user, "Memory 2", session=session) + + count = count_memories(sample_user, session=session) + assert count == 2 + + def test_count_memories_empty(self, session, sample_user): + """Test counting memories when user has none.""" + count = count_memories(sample_user, session=session) + assert count == 0 + + def test_select_random_memories(self, session, sample_user): + """Test selecting random memories.""" + add_memory(sample_user, "Memory 1", session=session) + add_memory(sample_user, "Memory 2", session=session) + add_memory(sample_user, "Memory 3", session=session) + + random_memories = select_random_memories(sample_user, count=2, session=session) + assert len(random_memories) == 2 + assert all(m.user_id == sample_user.id for m in random_memories) + + def test_select_random_memories_count_zero(self, session, sample_user): + """Test selecting zero memories.""" + add_memory(sample_user, "Memory 1", session=session) + result = select_random_memories(sample_user, count=0, session=session) + assert result == [] + + def test_select_random_memories_more_than_available(self, session, sample_user): + """Test selecting more memories than available.""" + add_memory(sample_user, "Only one", session=session) + result = select_random_memories(sample_user, count=10, session=session) + assert len(result) == 1 + + def test_select_random_memories_user_not_found(self, session): + """Test selecting memories for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = select_random_memories(user, session=session) + assert result is None + + def test_delete_last_memory(self, session, sample_user): + """Test deleting the last memory.""" + add_memory(sample_user, "First", session=session) + add_memory(sample_user, "Last", session=session) + + deleted = delete_last_memory(sample_user, session=session) + assert deleted == "Last" + + memories = list_memories(sample_user, session=session) + assert len(memories) == 1 + assert memories[0].reminder == "First" + + def test_delete_last_memory_empty(self, session, sample_user): + """Test deleting last memory when user has none.""" + result = delete_last_memory(sample_user, session=session) + assert result is False + + def test_delete_last_sent_memory(self, session, sample_user): + """Test deleting the last sent memory.""" + mem1 = add_memory(sample_user, "Memory 1", session=session) + mem2 = add_memory(sample_user, "Memory 2", session=session) + + # Simulate sending memory 2 + sample_user.last_sent_reminder_id = mem2.id + session.add(sample_user) + session.commit() + + deleted = delete_last_sent_memory(sample_user, session=session) + assert deleted == "Memory 2" + assert sample_user.last_sent_reminder_id == -1 + + def test_delete_last_sent_memory_no_last_sent(self, session, sample_user): + """Test deleting last sent when none was sent.""" + add_memory(sample_user, "Memory 1", session=session) + result = delete_last_sent_memory(sample_user, session=session) + assert result is False + + def test_add_package(self, session, sample_user): + """Test adding a package to memories.""" + result = add_package(sample_user, package_id=1, session=session) + assert result is True + + memories = list_memories(sample_user, session=session) + assert len(memories) == 1 + assert memories[0].reminder == "package: 1" + + +# ==================== SCHEDULE MANAGEMENT TESTS ==================== + +class TestScheduleManagement: + """Tests for schedule management operations.""" + + def test_get_schedule(self, session, sample_user): + """Test getting user schedule.""" + schedule = get_schedule(sample_user, session=session) + assert schedule == "8" # default schedule + + def test_get_schedule_user_not_found(self, session): + """Test getting schedule for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = get_schedule(user, session=session) + assert result is None + + def test_reset_schedule(self, session, sample_user): + """Test resetting schedule to default.""" + # First add some hours + add_hours_to_the_schedule(sample_user, [1, 2, 3], session=session) + + # Then reset + reset = reset_schedule(sample_user, session=session) + assert reset == "8" # default + + def test_add_hours_to_schedule(self, session, sample_user): + """Test adding hours to schedule.""" + result = add_hours_to_the_schedule(sample_user, [1, 13], session=session) + + hours = set(result.split(",")) + assert "1" in hours + assert "8" in hours # default + assert "13" in hours + + def test_add_hours_to_schedule_duplicates(self, session, sample_user): + """Test adding duplicate hours (function doesn't deduplicate, but sorts).""" + add_hours_to_the_schedule(sample_user, [1, 1, 1], session=session) + schedule = get_schedule(sample_user, session=session) + hours = schedule.split(",") + # Function adds all hours and sorts, so duplicates remain + # But after sorting, we should have them in order + assert "1" in hours + assert "8" in hours # default + # Verify it's sorted + hour_ints = [int(h) for h in hours] + assert hour_ints == sorted(hour_ints) + + def test_remove_hour_from_schedule(self, session, sample_user): + """Test removing hour from schedule.""" + add_hours_to_the_schedule(sample_user, [1, 2, 3], session=session) + + result = remove_hour_from_schedule(sample_user, hour=2, session=session) + hours = set(result.split(",")) + assert "2" not in hours + assert "1" in hours + assert "3" in hours + + def test_remove_hour_from_schedule_not_present(self, session, sample_user): + """Test removing hour that's not in schedule.""" + result = remove_hour_from_schedule(sample_user, hour=99, session=session) + # Should still have default schedule + assert "8" in result + + def test_create_schedule_array(self): + """Test creating schedule array from string.""" + result = create_schedule_array("1,2,3,8") + assert result == [1, 2, 3, 8] + + def test_create_schedule_array_empty(self): + """Test creating schedule array from empty string.""" + result = create_schedule_array("") + assert result == [] + + +# ==================== USER SETTINGS TESTS ==================== + +class TestUserSettings: + """Tests for user settings and preferences.""" + + def test_update_gmt(self, session, sample_user): + """Test updating user GMT offset.""" + result = update_gmt(sample_user, gmt=3, session=session) + assert result is not None + assert result.gmt == 3 + + def test_update_gmt_user_not_found(self, session): + """Test updating GMT for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = update_gmt(user, gmt=3, session=session) + assert result is None + + def test_toggle_easyadd(self, session, sample_user): + """Test toggling easy add setting.""" + initial = sample_user.auto_add_active + + result = toggle_easyadd(sample_user, session=session) + assert result is not None + assert result != initial + + def test_toggle_easyadd_user_not_found(self, session): + """Test toggling easy add for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = toggle_easyadd(user, session=session) + assert result is None + + def test_toggle_silent(self, session, sample_user): + """Test toggling silent mode.""" + initial = sample_user.is_silent + + result = toggle_silent(sample_user, session=session) + assert result is not None + assert result != initial + + def test_toggle_silent_user_not_found(self, session): + """Test toggling silent for non-existent user.""" + user = UserCreate(name="Not Found", telegram_chat_id=999999999) + result = toggle_silent(user, session=session) + assert result is None + + +# ==================== API ENDPOINT TESTS ==================== + +class TestAPIEndpoints: + """Tests for FastAPI endpoints.""" + + def test_health_endpoint(self, client): + """Test health check endpoint.""" + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"healthy": True} + + def test_webhook_endpoint_invalid_token(self, client): + """Test webhook endpoint with invalid token.""" + response = client.post("/webhook/invalid_token", json={}) + # Should return 422 or 404 depending on validation + assert response.status_code in [404, 422] + + def test_trigger_endpoint_invalid_token(self, client): + """Test trigger endpoint with invalid token.""" + response = client.post("/trigger_send_user_hourly_memories/invalid_token") + # Should return 404 or 422 + assert response.status_code in [404, 422] + + +# ==================== EVENTS AND UTILITIES TESTS ==================== + +class TestEvents: + """Tests for Events class functionality.""" + + def test_get_time_until_next_hour(self): + """Test calculating time until next hour.""" + time_until = Events.get_time_until_next_hour() + assert isinstance(time_until, float) + assert 0 < time_until <= 3600 # Should be between 0 and 3600 seconds + + def test_get_memory_count(self): + """Test getting memory count for scheduled hours.""" + # Empty schedule + assert Events.get_memory_count("", 8) == 0 + + # Single hour match + assert Events.get_memory_count("8", 8) == 1 + + # Multiple hours, one match + assert Events.get_memory_count("8,9,10", 9) == 1 + + # Multiple hours, multiple matches (same hour repeated) + assert Events.get_memory_count("8,8,9", 8) == 2 + + # No match + assert Events.get_memory_count("8,9,10", 11) == 0 + + @pytest.mark.asyncio + async def test_main_event_does_not_hang(self): + """Test that main_event can be started (but don't wait for it).""" + # This test just verifies the function can be called + # We don't actually await it since it's an infinite loop + # In a real scenario, you'd use asyncio.wait_for with a timeout + pass + + +# ==================== CONSTANTS TESTS ==================== + +class TestConstants: + """Tests for Constants class messages.""" + + def test_start_message(self): + """Test start message generation.""" + msg = Constants.Start.start_message("Test User", "en") + assert isinstance(msg, str) + assert len(msg) > 0 + + def test_start_message_turkish(self): + """Test start message in Turkish.""" + msg = Constants.Start.start_message("Test User", "tr") + assert isinstance(msg, str) + assert len(msg) > 0 + + def test_small_help_message(self): + """Test small help message.""" + msg = Constants.Help.small_help_message("Test User", "en") + assert isinstance(msg, str) + assert len(msg) > 0 + assert "/leave" in msg or "leave" in msg.lower() + + def test_big_help_message(self): + """Test big help message.""" + msg = Constants.Help.big_help_message("Test User", "en") + assert isinstance(msg, str) + assert len(msg) > 0 + + def test_group_warning(self): + """Test group warning message.""" + msg = Constants.Start.group_warning("Test User", "en") + assert isinstance(msg, str) + assert len(msg) > 0 + + +# ==================== INTEGRATION TESTS ==================== + +class TestIntegration: + """Integration tests for complete workflows.""" + + def test_complete_user_workflow(self, session): + """Test complete user workflow: join, add memories, schedule, leave.""" + # Join + user = UserCreate(name="Workflow Test", telegram_chat_id=555555555) + user = join_user(user, session=session) + assert user is not None + assert user.active is True + + # Add memories + add_memory(user, "Memory 1", session=session) + add_memory(user, "Memory 2", session=session) + assert count_memories(user, session=session) == 2 + + # Update schedule + add_hours_to_the_schedule(user, [9, 10], session=session) + schedule = get_schedule(user, session=session) + assert "9" in schedule + assert "10" in schedule + + # Update GMT + update_gmt(user, gmt=2, session=session) + user = get_user_status(user.telegram_chat_id, session=session) + assert user.gmt == 2 + + # Leave + left = leave_user(user, session=session) + assert left.active is False + + # Verify memories still exist + assert count_memories(user, session=session) == 2 + + def test_memory_selection_and_deletion_workflow(self, session, sample_user): + """Test workflow of selecting and deleting memories.""" + # Add multiple memories + for i in range(5): + add_memory(sample_user, f"Memory {i}", session=session) + + # Select random memories + selected = select_random_memories(sample_user, count=2, session=session) + assert len(selected) == 2 + + # Delete last sent + sample_user.last_sent_reminder_id = selected[-1].id + session.add(sample_user) + session.commit() + + deleted = delete_last_sent_memory(sample_user, session=session) + assert deleted == selected[-1].reminder + + # Verify count decreased + assert count_memories(sample_user, session=session) == 4 diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..3ba4359 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,355 @@ +""" +End-to-end integration tests for Memory Vault application. + +These tests verify the complete application workflow including: +- Application startup +- Database initialization +- API endpoints +- Full user workflows +""" +import pytest +import asyncio +import os +from datetime import datetime +from fastapi.testclient import TestClient +from sqlmodel import Session, SQLModel, create_engine, select +from sqlmodel.pool import StaticPool + +from src.listener import app, get_session +from src.db import * +from src.events import Events +from src.response_logic import ResponseLogic + + +@pytest.fixture(name="test_session") +def test_session_fixture(): + """Create a test database session.""" + engine = create_engine( + "sqlite:///test_e2e.db", + echo=False, + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + yield session + # Cleanup + import os + if os.path.exists("test_e2e.db"): + os.remove("test_e2e.db") + + +@pytest.fixture(name="test_client") +def test_client_fixture(test_session: Session): + """Create a test client with database override.""" + def get_session_override(): + return test_session + + app.dependency_overrides[get_session] = get_session_override + client = TestClient(app) + yield client + app.dependency_overrides.clear() + + +class TestE2EApplication: + """End-to-end application tests.""" + + def test_application_startup(self, test_client): + """Test that the application starts correctly.""" + # Test health endpoint + response = test_client.get("/health") + assert response.status_code == 200 + assert response.json() == {"healthy": True} + + def test_database_initialization(self, test_session): + """Test that database tables are created correctly.""" + # Verify tables exist by querying them + users = test_session.exec(select(User)).all() + reminders = test_session.exec(select(Reminder)).all() + + # Should be able to query even if empty + assert isinstance(users, list) + assert isinstance(reminders, list) + + def test_complete_user_journey(self, test_session, test_client): + """Test complete user journey from registration to memory management.""" + # 1. Create a user + user = UserCreate( + name="E2E Test User", + telegram_chat_id=999888777, + ) + created_user = db_create_user(user, session=test_session) + assert created_user is not None + assert created_user.active is True + + # 2. Add memories + memory1 = add_memory(created_user, "My first memory", session=test_session) + memory2 = add_memory(created_user, "My second memory", session=test_session) + memory3 = add_memory(created_user, "My third memory", session=test_session) + + assert memory1 is not None + assert memory2 is not None + assert memory3 is not None + + # 3. Verify memories are stored + memories = list_memories(created_user, session=test_session) + assert len(memories) == 3 + + # 4. Count memories + count = count_memories(created_user, session=test_session) + assert count == 3 + + # 5. Select random memories + random_mems = select_random_memories(created_user, count=2, session=test_session) + assert len(random_mems) == 2 + assert all(m.user_id == created_user.id for m in random_mems) + + # 6. Update schedule + schedule = add_hours_to_the_schedule(created_user, [9, 10, 11], session=test_session) + assert "9" in schedule + assert "10" in schedule + assert "11" in schedule + + # 7. Update GMT + updated = update_gmt(created_user, gmt=3, session=test_session) + assert updated.gmt == 3 + + # 8. Toggle settings + silent = toggle_silent(created_user, session=test_session) + assert silent is not None + + easyadd = toggle_easyadd(created_user, session=test_session) + assert easyadd is not None + + # 9. Delete a memory + deleted = delete_last_memory(created_user, session=test_session) + assert deleted == "My third memory" + + # 10. Verify deletion + remaining = list_memories(created_user, session=test_session) + assert len(remaining) == 2 + + # 11. User leaves + left = leave_user(created_user, session=test_session) + assert left.active is False + + # 12. User rejoins + rejoined = join_user(created_user, session=test_session) + assert rejoined.active is True + + def test_multiple_users_isolation(self, test_session): + """Test that multiple users' data is properly isolated.""" + # Create two users + user1 = db_create_user( + UserCreate(name="User 1", telegram_chat_id=111111111), + session=test_session + ) + user2 = db_create_user( + UserCreate(name="User 2", telegram_chat_id=222222222), + session=test_session + ) + + # Add memories to each + add_memory(user1, "User 1 memory", session=test_session) + add_memory(user1, "User 1 another", session=test_session) + add_memory(user2, "User 2 memory", session=test_session) + + # Verify isolation + user1_memories = list_memories(user1, session=test_session) + user2_memories = list_memories(user2, session=test_session) + + assert len(user1_memories) == 2 + assert len(user2_memories) == 1 + assert all(m.user_id == user1.id for m in user1_memories) + assert all(m.user_id == user2.id for m in user2_memories) + + def test_schedule_management_workflow(self, test_session): + """Test complete schedule management workflow.""" + user = db_create_user( + UserCreate(name="Schedule Test", telegram_chat_id=333333333), + session=test_session + ) + + # Start with default schedule + schedule = get_schedule(user, session=test_session) + assert schedule == "8" + + # Add hours + schedule = add_hours_to_the_schedule(user, [9, 10, 11], session=test_session) + assert "9" in schedule + assert "10" in schedule + assert "11" in schedule + + # Remove an hour + schedule = remove_hour_from_schedule(user, hour=10, session=test_session) + assert "10" not in schedule + assert "9" in schedule + assert "11" in schedule + + # Reset schedule + schedule = reset_schedule(user, session=test_session) + assert schedule == "8" + + def test_memory_selection_and_tracking(self, test_session): + """Test memory selection and last_sent tracking.""" + user = db_create_user( + UserCreate(name="Selection Test", telegram_chat_id=444444444), + session=test_session + ) + + # Add multiple memories + for i in range(5): + add_memory(user, f"Memory {i}", session=test_session) + + # Select random memories + selected = select_random_memories(user, count=3, session=test_session) + assert len(selected) == 3 + + # Verify last_sent_reminder_id was updated + test_session.refresh(user) + assert user.last_sent_reminder_id == selected[-1].id + + # Delete last sent memory + deleted = delete_last_sent_memory(user, session=test_session) + assert deleted == selected[-1].reminder + + # Verify it's gone + remaining = list_memories(user, session=test_session) + assert len(remaining) == 4 + assert all(m.reminder != deleted for m in remaining) + + def test_user_status_retrieval(self, test_session): + """Test retrieving user status information.""" + user = db_create_user( + UserCreate(name="Status Test", telegram_chat_id=555555555), + session=test_session + ) + + # Update various settings + update_gmt(user, gmt=5, session=test_session) + add_hours_to_the_schedule(user, [12, 13], session=test_session) + toggle_silent(user, session=test_session) + + # Retrieve status + status = get_user_status(user.telegram_chat_id, session=test_session) + + assert status is not None + assert status.gmt == 5 + assert "12" in status.scheduled_hours + assert "13" in status.scheduled_hours + + def test_package_memory_workflow(self, test_session): + """Test adding package-type memories.""" + user = db_create_user( + UserCreate(name="Package Test", telegram_chat_id=666666666), + session=test_session + ) + + # Add a package + result = add_package(user, package_id=1, session=test_session) + assert result is True + + # Verify it's stored as "package: 1" + memories = list_memories(user, session=test_session) + assert len(memories) == 1 + assert memories[0].reminder == "package: 1" + + def test_events_memory_counting(self): + """Test Events.get_memory_count function.""" + # Test various scenarios + assert Events.get_memory_count("", 8) == 0 # Empty schedule + assert Events.get_memory_count("8", 8) == 1 # Single match + assert Events.get_memory_count("8,9,10", 9) == 1 # One match in list + assert Events.get_memory_count("8,8,9", 8) == 2 # Multiple matches + assert Events.get_memory_count("8,9,10", 11) == 0 # No match + assert Events.get_memory_count("1,2,3,8,9", 8) == 1 # Match in middle + + def test_events_time_calculation(self): + """Test Events.get_time_until_next_hour function.""" + time_until = Events.get_time_until_next_hour() + + # Should return a float between 0 and 3600 seconds + assert isinstance(time_until, float) + assert 0 < time_until <= 3600 + + # Should be reasonable (not negative, not too large) + assert time_until > 0 + assert time_until <= 3600 + + def test_api_health_endpoint(self, test_client): + """Test API health endpoint.""" + response = test_client.get("/health") + assert response.status_code == 200 + assert response.json() == {"healthy": True} + + def test_api_webhook_endpoint_structure(self, test_client): + """Test webhook endpoint accepts proper structure (without real token).""" + # The endpoint requires a valid token in the path + # We can test that it returns proper error for invalid token + response = test_client.post( + "/webhook/invalid_token", + json={"update_id": 1} + ) + # Should return 404 (invalid token) or 422 (validation error) + assert response.status_code in [404, 422] + + def test_trigger_endpoint_structure(self, test_client): + """Test trigger endpoint structure.""" + # Without valid token, should return 404 + response = test_client.post("/trigger_send_user_hourly_memories/invalid_token") + assert response.status_code in [404, 422] + + def test_database_persistence(self, test_session): + """Test that data persists across operations.""" + user = db_create_user( + UserCreate(name="Persistence Test", telegram_chat_id=777777777), + session=test_session + ) + user_id = user.id + + # Add memory + add_memory(user, "Persistent memory", session=test_session) + + # Retrieve user again + retrieved = get_user_status(user.telegram_chat_id, session=test_session) + assert retrieved.id == user_id + + # Verify memory is still there + memories = list_memories(retrieved, session=test_session) + assert len(memories) == 1 + assert memories[0].reminder == "Persistent memory" + + def test_error_handling_nonexistent_user(self, test_session): + """Test error handling for non-existent users.""" + fake_user = UserCreate(name="Fake", telegram_chat_id=999999999) + + # All these should handle None gracefully + assert get_schedule(fake_user, session=test_session) is None + assert list_memories(fake_user, session=test_session) is None + assert leave_user(fake_user, session=test_session) is None + assert update_gmt(fake_user, gmt=1, session=test_session) is None + assert toggle_silent(fake_user, session=test_session) is None + assert toggle_easyadd(fake_user, session=test_session) is None + + def test_edge_cases(self, test_session): + """Test various edge cases.""" + user = db_create_user( + UserCreate(name="Edge Cases", telegram_chat_id=888888888), + session=test_session + ) + + # Empty memory list + assert count_memories(user, session=test_session) == 0 + assert list_memories(user, session=test_session) == [] + assert select_random_memories(user, count=0, session=test_session) == [] + assert delete_last_memory(user, session=test_session) is False + + # Select more than available + add_memory(user, "Only one", session=test_session) + selected = select_random_memories(user, count=100, session=test_session) + assert len(selected) == 1 + + # Empty schedule operations + schedule = remove_hour_from_schedule(user, hour=99, session=test_session) + assert schedule is not None # Should still return current schedule +