diff --git a/.example.env b/.example.env index 34c3833..7417c57 100644 --- a/.example.env +++ b/.example.env @@ -6,6 +6,5 @@ MISTRAL_API_KEY=votre-clé-mistral # TELEGRAM BOT TELEGRAM_BOT_TOKEN=votre-token-bot-telegram -TELEGRAM_CHAT_ID=votre-id-chat-telegram -TELEGRAM_API_URL=https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN} +TELEGRAM_API_URL=https://api.telegram.org/bot WEBHOOK_URL=https://6cc5-102-64-223-197.ngrok-free.app/webhook \ No newline at end of file diff --git a/.gitignore b/.gitignore index 22480a6..3bba28f 100644 --- a/.gitignore +++ b/.gitignore @@ -139,6 +139,8 @@ celerybeat.pid # Environments .env +.prod.env +.dev.env .venv env/ venv/ diff --git a/Jenkinsfile b/Jenkinsfile index 3963f71..b596d58 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -8,7 +8,8 @@ pipeline { environment { // Define environment variables here BOT_NAME = 'awesome-bot' - // BOT_TOKEN = credentials('telegram-bot-token') + TELEGRAM_BOT_TOKEN = credentials('telegram-bot-token') + MISTRAL_API_KEY = credentials('mistral-api-key') } stages { @@ -22,7 +23,7 @@ pipeline { stage('Environment variable injection') { steps { script { - withCredentials([file(credentialsId: 'matbradiouf-chatbot-env-file', variable: 'ENV_FILE')]) { + withCredentials([file(credentialsId: 'bradlab-chatbot-env-file', variable: 'ENV_FILE')]) { // Load the environment variables from the file echo "Loading environment variables from ${ENV_FILE}" sh "cat ${ENV_FILE} > .env" @@ -53,16 +54,77 @@ pipeline { } stage('Deploy') { + when { + anyOf { + branch 'bradlab' + branch 'dev' + branch 'preprod' + branch 'prod' + } + } steps { script { // Add your deployment commands here echo "Deploying the project..." - sh "make deploy env=${BRANCH_NAME}" + withCredentials([ + string(credentialsId: 'telegram-bot-token', variable: 'TELEGRAM_BOT_TOKEN'), + string(credentialsId: 'mistral-api-key', variable: 'MISTRAL_API_KEY') + ]) { + sh """ + make deploy env=${BRANCH_NAME} \ + TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} \ + MISTRAL_API_KEY=${MISTRAL_API_KEY} + """ + } + } + } + } + + stage('Configure Webhook') { + when { + anyOf { + branch 'bradlab' + branch 'dev' + branch 'preprod' + branch 'prod' + } + } + steps { + script { + // Get the API URL from CloudFormation outputs + def apiUrl = sh( + script: """ + aws cloudformation describe-stacks \ + --stack-name multi-stack-${BRANCH_NAME} \ + --region eu-west-3 \ + --query "Stacks[0].Outputs[?OutputKey=='ApiUrl'].OutputValue" \ + --output text + """, + returnStdout: true + ).trim() + + // Configure the webhook + withCredentials([string(credentialsId: 'telegram-bot-token', variable: 'TELEGRAM_BOT_TOKEN')]) { + sh """ + curl -X POST "${apiUrl}set-webhook" \\ + -H "Authorization: Bearer ${TELEGRAM_BOT_TOKEN}" \\ + -H "Content-Type: application/json" \\ + -d '{ "url": "${apiUrl}webhook" }' + """ + } } } } stage('Test endpoint'){ + when { + anyOf { + branch 'bradlab' + branch 'dev' + branch 'preprod' + branch 'prod' + } + } steps { script { // Add your endpoint testing commands here diff --git a/Makefile b/Makefile index cea7c38..65a96a5 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ # by default, we settle down in this region AWS_REGION ?= eu-west-3 -AWS_PROFILE ?= "esgis_profile" clean: rm -rf venv diff --git a/README.md b/README.md index 1020c41..397e00b 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,9 @@ Une API de chatbot développée avec FastAPI, conçue pour être déployée sur DYNAMO_TABLE=votre-table-dynamo AWS_PROFILE=votre-profil-aws MISTRAL_API_KEY=votre-clé-mistral + TELEGRAM_BOT_TOKEN=votre-token-bot-telegram + TELEGRAM_API_URL=https://api.telegram.org/bot + WEBHOOK_URL=votre-api-url/webhook ``` ## ▶️ Exécution diff --git a/infrastructure/template.yaml b/infrastructure/template.yaml index 9db70e3..8cbedd1 100644 --- a/infrastructure/template.yaml +++ b/infrastructure/template.yaml @@ -9,11 +9,42 @@ Parameters: Description: Environment name for the application dev/staging/production Type: String AllowedValues: - - matbradiouf + - bradlab - dev - preprod - prod - Default: matbradiouf + Default: bradlab +############################################################################### + + AWSRegionName: + Description: AWS region where resources will be deployed (e.g. eu-west-3) + Type: String + Default: eu-west-3 + DynamoTable: + Description: Name of the DynamoDB table used to persist chatbot conversations + Type: String + Default: "" + NoEcho: true + AwsProfile: + Description: AWS CLI profile name used for deployment (for local/devops usage) + Type: String + Default: esgis_profile + MistralApiKey: + Description: API key for accessing the Mistral AI service + Type: String + Default: "" + NoEcho: true + TelegramBotToken: + Description: Telegram bot token for authenticating with the Telegram API + Type: String + Default: "" + NoEcho: true + ApiWehookUrl: + Type: String + Default: "" + TelegramApiBotUrl: + Type: String + Default: https://api.telegram.org/bot ############################################################################### Resources: ############################################################################### @@ -23,7 +54,7 @@ Resources: TableName: !Sub "chatbot-dbtable-${EnvironmentName}" # Définition des attributs utilisés comme clés pour la table principale et les GSI AttributeDefinitions: - - AttributeName: "id" + - AttributeName: "chat_id" AttributeType: "S" - AttributeName: "timestamp" AttributeType: "S" @@ -31,7 +62,7 @@ Resources: AttributeType: "S" # Schéma de la clé primaire de la table principale KeySchema: - - AttributeName: "id" + - AttributeName: "chat_id" KeyType: "HASH" - AttributeName: "timestamp" KeyType: "RANGE" @@ -51,7 +82,7 @@ Resources: Projection: ProjectionType: "INCLUDE" NonKeyAttributes: # Attributs à projeter en plus des clés de l'index et de la table - - "id" + - "chat_id" - "message_id" - "role" - "text" @@ -67,6 +98,33 @@ Resources: CodeUri: ../ Handler: src/main.handler Runtime: python3.12 + Environment: + Variables: + ENV_NAME: !Ref EnvironmentName + AWS_REGION_NAME: !Ref AWSRegionName + DYNAMO_TABLE: !Ref DynamoDBTable + MISTRAL_API_KEY: !Ref MistralApiKey + TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken + TELEGRAM_API_URL: !Ref TelegramApiBotUrl + WEBHOOK_URL: !Ref ApiWehookUrl + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref DynamoDBTable + - Statement: + - Effect: Allow + Action: + - dynamodb:GetItem + - dynamodb:DeleteItem + - dynamodb:PutItem + - dynamodb:Query + - dynamodb:UpdateItem + - dynamodb:BatchWriteItem + - dynamodb:BatchGetItem + - dynamodb:DescribeTable + - dynamodb:ConditionCheckItem + - apigateway:POST + - apigateway:GET + Resource: "*" Events: Api: Type: HttpApi @@ -83,6 +141,6 @@ Outputs: DynamoDBTableName: Value: !Ref DynamoDBTable ApiUrl: - Description: URL of your API + Description: URL of our API Value: - Fn::Sub: 'https://${Api}.execute-api.${AWS::Region}.${AWS::URLSuffix}/' \ No newline at end of file + Fn::Sub: 'https://${Api}.execute-api.${AWS::Region}.${AWS::URLSuffix}/' diff --git a/requirements.txt b/requirements.txt index 7780859..990e6c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ mangum httpx mistralai python-telegram-bot +slowapi diff --git a/src/dynamodb_repository.py b/src/dynamodb_repository.py index 9365ddc..6a58306 100644 --- a/src/dynamodb_repository.py +++ b/src/dynamodb_repository.py @@ -1,6 +1,5 @@ import boto3 -import datetime -import asyncio +import datetime, asyncio, uuid from botocore.exceptions import ClientError from boto3.dynamodb.conditions import Key, Attr @@ -44,9 +43,20 @@ async def save_message( Sauvegarde un message (utilisateur ou bot) dans la table DynamoDB. """ try: - + id = str(uuid.uuid4()) # Génère un UUID + # item = { + # "id": {"S": str(id)}, + # "chat_id": {"S": str(chat_id)}, + # "timestamp": {"S": datetime.datetime.now(datetime.timezone.utc).isoformat()}, + # "message_id": {"S": str(message_id)}, + # "user_id": {"S": str(user_id)}, + # "user_name": {"S": user_name}, + # "text": {"S": text}, + # "role": {"S": role}, + # } item = { - 'id': str(chat_id), + 'id': id, + 'chat_id': str(chat_id), 'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat(), 'message_id': str(message_id), 'user_id': str(user_id), @@ -55,14 +65,18 @@ async def save_message( 'role': role, } if ai_model: - item['ai_model'] = ai_model + item["ai_model"] = str(ai_model) + # item["ai_model"] = {"S": ai_model} + + # Utils.insert_data(item) # Exécute l'opération put_item (synchrone) dans un thread séparé - await asyncio.to_thread(self.table.put_item, Item=item) - Utils.log_info(f"Message enregistré dans DynamoDB: chat_id={chat_id}, role={role}") + # await asyncio.to_thread(self.table.put_item, Item=item) + return True except Exception as e: Utils.log_error(f"Erreur lors de l'enregistrement dans DynamoDB: {e}") # L'erreur n'est pas levée pour ne pas interrompre le flux du bot + raise e async def get_chat_history(self, chat_id: int, limit: int = 100) -> list[dict]: """ @@ -70,14 +84,15 @@ async def get_chat_history(self, chat_id: int, limit: int = 100) -> list[dict]: Retourne une liste de dictionnaires représentant les messages. """ try: + # return Utils.get_chat_history(chat_id=chat_id, limit=limit) response = await asyncio.to_thread( self.table.query, - KeyConditionExpression=Key('id').eq(str(chat_id)), + KeyConditionExpression=Key('chat_id').eq(str(chat_id)), Limit=limit, ScanIndexForward=True # True pour tri ascendant (du plus ancien au plus récent) ) Utils.log_info(f"Historique du chat {chat_id} récupéré. Messages trouvés: {len(response.get('Items', []))}") - return response.get('Items', []) + # return response.get('Items', []) except ClientError as e: error_code = e.response['Error']['Code'] Utils.log_error(f"Erreur DynamoDB lors de la récupération de l'historique: {error_code} - {e}") @@ -94,6 +109,7 @@ async def get_user_messages_by_date_range(self, user_id: int, start_timestamp: s """ gsi_name = "UserIndex" try: + # return Utils.get_user_chats(user_id, start_timestamp, end_timestamp, limit) query_params = { 'IndexName': gsi_name, 'KeyConditionExpression': Key('user_id').eq(str(user_id)) & Key('timestamp').between(start_timestamp, end_timestamp), diff --git a/src/main.py b/src/main.py index f4f5a22..ec4fb95 100644 --- a/src/main.py +++ b/src/main.py @@ -1,21 +1,29 @@ -from fastapi import FastAPI, Request, HTTPException, Query +from fastapi import FastAPI, Request, HTTPException, Header, Query, status from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from mangum import Mangum +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded +from slowapi.middleware import SlowAPIMiddleware import datetime from mistralai import Mistral -import asyncio +from pydantic import BaseModel, HttpUrl +from typing import Optional +from .config import env_vars + +from .telegram_handler import telegram_handler from .dynamodb_repository import dynamodb_repo # Importe les fonctions de traitement Telegram -from .telegram_handler import ( - setup_ptb_handlers, - configure_telegram_webhook, - process_telegram_update, - shutdown_ptb -) +# from .telegram_handler import ( +# setup_ptb_handlers, +# configure_telegram_webhook, +# process_telegram_update, +# shutdown_ptb +# ) from .config import env_vars @@ -26,18 +34,20 @@ model = "mistral-small-latest" client = Mistral(api_key=api_key) +class WebhookRequest(BaseModel): + url: HttpUrl @asynccontextmanager async def app_lifespan(application: FastAPI): Utils.log_info("Application KOZ API démarrée.") - await setup_ptb_handlers() - asyncio.create_task(configure_telegram_webhook()) # <-- C'est la source probable du problème + await telegram_handler.setup_ptb_handlers() + # asyncio.create_task(configure_telegram_webhook()) yield # L'application est maintenant prête à recevoir des requêtes Utils.log_info("Application KOZ API arrêtée.") - await shutdown_ptb() + await telegram_handler.shutdown_ptb() app = FastAPI( @@ -55,6 +65,17 @@ async def app_lifespan(application: FastAPI): allow_headers=["*"], ) +# Initialise le rate limiter (par IP) +limiter = Limiter(key_func=get_remote_address) +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) +app.add_middleware(SlowAPIMiddleware) + +@app.middleware("http") +@limiter.limit("100/minute") +async def global_rate_limit(request: Request, call_next): + return await call_next(request) + @app.get("/") async def root(): @@ -78,7 +99,7 @@ async def chat(question: str): ) Utils.log_info(chat_response) response = { - "id": { + "chat_id": { "S": f"{chat_response.id}", }, "question": { @@ -89,22 +110,37 @@ async def chat(question: str): } } Utils.insert_data(response) - await dynamodb_repo.save_message( - "7d6d6416cff4477082d884dbc1d50254", - "7d6d6416cff4477082d884dbc1d51293", - "mybot_id1354", - "toto_machin", - "user", - chat_response.choices[0].message.content, - "mistral-large-latest" - ) return response +# Modèle pour la réponse +class WebhookResponse(BaseModel): + status: str + webhook_set_to: HttpUrl + +@app.post("/set-webhook", response_model=WebhookResponse, include_in_schema=False) +async def set_webhook( + payload: WebhookRequest, + authorization: Optional[str] = Header(None, description="Bearer token for authentication") +): + if authorization is None or not authorization.startswith("Bearer "): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing or invalid Authorization header") + + token = authorization.split("Bearer ")[-1] + + # Vérifie que le token correspond à celui attendu (à adapter selon ton besoin) + if token != env_vars.TELEGRAM_BOT_TOKEN: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token") + + Utils.log_warning(f"START == webhook == CONFIG: {payload.url}") + await telegram_handler.configure_telegram_webhook(payload.url) + return WebhookResponse(status="ok", webhook_set_to=payload.url) + @app.post("/webhook", description="Endpoint pour recevoir les mises à jour ou changement dans le bot Telegram") async def telegram_webhook(request: Request): try: update_json = await request.json() - await process_telegram_update(update_json) + await telegram_handler.process_telegram_update(update_json) + return {"status": "ok"} except Exception as e: Utils.log_error(f"Erreur de traitement du webhook: {e}") diff --git a/src/telegram_handler copy.py b/src/telegram_handler copy.py new file mode 100644 index 0000000..891ea41 --- /dev/null +++ b/src/telegram_handler copy.py @@ -0,0 +1,203 @@ +from telegram import Update, Bot, ReplyKeyboardMarkup +from telegram.ext import Application, MessageHandler, filters, CommandHandler, ContextTypes +from mistralai import Mistral +import asyncio + +from .dynamodb_repository import dynamodb_repo +from .config import env_vars +from .utils import Utils + +api_key = env_vars.MISTRAL_API_KEY +# Récupérer les jetons depuis les variables d'environnement +TELEGRAM_BOT_TOKEN = env_vars.TELEGRAM_BOT_TOKEN +MISTRAL_API_KEY = env_vars.MISTRAL_API_KEY +# API_WEBHOOK_URL = f"{env_vars.TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/setWebhook?url={env_vars.WEBHOOK_URL}" +MISTRAL_MODEL = "mistral-large-latest" + +if not TELEGRAM_BOT_TOKEN: + raise ValueError("TELEGRAM_BOT_TOKEN n'est pas défini.") +if not MISTRAL_API_KEY: + raise ValueError("MISTRAL_API_KEY n'est pas défini.") + +# Initialisation du client MistralAI +mistral_client = Mistral(api_key=MISTRAL_API_KEY) + +# Initialisation de l'application Python-Telegram-Bot +ptb_app = Application.builder().token(TELEGRAM_BOT_TOKEN).updater(None).build() + +async def start_command(update: Update, context): + try: + # Gère la commande /start. + user = update.message.from_user + user_message = update.message.text + chat_id = update.message.chat_id + message_id = update.message.message_id + user_name = user.full_name or user.username or "N/A" + Utils.log_warning(f"===== Start command Initializing ====== {TELEGRAM_BOT_TOKEN}") + + # Menu clavier + reply_markup = ReplyKeyboardMarkup( + [["/start", "/help"]], + resize_keyboard=True + ) + + await dynamodb_repo.save_message( + chat_id=chat_id, + message_id=message_id, + user_id=str(user.id), + user_name=user_name, + text=user_message, + role="user" + ) + response_text = ( + f"Hello {user_name}! How can I assist you today? Let's have a friendly conversation. Here are a few suggestions for how we can proceed:\n\n" + "• You can ask me a question about a topic you're interested in.\n" + "• We can play a word game, like word association or 20 questions.\n" + "• You can share something about yourself, and I'll do my best to relate.\n" + "• We can discuss a recent event or trending topic.\n\n" + "How would you like to begin?" + ) + # await update.message.reply_text() + bot_message = await ptb_app.bot.send_message(chat_id=chat_id, text=response_text, reply_markup=reply_markup) + # await dynamodb_repo.save_message(chat_id, bot_message.message_id, ptb_app.bot.id, ptb_app.bot.username, response_text, "bot") + await dynamodb_repo.save_message( + chat_id=chat_id, + message_id=bot_message.message_id, + user_id=str(ptb_app.bot.id), + user_name=ptb_app.bot.username, + text=response_text, + role="bot" + ) + except Exception as e: + Utils.log_error(f"Start command error ==== {e}") + + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): + try: + if update.message is None: + return + message = update.message + if not message: + return # Ignore les événements sans message (ex : edits, joins) + + user = message.from_user + if user is None or user.is_bot: + return # Ignore les messages envoyés par des bots, y compris lui-même + + chat_id = str(message.chat_id) + message_id = str(message.message_id) + user_name = user.username or f"{user.first_name} {user.last_name or ''}" + user_message = message.text + + # (1) Enregistrement du message utilisateur + await dynamodb_repo.save_message( + chat_id=chat_id, + message_id=message_id, + user_id=str(user.id), + user_name=user_name, + text=user_message, + role="user" + ) + + # (2) Appel à Mistral + Utils.log_warning(f"MISTRAL ==== {user_message} ") + chat_response = mistral_client.chat.complete( + model=MISTRAL_MODEL, + messages=[ + { + "role": "user", + "content": user_message, + }, + ] + ) + bot_reply = chat_response.choices[0].message.content + + # (3) Envoi de la réponse du bot + bot_message = await context.bot.send_message( + chat_id=chat_id, + text=bot_reply + ) + + # (4) Enregistrement du message du bot (mais on précise bien le role) + await dynamodb_repo.save_message( + chat_id=chat_id, + message_id=str(bot_message.message_id), + user_id=str(context.bot.id), + user_name=context.bot.username or "bot", + text=bot_reply, + role="bot" + ) + + except Exception as e: + Utils.log_error(f"[handle_message] Erreur: {e}") + +async def help_command(update: Update, context): + try: + chat_id = update.message.chat_id + response_text = ( + "Voici les commandes disponibles :\n" + "/start - Afficher le menu principal\n" + "/help - Afficher l'aide\n" + "/clear - Effacer la conversation" + ) + await ptb_app.bot.send_message(chat_id=chat_id, text=response_text) + except Exception as e: + Utils.log_error(f"Help command error ==== {e}") + +async def clear_command(update: Update, context): + try: + chat_id = update.message.chat_id + response_text = "La conversation a été effacée" + await ptb_app.bot.send_message(chat_id=chat_id, text=response_text) + # Ici tu peux ajouter la logique pour effacer l'historique si besoin + except Exception as e: + Utils.log_error(f"Clear command error ==== {e}") + + +async def _error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: + Utils.log_error(f"== Unhandled Telegram exception: {context.error}") + + +async def setup_ptb_handlers(): + try: + # Configure les handlers de l'application Python-Telegram-Bot + ptb_app.add_handler(CommandHandler("start", start_command)) + ptb_app.add_handler(CommandHandler("help", help_command)) + ptb_app.add_handler(CommandHandler("clear", clear_command)) + Utils.log_warning("Handle first message ====") + ptb_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + ptb_app.add_error_handler(_error_handler) + await ptb_app.initialize() + Utils.log_warning("Handlers Telegram initialisés.") + except Exception as e: + Utils.log_error(f"Erreur lors de la configuration des handlers Telegram : {e}") + +async def configure_telegram_webhook(webhook_url: str): + api_webhook_url = f"{env_vars.TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/setWebhook?url={webhook_url}" + if not api_webhook_url: + Utils.log_error("WEBHOOK_URL non défini. Le webhook ne sera pas configuré automatiquement.") + return + + bot = Bot(TELEGRAM_BOT_TOKEN) + try: + current_webhook = await bot.get_webhook_info() + Utils.log_warning(f"=== TELEGRAM to connect : \n NEW : {webhook_url} \n OLD: {current_webhook.url}") + if current_webhook.url != api_webhook_url: + try: + await bot.set_webhook(url=api_webhook_url) + Utils.log_warning(f"Webhook Telegram configuré sur : {webhook_url}") + except Exception as bot_error: + Utils.log_error(f"Erreur configuration du webhook url : {e}") + else: + Utils.log_warning("Webhook déjà configuré, aucune modification nécessaire.") + except Exception as e: + Utils.log_error(f"Erreur lors de la configuration du webhook Telegram : {webhook_url} {e}") + +async def process_telegram_update(update_json: dict): + update = Update.de_json(update_json, ptb_app.bot) + await ptb_app.process_update(update) + +async def shutdown_ptb(): + # Arrête l'application Python-Telegram-Bot. + await ptb_app.shutdown() + Utils.log_warning("Application Telegram arrêtée.") \ No newline at end of file diff --git a/src/telegram_handler.py b/src/telegram_handler.py index 6b2addb..ca4532e 100644 --- a/src/telegram_handler.py +++ b/src/telegram_handler.py @@ -1,60 +1,74 @@ -from telegram import Update, Bot -from telegram.ext import Application, MessageHandler, filters, CommandHandler +from telegram import Update, Bot, ReplyKeyboardMarkup +from telegram.ext import Application, MessageHandler, filters, CommandHandler, ContextTypes from mistralai import Mistral from .dynamodb_repository import dynamodb_repo from .config import env_vars from .utils import Utils -api_key = env_vars.MISTRAL_API_KEY -# Récupérer les jetons depuis les variables d'environnement -TELEGRAM_BOT_TOKEN = env_vars.TELEGRAM_BOT_TOKEN -MISTRAL_API_KEY = env_vars.MISTRAL_API_KEY -# API_WEBHOOK_URL = f"{env_vars.TELEGRAM_API_URL}/setWebhook?url={env_vars.WEBHOOK_URL}" -API_WEBHOOK_URL = env_vars.WEBHOOK_URL -MISTRAL_MODEL = "mistral-large-latest" - -if not TELEGRAM_BOT_TOKEN: - raise ValueError("TELEGRAM_BOT_TOKEN n'est pas défini.") -if not MISTRAL_API_KEY: - raise ValueError("MISTRAL_API_KEY n'est pas défini.") - -# Initialisation du client MistralAI -mistral_client = Mistral(api_key=MISTRAL_API_KEY) - -# Initialisation de l'application Python-Telegram-Bot -ptb_app = Application.builder().token(TELEGRAM_BOT_TOKEN).updater(None).build() - -async def start_command(update: Update, context): - # Gère la commande /start. - user = update.message.from_user - user_message = update.message.text - chat_id = update.message.chat_id - message_id = update.message.message_id - user_name = user.full_name or user.username or "N/A" - await dynamodb_repo.save_message(chat_id, message_id, user.id, user_name, "user", user_message) - response_text = f"Bonjour {user_name} ! Je suis Koz votre bot intelligent de causerie. Posez-moi une question !" - await update.message.reply_text() - bot_message = await ptb_app.bot.send_message(chat_id=chat_id, text=response_text) - await dynamodb_repo.save_message(chat_id, bot_message.message_id, ptb_app.bot.id, ptb_app.bot.username, "bot", response_text) - - -async def handle_message(update: Update, context): - # Traite tous les messages texte et utilise MistralAI pour répondre. - try: - if update.message and update.message.text: +class TelegramHandler: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(TelegramHandler, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self.api_key = env_vars.MISTRAL_API_KEY + self.TELEGRAM_BOT_TOKEN = env_vars.TELEGRAM_BOT_TOKEN + self.MISTRAL_API_KEY = env_vars.MISTRAL_API_KEY + self.MISTRAL_MODEL = "mistral-large-latest" + + if not self.TELEGRAM_BOT_TOKEN: + raise ValueError("TELEGRAM_BOT_TOKEN n'est pas défini.") + if not self.MISTRAL_API_KEY: + raise ValueError("MISTRAL_API_KEY n'est pas défini.") + + self.mistral_client = Mistral(api_key=self.MISTRAL_API_KEY) + self.ptb_app = Application.builder().token(self.TELEGRAM_BOT_TOKEN).updater(None).build() + self._initialized = True + # self.setup_ptb_handlers() + + + async def start_command(self, update: Update, context): + try: user = update.message.from_user - user_message = update.message.text chat_id = update.message.chat_id - message_id = update.message.message_id user_name = user.full_name or user.username or "N/A" - - # Enregistrer le message utilisateur via le repository - await dynamodb_repo.save_message(chat_id, message_id, user.id, user_name, "user", user_message) - try: - chat_response = mistral_client.chat.complete( - model=MISTRAL_MODEL, + reply_markup = ReplyKeyboardMarkup( + [["/start", "/help", "/clear"]], + resize_keyboard=True + ) + + response_text = ( + f"Bonjour {user_name} !\n" + "Bienvenue sur le bot KOZ.\n" + "Utilisez le menu ci-dessous pour commencer :" + ) + await self.ptb_app.bot.send_message(chat_id=chat_id, text=response_text, reply_markup=reply_markup) + except Exception as e: + Utils.log_error(f"Start command error ==== {e}") + + async def handle_message(self, update: Update, context): + try: + if update.message and update.message.text: + user = update.message.from_user + user_message = update.message.text + chat_id = update.message.chat_id + message_id = update.message.message_id + user_name = user.full_name or user.username or "N/A" + + Utils.log_warning(f"KOZ_MSG ======== {user_name} - {user_message}") + await dynamodb_repo.save_message(chat_id, message_id, user.id, user_name, user_message, "user") + + Utils.log_warning(f"GET MISTRAL RESPONSE ======") + chat_response = self.mistral_client.chat.complete( + model=self.MISTRAL_MODEL, messages=[ { "role": "user", @@ -62,57 +76,92 @@ async def handle_message(update: Update, context): }, ] ) - - response_text = chat_response.choices[0].message.content - bot_message = await ptb_app.bot.send_message(chat_id=chat_id, text=response_text) - # Enregistrer la reponse du bot - await dynamodb_repo.save_message( - chat_id, - bot_message.message_id, - ptb_app.bot.id, - ptb_app.bot.username, - "bot", - response_text, - MISTRAL_MODEL - ) - except Exception as e: - Utils.log_info(f"Erreur lors de l'interaction avec MistralAI ou Telegram: {e}") - error_response = "Désolé, une erreur est survenue lors du traitement de votre demande." - # Enregistrer le message d'erreur du bot via le dépôt - bot_message = await ptb_app.bot.send_message(chat_id=chat_id, text=error_response) - await dynamodb_repo.save_message(chat_id, bot_message.message_id, ptb_app.bot.id, ptb_app.bot.username, "bot", error_response) - except Exception as e: - Utils.log_error("Traitement du message échoué.") - -async def setup_ptb_handlers(): - try: - # Configure les handlers de l'application Python-Telegram-Bot - ptb_app.add_handler(CommandHandler("start", start_command)) - ptb_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) - await ptb_app.initialize() - Utils.log_info("Handlers Telegram initialisés.") - except Exception as e: - Utils.log_error(f"Erreur lors de la configuration des handlers Telegram : {e}") - -async def configure_telegram_webhook(): - # Configure le webhook Telegram avec l'URL définie. - if not API_WEBHOOK_URL: - Utils.log_error("WEBHOOK_URL non défini. Le webhook ne sera pas configuré automatiquement.") - return - - bot = Bot(TELEGRAM_BOT_TOKEN) - try: - await bot.set_webhook(url=API_WEBHOOK_URL) - Utils.log_info(f"Webhook Telegram configuré sur : {env_vars.WEBHOOK_URL}") - Utils.log_info(f"Webhook Telegram API : {API_WEBHOOK_URL}") - except Exception as e: - Utils.log_error(f"Erreur lors de la configuration du webhook Telegram : {e}") - -async def process_telegram_update(update_json: dict): - update = Update.de_json(update_json, ptb_app.bot) - await ptb_app.process_update(update) - -async def shutdown_ptb(): - # Arrête l'application Python-Telegram-Bot. - await ptb_app.shutdown() - Utils.log_warning("Application Telegram arrêtée.") \ No newline at end of file + Utils.log_warning(f"MISTRAL RESPONDED ====== ") + + if chat_response: + response_text = chat_response.choices[0].message.content + + bot_message = await self.ptb_app.bot.send_message(chat_id=chat_id, text=response_text) + + await dynamodb_repo.save_message( + chat_id, + bot_message.message_id, + self.ptb_app.bot.id, + self.ptb_app.bot.username, + response_text, + "bot", + self.MISTRAL_MODEL + ) + except Exception as e: + error_response = "Désolé, une erreur est survenue lors du traitement de votre demande." + await self.ptb_app.bot.send_message(chat_id=chat_id, text=error_response) + Utils.log_error(f"[handle_message] Erreur: {e}") + + async def help_command(self, update: Update, context): + try: + chat_id = update.message.chat_id + response_text = ( + "Voici les commandes disponibles :\n" + "/start - Afficher le menu principal\n" + "/help - Afficher l'aide\n" + "/clear - Effacer la conversation" + ) + await self.ptb_app.bot.send_message(chat_id=chat_id, text=response_text) + except Exception as e: + Utils.log_error(f"Help command error ==== {e}") + + async def clear_command(self, update: Update, context): + try: + chat_id = update.message.chat_id + response_text = "La conversation a été effacée" + await self.ptb_app.bot.send_message(chat_id=chat_id, text=response_text) + except Exception as e: + Utils.log_error(f"Clear command error ==== {e}") + + async def _error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: + Utils.log_error(f"== Unhandled Telegram exception: {context.error}") + + async def setup_ptb_handlers(self): + try: + self.ptb_app.add_handler(CommandHandler("start", self.start_command)) + self.ptb_app.add_handler(CommandHandler("help", self.help_command)) + self.ptb_app.add_handler(CommandHandler("clear", self.clear_command)) + Utils.log_warning("Handle first message ====") + self.ptb_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) + self.ptb_app.add_error_handler(self._error_handler) + await self.ptb_app.initialize() + Utils.log_warning("Handlers Telegram initialisés.") + except Exception as e: + Utils.log_error(f"Erreur lors de la configuration des handlers Telegram : {e}") + + async def configure_telegram_webhook(self, webhook_url: str): + api_webhook_url = f"{env_vars.TELEGRAM_API_URL}{self.TELEGRAM_BOT_TOKEN}/setWebhook?url={webhook_url}" + if not api_webhook_url: + Utils.log_error("WEBHOOK_URL non défini. Le webhook ne sera pas configuré automatiquement.") + return + + bot = Bot(self.TELEGRAM_BOT_TOKEN) + try: + current_webhook = await bot.get_webhook_info() + Utils.log_warning(f"=== TELEGRAM to connect : \n NEW : {webhook_url} \n OLD: {current_webhook.url}") + if current_webhook.url != api_webhook_url: + try: + await bot.set_webhook(url=api_webhook_url) + Utils.log_warning(f"Webhook Telegram configuré sur : {webhook_url}") + except Exception as bot_error: + Utils.log_error(f"Erreur configuration du webhook url : {bot_error}") + else: + Utils.log_warning("Webhook déjà configuré, aucune modification nécessaire.") + except Exception as e: + Utils.log_error(f"Erreur lors de la configuration du webhook Telegram : {webhook_url} {e}") + + async def process_telegram_update(self, update_json: dict): + update = Update.de_json(update_json, self.ptb_app.bot) + await self.ptb_app.process_update(update) + + async def shutdown_ptb(self): + await self.ptb_app.shutdown() + Utils.log_warning("Application Telegram arrêtée.") + +# Singleton instance +telegram_handler = TelegramHandler() diff --git a/src/utils.py b/src/utils.py index 105e40b..06dc4fa 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,14 +1,15 @@ -from datetime import datetime import json -from uuid import uuid4 import logging from typing import List +from boto3.dynamodb.conditions import Key, Attr import boto3 from src.config import env_vars ## Simple edit +logging.basicConfig() +logger = logging.getLogger("chatbot") class Utils: @@ -19,54 +20,114 @@ def log_info(message): """_summary_ Log a simple info message """ - logging.getLogger("uvicorn.error").info(msg=f"==> {message}") + logger.info(msg=f"==> {message}") @staticmethod def log_warning(message): """_summary_ Log a simple warning message """ - logging.getLogger("uvicorn.error").warning(msg=f"==> {message}") + logger.warning(msg=f"==> {message}") @staticmethod def log_debug(message): """_summary_ Log a debug message """ - logging.getLogger("uvicorn.error").debug(msg=f"==> {message}") + logger.debug(msg=f"==> {message}") @staticmethod def log_error(message): """_summary_ Log an error message """ - logging.getLogger("uvicorn.error").error(msg=f"==> {message}") + logger.error(msg=f"==> {message}") @staticmethod def log_list(elements: List[any]): if elements: - logging.getLogger("uvicorn.error").info( + logger.info( msg=f"Displaying all the {len(elements)} elements of the list" ) for i in range(len(elements)): - logging.getLogger("uvicorn.error").info( + logger.info( msg=f"##### {i} ==> {json.dumps(elements[i], indent=4)}" ) @staticmethod def get_logger(): - return logging.getLogger("uvicorn.error") + return logger @staticmethod def get_session(): return boto3.Session( - region_name=env_vars.AWS_REGION_NAME, profile_name=env_vars.AWS_PROFILE + region_name=env_vars.AWS_REGION_NAME, env_name=env_vars.ENV_NAME ) + + @staticmethod + def get_dynamo_resource() -> boto3.resource: + # In Lambda, use the role credentials + Utils.log_info(f"Initialisation de la connexion DynamoDB à la table: {env_vars.DYNAMO_TABLE} en région: {env_vars.AWS_REGION_NAME}") + return boto3.resource("dynamodb", region_name=env_vars.AWS_REGION_NAME) + + @staticmethod def insert_data(item): - dynamo_client = boto3.client("dynamodb", region_name=env_vars.AWS_REGION_NAME) - dynamo_client.put_item( - TableName=env_vars.DYNAMO_TABLE, - Item=item, - ) + try: + dynamo_client = boto3.client("dynamodb", region_name=env_vars.AWS_REGION_NAME) + dynamo_client.put_item( + TableName=env_vars.DYNAMO_TABLE, + Item=item, + ) + return True + except Exception as e: + logger.error(msg=f"Erreur lors de l'insertion dans DynamoDB: {str(e)}") + raise e + + @staticmethod + def get_chat_history(chat_id: str, limit: int = 100) -> list[dict]: + try: + dynamo_resource = Utils.get_dynamo_resource() + table = dynamo_resource.Table(env_vars.DYNAMO_TABLE) + + response = table.query( + KeyConditionExpression=Key('chat_id').eq(str(chat_id)), + Limit=limit, + ScanIndexForward=True, # Trier par timestamp croissant + ) + + items: list[dict] = response.get("Items", []) + logger.info(msg=f"Historique du chat {chat_id} récupéré. Messages trouvés: {len(items)}") + return items + except Exception as e: + logger.error(msg=f"Erreur : chat history in DynamoDB: {str(e)}") + raise e + + @staticmethod + def get_user_chats(user_id: str, start_timestamp: str, end_timestamp: str, limit: int = 100) -> list[dict]: + try: + dynamo_resource = Utils.get_dynamo_resource() + table = dynamo_resource.Table(env_vars.DYNAMO_TABLE) + + # Utilise une requête sur l'index avec le user_id + query_params = { + 'IndexName': "UserIndex", + 'KeyConditionExpression': Key('user_id').eq(str(user_id)) & Key('timestamp').between(start_timestamp, end_timestamp), + 'Limit': limit, + 'ScanIndexForward': False # Du plus récents au plus anciens + } + + items = [] + response = table.query(**query_params) + items.extend(response.get('Items', [])) + + while 'LastEvaluatedKey' in response: + query_params['ExclusiveStartKey'] = response['LastEvaluatedKey'] + response = table.query(**query_params) + items.extend(response.get('Items', [])) + + return items + except Exception as e: + logger.error(msg=f"Erreur : chat history in DynamoDB: {str(e)}") + raise e diff --git a/tests/test_main.py b/tests/test_main.py index 5455da6..17b1d94 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,46 +9,34 @@ def mock_env_vars(): """ Mocke les variables d'environnement requises par l'application. - Utilise scope="module" et autouse=True pour que le mocking s'applique - à tous les tests de ce module et soit setup/teardown une seule fois. """ - # Crée une instance mockée de Settings mock_settings_instance = MagicMock(spec=Settings) mock_settings_instance.TELEGRAM_BOT_TOKEN = "mock_telegram_token_for_tests" mock_settings_instance.MISTRAL_API_KEY = "mock_mistral_api_key_for_tests" mock_settings_instance.WEBHOOK_URL = "http://mock.webhook.url/webhook_for_tests" - mock_settings_instance.TELEGRAM_API_URL = "https://api.telegram.org" # Ajoutez toutes les vars nécessaires + mock_settings_instance.TELEGRAM_API_URL = "https://api.telegram.org" with patch('src.config.get_settings', return_value=mock_settings_instance): - # Ici, en mockant get_settings, on contourne complètement la lecture du .env. yield @pytest.fixture(scope="module", autouse=True) -def mock_telegram_handler_module(): +def mock_telegram_handler_singleton(): """ - Mocke complètement le module 'src.telegram_handler' pour éviter - son exécution réelle lors de l'importation de 'main.py'. + Mocke le singleton telegram_handler pour éviter l'exécution réelle. """ - # Crée un mock pour le module telegram_handler. - # Nous pourrions ajouter des attributs ou méthodes mockés si main.py - # appelait des choses spécifiques du handler directement (ex: handler.process_update) mock_handler = MagicMock() - - # Mocker les fonctions/objets que main.py importe depuis telegram_handler.py - # Ces noms doivent correspondre exactement à ce qui est importé dans main.py - mock_handler.setup_ptb_handlers = AsyncMock(return_value=None) # async function, so it will be awaited - mock_handler.configure_telegram_webhook = AsyncMock(return_value=None) # async function - mock_handler.process_telegram_update = AsyncMock(return_value=None) # async function - mock_handler.shutdown_ptb = AsyncMock(return_value=None) # async function - - with patch.dict('sys.modules', {'src.telegram_handler': mock_handler}): - yield # L'application va maintenant importer notre mock_handler - + # Mock les méthodes async attendues sur le singleton + mock_handler.setup_ptb_handlers = AsyncMock(return_value=None) + mock_handler.configure_telegram_webhook = AsyncMock(return_value=None) + mock_handler.process_telegram_update = AsyncMock(return_value=None) + mock_handler.shutdown_ptb = AsyncMock(return_value=None) + + # Patch le singleton dans le module src.telegram_handler + with patch('src.telegram_handler.telegram_handler', mock_handler): + yield @pytest.fixture(scope="module") def client(): - # C'est ici que src.main est importé, et donc src.telegram_handler - # sera importé comme notre mock_handler. from src.main import app with TestClient(app) as c: yield c @@ -57,7 +45,7 @@ def test_read_main(client): response = client.get("/") assert response.status_code == 200 assert response.json() == {"msg": "Hello World. Welcome to KOZ API"} - + def test_read_prompt(client): response = client.get("/prompt") assert response.status_code == 404 @@ -68,49 +56,18 @@ def test_noread_prompt(client): assert response.status_code == 404 assert response.json() != {"msg": "Hola", "response": ""} - -# calculator.py -class Calculator: - def add(self, a, b): - return a + b - - def divide(self, a, b): - if b == 0: - raise ValueError("Division by zero is not allowed") - return a / b - - - -@pytest.fixture -def calculator(): - return Calculator() - - -def test_add_positive_numbers(calculator): - result = calculator.add(2, 3) - assert result == 5 - - -def test_add_negative_numbers(calculator): - result = calculator.add(-1, -4) - assert result == -5 - - -def test_add_zero(calculator): - result = calculator.add(10, 0) - assert result == 10 - - -def test_divide_valid_numbers(calculator): - result = calculator.divide(10, 2) - assert result == 5.0 - - -def test_divide_by_zero(calculator): - with pytest.raises(ValueError, match="Division by zero is not allowed"): - calculator.divide(10, 0) - - -def test_divide_negative_numbers(calculator): - result = calculator.divide(-10, 2) - assert result == -5.0 +def test_webhook_post(client): + # Simule un update Telegram minimal + update_json = { + "update_id": 123456789, + "message": { + "message_id": 1, + "from": {"id": 123, "is_bot": False, "first_name": "Test"}, + "chat": {"id": 123, "type": "private"}, + "date": 1680000000, + "text": "Bonjour" + } + } + response = client.post("/webhook", json=update_json) + assert response.status_code == 200 + assert response.json() == {"status": "ok"} diff --git a/tests/test_telegram_handler.py b/tests/test_telegram_handler.py index 2ea8503..6fa67c7 100644 --- a/tests/test_telegram_handler.py +++ b/tests/test_telegram_handler.py @@ -1,12 +1,10 @@ import pytest from unittest.mock import AsyncMock, patch, MagicMock -import os -import datetime # --- Fixtures Pytest pour le Mocking --- @pytest.fixture(scope="module", autouse=True) -def mock_env_vars(): +def test_mock_env_vars(): # Import ici pour garantir que le patch est effectif avant toute utilisation from src.config import Settings mock_settings_instance = MagicMock(spec=Settings)