diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md index 17309ec..b6a2107 100644 --- a/docs/API_REFERENCE.md +++ b/docs/API_REFERENCE.md @@ -52,8 +52,8 @@ Block receives a `Langfuse::Config` object with these properties: | `flush_interval` | Integer | No | `10` | Score + trace export interval (s) | | `sample_rate` | Float | No | `1.0` | Trace + trace-linked score sampling rate (`0.0..1.0`) | | `logger` | Logger | No | Auto-detected | Logger instance | -| `tracing_async` | Boolean | No | `true` | ⚠️ Experimental (OTel export mode) | -| `job_queue` | Symbol | No | `:default` | ⚠️ Experimental (not implemented) | +| `tracing_async` | Boolean | No | `true` | ⚠️ Experimental (OTel batch scheduling) | +| `job_queue` | Symbol | No | `:default` | Reserved/no-op for future job integration | | `environment` | String | No | `nil` (or `ENV["LANGFUSE_TRACING_ENVIRONMENT"]`) | Default trace environment | | `release` | String | No | `nil` (or `ENV["LANGFUSE_RELEASE"]` / common CI commit SHA env) | Default release identifier | | `should_export_span` | `#call` | No | `nil` | Span export filter callback | diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index ffbe2fd..4d06e41 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -301,8 +301,8 @@ config.logger = Logger.new(IO::NULL) - **Type:** Boolean - **Default:** `true` -- **Status:** Implemented for OpenTelemetry batching; ActiveJob integration is not implemented -- **Description:** Controls OpenTelemetry export behavior. When `true`, spans are exported in the background on a schedule. When `false`, spans are still batched with a longer schedule delay and are typically flushed explicitly at lifecycle boundaries. +- **Status:** Implemented for OpenTelemetry batch scheduling; ActiveJob integration is not implemented +- **Description:** Controls OpenTelemetry export scheduling. When `true`, spans use the configured `flush_interval` schedule. When `false`, spans still use OpenTelemetry's batch processor with a long schedule delay and are typically flushed explicitly at lifecycle boundaries. ```ruby config.tracing_async = true @@ -314,11 +314,11 @@ config.tracing_async = true - **Type:** Symbol - **Default:** `:default` -- **Status:** Not yet implemented (placeholder) -- **Description:** Future: ActiveJob queue name for async tracing +- **Status:** Reserved/no-op +- **Description:** Reserved for a future ActiveJob integration. It is kept for configuration compatibility and has no runtime effect today. ```ruby -config.job_queue = :langfuse # Placeholder - no effect currently +config.job_queue = :langfuse # Reserved/no-op today ``` **Current Behavior:** No ActiveJob integration yet. Reserved for future implementation. diff --git a/lib/langfuse.rb b/lib/langfuse.rb index 24ecb47..5e5e28f 100644 --- a/lib/langfuse.rb +++ b/lib/langfuse.rb @@ -44,6 +44,7 @@ class UnauthorizedError < ApiError; end require_relative "langfuse/prompt_cache" require_relative "langfuse/prompt_fetch_result" require_relative "langfuse/rails_cache_adapter" +require_relative "langfuse/prompt_cache_coordinator" require_relative "langfuse/cache_warmer" require_relative "langfuse/prompt_cache_events" require_relative "langfuse/api_client" diff --git a/lib/langfuse/api_client.rb b/lib/langfuse/api_client.rb index 7789791..551910f 100644 --- a/lib/langfuse/api_client.rb +++ b/lib/langfuse/api_client.rb @@ -6,6 +6,7 @@ require "json" require "uri" require_relative "prompt_fetch_result" +require_relative "prompt_cache_coordinator" module Langfuse # HTTP client for Langfuse API @@ -25,14 +26,6 @@ module Langfuse class ApiClient # rubocop:disable Metrics/ClassLength include PromptCacheEvents - # Bundles the resolved cache key with the per-call TTL override so private - # prompt-fetch helpers take one arg instead of four. - PromptFetchOptions = Struct.new(:key, :cache_ttl, keyword_init: true) do - def name = key.name - def version = key.version - def label = key.label - end - # @return [String] Langfuse public API key attr_reader :public_key @@ -69,8 +62,12 @@ def initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cac @timeout = timeout @logger = logger || Logger.new($stdout, level: Logger::WARN) @cache = cache - @cache_backend_name = compute_cache_backend_name setup_prompt_cache_events(cache_observer: cache_observer) + @prompt_cache_coordinator = PromptCacheCoordinator.new( + cache: cache, + event_emitter: self, + fetch_prompt: ->(name, version:, label:) { fetch_prompt_from_api(name, version: version, label: label) } + ) end # rubocop:enable Metrics/ParameterLists @@ -105,15 +102,7 @@ def connection(timeout: nil) # puts "#{prompt['name']} (v#{prompt['version']})" # end def list_prompts(page: nil, limit: nil) - with_faraday_error_handling do - params = { page: page, limit: limit }.compact - - response = connection.get("/api/public/v2/prompts", params) - result = handle_response(response) - - # API returns { data: [...], meta: {...} } - result["data"] || [] - end + request(:get, "/api/public/v2/prompts", params: { page: page, limit: limit }.compact)["data"] || [] end # Fetch a prompt from the Langfuse API @@ -148,16 +137,7 @@ def get_prompt(name, version: nil, label: nil, cache_ttl: nil) # @raise [UnauthorizedError] if authentication fails # @raise [ApiError] for other API errors def get_prompt_result(name, version: nil, label: nil, cache_ttl: nil) - validate_prompt_fetch_options!(version, label, cache_ttl) - - options = PromptFetchOptions.new( - key: prompt_cache_key(name, version: version, label: label), - cache_ttl: cache_ttl - ) - return fetch_uncached_prompt_result(options, CacheStatus::DISABLED) if cache.nil? - return fetch_uncached_prompt_result(options, CacheStatus::BYPASS) if cache_ttl&.zero? - - fetch_cached_prompt_result(options) + @prompt_cache_coordinator.get_prompt_result(name, version: version, label: label, cache_ttl: cache_ttl) end # Refresh a prompt from the API, optionally writing through to cache. @@ -173,14 +153,7 @@ def get_prompt_result(name, version: nil, label: nil, cache_ttl: nil) # @raise [UnauthorizedError] if authentication fails # @raise [ApiError] for other API errors def refresh_prompt(name, version: nil, label: nil, cache_ttl: nil) - validate_prompt_fetch_options!(version, label, cache_ttl) - - refresh_prompt_result( - PromptFetchOptions.new( - key: prompt_cache_key(name, version: version, label: label), - cache_ttl: cache_ttl - ) - ) + @prompt_cache_coordinator.refresh_prompt(name, version: version, label: label, cache_ttl: cache_ttl) end # Inspect the logical and generated cache keys for a prompt. @@ -191,15 +164,7 @@ def refresh_prompt(name, version: nil, label: nil, cache_ttl: nil) # @return [PromptCacheKey] Logical and generated cache keys # @raise [ArgumentError] if both version and label are provided def prompt_cache_key(name, version: nil, label: nil) - raise ArgumentError, "Cannot specify both version and label" if version && label - - logical_key = PromptCache.build_key(name, version: version, label: label) - storage_key = if generated_storage_key_cache? - cache.storage_key(logical_key, name: name) - else - logical_key - end - PromptCacheKey.new(name: name, version: version, label: label, logical_key: logical_key, storage_key: storage_key) + @prompt_cache_coordinator.prompt_cache_key(name, version: version, label: label) end # Invalidate one exact logical prompt cache key. @@ -210,13 +175,7 @@ def prompt_cache_key(name, version: nil, label: nil) # @return [PromptCacheKey] The invalidated key # @raise [ArgumentError] if both version and label are provided def invalidate_prompt_cache(name, version: nil, label: nil) - key = prompt_cache_key(name, version: version, label: label) - deleted = cache&.delete(key.storage_key) || false - emit_prompt_cache_event(:delete) { event_payload(key, CacheStatus::MISS, CacheSource::CACHE, deleted: deleted) } - emit_prompt_cache_event(:invalidate) do - event_payload(key, CacheStatus::MISS, CacheSource::CACHE, scope: :exact) - end - key + @prompt_cache_coordinator.invalidate_prompt_cache(name, version: version, label: label) end # Invalidate all cached variants for one prompt name. @@ -224,29 +183,33 @@ def invalidate_prompt_cache(name, version: nil, label: nil) # @param name [String] The prompt name # @return [Integer, nil] New generation, or nil when cache is disabled def invalidate_prompt_cache_by_name(name) - generation = cache&.invalidate_name(name) - payload = { name: name, backend: cache_backend_name, generation: generation, scope: :name } - emit_prompt_cache_event(:invalidate, payload) - generation + @prompt_cache_coordinator.invalidate_prompt_cache_by_name(name) end # Logically clear the whole Langfuse prompt cache namespace. # # @return [Integer, nil] New global generation, or nil when cache is disabled def clear_prompt_cache - generation = cache&.clear_logically - emit_prompt_cache_event(:clear, backend: cache_backend_name, generation: generation) - generation + @prompt_cache_coordinator.clear_prompt_cache end # Return prompt cache statistics. # # @return [Hash] Cache statistics def prompt_cache_stats - return disabled_prompt_cache_stats unless cache + @prompt_cache_coordinator.prompt_cache_stats + end - cache.stats + # Validate the configured prompt cache backend. + # + # @return [Boolean] true when the configured backend is usable + # @raise [ConfigurationError] if the backend is invalid + # rubocop:disable Naming/PredicateMethod + def validate_prompt_cache_backend! + @cache&.validate! + true end + # rubocop:enable Naming/PredicateMethod # Create a new prompt (or new version if prompt with same name exists) # @@ -271,21 +234,12 @@ def prompt_cache_stats # # rubocop:disable Metrics/ParameterLists def create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) - with_faraday_error_handling do - path = "/api/public/v2/prompts" - payload = { - name: name, - prompt: prompt, - type: type, - config: config, - labels: labels, - tags: tags - } - payload[:commitMessage] = commit_message if commit_message - - response = connection.post(path, payload) - handle_response(response).tap { invalidate_prompt_cache_after_mutation(name) } - end + payload = { + name: name, prompt: prompt, type: type, config: config, + labels: labels, tags: tags, commitMessage: commit_message + }.compact + request(:post, "/api/public/v2/prompts", body: payload) + .tap { @prompt_cache_coordinator.invalidate_after_mutation(name) } end # rubocop:enable Metrics/ParameterLists @@ -309,13 +263,9 @@ def create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commi def update_prompt(name:, version:, labels:) raise ArgumentError, "labels must be an array" unless labels.is_a?(Array) - with_faraday_error_handling do - path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}/versions/#{version}" - payload = { newLabels: labels } - - response = connection.patch(path, payload) - handle_response(response).tap { invalidate_prompt_cache_after_mutation(name) } - end + path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}/versions/#{version}" + request(:patch, path, body: { newLabels: labels }) + .tap { @prompt_cache_coordinator.invalidate_after_mutation(name) } end # Send a batch of events to the Langfuse ingestion API @@ -344,13 +294,9 @@ def send_batch(events) raise ArgumentError, "events must be an array" unless events.is_a?(Array) raise ArgumentError, "events array cannot be empty" if events.empty? - path = "/api/public/ingestion" - payload = { batch: events } - - response = connection.post(path, payload) + response = connection.post("/api/public/ingestion", { batch: events }) handle_batch_response(response) rescue Faraday::RetriableResponse => e - # Retry middleware exhausted all retries - handle the final response logger.error("Langfuse batch send failed: Retries exhausted - #{e.response.status}") handle_batch_response(e.response) rescue Faraday::Error => e @@ -374,16 +320,12 @@ def send_batch(events) # api_client.create_dataset_run_item(dataset_item_id: "item-123", run_name: "eval-v1", trace_id: "trace-abc") def create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) - with_faraday_error_handling do - payload = { datasetItemId: dataset_item_id, runName: run_name } - payload[:traceId] = trace_id if trace_id - payload[:observationId] = observation_id if observation_id - payload[:metadata] = metadata if metadata - payload[:runDescription] = run_description if run_description - - response = connection.post("/api/public/dataset-run-items", payload) - handle_response(response) - end + payload = { + datasetItemId: dataset_item_id, runName: run_name, + traceId: trace_id, observationId: observation_id, + metadata: metadata, runDescription: run_description + }.compact + request(:post, "/api/public/dataset-run-items", body: payload) end # Fetch a dataset run by dataset and run name @@ -395,10 +337,7 @@ def create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, # @raise [UnauthorizedError] if authentication fails # @raise [ApiError] for other API errors def get_dataset_run(dataset_name:, run_name:) - with_faraday_error_handling do - response = connection.get(dataset_run_path(dataset_name: dataset_name, run_name: run_name)) - handle_response(response) - end + request(:get, dataset_run_path(dataset_name: dataset_name, run_name: run_name)) end # List dataset runs in a dataset @@ -410,8 +349,7 @@ def get_dataset_run(dataset_name:, run_name:) # @raise [UnauthorizedError] if authentication fails # @raise [ApiError] for other API errors def list_dataset_runs(dataset_name:, page: nil, limit: nil) - result = list_dataset_runs_paginated(dataset_name: dataset_name, page: page, limit: limit) - result["data"] || [] + list_dataset_runs_paginated(dataset_name: dataset_name, page: page, limit: limit)["data"] || [] end # Full paginated response including "meta" for internal pagination use @@ -419,10 +357,7 @@ def list_dataset_runs(dataset_name:, page: nil, limit: nil) # @api private # @return [Hash] Full response hash with "data" array and "meta" pagination info def list_dataset_runs_paginated(dataset_name:, page: nil, limit: nil) - with_faraday_error_handling do - response = connection.get(dataset_runs_path(dataset_name), build_dataset_runs_params(page: page, limit: limit)) - handle_response(response) - end + request(:get, dataset_runs_path(dataset_name), params: { page: page, limit: limit }.compact) end # Delete a dataset run by name @@ -451,19 +386,16 @@ def delete_dataset_run(dataset_name:, run_name:) # data = api_client.get_projects # project_id = data["data"][0]["id"] def get_projects # rubocop:disable Naming/AccessorMethodName - with_faraday_error_handling do - response = connection.get("/api/public/projects") - handle_response(response) - end + request(:get, "/api/public/projects") end # Shut down the API client and release resources # - # Shuts down the cache if it supports shutdown (e.g., SWR thread pool). + # Shuts down the cache backend's SWR thread pool when present. # # @return [void] def shutdown - cache.shutdown if cache.respond_to?(:shutdown) + @cache&.shutdown end # List traces in the project @@ -493,14 +425,13 @@ def list_traces(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) - result = list_traces_paginated( + list_traces_paginated( page: page, limit: limit, user_id: user_id, name: name, session_id: session_id, from_timestamp: from_timestamp, to_timestamp: to_timestamp, order_by: order_by, tags: tags, version: version, release: release, environment: environment, fields: fields, filter: filter - ) - result["data"] || [] + )["data"] || [] end # rubocop:enable Metrics/ParameterLists @@ -513,17 +444,14 @@ def list_traces_paginated(page: nil, limit: nil, user_id: nil, name: nil, sessio from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) - with_faraday_error_handling do - params = build_traces_params( - page: page, limit: limit, user_id: user_id, name: name, - session_id: session_id, from_timestamp: from_timestamp, - to_timestamp: to_timestamp, order_by: order_by, tags: tags, - version: version, release: release, environment: environment, - fields: fields, filter: filter - ) - response = connection.get("/api/public/traces", params) - handle_response(response) - end + params = build_traces_params( + page: page, limit: limit, user_id: user_id, name: name, + session_id: session_id, from_timestamp: from_timestamp, + to_timestamp: to_timestamp, order_by: order_by, tags: tags, + version: version, release: release, environment: environment, + fields: fields, filter: filter + ) + request(:get, "/api/public/traces", params: params) end # rubocop:enable Metrics/ParameterLists @@ -538,11 +466,7 @@ def list_traces_paginated(page: nil, limit: nil, user_id: nil, name: nil, sessio # @example # trace = api_client.get_trace("trace-uuid-123") def get_trace(id) - with_faraday_error_handling do - encoded_id = URI.encode_uri_component(id) - response = connection.get("/api/public/traces/#{encoded_id}") - handle_response(response) - end + request(:get, "/api/public/traces/#{URI.encode_uri_component(id)}") end # List all datasets in the project @@ -556,13 +480,7 @@ def get_trace(id) # @example # datasets = api_client.list_datasets(page: 1, limit: 10) def list_datasets(page: nil, limit: nil) - with_faraday_error_handling do - params = { page: page, limit: limit }.compact - - response = connection.get("/api/public/v2/datasets", params) - result = handle_response(response) - result["data"] || [] - end + request(:get, "/api/public/v2/datasets", params: { page: page, limit: limit }.compact)["data"] || [] end # Fetch a dataset by name @@ -576,11 +494,7 @@ def list_datasets(page: nil, limit: nil) # @example # data = api_client.get_dataset("my-dataset") def get_dataset(name) - with_faraday_error_handling do - encoded_name = URI.encode_uri_component(name) - response = connection.get("/api/public/v2/datasets/#{encoded_name}") - handle_response(response) - end + request(:get, "/api/public/v2/datasets/#{URI.encode_uri_component(name)}") end # Create a new dataset @@ -595,12 +509,8 @@ def get_dataset(name) # @example # data = api_client.create_dataset(name: "my-dataset", description: "QA evaluation set") def create_dataset(name:, description: nil, metadata: nil) - with_faraday_error_handling do - payload = { name: name, description: description, metadata: metadata }.compact - - response = connection.post("/api/public/v2/datasets", payload) - handle_response(response) - end + request(:post, "/api/public/v2/datasets", + body: { name: name, description: description, metadata: metadata }.compact) end # Create a new dataset item (or upsert if id is provided) @@ -627,16 +537,13 @@ def create_dataset(name:, description: nil, metadata: nil) def create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) - with_faraday_error_handling do - payload = build_dataset_item_payload( - dataset_name: dataset_name, input: input, expected_output: expected_output, - metadata: metadata, id: id, source_trace_id: source_trace_id, - source_observation_id: source_observation_id, status: status - ) - - response = connection.post("/api/public/dataset-items", payload) - handle_response(response) - end + payload = { + datasetName: dataset_name, id: id, input: input, + expectedOutput: expected_output, metadata: metadata, + sourceTraceId: source_trace_id, sourceObservationId: source_observation_id, + status: status&.to_s&.upcase + }.compact + request(:post, "/api/public/dataset-items", body: payload) end # rubocop:enable Metrics/ParameterLists @@ -651,11 +558,7 @@ def create_dataset_item(dataset_name:, input: nil, expected_output: nil, # @example # data = api_client.get_dataset_item("item-uuid-123") def get_dataset_item(id) - with_faraday_error_handling do - encoded_id = URI.encode_uri_component(id) - response = connection.get("/api/public/dataset-items/#{encoded_id}") - handle_response(response) - end + request(:get, "/api/public/dataset-items/#{URI.encode_uri_component(id)}") end # List items in a dataset with optional filters @@ -671,13 +574,8 @@ def get_dataset_item(id) # # @example # items = api_client.list_dataset_items(dataset_name: "my-dataset", limit: 50) - def list_dataset_items(dataset_name:, page: nil, limit: nil, - source_trace_id: nil, source_observation_id: nil) - result = list_dataset_items_paginated( - dataset_name: dataset_name, page: page, limit: limit, - source_trace_id: source_trace_id, source_observation_id: source_observation_id - ) - result["data"] || [] + def list_dataset_items(**) + list_dataset_items_paginated(**)["data"] || [] end # Full paginated response including "meta" for internal pagination use @@ -686,15 +584,11 @@ def list_dataset_items(dataset_name:, page: nil, limit: nil, # @return [Hash] Full response hash with "data" array and "meta" pagination info def list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) - with_faraday_error_handling do - params = build_dataset_items_params( - dataset_name: dataset_name, page: page, limit: limit, - source_trace_id: source_trace_id, source_observation_id: source_observation_id - ) - - response = connection.get("/api/public/dataset-items", params) - handle_response(response) - end + params = { + datasetName: dataset_name, page: page, limit: limit, + sourceTraceId: source_trace_id, sourceObservationId: source_observation_id + }.compact + request(:get, "/api/public/dataset-items", params: params) end # Delete a dataset item by ID @@ -708,8 +602,7 @@ def list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, # @example # api_client.delete_dataset_item("item-uuid-123") def delete_dataset_item(id) - encoded_id = URI.encode_uri_component(id) - response = connection.delete("/api/public/dataset-items/#{encoded_id}") + response = connection.delete("/api/public/dataset-items/#{URI.encode_uri_component(id)}") handle_delete_dataset_item_response(response, id) rescue Faraday::RetriableResponse => e logger.error("Faraday error: Retries exhausted - #{e.response.status}") @@ -721,314 +614,42 @@ def delete_dataset_item(id) private - def validate_prompt_fetch_options!(version, label, cache_ttl) - raise ArgumentError, "Cannot specify both version and label" if version && label - return if cache_ttl.nil? - raise ArgumentError, "cache_ttl must be a non-negative Integer" unless cache_ttl.is_a?(Integer) - raise ArgumentError, "cache_ttl must be non-negative" if cache_ttl.negative? + def cache_backend_name + @prompt_cache_coordinator.backend_name end - def fetch_uncached_prompt_result(options, cache_status) - prompt_data = fetch_prompt_for_options(options) - build_prompt_result(options.key, prompt_data, cache_status, CacheSource::API) - end - - def fetch_cached_prompt_result(options) - return fetch_swr_prompt_result(options) if swr_cache_available? - - fetch_non_swr_prompt_result(options) - end - - def fetch_swr_prompt_result(options) - unless generated_storage_key_cache? - prompt_data = fetch_with_swr_cache(options.key.storage_key, options.name, options.version, options.label) - return cache_hit_prompt_result(options.key, prompt_data) - end - - result = fetch_swr_cached_prompt_result(options) - return result if result - - fetch_cache_miss_prompt_result(options, swr_enabled: true, distributed_enabled: false) - end - - def fetch_non_swr_prompt_result(options) - distributed_enabled = distributed_cache_available? - - if !generated_storage_key_cache? && distributed_enabled - prompt_data = fetch_with_distributed_cache(options.key.storage_key, options.name, options.version, - options.label) - return cache_hit_prompt_result(options.key, prompt_data) - end - - cached_data = cache.get(options.key.storage_key) - return cache_hit_prompt_result(options.key, cached_data) if cached_data - - fetch_cache_miss_prompt_result(options, swr_enabled: false, distributed_enabled: distributed_enabled) - end - - def fetch_swr_cached_prompt_result(options) - key = options.key - entry = cache.entry(key.storage_key) if cache.respond_to?(:entry) - return nil unless entry.respond_to?(:fresh?) - return cache_hit_prompt_result(key, entry.data) if entry.fresh? - return nil unless entry.stale? - - emit_prompt_cache_event(:stale_serve) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } - schedule_prompt_cache_refresh(options) - build_prompt_result(key, entry.data, CacheStatus::STALE, CacheSource::CACHE) - end - - def cache_hit_prompt_result(key, prompt_data) - emit_prompt_cache_event(:hit) { event_payload(key, CacheStatus::HIT, CacheSource::CACHE) } - build_prompt_result(key, prompt_data, CacheStatus::HIT, CacheSource::CACHE) - end - - def fetch_cache_miss_prompt_result(options, swr_enabled: false, distributed_enabled: nil) - emit_prompt_cache_event(:miss) { event_payload(options.key, CacheStatus::MISS, CacheSource::API) } - distributed_enabled = distributed_cache_available? if distributed_enabled.nil? - - if !swr_enabled && distributed_enabled - fetch_cache_miss_with_lock(options) - else - fetch_cache_miss_directly(options, swr_enabled: swr_enabled) - end - end - - def fetch_cache_miss_with_lock(options) - key = options.key - fetched = false - prompt_data = cache_fetch_with_lock(key.storage_key, options.cache_ttl) do - fetched = true - fetch_prompt_for_options(options) - end - emit_prompt_cache_event(:write) { event_payload(key, CacheStatus::MISS, CacheSource::API) } if fetched - status = fetched ? CacheStatus::MISS : CacheStatus::HIT - source = fetched ? CacheSource::API : CacheSource::CACHE - build_prompt_result(key, prompt_data, status, source) - end - - def fetch_cache_miss_directly(options, swr_enabled: false) - prompt_data = fetch_prompt_for_options(options) - write_prompt_cache(options.key, prompt_data, options.cache_ttl, swr_enabled: swr_enabled) - build_prompt_result(options.key, prompt_data, CacheStatus::MISS, CacheSource::API) - end - - def refresh_prompt_result(options) - key = options.key - emit_prompt_cache_event(:refresh_start) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } - prompt_data = fetch_prompt_for_options(options) - write_refresh_prompt_cache(key, prompt_data, options.cache_ttl) - status = refresh_cache_status(options.cache_ttl) - emit_prompt_cache_event(:refresh_success) { event_payload(key, status, CacheSource::API) } - build_prompt_result(key, prompt_data, status, CacheSource::API) - rescue StandardError => e - emit_prompt_cache_event(:refresh_failure) do - event_payload(key, CacheStatus::REFRESH, CacheSource::API, - error_class: e.class.name, error_message: e.message) - end - raise - end - - def schedule_prompt_cache_refresh(options) - return unless cache.respond_to?(:refresh_async) - - key = options.key - scheduled = cache.refresh_async( - key.storage_key, - ttl: options.cache_ttl, - on_success: ->(_value) { emit_refresh_success_events(key) }, - on_failure: ->(error) { emit_refresh_failure_event(key, error) } - ) { fetch_prompt_for_options(options) } - return unless scheduled - - emit_prompt_cache_event(:refresh_start) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } - end - - def fetch_prompt_for_options(options) - fetch_prompt_from_api(options.name, version: options.version, label: options.label) - end - - def emit_refresh_success_events(key) - emit_prompt_cache_event(:refresh_success) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } - emit_prompt_cache_event(:write) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } - end - - def emit_refresh_failure_event(key, error) - emit_prompt_cache_event(:refresh_failure) do - event_payload(key, CacheStatus::STALE, CacheSource::CACHE, - error_class: error.class.name, error_message: error.message) - end - end - - def write_refresh_prompt_cache(key, prompt_data, cache_ttl) - return unless cache - return if cache_ttl&.zero? - - write_prompt_cache(key, prompt_data, cache_ttl, - cache_status: CacheStatus::REFRESH, swr_enabled: swr_cache_available?) - end - - def write_prompt_cache(key, prompt_data, cache_ttl, cache_status: CacheStatus::MISS, swr_enabled: false) - if swr_enabled && cache.respond_to?(:write_with_stale_while_revalidate) - cache.write_with_stale_while_revalidate(key.storage_key, prompt_data, ttl: cache_ttl) - elsif cache_ttl.nil? - cache.set(key.storage_key, prompt_data) - else - cache.set(key.storage_key, prompt_data, ttl: cache_ttl) - end - emit_prompt_cache_event(:write) { event_payload(key, cache_status, CacheSource::API) } - end - - def cache_fetch_with_lock(storage_key, cache_ttl, &) - return cache.fetch_with_lock(storage_key, &) if cache_ttl.nil? - - cache.fetch_with_lock(storage_key, ttl: cache_ttl, &) - end - - def refresh_cache_status(cache_ttl) - return CacheStatus::DISABLED unless cache - return CacheStatus::BYPASS if cache_ttl&.zero? - - CacheStatus::REFRESH - end - - def build_prompt_result(key, prompt_data, cache_status, source) - PromptFetchResult.new( - prompt: prompt_data, - logical_key: key.logical_key, - storage_key: key.storage_key, - cache_status: cache_status, - source: source, - name: prompt_data["name"] || key.name, - version: prompt_data["version"] || key.version, - label: key.resolved_label - ) - end - - attr_reader :cache_backend_name - - def compute_cache_backend_name - return CacheBackend::DISABLED unless cache - return CacheBackend::RAILS if cache.is_a?(RailsCacheAdapter) - return CacheBackend::MEMORY if cache.is_a?(PromptCache) - - cache.class.name - end - - def disabled_prompt_cache_stats - { - backend: CacheBackend::DISABLED, - enabled: false, - current_generation_entries: nil, - orphaned_entries: nil, - total_entries: nil, - unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS - } - end - - def generated_storage_key_cache? - cache.is_a?(PromptCache) || cache.is_a?(RailsCacheAdapter) - end - - def invalidate_prompt_cache_after_mutation(name) - generation = cache&.invalidate_name(name) - payload = { name: name, backend: cache_backend_name, generation: generation, scope: :name, mutation: true } - emit_prompt_cache_event(:invalidate, payload) - end - - # Check if SWR cache is available - def swr_cache_available? - cache.respond_to?(:swr_enabled?) && cache.swr_enabled? - end - - # Check if distributed cache is available - def distributed_cache_available? - cache.respond_to?(:fetch_with_lock) - end - - # Build payload for create_dataset_item - # rubocop:disable Metrics/ParameterLists - def build_dataset_item_payload(dataset_name:, input:, expected_output:, - metadata:, id:, source_trace_id:, - source_observation_id:, status:) - { datasetName: dataset_name }.tap do |payload| - add_optional_dataset_item_fields(payload, input, expected_output, metadata, id) - add_optional_source_fields(payload, source_trace_id, source_observation_id, status) + # Issue an HTTP request, raise on Faraday errors, parse the response. + # + # @api private + # @param verb [Symbol] HTTP verb (:get, :post, :patch, :delete) + # @param path [String] Request path + # @param params [Hash, nil] Query string params (GET/DELETE) + # @param body [Hash, nil] JSON body (POST/PATCH) + # @return [Hash] Parsed response body + def request(verb, path, params: nil, body: nil) + with_faraday_error_handling do + handle_response(connection.public_send(verb, path, body || params)) end end - # rubocop:enable Metrics/ParameterLists - - def add_optional_dataset_item_fields(payload, input, expected_output, metadata, id) - payload[:id] = id if id - payload[:input] = input if input - payload[:expectedOutput] = expected_output if expected_output - payload[:metadata] = metadata if metadata - end - - def add_optional_source_fields(payload, source_trace_id, source_observation_id, status) - payload[:sourceTraceId] = source_trace_id if source_trace_id - payload[:sourceObservationId] = source_observation_id if source_observation_id - payload[:status] = status.to_s.upcase if status - end - # Build params for list_dataset_items - def build_dataset_items_params(dataset_name:, page:, limit:, - source_trace_id:, source_observation_id:) + def build_traces_params(**options) { - datasetName: dataset_name, - page: page, - limit: limit, - sourceTraceId: source_trace_id, - sourceObservationId: source_observation_id + page: options[:page], limit: options[:limit], userId: options[:user_id], name: options[:name], + sessionId: options[:session_id], + fromTimestamp: options[:from_timestamp]&.iso8601, + toTimestamp: options[:to_timestamp]&.iso8601, + orderBy: options[:order_by], tags: options[:tags], version: options[:version], + release: options[:release], environment: options[:environment], fields: options[:fields], + filter: options[:filter] }.compact end - # Build params for list_dataset_runs - def build_dataset_runs_params(page:, limit:) - { page: page, limit: limit }.compact - end - - # Build endpoint path for dataset runs def dataset_runs_path(dataset_name) - encoded_name = URI.encode_uri_component(dataset_name) - "/api/public/datasets/#{encoded_name}/runs" + "/api/public/datasets/#{URI.encode_uri_component(dataset_name)}/runs" end - # Build endpoint path for a specific dataset run def dataset_run_path(dataset_name:, run_name:) - encoded_run_name = URI.encode_uri_component(run_name) - "#{dataset_runs_path(dataset_name)}/#{encoded_run_name}" - end - - # Build query params for list_traces, mapping snake_case to camelCase - # rubocop:disable Metrics/ParameterLists - def build_traces_params(page:, limit:, user_id:, name:, session_id:, - from_timestamp:, to_timestamp:, order_by:, - tags:, version:, release:, environment:, fields:, filter:) - { - page: page, limit: limit, userId: user_id, name: name, - sessionId: session_id, - fromTimestamp: from_timestamp&.iso8601, - toTimestamp: to_timestamp&.iso8601, - orderBy: order_by, tags: tags, version: version, - release: release, environment: environment, fields: fields, - filter: filter - }.compact - end - # rubocop:enable Metrics/ParameterLists - - # Fetch with SWR cache - def fetch_with_swr_cache(cache_key, name, version, label) - cache.fetch_with_stale_while_revalidate(cache_key) do - fetch_prompt_from_api(name, version: version, label: label) - end - end - - # Fetch with distributed cache (Rails.cache with stampede protection) - def fetch_with_distributed_cache(cache_key, name, version, label) - cache.fetch_with_lock(cache_key) do - fetch_prompt_from_api(name, version: version, label: label) - end + "#{dataset_runs_path(dataset_name)}/#{URI.encode_uri_component(run_name)}" end # Fetch a prompt from the API (without caching) @@ -1041,13 +662,8 @@ def fetch_with_distributed_cache(cache_key, name, version, label) # @raise [UnauthorizedError] if authentication fails # @raise [ApiError] for other API errors def fetch_prompt_from_api(name, version: nil, label: nil) - with_faraday_error_handling do - params = build_prompt_params(version: version, label: label) - path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}" - - response = connection.get(path, params) - handle_response(response) - end + path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}" + request(:get, path, params: { version: version, label: label }.compact) end # Build a new Faraday connection @@ -1116,15 +732,6 @@ def user_agent "langfuse-rb/#{Langfuse::VERSION}" end - # Build query parameters for prompt request - # - # @param version [Integer, nil] Optional version number - # @param label [String, nil] Optional label - # @return [Hash] Query parameters - def build_prompt_params(version: nil, label: nil) - { version: version, label: label }.compact - end - # Wrap a block with standard Faraday error handling. # # Catches RetriableResponse (retries exhausted) and generic Faraday errors, diff --git a/lib/langfuse/cache_warmer.rb b/lib/langfuse/cache_warmer.rb index 4a36ca4..7268fda 100644 --- a/lib/langfuse/cache_warmer.rb +++ b/lib/langfuse/cache_warmer.rb @@ -148,25 +148,22 @@ def warm!(prompt_names, versions: {}, labels: {}) # # @return [Boolean] def cache_enabled? - cache = client.api_client.cache - return false if cache.nil? - - cache.ttl&.positive? || false + client.prompt_cache_stats[:enabled] == true end # Get cache statistics (if supported by backend) # # @return [Hash, nil] Cache stats or nil if not supported def cache_stats - cache = client.api_client.cache - return nil unless cache - - stats = {} - stats[:backend] = cache.class.name.split("::").last - stats[:ttl] = cache.ttl if cache.respond_to?(:ttl) - stats[:size] = cache.size if cache.respond_to?(:size) - stats[:max_size] = cache.max_size if cache.respond_to?(:max_size) - stats + stats = client.prompt_cache_stats + return nil unless stats[:enabled] + + { + backend: public_backend_name(stats[:backend]), + ttl: stats[:ttl], + size: stats[:size], + max_size: stats[:max_size] + } end private @@ -213,6 +210,17 @@ def build_prompt_options(name, versions, labels) def record_failure(results, name, error) results[:failed] << { name: name, error: error } end + + def public_backend_name(backend) + case backend + when CacheBackend::MEMORY + "PromptCache" + when CacheBackend::RAILS + "RailsCacheAdapter" + else + backend.to_s + end + end end # Error raised when cache warming fails with warm! diff --git a/lib/langfuse/chat_prompt_client.rb b/lib/langfuse/chat_prompt_client.rb index 9dc3c7c..b516171 100644 --- a/lib/langfuse/chat_prompt_client.rb +++ b/lib/langfuse/chat_prompt_client.rb @@ -28,6 +28,9 @@ class ChatPromptClient # @return [Integer] Prompt version number attr_reader :version + # @return [Array] Raw prompt template (array of role/content message hashes) + attr_reader :prompt + # @return [Array] Labels assigned to this prompt attr_reader :labels @@ -37,9 +40,6 @@ class ChatPromptClient # @return [Hash] Prompt configuration attr_reader :config - # @return [Array] Array of message hashes and placeholder entries - attr_reader :prompt - # @return [String, nil] Optional commit message for this prompt version attr_reader :commit_message diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index 460867a..1a82d17 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "forwardable" + module Langfuse # Main client for Langfuse SDK # @@ -19,6 +21,8 @@ module Langfuse # # rubocop:disable Metrics/ClassLength class Client + extend Forwardable + # @return [Integer] Default page size when fetching all dataset items DATASET_ITEMS_PAGE_SIZE = 50 @@ -28,6 +32,35 @@ class Client # @return [ApiClient] The underlying API client attr_reader :api_client + # Pure pass-throughs to {ApiClient}. See {ApiClient} for parameter and + # return-value documentation; the public surface here is identical. + # + # @!method list_prompts(page: nil, limit: nil) + # @!method invalidate_prompt_cache(name, version: nil, label: nil) + # @!method invalidate_prompt_cache_by_name(name) + # @!method clear_prompt_cache + # @!method prompt_cache_stats + # @!method prompt_cache_key(name, version: nil, label: nil) + # @!method validate_prompt_cache_backend! + # @!method list_traces(**options) + # @!method get_trace(id) + # @!method list_datasets(page: nil, limit: nil) + # @!method get_dataset_run(dataset_name:, run_name:) + # @!method create_dataset_run_item(**) + def_delegators :api_client, + :list_prompts, + :invalidate_prompt_cache, + :invalidate_prompt_cache_by_name, + :clear_prompt_cache, + :prompt_cache_stats, + :prompt_cache_key, + :validate_prompt_cache_backend!, + :list_traces, + :get_trace, + :list_datasets, + :get_dataset_run, + :create_dataset_run_item + # Initialize a new Langfuse client # # @param config [Config] Configuration object @@ -134,79 +167,6 @@ def refresh_prompt(name, version: nil, label: nil, cache_ttl: nil) build_client_fetch_result(api_result, build_prompt_client(api_result.prompt)) end - # Invalidate one exact logical prompt cache key. - # - # @param name [String] The prompt name - # @param version [Integer, nil] Optional specific version number - # @param label [String, nil] Optional label - # @return [PromptCacheKey] The invalidated key - def invalidate_prompt_cache(name, version: nil, label: nil) - api_client.invalidate_prompt_cache(name, version: version, label: label) - end - - # Invalidate all cached variants for one prompt name. - # - # @param name [String] The prompt name - # @return [Integer, nil] New generation, or nil when cache is disabled - def invalidate_prompt_cache_by_name(name) - api_client.invalidate_prompt_cache_by_name(name) - end - - # Logically clear the whole Langfuse prompt cache namespace. - # - # @return [Integer, nil] New global generation, or nil when cache is disabled - def clear_prompt_cache - api_client.clear_prompt_cache - end - - # Return prompt cache statistics. - # - # @return [Hash] Cache statistics - def prompt_cache_stats - api_client.prompt_cache_stats - end - - # Inspect the logical and generated cache keys for a prompt. - # - # @param name [String] The prompt name - # @param version [Integer, nil] Optional specific version number - # @param label [String, nil] Optional label - # @return [PromptCacheKey] Logical and generated cache keys - def prompt_cache_key(name, version: nil, label: nil) - api_client.prompt_cache_key(name, version: version, label: label) - end - - # Validate the configured prompt cache backend before first prompt fetch. - # - # @return [Boolean] true when the configured backend is usable - # @raise [ConfigurationError] if the backend is invalid - # rubocop:disable Naming/PredicateMethod - def validate_prompt_cache_backend! - api_client.cache&.validate! if api_client.cache.respond_to?(:validate!) - true - end - # rubocop:enable Naming/PredicateMethod - - # List all prompts in the Langfuse project - # - # Fetches a list of all prompt names available in your project. - # Returns metadata only, not full prompt content. - # - # @param page [Integer, nil] Optional page number for pagination - # @param limit [Integer, nil] Optional limit per page - # @return [Array] Array of prompt metadata hashes - # @raise [UnauthorizedError] if authentication fails - # @raise [ApiError] for other API errors - # - # @example - # prompts = client.list_prompts - # prompts.each do |prompt| - # puts "#{prompt['name']} (v#{prompt['version']})" - # end - def list_prompts(page: nil, limit: nil) - api_client.list_prompts(page: page, limit: limit) - end - # Convenience method: fetch and compile a prompt in one call # # This is a shorthand for calling get_prompt followed by compile. @@ -533,49 +493,6 @@ def get_dataset(name) DatasetClient.new(data, client: self) end - # List all datasets in the project - # - # @param page [Integer, nil] Optional page number for pagination - # @param limit [Integer, nil] Optional limit per page - # @return [Array] Array of dataset metadata hashes - # @raise [UnauthorizedError] if authentication fails - # @raise [ApiError] for other API errors - # - # @example - # datasets = client.list_datasets(page: 1, limit: 10) - def list_datasets(page: nil, limit: nil) - api_client.list_datasets(page: page, limit: limit) - end - - # List traces in the project - # - # @param page [Integer, nil] Optional page number for pagination - # @param limit [Integer, nil] Optional limit per page - # @param filters [Hash] Additional filters (user_id, name, session_id, etc.) - # @return [Array] Array of trace hashes - # @raise [UnauthorizedError] if authentication fails - # @raise [ApiError] for other API errors - # - # @example - # traces = client.list_traces(page: 1, limit: 10, name: "my-trace") - def list_traces(page: nil, limit: nil, **filters) - api_client.list_traces(page: page, limit: limit, **filters) - end - - # Fetch a trace by ID - # - # @param id [String] Trace ID - # @return [Hash] The trace data - # @raise [NotFoundError] if the trace is not found - # @raise [UnauthorizedError] if authentication fails - # @raise [ApiError] for other API errors - # - # @example - # trace = client.get_trace("trace-uuid-123") - def get_trace(id) - api_client.get_trace(id) - end - # Create a new dataset item # # @param dataset_name [String] Name of the dataset to add item to (required) @@ -669,39 +586,6 @@ def delete_dataset_item(id) nil end - # Create a dataset run item (link a trace to a dataset item) - # - # @param dataset_item_id [String] Dataset item ID (required) - # @param run_name [String] Run name (required) - # @param trace_id [String, nil] Trace ID - # @param observation_id [String, nil] Observation ID - # @param metadata [Hash, nil] Optional metadata - # @param run_description [String, nil] Optional run description - # @return [Hash] The created dataset run item data - def create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, - observation_id: nil, metadata: nil, run_description: nil) - api_client.create_dataset_run_item( - dataset_item_id: dataset_item_id, - run_name: run_name, - trace_id: trace_id, - observation_id: observation_id, - metadata: metadata, - run_description: run_description - ) - end - - # Fetch a dataset run by dataset and run name - # - # @param dataset_name [String] Dataset name (required) - # @param run_name [String] Run name (required) - # @return [Hash] The dataset run data, including linked run items - # @raise [NotFoundError] if the dataset run is not found - # @raise [UnauthorizedError] if authentication fails - # @raise [ApiError] for other API errors - def get_dataset_run(dataset_name:, run_name:) - api_client.get_dataset_run(dataset_name: dataset_name, run_name: run_name) - end - # List dataset runs for a dataset # # When page is nil (default), auto-paginates to fetch all runs. @@ -877,7 +761,7 @@ def build_fallback_prompt_result(key, fallback:, type:, cache_ttl:, error:) def fallback_cache_status(cache_ttl) return CacheStatus::BYPASS if cache_ttl&.zero? - return CacheStatus::DISABLED unless api_client.cache + return CacheStatus::DISABLED unless api_client.prompt_cache_stats[:enabled] CacheStatus::MISS end @@ -932,21 +816,22 @@ def create_rails_cache_adapter ) end + VALID_PROMPT_TYPES = %i[text chat].freeze + private_constant :VALID_PROMPT_TYPES + # Build the appropriate prompt client based on prompt type # # @param prompt_data [Hash] The prompt data from API # @return [TextPromptClient, ChatPromptClient] # @raise [ApiError] if prompt type is unknown - def build_prompt_client(prompt_data) - type = prompt_data["type"] - - case type + def build_prompt_client(prompt_data, is_fallback: false) + case prompt_data["type"] when "text" - TextPromptClient.new(prompt_data) + TextPromptClient.new(prompt_data, is_fallback: is_fallback) when "chat" - ChatPromptClient.new(prompt_data) + ChatPromptClient.new(prompt_data, is_fallback: is_fallback) else - raise ApiError, "Unknown prompt type: #{type}" + raise ApiError, "Unknown prompt type: #{prompt_data['type']}" end end @@ -959,24 +844,18 @@ def build_prompt_client(prompt_data) # @raise [ArgumentError] if type is invalid def build_fallback_prompt_client(name, fallback, type) validate_prompt_type!(type) - - # Create minimal prompt data structure - prompt_data = { - "name" => name, - "version" => 0, - "type" => type.to_s, - "prompt" => fallback, - "labels" => [], - "tags" => ["fallback"], - "config" => {} - } - - case type - when :text - TextPromptClient.new(prompt_data, is_fallback: true) - when :chat - ChatPromptClient.new(prompt_data, is_fallback: true) - end + build_prompt_client( + { + "name" => name, + "version" => 0, + "type" => type.to_s, + "prompt" => fallback, + "labels" => [], + "tags" => ["fallback"], + "config" => {} + }, + is_fallback: true + ) end # Validate prompt type parameter @@ -984,8 +863,7 @@ def build_fallback_prompt_client(name, fallback, type) # @param type [Symbol] The type to validate # @raise [ArgumentError] if type is invalid def validate_prompt_type!(type) - valid_types = %i[text chat] - return if valid_types.include?(type) + return if VALID_PROMPT_TYPES.include?(type) raise ArgumentError, "Invalid type: #{type}. Must be :text or :chat" end @@ -1015,30 +893,15 @@ def validate_prompt_content!(prompt, type) def normalize_prompt_content(prompt, type) return prompt if type == :text - # Normalize chat messages to use string keys prompt.map do |message| normalized = message.transform_keys(&:to_s) - next placeholder_prompt_content(normalized) if normalized["type"] == ChatPromptClient::PLACEHOLDER_TYPE - - normalize_chat_message_content(normalized) + if normalized["type"] == ChatPromptClient::PLACEHOLDER_TYPE + { "type" => ChatPromptClient::PLACEHOLDER_TYPE, "name" => normalized["name"].to_s } + else + normalized.merge("role" => normalized["role"]&.to_s, "content" => normalized["content"]) + end end end - - # @api private - def placeholder_prompt_content(message) - { - "type" => ChatPromptClient::PLACEHOLDER_TYPE, - "name" => message["name"].to_s - } - end - - # @api private - def normalize_chat_message_content(message) - message.merge( - "role" => message["role"]&.to_s, - "content" => message["content"] - ) - end end # rubocop:enable Metrics/ClassLength end diff --git a/lib/langfuse/config.rb b/lib/langfuse/config.rb index 8c9dac2..3d98c15 100644 --- a/lib/langfuse/config.rb +++ b/lib/langfuse/config.rb @@ -60,7 +60,7 @@ class Config # @return [#call, nil] Observer called for prompt cache events attr_accessor :prompt_cache_observer - # @return [Boolean] Use async processing for traces (requires ActiveJob) + # @return [Boolean] Use OpenTelemetry batch scheduling for trace export attr_accessor :tracing_async # @return [Integer] Number of events to batch before sending @@ -69,7 +69,7 @@ class Config # @return [Integer] Interval in seconds to flush buffered events attr_accessor :flush_interval - # @return [Symbol] ActiveJob queue name for async processing + # @return [Symbol] Reserved no-op queue name for future async job integration attr_accessor :job_queue # @return [String, nil] Default tracing environment applied to new traces/observations diff --git a/lib/langfuse/prompt_cache.rb b/lib/langfuse/prompt_cache.rb index dc6edc8..ad35b2f 100644 --- a/lib/langfuse/prompt_cache.rb +++ b/lib/langfuse/prompt_cache.rb @@ -209,6 +209,9 @@ def stats current_generation_entries: counts.fetch(:current), orphaned_entries: counts.fetch(:orphaned), total_entries: @cache.size, + ttl: ttl, + size: @cache.size, + max_size: max_size, global_generation: @global_generation, unsupported_counts: [] } diff --git a/lib/langfuse/prompt_cache_coordinator.rb b/lib/langfuse/prompt_cache_coordinator.rb new file mode 100644 index 0000000..79efb55 --- /dev/null +++ b/lib/langfuse/prompt_cache_coordinator.rb @@ -0,0 +1,288 @@ +# frozen_string_literal: true + +require_relative "prompt_fetch_result" +require_relative "prompt_cache_events" + +module Langfuse + # Coordinates prompt fetch/cache behavior between the API transport and the + # configured cache backend. Both supported backends ({PromptCache} and + # {RailsCacheAdapter}) provide the full cache + SWR surface; only + # {RailsCacheAdapter} adds distributed-lock fetch, which is the one branch + # the dispatch needs to make. + # + # @api private + class PromptCacheCoordinator # rubocop:disable Metrics/ClassLength + # @param cache [PromptCache, RailsCacheAdapter, nil] Configured cache backend + # @param event_emitter [#emit_prompt_cache_event] Emitter for cache events + # @param fetch_prompt [#call] Callable that fetches prompt data from the API + # @return [PromptCacheCoordinator] + def initialize(cache:, event_emitter:, fetch_prompt:) + @cache = cache + @event_emitter = event_emitter + @fetch_prompt = fetch_prompt + @backend_name = compute_backend_name + end + + # @return [String] Backend identifier reported in events and stats + attr_reader :backend_name + + # Fetch a prompt and include cache metadata. + # + # @param name [String] Prompt name + # @param version [Integer, nil] Optional prompt version + # @param label [String, nil] Optional prompt label + # @param cache_ttl [Integer, nil] Optional TTL override (0 forces a bypass) + # @return [PromptFetchResult] Prompt data plus cache metadata + def get_prompt_result(name, version: nil, label: nil, cache_ttl: nil) + validate_fetch_options!(version, label, cache_ttl) + key = prompt_cache_key(name, version: version, label: label) + + return fetch_uncached(key, CacheStatus::DISABLED) if @cache.nil? + return fetch_uncached(key, CacheStatus::BYPASS) if cache_ttl&.zero? + + fetch_cached(key, cache_ttl) + end + + # Refresh a prompt from the API, optionally writing through to cache. + # + # @param name [String] Prompt name + # @param version [Integer, nil] Optional prompt version + # @param label [String, nil] Optional prompt label + # @param cache_ttl [Integer, nil] Optional TTL override + # @return [PromptFetchResult] Prompt data plus cache metadata + def refresh_prompt(name, version: nil, label: nil, cache_ttl: nil) + validate_fetch_options!(version, label, cache_ttl) + key = prompt_cache_key(name, version: version, label: label) + + emit(:refresh_start) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } + prompt_data = @fetch_prompt.call(name, version: version, label: label) + write_through(key, prompt_data, cache_ttl, status: CacheStatus::REFRESH) if @cache && !cache_ttl&.zero? + status = refresh_status(cache_ttl) + emit(:refresh_success) { event_payload(key, status, CacheSource::API) } + build_result(key, prompt_data, status, CacheSource::API) + rescue StandardError => e + emit(:refresh_failure) do + event_payload(key, CacheStatus::REFRESH, CacheSource::API, + error_class: e.class.name, error_message: e.message) + end + raise + end + + # Inspect the logical and generated cache keys for a prompt. + # + # @param name [String] Prompt name + # @param version [Integer, nil] Optional prompt version + # @param label [String, nil] Optional prompt label + # @return [PromptCacheKey] Logical and generated cache keys + def prompt_cache_key(name, version: nil, label: nil) + raise ArgumentError, "Cannot specify both version and label" if version && label + + logical_key = PromptCache.build_key(name, version: version, label: label) + storage_key = @cache ? @cache.storage_key(logical_key, name: name) : logical_key + PromptCacheKey.new(name: name, version: version, label: label, logical_key: logical_key, storage_key: storage_key) + end + + # Invalidate one exact logical prompt cache key. + # + # @param name [String] Prompt name + # @param version [Integer, nil] Optional prompt version + # @param label [String, nil] Optional prompt label + # @return [PromptCacheKey] Invalidated key + def invalidate_prompt_cache(name, version: nil, label: nil) + key = prompt_cache_key(name, version: version, label: label) + deleted = @cache ? @cache.delete(key.storage_key) : false + emit(:delete) { event_payload(key, CacheStatus::MISS, CacheSource::CACHE, deleted: deleted) } + emit(:invalidate) { event_payload(key, CacheStatus::MISS, CacheSource::CACHE, scope: :exact) } + key + end + + # Invalidate all cached variants for one prompt name. + # + # @param name [String] Prompt name + # @return [Integer, nil] New generation, or nil when caching is disabled + def invalidate_prompt_cache_by_name(name) + emit_name_invalidation(name, mutation: false) + end + + # Invalidate after prompt mutation (create/update). Distinct from manual + # invalidation so observers can tell the two apart. + # + # @param name [String] Prompt name + # @return [Integer, nil] New generation + def invalidate_after_mutation(name) + emit_name_invalidation(name, mutation: true) + end + + # Logically clear the entire prompt cache namespace. + # + # @return [Integer, nil] New global generation, or nil when caching is disabled + def clear_prompt_cache + generation = @cache&.clear_logically + emit(:clear, backend: @backend_name, generation: generation) + generation + end + + # @return [Hash] Prompt cache statistics + def prompt_cache_stats + @cache ? @cache.stats : disabled_stats + end + + private + + def validate_fetch_options!(version, label, cache_ttl) + raise ArgumentError, "Cannot specify both version and label" if version && label + return if cache_ttl.nil? + raise ArgumentError, "cache_ttl must be a non-negative Integer" unless cache_ttl.is_a?(Integer) + raise ArgumentError, "cache_ttl must be non-negative" if cache_ttl.negative? + end + + def fetch_uncached(key, status) + prompt_data = @fetch_prompt.call(key.name, version: key.version, label: key.label) + build_result(key, prompt_data, status, CacheSource::API) + end + + # Single dispatch: SWR > distributed lock > simple get/set. + def fetch_cached(key, cache_ttl) + return fetch_with_swr(key, cache_ttl) if @cache.swr_enabled? + return fetch_with_lock(key, cache_ttl) if @cache.is_a?(RailsCacheAdapter) + + cached = @cache.get(key.storage_key) + return cache_hit(key, cached) if cached + + fetch_and_cache(key, cache_ttl, swr: false) + end + + def fetch_with_swr(key, cache_ttl) + entry = @cache.entry(key.storage_key) + return cache_hit(key, entry.data) if entry.respond_to?(:fresh?) && entry.fresh? + + if entry.respond_to?(:stale?) && entry.stale? + emit(:stale_serve) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } + schedule_refresh(key, cache_ttl) + return build_result(key, entry.data, CacheStatus::STALE, CacheSource::CACHE) + end + + fetch_and_cache(key, cache_ttl, swr: true) + end + + def fetch_with_lock(key, cache_ttl) + cached = @cache.get(key.storage_key) + return cache_hit(key, cached) if cached + + emit(:miss) { event_payload(key, CacheStatus::MISS, CacheSource::API) } + fetched = false + prompt_data = @cache.fetch_with_lock(key.storage_key, ttl: cache_ttl) do + fetched = true + @fetch_prompt.call(key.name, version: key.version, label: key.label) + end + emit(:write) { event_payload(key, CacheStatus::MISS, CacheSource::API) } if fetched + status = fetched ? CacheStatus::MISS : CacheStatus::HIT + source = fetched ? CacheSource::API : CacheSource::CACHE + build_result(key, prompt_data, status, source) + end + + def fetch_and_cache(key, cache_ttl, swr:) + emit(:miss) { event_payload(key, CacheStatus::MISS, CacheSource::API) } + prompt_data = @fetch_prompt.call(key.name, version: key.version, label: key.label) + write_through(key, prompt_data, cache_ttl, swr: swr) + build_result(key, prompt_data, CacheStatus::MISS, CacheSource::API) + end + + def write_through(key, prompt_data, cache_ttl, swr: false, status: CacheStatus::MISS) + if swr + @cache.write_with_stale_while_revalidate(key.storage_key, prompt_data, ttl: cache_ttl) + else + @cache.set(key.storage_key, prompt_data, ttl: cache_ttl) + end + emit(:write) { event_payload(key, status, CacheSource::API) } + end + + def cache_hit(key, prompt_data) + emit(:hit) { event_payload(key, CacheStatus::HIT, CacheSource::CACHE) } + build_result(key, prompt_data, CacheStatus::HIT, CacheSource::CACHE) + end + + def schedule_refresh(key, cache_ttl) + scheduled = @cache.refresh_async( + key.storage_key, + ttl: cache_ttl, + on_success: ->(_value) { emit_refresh_success(key) }, + on_failure: ->(error) { emit_refresh_failure(key, error) } + ) { @fetch_prompt.call(key.name, version: key.version, label: key.label) } + emit(:refresh_start) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } if scheduled + end + + def emit_refresh_success(key) + emit(:refresh_success) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } + emit(:write) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } + end + + def emit_refresh_failure(key, error) + emit(:refresh_failure) do + event_payload(key, CacheStatus::STALE, CacheSource::CACHE, + error_class: error.class.name, error_message: error.message) + end + end + + def emit_name_invalidation(name, mutation:) + generation = @cache&.invalidate_name(name) + payload = { name: name, backend: @backend_name, generation: generation, scope: :name } + payload[:mutation] = true if mutation + emit(:invalidate, payload) + generation + end + + def refresh_status(cache_ttl) + return CacheStatus::DISABLED unless @cache + return CacheStatus::BYPASS if cache_ttl&.zero? + + CacheStatus::REFRESH + end + + def build_result(key, prompt_data, status, source) + PromptFetchResult.new( + prompt: prompt_data, + logical_key: key.logical_key, + storage_key: key.storage_key, + cache_status: status, + source: source, + name: prompt_data["name"] || key.name, + version: prompt_data["version"] || key.version, + label: key.resolved_label + ) + end + + def emit(event, payload = nil, &) + @event_emitter.emit_prompt_cache_event(event, payload, &) + end + + def event_payload(key, cache_status, source, **extra) + PromptCacheEvents.build_payload( + key, + cache_status: cache_status, + source: source, + backend: @backend_name, + extra: extra + ) + end + + def compute_backend_name + return CacheBackend::DISABLED unless @cache + return CacheBackend::RAILS if @cache.is_a?(RailsCacheAdapter) + return CacheBackend::MEMORY if @cache.is_a?(PromptCache) + + @cache.class.name + end + + def disabled_stats + { + backend: CacheBackend::DISABLED, + enabled: false, + current_generation_entries: nil, + orphaned_entries: nil, + total_entries: nil, + unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS + } + end + end +end diff --git a/lib/langfuse/prompt_cache_events.rb b/lib/langfuse/prompt_cache_events.rb index 70eb1f2..1e7e419 100644 --- a/lib/langfuse/prompt_cache_events.rb +++ b/lib/langfuse/prompt_cache_events.rb @@ -10,6 +10,29 @@ module PromptCacheEvents # ActiveSupport::Notifications event name used for prompt cache events. PROMPT_CACHE_NOTIFICATION = "prompt_cache.langfuse" + # Build a prompt cache event payload from a key, status, source, and backend. + # Shared by the ApiClient mixin and PromptCacheCoordinator so a payload-shape + # change can't drift between the two emitters. + # + # @param key [PromptCacheKey] Logical and storage cache key + # @param cache_status [Symbol] Cache status + # @param source [Symbol] Event source + # @param backend [String] Backend identifier + # @param extra [Hash] Additional payload fields + # @return [Hash] Event payload + def self.build_payload(key, cache_status:, source:, backend:, extra: {}) + { + name: key.name, + version: key.version, + label: key.resolved_label, + logical_key: key.logical_key, + storage_key: key.storage_key, + backend: backend, + cache_status: cache_status, + source: source + }.merge(extra) + end + # Configure prompt cache event dispatch. Wraps the observer once into a # 1-arg callable so the per-event hot path never re-checks arity. # @@ -56,16 +79,13 @@ def emit_prompt_fallback_event(key, cache_status:, error:) # @api private def event_payload(key, cache_status, source, extra = {}) - { - name: key.name, - version: key.version, - label: key.resolved_label, - logical_key: key.logical_key, - storage_key: key.storage_key, - backend: cache_backend_name, + PromptCacheEvents.build_payload( + key, cache_status: cache_status, - source: source - }.merge(extra) + source: source, + backend: cache_backend_name, + extra: extra + ) end # @api private @@ -100,7 +120,8 @@ def notify_active_support(payload) def wrap_cache_observer(observer) return nil if observer.nil? - if observer.method(:call).arity == 1 + arity = observer.respond_to?(:arity) ? observer.arity : observer.method(:call).arity + if arity == 1 ->(payload) { observer.call(payload) } else ->(payload) { observer.call(payload[:event], payload) } diff --git a/lib/langfuse/rails_cache_adapter.rb b/lib/langfuse/rails_cache_adapter.rb index 305a7d6..7f3b439 100644 --- a/lib/langfuse/rails_cache_adapter.rb +++ b/lib/langfuse/rails_cache_adapter.rb @@ -162,7 +162,10 @@ def stats current_generation_entries: nil, orphaned_entries: nil, total_entries: nil, - global_generation: generation_value(global_generation_key), + ttl: ttl, + size: size, + max_size: nil, + global_generation: safe_generation_value(global_generation_key), unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS } end @@ -351,6 +354,12 @@ def generation_value(key) end end + def safe_generation_value(key) + return nil unless Rails.cache.respond_to?(:read) + + generation_value(key) + end + def bump_generation(key) incremented = increment_generation(key) if incremented diff --git a/lib/langfuse/score_client.rb b/lib/langfuse/score_client.rb index 6b49ba0..050ed1e 100644 --- a/lib/langfuse/score_client.rb +++ b/lib/langfuse/score_client.rb @@ -216,24 +216,23 @@ def shutdown # @param environment [String, nil] Environment # @param data_type [String] Data type string (NUMERIC, BOOLEAN, CATEGORICAL) # @return [Hash] Event hash - # rubocop:disable Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + # rubocop:disable Metrics/ParameterLists def build_score_event(name:, value:, id:, trace_id:, session_id:, observation_id:, comment:, metadata:, environment:, data_type:, dataset_run_id: nil, config_id: nil) body = { id: id || SecureRandom.uuid, name: name, value: value, - dataType: data_type - } - body[:traceId] = trace_id if trace_id - body[:sessionId] = session_id if session_id - body[:observationId] = observation_id if observation_id - body[:comment] = comment if comment - body[:metadata] = metadata if metadata - body[:environment] = environment if environment - body[:datasetRunId] = dataset_run_id if dataset_run_id - body[:configId] = config_id if config_id - + dataType: data_type, + traceId: trace_id, + sessionId: session_id, + observationId: observation_id, + comment: comment, + metadata: metadata, + environment: environment, + datasetRunId: dataset_run_id, + configId: config_id + }.compact { id: SecureRandom.uuid, type: "score-create", @@ -241,7 +240,7 @@ def build_score_event(name:, value:, id:, trace_id:, session_id:, observation_id body: body } end - # rubocop:enable Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + # rubocop:enable Metrics/ParameterLists # Normalize and validate score value based on data type # diff --git a/lib/langfuse/text_prompt_client.rb b/lib/langfuse/text_prompt_client.rb index b2e263e..c3fd426 100644 --- a/lib/langfuse/text_prompt_client.rb +++ b/lib/langfuse/text_prompt_client.rb @@ -35,7 +35,7 @@ class TextPromptClient # @return [Hash] Prompt configuration attr_reader :config - # @return [String] Raw prompt template string + # @return [String] Raw prompt template attr_reader :prompt # @return [String, nil] Optional commit message for this prompt version @@ -75,6 +75,7 @@ def type # # @param kwargs [Hash] Variables to substitute in the template (as keyword arguments) # @return [String] The compiled prompt text + # @raise [ArgumentError] if variables cannot be rendered # # @example # text_prompt.compile(name: "Alice", greeting: "Hi") @@ -87,10 +88,6 @@ def compile(**kwargs) private - # Validate prompt data structure - # - # @param prompt_data [Hash] The prompt data to validate - # @raise [ArgumentError] if validation fails def validate_prompt_data!(prompt_data) raise ArgumentError, "prompt_data must be a Hash" unless prompt_data.is_a?(Hash) raise ArgumentError, "prompt_data must include 'prompt' field" unless prompt_data.key?("prompt") diff --git a/lib/tasks/langfuse.rake b/lib/tasks/langfuse.rake index c6a0eff..0285cae 100644 --- a/lib/tasks/langfuse.rake +++ b/lib/tasks/langfuse.rake @@ -141,21 +141,19 @@ namespace :langfuse do task clear_cache: :environment do require "langfuse" - cache = Langfuse.client.api_client.cache + stats = Langfuse.client.prompt_cache_stats - if cache.nil? + unless stats[:enabled] puts "Cache is disabled (cache_ttl = 0)" exit 0 end - if cache.respond_to?(:clear) - cache.clear - puts "Cache cleared successfully! ✓" - puts "Backend: #{Langfuse.configuration.cache_backend}" - else - puts "Cache backend does not support clearing" - exit 1 - end + generation = Langfuse.client.clear_prompt_cache + puts "Prompt cache logically cleared ✓" + puts "Existing backend entries remain until their TTL expires;" + puts "subsequent fetches will miss the old generation and re-populate." + puts "Backend: #{Langfuse.configuration.cache_backend}" + puts "Generation: #{generation}" unless generation.nil? end # Helper method to display warming results diff --git a/spec/langfuse/api_client_spec.rb b/spec/langfuse/api_client_spec.rb index 85eeaf8..7ad7635 100644 --- a/spec/langfuse/api_client_spec.rb +++ b/spec/langfuse/api_client_spec.rb @@ -320,14 +320,9 @@ # rubocop:disable RSpec/MultipleMemoizedHelpers context "with caching enabled" do - let(:cache) { instance_double(Langfuse::PromptCache) } + let(:cache) { Langfuse::PromptCache.new(ttl: 60) } let(:cached_client) do - described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - cache: cache - ) + described_class.new(public_key: public_key, secret_key: secret_key, base_url: base_url, cache: cache) end before do @@ -339,37 +334,22 @@ ) end - it "stores response in cache" do - cache_key = Langfuse::PromptCache.build_key(prompt_name) - - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key).and_return(nil) - expect(cache).to receive(:set).with(cache_key, prompt_response) - + it "stores the response in the cache after a miss" do cached_client.get_prompt(prompt_name) - end - - it "returns cached response on second call" do - cache_key = Langfuse::PromptCache.build_key(prompt_name) - # First call - cache miss - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key).and_return(nil) - expect(cache).to receive(:set).with(cache_key, prompt_response) - first_result = cached_client.get_prompt(prompt_name) + key = cached_client.prompt_cache_key(prompt_name) + expect(cache.get(key.storage_key)).to eq(prompt_response) + end - # Second call - cache hit - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key).and_return(prompt_response) - second_result = cached_client.get_prompt(prompt_name) + it "returns cached data on the second call without an extra API request" do + first = cached_client.get_prompt(prompt_name) + second = cached_client.get_prompt(prompt_name) - expect(second_result).to eq(first_result) + expect(second).to eq(first) + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}")).to have_been_made.once end - it "builds correct cache key with version" do + it "fetches a different version separately" do stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") .with(query: { version: "2" }) .to_return( @@ -378,18 +358,11 @@ headers: { "Content-Type" => "application/json" } ) - cache_key = Langfuse::PromptCache.build_key(prompt_name, version: 2) - versioned_response = prompt_response.merge("version" => 2) - - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key).and_return(nil) - expect(cache).to receive(:set).with(cache_key, versioned_response) - - cached_client.get_prompt(prompt_name, version: 2) + result = cached_client.get_prompt(prompt_name, version: 2) + expect(result["version"]).to eq(2) end - it "builds correct cache key with label" do + it "fetches a labeled prompt separately" do stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") .with(query: { label: "production" }) .to_return( @@ -398,57 +371,35 @@ headers: { "Content-Type" => "application/json" } ) - cache_key = Langfuse::PromptCache.build_key(prompt_name, label: "production") - - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key).and_return(nil) - expect(cache).to receive(:set).with(cache_key, prompt_response) - - cached_client.get_prompt(prompt_name, label: "production") + result = cached_client.get_prompt(prompt_name, label: "production") + expect(result).to eq(prompt_response) end - it "caches different versions separately" do + it "caches different versions under distinct keys" do stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") .with(query: { version: "1" }) - .to_return( - status: 200, - body: prompt_response.merge("version" => 1).to_json, - headers: { "Content-Type" => "application/json" } - ) - + .to_return(status: 200, body: prompt_response.merge("version" => 1).to_json, + headers: { "Content-Type" => "application/json" }) stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") .with(query: { version: "2" }) - .to_return( - status: 200, - body: prompt_response.merge("version" => 2).to_json, - headers: { "Content-Type" => "application/json" } - ) - - cache_key_v1 = Langfuse::PromptCache.build_key(prompt_name, version: 1) - cache_key_v2 = Langfuse::PromptCache.build_key(prompt_name, version: 2) - v1_response = prompt_response.merge("version" => 1) - v2_response = prompt_response.merge("version" => 2) - - # First call for version 1 - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key_v1).and_return(nil) - expect(cache).to receive(:set).with(cache_key_v1, v1_response) + .to_return(status: 200, body: prompt_response.merge("version" => 2).to_json, + headers: { "Content-Type" => "application/json" }) cached_client.get_prompt(prompt_name, version: 1) - - # First call for version 2 - expect(cache).to receive(:respond_to?).with(:swr_enabled?).and_return(false) - expect(cache).to receive(:respond_to?).with(:fetch_with_lock).and_return(false) - expect(cache).to receive(:get).with(cache_key_v2).and_return(nil) - expect(cache).to receive(:set).with(cache_key_v2, v2_response) - cached_client.get_prompt(prompt_name, version: 2) + cached_client.get_prompt(prompt_name, version: 1) + + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") + .with(query: { version: "1" })).to have_been_made.once + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") + .with(query: { version: "2" })).to have_been_made.once end end - context "with SWR caching integration" do + # rubocop:enable RSpec/MultipleMemoizedHelpers + + # rubocop:disable RSpec/MultipleMemoizedHelpers + context "with cache backend integration" do let(:logger) { Logger.new($stdout, level: Logger::WARN) } let(:prompt_data) do { @@ -463,329 +414,54 @@ } end - context "with SWR-enabled cache" do - it "uses SWR fetch method when available" do - swr_cache = instance_double(Langfuse::RailsCacheAdapter) - cache_key = "greeting:version:1" - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: swr_cache - ) - - allow(swr_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(true) - allow(swr_cache).to receive(:swr_enabled?) - .and_return(true) - - expect(Langfuse::PromptCache).to receive(:build_key) - .with("greeting", version: 1, label: nil) - .and_return(cache_key) - - expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) - .with(cache_key) - .and_yield - .and_return(prompt_data) - - expect(client).to receive(:fetch_prompt_from_api) - .with("greeting", version: 1, label: nil) - .and_return(prompt_data) - - result = client.get_prompt("greeting", version: 1) - expect(result).to eq(prompt_data) - end - - it "handles cache miss with SWR" do - swr_cache = instance_double(Langfuse::RailsCacheAdapter) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: swr_cache - ) - - allow(swr_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(true) - allow(swr_cache).to receive(:swr_enabled?) - .and_return(true) - - expect(Langfuse::PromptCache).to receive(:build_key) - .with("greeting", version: nil, label: nil) - .and_return("greeting:production") - - expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) - .with("greeting:production") - .and_yield - .and_return(prompt_data) - - stub_request(:get, "#{base_url}/api/public/v2/prompts/greeting") - .to_return( - status: 200, - body: prompt_data.to_json, - headers: { "Content-Type" => "application/json" } - ) - - result = client.get_prompt("greeting") - expect(result).to eq(prompt_data) - end - - it "passes through all prompt parameters to cache key building" do - swr_cache = instance_double(Langfuse::RailsCacheAdapter) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: swr_cache - ) - - allow(swr_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(true) - allow(swr_cache).to receive(:swr_enabled?) - .and_return(true) - - expect(Langfuse::PromptCache).to receive(:build_key) - .with("support-bot", version: nil, label: "staging") - .and_return("support-bot:label:staging") - - expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) - .with("support-bot:label:staging") - .and_return(prompt_data) - - client.get_prompt("support-bot", label: "staging") - end - end - - context "with stampede protection cache (no SWR)" do - it "falls back to stampede protection when SWR not available" do - stampede_cache = instance_double(Langfuse::RailsCacheAdapter) - cache_key = "greeting:version:1" - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: stampede_cache - ) - - allow(stampede_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(false) - allow(stampede_cache).to receive(:respond_to?) - .with(:fetch_with_lock) - .and_return(true) - - expect(Langfuse::PromptCache).to receive(:build_key) - .with("greeting", version: 1, label: nil) - .and_return(cache_key) - - expect(stampede_cache).to receive(:fetch_with_lock) - .with(cache_key) - .and_yield - .and_return(prompt_data) - - expect(client).to receive(:fetch_prompt_from_api) - .with("greeting", version: 1, label: nil) - .and_return(prompt_data) - - result = client.get_prompt("greeting", version: 1) - expect(result).to eq(prompt_data) - end + before do + stub_request(:get, "#{base_url}/api/public/v2/prompts/greeting") + .to_return(status: 200, body: prompt_data.to_json, + headers: { "Content-Type" => "application/json" }) end - context "with simple cache (no SWR, no stampede protection)" do - it "uses simple get/set pattern when advanced caching not available" do - simple_cache = instance_double(Langfuse::PromptCache) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: simple_cache - ) - - allow(simple_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(false) - allow(simple_cache).to receive(:respond_to?) - .with(:fetch_with_lock) - .and_return(false) + it "uses SWR path when the configured backend has SWR enabled" do + swr_cache = Langfuse::PromptCache.new(ttl: 60, stale_ttl: 30) + client = described_class.new(public_key: public_key, secret_key: secret_key, + base_url: base_url, logger: logger, cache: swr_cache) - expect(Langfuse::PromptCache).to receive(:build_key) - .with("greeting", version: nil, label: nil) - .and_return("greeting:production") + client.get_prompt("greeting") + client.get_prompt("greeting") - expect(simple_cache).to receive(:get) - .with("greeting:production") - .and_return(nil) - - expect(client).to receive(:fetch_prompt_from_api) - .with("greeting", version: nil, label: nil) - .and_return(prompt_data) - - expect(simple_cache).to receive(:set) - .with("greeting:production", prompt_data) - - result = client.get_prompt("greeting") - expect(result).to eq(prompt_data) - end - - it "returns cached data when available" do - simple_cache = instance_double(Langfuse::PromptCache) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: simple_cache - ) - - allow(simple_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(false) - allow(simple_cache).to receive(:respond_to?) - .with(:fetch_with_lock) - .and_return(false) - - expect(Langfuse::PromptCache).to receive(:build_key) - .with("greeting", version: nil, label: nil) - .and_return("greeting:production") - - expect(simple_cache).to receive(:get) - .with("greeting:production") - .and_return(prompt_data) - - expect(client).not_to receive(:fetch_prompt_from_api) - expect(simple_cache).not_to receive(:set) - - result = client.get_prompt("greeting") - expect(result).to eq(prompt_data) - end + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/greeting")).to have_been_made.once end - context "with no cache" do - it "fetches directly from API without caching" do - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: nil - ) + it "uses simple get/set when SWR is disabled" do + simple_cache = Langfuse::PromptCache.new(ttl: 60) + client = described_class.new(public_key: public_key, secret_key: secret_key, + base_url: base_url, logger: logger, cache: simple_cache) - expect(client).to receive(:fetch_prompt_from_api) - .with("greeting", version: nil, label: nil) - .and_return(prompt_data) + client.get_prompt("greeting") + client.get_prompt("greeting") - result = client.get_prompt("greeting") - expect(result).to eq(prompt_data) - end + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/greeting")).to have_been_made.once end - context "when detecting cache capabilities" do - it "correctly detects SWR capability" do - swr_cache = instance_double(Langfuse::RailsCacheAdapter) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - cache: swr_cache - ) - - allow(swr_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(true) + it "fetches directly from the API when no cache is configured" do + client = described_class.new(public_key: public_key, secret_key: secret_key, + base_url: base_url, logger: logger, cache: nil) - expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) - allow(swr_cache).to receive_messages(swr_enabled?: true, fetch_with_stale_while_revalidate: prompt_data) - - client.get_prompt("test") - end - - it "falls back when SWR not available but stampede protection is" do - rails_cache = instance_double(Langfuse::RailsCacheAdapter) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - cache: rails_cache - ) - - allow(rails_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(false) - allow(rails_cache).to receive(:respond_to?) - .with(:fetch_with_lock) - .and_return(true) - - expect(rails_cache).to receive(:fetch_with_lock) - allow(rails_cache).to receive(:fetch_with_lock) - .and_return(prompt_data) - - client.get_prompt("test") - end - - it "handles nil cache gracefully" do - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - cache: nil - ) - - expect(client).to receive(:fetch_prompt_from_api) - .and_return(prompt_data) - - result = client.get_prompt("test") - expect(result).to eq(prompt_data) - end + expect(client.get_prompt("greeting")).to eq(prompt_data) end - context "when handling errors with SWR" do - it "propagates API errors when SWR cache fails" do - swr_cache = instance_double(Langfuse::RailsCacheAdapter) - - client = described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - logger: logger, - cache: swr_cache - ) - - allow(swr_cache).to receive(:respond_to?) - .with(:swr_enabled?) - .and_return(true) - allow(swr_cache).to receive(:swr_enabled?) - .and_return(true) - - allow(swr_cache).to receive(:fetch_with_stale_while_revalidate) - .and_yield + it "propagates API errors when the SWR fetch path encounters one" do + swr_cache = Langfuse::PromptCache.new(ttl: 60, stale_ttl: 30) + client = described_class.new(public_key: public_key, secret_key: secret_key, + base_url: base_url, logger: logger, cache: swr_cache) - expect(client).to receive(:fetch_prompt_from_api) - .and_raise(Langfuse::NotFoundError, "Not found") + stub_request(:get, "#{base_url}/api/public/v2/prompts/nonexistent") + .to_return(status: 404, body: { message: "Not found" }.to_json, + headers: { "Content-Type" => "application/json" }) - expect do - client.get_prompt("nonexistent") - end.to raise_error(Langfuse::NotFoundError, "Not found") - end + expect { client.get_prompt("nonexistent") } + .to raise_error(Langfuse::NotFoundError, /Not found/) end end - # rubocop:enable RSpec/MultipleMemoizedHelpers context "with retry middleware configuration" do # NOTE: Direct retry behavior testing is challenging with WebMock due to @@ -910,65 +586,66 @@ end end + # rubocop:enable RSpec/MultipleMemoizedHelpers + # rubocop:disable RSpec/MultipleMemoizedHelpers context "with Rails cache backend (fetch_with_lock)" do - let(:rails_cache) do - # Create a simple object that responds to fetch_with_lock + let(:rails_cache_store) do Class.new do - def respond_to?(method, include_private: false) - method == :fetch_with_lock || super + def initialize + @store = {} + @mutex = Mutex.new + end + + def read(key) + @mutex.synchronize { @store[key] } + end + + def write(key, value, _options = {}) + @mutex.synchronize { @store[key] = value } + end + + def delete(key) + @mutex.synchronize { @store.delete(key) } end - def fetch_with_lock(_key) - result = yield if block_given? - @cached_value ||= result - @cached_value || result + def exist?(key) + @mutex.synchronize { @store.key?(key) } end - def get(_key) - @cached_value + def increment(key, amount = 1, _options = {}) + @mutex.synchronize { @store[key] = (@store[key] || 0) + amount } end - def set(_key, value) - @cached_value = value + def clear + @mutex.synchronize { @store.clear } end end.new end + let(:rails_cache) { Langfuse::RailsCacheAdapter.new(ttl: 60) } let(:rails_cached_client) do described_class.new( - public_key: public_key, - secret_key: secret_key, - base_url: base_url, - cache: rails_cache + public_key: public_key, secret_key: secret_key, base_url: base_url, cache: rails_cache ) end before do - stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") - .to_return( - status: 200, - body: prompt_response.to_json, - headers: { "Content-Type" => "application/json" } - ) - end - - it "uses fetch_with_lock for distributed locking" do - cache_key = Langfuse::PromptCache.build_key(prompt_name) - expect(rails_cache).to receive(:fetch_with_lock).with(cache_key).and_call_original + stub_const("Rails", Class.new { class << self; attr_accessor :cache; end }) + Rails.cache = rails_cache_store - result = rails_cached_client.get_prompt(prompt_name) - expect(result).to eq(prompt_response) + stub_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}") + .to_return(status: 200, body: prompt_response.to_json, + headers: { "Content-Type" => "application/json" }) end - it "calls fetch_prompt_from_api within the lock block" do - expect(rails_cached_client).to receive(:fetch_prompt_from_api).with( - prompt_name, - version: nil, - label: nil - ).and_call_original + it "deduplicates concurrent fetches via fetch_with_lock and caches the result" do + first = rails_cached_client.get_prompt(prompt_name) + second = rails_cached_client.get_prompt(prompt_name) - rails_cached_client.get_prompt(prompt_name) + expect(first).to eq(prompt_response) + expect(second).to eq(prompt_response) + expect(a_request(:get, "#{base_url}/api/public/v2/prompts/#{prompt_name}")).to have_been_made.once end end # rubocop:enable RSpec/MultipleMemoizedHelpers @@ -1051,6 +728,22 @@ def call(payload) expect(observer.payloads.first).to include(logical_key: "greeting:production") end + it "supports one-argument proc cache observers" do + payloads = [] + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache, + cache_observer: ->(payload) { payloads << payload } + ) + stub_prompt(prompt_response) + + client.get_prompt_result(prompt_name) + + expect(payloads.map { |payload| payload.fetch(:event) }).to include(:miss, :write) + end + it "does not build cache event payloads when no listeners are active" do client = described_class.new( public_key: public_key, diff --git a/spec/langfuse/prompt_cache_coordinator_spec.rb b/spec/langfuse/prompt_cache_coordinator_spec.rb new file mode 100644 index 0000000..56551d1 --- /dev/null +++ b/spec/langfuse/prompt_cache_coordinator_spec.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Langfuse::PromptCacheCoordinator do + let(:prompt_data) do + { + "name" => "greeting", + "version" => 1, + "type" => "text", + "prompt" => "Hello" + } + end + + let(:events) { [] } + let(:emitter) do + double("cache event emitter").tap do |emitter| + allow(emitter).to receive(:emit_prompt_cache_event) do |event, payload = nil, &block| + events << (payload || block.call).merge(event: event) + end + end + end + let(:fetch_prompt) do + lambda do |name, version:, label:| + prompt_data.merge("name" => name, "version" => version || 1, "label" => label) + end + end + + def build_coordinator(cache) + described_class.new(cache: cache, event_emitter: emitter, fetch_prompt: fetch_prompt) + end + + it "returns disabled and bypass statuses without writing cache" do + disabled = build_coordinator(nil).get_prompt_result("greeting") + bypass = build_coordinator(Langfuse::PromptCache.new(ttl: 60)).get_prompt_result("greeting", cache_ttl: 0) + + expect(disabled.cache_status).to eq(Langfuse::CacheStatus::DISABLED) + expect(disabled.source).to eq(Langfuse::CacheSource::API) + expect(bypass.cache_status).to eq(Langfuse::CacheStatus::BYPASS) + expect(bypass.storage_key).to start_with("g0:") + end + + it "tracks miss, write, and hit through the cache backend" do + cache = Langfuse::PromptCache.new(ttl: 60) + coordinator = build_coordinator(cache) + + miss = coordinator.get_prompt_result("greeting") + hit = coordinator.get_prompt_result("greeting") + + expect(miss.cache_status).to eq(Langfuse::CacheStatus::MISS) + expect(hit.cache_status).to eq(Langfuse::CacheStatus::HIT) + expect(events.map { |event| event[:event] }).to include(:miss, :write, :hit) + expect(coordinator.prompt_cache_stats).to include(backend: "memory", enabled: true) + end + + it "invalidates exact, name, and global scopes using public cache keys" do + cache = Langfuse::PromptCache.new(ttl: 60) + coordinator = build_coordinator(cache) + key = coordinator.invalidate_prompt_cache("greeting", label: "production") + name_generation = coordinator.invalidate_prompt_cache_by_name("greeting") + global_generation = coordinator.clear_prompt_cache + + expect(key.logical_key).to eq("greeting:production") + expect(name_generation).to eq(1) + expect(global_generation).to eq(1) + expect(events.map { |event| event[:event] }).to include(:delete, :invalidate, :clear) + end + + it "validates mutually exclusive version and label plus cache_ttl type" do + coordinator = build_coordinator(Langfuse::PromptCache.new(ttl: 60)) + + expect { coordinator.get_prompt_result("greeting", version: 1, label: "production") } + .to raise_error(ArgumentError, "Cannot specify both version and label") + expect { coordinator.get_prompt_result("greeting", cache_ttl: "60") } + .to raise_error(ArgumentError, "cache_ttl must be a non-negative Integer") + expect { coordinator.get_prompt_result("greeting", cache_ttl: -1) } + .to raise_error(ArgumentError, "cache_ttl must be non-negative") + end +end diff --git a/spec/langfuse/rake_cache_contract_spec.rb b/spec/langfuse/rake_cache_contract_spec.rb new file mode 100644 index 0000000..0e47321 --- /dev/null +++ b/spec/langfuse/rake_cache_contract_spec.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe "langfuse rake cache contract" do + let(:rake_source) { File.read(File.expand_path("../../lib/tasks/langfuse.rake", __dir__)) } + + it "uses public SDK cache APIs" do + expect(rake_source).not_to include("Langfuse.client.api_client.cache") + expect(rake_source).to include("Langfuse.client.clear_prompt_cache") + expect(rake_source).to include("Langfuse::CacheWarmer.new") + end +end