forked from Vexa-ai/vexa
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvexa_client.py
More file actions
327 lines (271 loc) · 12.9 KB
/
vexa_client.py
File metadata and controls
327 lines (271 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# vexa_client.py
import requests
from typing import Optional, List, Dict, Any
import os
from urllib.parse import urljoin
import time # Import time for sleep
import re # Import re for parsing meeting ID
# Default Base URL (can be overridden)
DEFAULT_BASE_URL = "http://localhost:8056"
class VexaClientError(Exception):
"""Custom exception for Vexa client errors."""
pass
class VexaClient:
"""
A Python client for interacting with the Vexa API Gateway.
"""
def __init__(self,
base_url: str = DEFAULT_BASE_URL,
api_key: Optional[str] = None,
admin_key: Optional[str] = None):
"""
Initializes the Vexa API client.
Args:
base_url: The base URL of the Vexa API Gateway.
api_key: The API key for regular user operations (X-API-Key).
admin_key: The API key for administrative operations (X-Admin-API-Key).
"""
# Ensure base_url is a string
if not isinstance(base_url, str):
base_url = str(base_url)
self.base_url = base_url
self._api_key = api_key
self._admin_key = admin_key
self._session = requests.Session()
def _get_headers(self, api_type: str = 'user') -> Dict[str, str]:
"""Prepares headers for the request based on API type."""
headers = {"Content-Type": "application/json"}
if api_type == 'admin':
if not self._admin_key:
raise VexaClientError("Admin API key is required for this operation but was not provided.")
headers["X-Admin-API-Key"] = self._admin_key
elif api_type == 'user':
if not self._api_key:
raise VexaClientError("User API key is required for this operation but was not provided.")
headers["X-API-Key"] = self._api_key
else:
raise ValueError("Invalid api_type specified. Use 'user' or 'admin'.")
return headers
def _request(self,
method: str,
path: str,
api_type: str = 'user',
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None) -> Any:
"""
Internal helper method to make requests to the API gateway.
Args:
method: HTTP method (e.g., 'GET', 'POST', 'DELETE').
path: API endpoint path (e.g., '/bots').
api_type: Type of API key required ('user' or 'admin').
params: Optional dictionary of query parameters.
json_data: Optional dictionary for the JSON request body.
Returns:
The JSON response from the API.
Raises:
VexaClientError: If the required API key is missing.
requests.exceptions.RequestException: For connection or other request errors.
requests.exceptions.HTTPError: For non-2xx status codes.
"""
url = urljoin(self.base_url, path)
headers = self._get_headers(api_type)
# Debug output - print URL and headers for troubleshooting
print(f"\nDEBUG: Making {method} request to {url}")
print(f"DEBUG: Headers: {headers}")
print(f"DEBUG: Params: {params}")
print(f"DEBUG: JSON data: {json_data}")
try:
response = self._session.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_data
)
# Debug response
print(f"DEBUG: Response status: {response.status_code}")
print(f"DEBUG: Response headers: {dict(response.headers)}")
try:
print(f"DEBUG: Response content: {response.text[:500]}...")
except:
print(f"DEBUG: Could not display response content")
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
# Handle cases where response might be empty (e.g., 204 No Content)
if response.status_code == 204:
return None
return response.json()
except requests.exceptions.JSONDecodeError:
raise VexaClientError(f"Failed to decode JSON response from {method} {url}. Status: {response.status_code}, Body: {response.text}")
except requests.exceptions.HTTPError as e:
# Attempt to include API error details if available
try:
error_details = e.response.json()
detail_msg = error_details.get('detail', e.response.text)
except requests.exceptions.JSONDecodeError:
detail_msg = e.response.text
raise VexaClientError(f"HTTP Error {e.response.status_code} for {method} {url}: {detail_msg}") from e
except requests.exceptions.RequestException as e:
raise VexaClientError(f"Request failed for {method} {url}: {e}") from e
# --- Bot Management ---
def request_bot(self, platform: str, native_meeting_id: str, bot_name: Optional[str] = None, language: Optional[str] = None, task: Optional[str] = None) -> Dict[str, Any]:
"""
Requests a new bot to join a meeting using platform and native ID.
Args:
platform: Platform identifier (e.g., 'google_meet', 'zoom').
native_meeting_id: The platform-specific meeting identifier.
bot_name: Optional name for the bot in the meeting.
language: Optional language code for transcription (e.g., 'en', 'es').
task: Optional transcription task ('transcribe' or 'translate').
Returns:
Dictionary representing the created/updated Meeting object.
"""
payload = {
"platform": platform,
"native_meeting_id": native_meeting_id
}
if bot_name:
payload["bot_name"] = bot_name
if language:
payload["language"] = language
if task:
payload["task"] = task
return self._request("POST", "/bots", api_type='user', json_data=payload)
def stop_bot(self, platform: str, native_meeting_id: str) -> Dict[str, str]:
"""
Requests a running bot to stop for a specific meeting using platform and native ID.
The API returns a 202 Accepted response immediately while the stop happens in the background.
Args:
platform: Platform identifier (e.g., 'google_meet', 'zoom').
native_meeting_id: The platform-specific meeting identifier.
Returns:
A dictionary containing a confirmation message (e.g., {"message": "..."}).
"""
path = f"/bots/{platform}/{native_meeting_id}"
# _request handles 202 status and returns the JSON body
return self._request("DELETE", path, api_type='user')
def update_bot_config(self, platform: str, native_meeting_id: str, language: Optional[str] = None, task: Optional[str] = None) -> Dict[str, Any]:
"""
Updates the configuration (language, task) for an active bot.
The API returns a 202 Accepted response immediately while the command is sent.
Args:
platform: Platform identifier (e.g., 'google_meet').
native_meeting_id: The platform-specific meeting identifier.
language: Optional new language code (e.g., 'en', 'es'). Pass None to not update.
task: Optional new task ('transcribe' or 'translate'). Pass None to not update.
Returns:
A dictionary containing a confirmation message (e.g., {"message": "..."}).
"""
path = f"/bots/{platform}/{native_meeting_id}/config"
payload = {}
if language is not None:
payload["language"] = language
if task is not None:
payload["task"] = task
if not payload: # Check if there's anything to update
raise VexaClientError("No configuration updates provided (language or task must be specified).")
# _request handles 202 status and returns the JSON body
return self._request("PUT", path, api_type='user', json_data=payload)
def get_running_bots_status(self) -> List[Dict[str, Any]]:
"""
Retrieves the status of running bot containers for the authenticated user.
Returns:
List of dictionaries, each representing the status of a running bot container.
"""
response = self._request("GET", "/bots/status", api_type='user')
# The API returns a dict {"running_bots": [...]}, extract the list.
return response.get("running_bots", [])
# --- Transcriptions ---
def get_meetings(self) -> List[Dict[str, Any]]:
"""
Retrieves the list of meetings initiated by the user associated with the API key.
Returns:
List of dictionaries, each representing a Meeting object.
"""
response = self._request("GET", "/meetings", api_type='user')
# The API returns a dict {"meetings": [...]}, extract the list.
return response.get("meetings", [])
def get_transcript(self, platform: str, native_meeting_id: str) -> Dict[str, Any]:
"""
Retrieves the transcript for a specific meeting using platform and native ID.
Args:
platform: Platform identifier (e.g., 'google_meet', 'zoom').
native_meeting_id: The platform-specific meeting identifier.
Returns:
Dictionary containing meeting details and transcript segments.
"""
path = f"/transcripts/{platform}/{native_meeting_id}"
return self._request("GET", path, api_type='user')
# --- Admin: User Management ---
def create_user(self,
email: str,
name: Optional[str] = None,
image_url: Optional[str] = None,
max_concurrent_bots: Optional[int] = None
) -> Dict[str, Any]:
"""
Creates a new user (Admin Only).
Args:
email: The email address for the new user.
name: Optional name for the user.
image_url: Optional URL for the user's image.
max_concurrent_bots: Optional maximum number of concurrent bots allowed (defaults server-side if None).
Returns:
Dictionary representing the created User object.
"""
payload = {"email": email}
if name:
payload["name"] = name
if image_url:
payload["image_url"] = image_url
if max_concurrent_bots is not None:
payload["max_concurrent_bots"] = max_concurrent_bots
return self._request("POST", "/admin/users", api_type='admin', json_data=payload)
def list_users(self, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""
Lists users in the system (Admin Only).
Args:
skip: Number of users to skip (for pagination).
limit: Maximum number of users to return (for pagination).
Returns:
A list of dictionaries, each representing a User object.
"""
params = {"skip": skip, "limit": limit}
return self._request("GET", "/admin/users", api_type='admin', params=params)
def update_user(self,
user_id: int,
name: Optional[str] = None,
image_url: Optional[str] = None,
max_concurrent_bots: Optional[int] = None
) -> Dict[str, Any]:
"""
Updates specific fields for an existing user (Admin Only).
Only include parameters for the fields you want to change.
Args:
user_id: The ID of the user to update.
name: Optional new name for the user.
image_url: Optional new URL for the user's image.
max_concurrent_bots: Optional new maximum number of concurrent bots.
Returns:
Dictionary representing the updated User object.
"""
payload = {}
if name is not None:
payload["name"] = name
if image_url is not None:
payload["image_url"] = image_url
if max_concurrent_bots is not None:
payload["max_concurrent_bots"] = max_concurrent_bots
if not payload: # Check if any update fields were provided
raise VexaClientError("No update fields provided for update_user.")
path = f"/admin/users/{user_id}"
return self._request("PATCH", path, api_type='admin', json_data=payload)
# --- Admin: Token Management ---
def create_token(self, user_id: int) -> Dict[str, Any]:
"""
Generates a new API token for a specific user (Admin Only).
Args:
user_id: The ID of the user for whom to create the token.
Returns:
Dictionary representing the created APIToken object.
"""
return self._request("POST", f"/admin/users/{user_id}/tokens", api_type='admin')