forked from Vexa-ai/vexa
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
291 lines (254 loc) · 12.3 KB
/
main.py
File metadata and controls
291 lines (254 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import uvicorn
from fastapi import FastAPI, Request, Response, HTTPException, status, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.utils import get_openapi
from fastapi.security import APIKeyHeader
import httpx
import os
from dotenv import load_dotenv
import json # For request body processing
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional
# Import schemas for documentation
from shared_models.schemas import (
MeetingCreate, MeetingResponse, MeetingListResponse, # Updated/Added Schemas
TranscriptionResponse, TranscriptionSegment,
UserCreate, UserResponse, TokenResponse, UserDetailResponse, # Admin Schemas
ErrorResponse,
Platform, # Import Platform enum for path parameters
BotStatusResponse # ADDED: Import response model for documentation
)
load_dotenv()
# Configuration from environment variables
ADMIN_API_URL = os.getenv("ADMIN_API_URL", "http://admin-api:8001")
BOT_MANAGER_URL = os.getenv("BOT_MANAGER_URL", "http://bot-manager:8080")
TRANSCRIPTION_COLLECTOR_URL = os.getenv("TRANSCRIPTION_COLLECTOR_URL", "http://transcription-collector:8000")
# Response Models
# class BotResponseModel(BaseModel): ...
# class MeetingModel(BaseModel): ...
# class MeetingsResponseModel(BaseModel): ...
# class TranscriptSegmentModel(BaseModel): ...
# class TranscriptResponseModel(BaseModel): ...
# class UserModel(BaseModel): ...
# class TokenModel(BaseModel): ...
# Security Schemes for OpenAPI
api_key_scheme = APIKeyHeader(name="X-API-Key", description="API Key for client operations", auto_error=False)
admin_api_key_scheme = APIKeyHeader(name="X-Admin-API-Key", description="API Key for admin operations", auto_error=False)
app = FastAPI(
title="Vexa API Gateway",
description="""
**Main entry point for the Vexa platform APIs.**
Provides access to:
- Bot Management (Starting/Stopping transcription bots)
- Transcription Retrieval
- User & Token Administration (Admin only)
## Authentication
Two types of API keys are used:
1. **`X-API-Key`**: Required for all regular client operations (e.g., managing bots, getting transcripts). Obtain your key from an administrator.
2. **`X-Admin-API-Key`**: Required *only* for administrative endpoints (prefixed with `/admin`). This key is configured server-side.
Include the appropriate header in your requests.
""",
version="1.2.0", # Incremented version
contact={
"name": "Vexa Support",
"url": "https://vexa.io/support", # Placeholder URL
"email": "support@vexa.io", # Placeholder Email
},
license_info={
"name": "Proprietary",
},
# Include security schemes in OpenAPI spec
# Note: Applying them globally or per-route is done below
)
# Custom OpenAPI Schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate basic schema first, without components
openapi_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
contact=app.contact,
license_info=app.license_info,
)
# Manually add security schemes to the schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add securitySchemes component
openapi_schema["components"]["securitySchemes"] = {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "API Key for client operations"
},
"AdminApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-Admin-API-Key",
"description": "API Key for admin operations"
}
}
# Optional: Add global security requirement
# openapi_schema["security"] = [{"ApiKeyAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- HTTP Client ---
# Use a single client instance for connection pooling
@app.on_event("startup")
async def startup_event():
app.state.http_client = httpx.AsyncClient()
@app.on_event("shutdown")
async def shutdown_event():
await app.state.http_client.aclose()
# --- Helper for Forwarding ---
async def forward_request(client: httpx.AsyncClient, method: str, url: str, request: Request) -> Response:
# Copy original headers, converting to a standard dict
# Exclude host, content-length, transfer-encoding as they are handled by httpx/server
excluded_headers = {"host", "content-length", "transfer-encoding"}
headers = {k.lower(): v for k, v in request.headers.items() if k.lower() not in excluded_headers}
# Debug logging for original request headers
print(f"DEBUG: Original request headers: {dict(request.headers)}")
# Determine target service based on URL path prefix
is_admin_request = url.startswith(f"{ADMIN_API_URL}/admin")
# Forward appropriate auth header if present
if is_admin_request:
admin_key = request.headers.get("x-admin-api-key")
if admin_key:
headers["x-admin-api-key"] = admin_key
print(f"DEBUG: Forwarding x-admin-api-key header")
else:
print(f"DEBUG: No x-admin-api-key header found in request")
else:
# Forward client API key for bot-manager and transcription-collector
client_key = request.headers.get("x-api-key")
if client_key:
headers["x-api-key"] = client_key
print(f"DEBUG: Forwarding x-api-key header: {client_key[:5]}...")
else:
print(f"DEBUG: No x-api-key header found in request. Headers: {dict(request.headers)}")
# Debug logging for forwarded headers
print(f"DEBUG: Forwarded headers: {headers}")
content = await request.body()
try:
print(f"DEBUG: Forwarding {method} request to {url}")
resp = await client.request(method, url, headers=headers, content=content)
print(f"DEBUG: Response from {url}: status={resp.status_code}")
# Return downstream response directly (including headers, status code)
return Response(content=resp.content, status_code=resp.status_code, headers=dict(resp.headers))
except httpx.RequestError as exc:
print(f"DEBUG: Request error: {exc}")
raise HTTPException(status_code=503, detail=f"Service unavailable: {exc}")
# --- Root Endpoint ---
@app.get("/", tags=["General"], summary="API Gateway Root")
async def root():
"""Provides a welcome message for the Vexa API Gateway."""
return {"message": "Welcome to the Vexa API Gateway"}
# --- Bot Manager Routes ---
@app.post("/bots",
tags=["Bot Management"],
summary="Request a new bot to join a meeting",
description="Creates a new meeting record and launches a bot instance based on platform and native meeting ID.",
# response_model=MeetingResponse, # Response comes from downstream, keep commented
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(api_key_scheme)],
# Explicitly define the request body schema for OpenAPI documentation
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"schema": MeetingCreate.schema()
}
},
"required": True,
"description": "Specify the meeting platform, native ID, and optional bot name."
},
})
# Function signature remains generic for forwarding
async def request_bot_proxy(request: Request, body: Dict[str, Any]):
"""Forward request to Bot Manager to start a bot."""
url = f"{BOT_MANAGER_URL}/bots"
# forward_request handles reading and passing the body from the original request
return await forward_request(app.state.http_client, "POST", url, request)
@app.delete("/bots/{platform}/{native_meeting_id}",
tags=["Bot Management"],
summary="Stop a bot for a specific meeting",
description="Stops the bot container associated with the specified platform and native meeting ID. Requires ownership via API key.",
response_model=MeetingResponse,
dependencies=[Depends(api_key_scheme)])
async def stop_bot_proxy(platform: Platform, native_meeting_id: str, request: Request):
"""Forward request to Bot Manager to stop a bot."""
url = f"{BOT_MANAGER_URL}/bots/{platform.value}/{native_meeting_id}"
return await forward_request(app.state.http_client, "DELETE", url, request)
# --- ADD Route for PUT /bots/.../config ---
@app.put("/bots/{platform}/{native_meeting_id}/config",
tags=["Bot Management"],
summary="Update configuration for an active bot",
description="Updates the language and/or task for an active bot. Sends command via Bot Manager.",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(api_key_scheme)])
# Need to accept request body for PUT
async def update_bot_config_proxy(platform: Platform, native_meeting_id: str, request: Request, body: Dict[str, Any]):
"""Forward request to Bot Manager to update bot config."""
url = f"{BOT_MANAGER_URL}/bots/{platform.value}/{native_meeting_id}/config"
# forward_request handles reading and passing the body from the original request
return await forward_request(app.state.http_client, "PUT", url, request)
# -------------------------------------------
# --- ADD Route for GET /bots/status ---
@app.get("/bots/status",
tags=["Bot Management"],
summary="Get status of running bots for the user",
description="Retrieves a list of currently running bot containers associated with the authenticated user.",
response_model=BotStatusResponse, # Document expected response
dependencies=[Depends(api_key_scheme)])
async def get_bots_status_proxy(request: Request):
"""Forward request to Bot Manager to get running bot status."""
url = f"{BOT_MANAGER_URL}/bots/status"
return await forward_request(app.state.http_client, "GET", url, request)
# --- END Route for GET /bots/status ---
# --- Transcription Collector Routes ---
@app.get("/meetings",
tags=["Transcriptions"],
summary="Get list of user's meetings",
description="Returns a list of all meetings initiated by the user associated with the API key.",
response_model=MeetingListResponse,
dependencies=[Depends(api_key_scheme)])
async def get_meetings_proxy(request: Request):
"""Forward request to Transcription Collector to get meetings."""
url = f"{TRANSCRIPTION_COLLECTOR_URL}/meetings"
return await forward_request(app.state.http_client, "GET", url, request)
@app.get("/transcripts/{platform}/{native_meeting_id}",
tags=["Transcriptions"],
summary="Get transcript for a specific meeting",
description="Retrieves the transcript segments for a meeting specified by its platform and native ID.",
response_model=TranscriptionResponse,
dependencies=[Depends(api_key_scheme)])
async def get_transcript_proxy(platform: Platform, native_meeting_id: str, request: Request):
"""Forward request to Transcription Collector to get a transcript."""
url = f"{TRANSCRIPTION_COLLECTOR_URL}/transcripts/{platform.value}/{native_meeting_id}"
return await forward_request(app.state.http_client, "GET", url, request)
# --- Admin API Routes ---
@app.api_route("/admin/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
tags=["Administration"],
summary="Forward admin requests",
description="Forwards requests prefixed with `/admin` to the Admin API service. Requires `X-Admin-API-Key`.",
dependencies=[Depends(admin_api_key_scheme)])
async def forward_admin_request(request: Request, path: str):
"""Generic forwarder for all admin endpoints."""
admin_path = f"/admin/{path}"
url = f"{ADMIN_API_URL}{admin_path}"
return await forward_request(app.state.http_client, request.method, url, request)
# --- Main Execution ---
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)