Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def _initialize_instance(
self._media_upload_queue: Queue[Any] = Queue(100_000)
self._media_manager = MediaManager(
api_client=self.api,
httpx_client=self.httpx_client,
media_upload_queue=self._media_upload_queue,
max_retries=3,
)
Expand Down
10 changes: 6 additions & 4 deletions langfuse/_task_manager/media_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Callable, Optional, TypeVar, cast

import backoff
import requests
import httpx
from typing_extensions import ParamSpec

from langfuse._client.environment_variables import LANGFUSE_MEDIA_UPLOAD_ENABLED
Expand All @@ -29,10 +29,12 @@ def __init__(
self,
*,
api_client: FernLangfuse,
httpx_client: httpx.Client,
media_upload_queue: Queue,
max_retries: Optional[int] = 3,
):
self._api_client = api_client
self._httpx_client = httpx_client
self._queue = media_upload_queue
self._max_retries = max_retries
self._enabled = os.environ.get(
Expand Down Expand Up @@ -256,10 +258,10 @@ def _process_upload_media_job(

upload_start_time = time.time()
upload_response = self._request_with_backoff(
requests.put,
self._httpx_client.put,
upload_url,
headers=headers,
data=data["content_bytes"],
content=data["content_bytes"],
)
Comment on lines 260 to 265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in #1536, _request_with_backoff will never really retry here, since requests.put/httpx.Client.put does not raise a HTTP Status Error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upload_time_ms = int((time.time() - upload_start_time) * 1000)

Expand Down Expand Up @@ -288,7 +290,7 @@ def _should_give_up(e: Exception) -> bool:
and 400 <= e.status_code < 500
and e.status_code != 429
)
if isinstance(e, requests.exceptions.RequestException):
if isinstance(e, httpx.HTTPStatusError):
return (
e.response is not None
and e.response.status_code < 500
Expand Down
21 changes: 16 additions & 5 deletions langfuse/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast

import requests
import httpx

if TYPE_CHECKING:
from langfuse._client.client import Langfuse
Expand Down Expand Up @@ -284,6 +284,11 @@ def traverse(obj: Any, depth: int) -> Any:

result = obj
reference_string_to_media_content = {}
httpx_client = (
langfuse_client._resources.httpx_client
if langfuse_client._resources is not None
else None
)

for reference_string in reference_string_matches:
try:
Expand All @@ -293,11 +298,17 @@ def traverse(obj: Any, depth: int) -> Any:
media_data = langfuse_client.api.media.get(
parsed_media_reference["media_id"]
)
media_content = requests.get(
media_data.url, timeout=content_fetch_timeout_seconds
media_content = (
httpx_client.get(
media_data.url,
timeout=content_fetch_timeout_seconds,
)
if httpx_client is not None
else httpx.get(
media_data.url, timeout=content_fetch_timeout_seconds
)
)
if not media_content.ok:
raise Exception("Failed to fetch media content")
media_content.raise_for_status()

base64_media_content = base64.b64encode(
media_content.content
Expand Down
Loading
Loading