From eb1837352f6d9a9b9a806a5ca251b66e2835e93f Mon Sep 17 00:00:00 2001 From: kadekillary Date: Tue, 5 May 2026 01:29:59 -0600 Subject: [PATCH 1/3] perf(prompts): streamline prompt cache event dispatch --- lib/langfuse/api_client.rb | 74 ++++++++++++++++++++--------- lib/langfuse/client.rb | 18 ++----- lib/langfuse/rails_cache_adapter.rb | 3 +- spec/langfuse/api_client_spec.rb | 53 +++++++++++++++++++++ 4 files changed, 110 insertions(+), 38 deletions(-) diff --git a/lib/langfuse/api_client.rb b/lib/langfuse/api_client.rb index 67f861b..d548101 100644 --- a/lib/langfuse/api_client.rb +++ b/lib/langfuse/api_client.rb @@ -63,6 +63,7 @@ def initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cac @logger = logger || Logger.new($stdout, level: Logger::WARN) @cache = cache @cache_observer = cache_observer + @cache_observer_arity = cache_observer&.method(:call)&.arity @cache_backend_name = compute_cache_backend_name @active_support_notifications = defined?(ActiveSupport::Notifications) ? ActiveSupport::Notifications : nil end @@ -199,8 +200,8 @@ def prompt_cache_key(name, version: nil, label: nil) def invalidate_prompt_cache(name, version: nil, label: nil) key = prompt_cache_key(name, version: version, label: label) deleted = cache&.delete(key.storage_key) || false - emit_prompt_cache_event(:delete, event_payload(key, :miss, :cache, deleted: deleted)) - emit_prompt_cache_event(:invalidate, event_payload(key, :miss, :cache, scope: :exact)) + emit_prompt_cache_event(:delete) { event_payload(key, :miss, :cache, deleted: deleted) } + emit_prompt_cache_event(:invalidate) { event_payload(key, :miss, :cache, scope: :exact) } key end @@ -233,14 +234,19 @@ def prompt_cache_stats cache.stats end - # Emit a prompt cache event to configured hooks. + # Emit a prompt cache event to configured hooks. Accepts an eager payload + # hash or a block that builds one. The block is only evaluated when at + # least one listener is active, avoiding hash allocations on the hot path. # # @param event [Symbol] Event name - # @param payload [Hash] Event payload + # @param payload [Hash, nil] Event payload (omit when passing a block) + # @yieldreturn [Hash] Lazily constructed payload # @return [void] - def emit_prompt_cache_event(event, payload) - return if @cache_observer.nil? && @active_support_notifications.nil? + def emit_prompt_cache_event(event, payload = nil) + return unless prompt_cache_listeners_active? + payload = yield if payload.nil? && block_given? + payload ||= {} normalized_payload = payload.merge(event: event.to_sym) notify_cache_observer(event, normalized_payload) notify_active_support(normalized_payload) @@ -258,6 +264,18 @@ def prompt_event_payload(key, cache_status, source, extra = {}) event_payload(key, cache_status, source, extra) end + # Emit a fallback event for a prompt fetch that fell back to caller-provided content. + # + # @param key [PromptCacheKey] Logical and storage cache key + # @param cache_status [Symbol] Cache status to report + # @param error [StandardError] The error that triggered the fallback + # @return [void] + def emit_prompt_fallback_event(key, cache_status:, error:) + emit_prompt_cache_event(:fallback) do + event_payload(key, cache_status, :fallback, error_class: error.class.name, error_message: error.message) + end + end + # Create a new prompt (or new version if prompt with same name exists) # # @param name [String] The prompt name @@ -790,18 +808,18 @@ def fetch_swr_cached_prompt_result(key, version, label, cache_ttl) return nil unless entry.stale? - emit_prompt_cache_event(:stale_serve, event_payload(key, :stale, :cache)) + emit_prompt_cache_event(:stale_serve) { event_payload(key, :stale, :cache) } schedule_prompt_cache_refresh(key, version, label, cache_ttl) build_prompt_result(key, entry.data, :stale, :cache) end def cache_hit_prompt_result(key, prompt_data) - emit_prompt_cache_event(:hit, event_payload(key, :hit, :cache)) + emit_prompt_cache_event(:hit) { event_payload(key, :hit, :cache) } build_prompt_result(key, prompt_data, :hit, :cache) end def fetch_cache_miss_prompt_result(key, version, label, cache_ttl, swr_enabled: false, distributed_enabled: nil) - emit_prompt_cache_event(:miss, event_payload(key, :miss, :api)) + emit_prompt_cache_event(:miss) { event_payload(key, :miss, :api) } distributed_enabled = distributed_cache_available? if distributed_enabled.nil? if !swr_enabled && distributed_enabled @@ -817,7 +835,7 @@ def fetch_cache_miss_with_lock(key, version, label, cache_ttl) fetched = true fetch_prompt_from_api(key.name, version: version, label: label) end - emit_prompt_cache_event(:write, event_payload(key, :miss, :api)) if fetched + emit_prompt_cache_event(:write) { event_payload(key, :miss, :api) } if fetched status = fetched ? :miss : :hit source = fetched ? :api : :cache build_prompt_result(key, prompt_data, status, source) @@ -830,14 +848,15 @@ def fetch_cache_miss_directly(key, version, label, cache_ttl, swr_enabled: false end def refresh_prompt_result(key, version, label, cache_ttl) - emit_prompt_cache_event(:refresh_start, event_payload(key, :refresh, :api)) + emit_prompt_cache_event(:refresh_start) { event_payload(key, :refresh, :api) } prompt_data = fetch_prompt_from_api(key.name, version: version, label: label) write_refresh_prompt_cache(key, prompt_data, cache_ttl) - emit_prompt_cache_event(:refresh_success, event_payload(key, refresh_cache_status(cache_ttl), :api)) + emit_prompt_cache_event(:refresh_success) { event_payload(key, refresh_cache_status(cache_ttl), :api) } build_prompt_result(key, prompt_data, refresh_cache_status(cache_ttl), :api) rescue StandardError => e - payload = event_payload(key, :refresh, :api, error_class: e.class.name, error_message: e.message) - emit_prompt_cache_event(:refresh_failure, payload) + emit_prompt_cache_event(:refresh_failure) do + event_payload(key, :refresh, :api, error_class: e.class.name, error_message: e.message) + end raise end @@ -850,17 +869,18 @@ def schedule_prompt_cache_refresh(key, version, label, cache_ttl) on_success: ->(_value) { emit_refresh_success_events(key) }, on_failure: ->(error) { emit_refresh_failure_event(key, error) } ) { fetch_prompt_from_api(key.name, version: version, label: label) } - emit_prompt_cache_event(:refresh_start, event_payload(key, :stale, :cache)) if scheduled + emit_prompt_cache_event(:refresh_start) { event_payload(key, :stale, :cache) } if scheduled end def emit_refresh_success_events(key) - emit_prompt_cache_event(:refresh_success, event_payload(key, :refresh, :api)) - emit_prompt_cache_event(:write, event_payload(key, :refresh, :api)) + emit_prompt_cache_event(:refresh_success) { event_payload(key, :refresh, :api) } + emit_prompt_cache_event(:write) { event_payload(key, :refresh, :api) } end def emit_refresh_failure_event(key, error) - payload = event_payload(key, :stale, :cache, error_class: error.class.name, error_message: error.message) - emit_prompt_cache_event(:refresh_failure, payload) + emit_prompt_cache_event(:refresh_failure) do + event_payload(key, :stale, :cache, error_class: error.class.name, error_message: error.message) + end end def write_refresh_prompt_cache(key, prompt_data, cache_ttl) @@ -878,7 +898,7 @@ def write_prompt_cache(key, prompt_data, cache_ttl, cache_status: :miss, swr_ena else cache.set(key.storage_key, prompt_data, ttl: cache_ttl) end - emit_prompt_cache_event(:write, event_payload(key, cache_status, :api)) + emit_prompt_cache_event(:write) { event_payload(key, cache_status, :api) } end def cache_fetch_with_lock(storage_key, cache_ttl, &) @@ -951,15 +971,25 @@ def invalidate_prompt_cache_after_mutation(name) emit_prompt_cache_event(:invalidate, payload) end + def prompt_cache_listeners_active? + !cache_observer.nil? || active_support_listening? + end + def notify_cache_observer(event, payload) return unless cache_observer - observer_arity = cache_observer.respond_to?(:arity) ? cache_observer.arity : 2 - observer_arity == 1 ? cache_observer.call(payload) : cache_observer.call(event, payload) + @cache_observer_arity == 1 ? cache_observer.call(payload) : cache_observer.call(event, payload) rescue StandardError => e logger.warn("Langfuse prompt cache observer failed: #{e.class} - #{e.message}") end + def active_support_listening? + return false unless @active_support_notifications + + notifier = @active_support_notifications.notifier + notifier.respond_to?(:listening?) ? notifier.listening?("prompt_cache.langfuse") : true + end + def notify_active_support(payload) return unless @active_support_notifications diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index 7602d8e..a3332be 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -862,15 +862,13 @@ def build_client_fetch_result(api_result, prompt_client) def build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl:, error:) prompt_client = build_fallback_prompt_client(name, fallback, type) key = api_client.prompt_cache_key(name, version: version, label: label) - api_client.emit_prompt_cache_event( - :fallback, - fallback_event_payload(key, cache_ttl, error) - ) + cache_status = fallback_cache_status(cache_ttl) + api_client.emit_prompt_fallback_event(key, cache_status: cache_status, error: error) PromptFetchResult.new( prompt: prompt_client, logical_key: key.logical_key, storage_key: key.storage_key, - cache_status: fallback_cache_status(cache_ttl), + cache_status: cache_status, source: :fallback, name: name, version: version || prompt_client.version, @@ -879,16 +877,6 @@ def build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl end # rubocop:enable Metrics/ParameterLists - def fallback_event_payload(key, cache_ttl, error) - api_client.prompt_event_payload( - key, - fallback_cache_status(cache_ttl), - :fallback, - error_class: error.class.name, - error_message: error.message - ) - end - def fallback_cache_status(cache_ttl) return :bypass if cache_ttl&.zero? return :disabled unless api_client.cache diff --git a/lib/langfuse/rails_cache_adapter.rb b/lib/langfuse/rails_cache_adapter.rb index 9986e7f..d2511d8 100644 --- a/lib/langfuse/rails_cache_adapter.rb +++ b/lib/langfuse/rails_cache_adapter.rb @@ -54,6 +54,7 @@ def initialize(ttl: 60, namespace: "langfuse", lock_timeout: 10, stale_ttl: 0, r @ttl = ttl @namespace = namespace + @namespace_prefix = "#{namespace}:" @lock_timeout = lock_timeout @stale_ttl = stale_ttl @logger = logger @@ -327,7 +328,7 @@ def wait_for_cache(key) # @param key [String] Original cache key # @return [String] Namespaced cache key def namespaced_key(key) - key.start_with?("#{namespace}:") ? key : "#{namespace}:#{key}" + key.start_with?(@namespace_prefix) ? key : "#{@namespace_prefix}#{key}" end def global_generation_key diff --git a/spec/langfuse/api_client_spec.rb b/spec/langfuse/api_client_spec.rb index 4e74873..d7fa78c 100644 --- a/spec/langfuse/api_client_spec.rb +++ b/spec/langfuse/api_client_spec.rb @@ -1030,6 +1030,59 @@ def set(_key, value) expect(a_request(:get, prompt_url)).to have_been_made.once end + it "supports one-argument callable object cache observers" do + observer = Struct.new(:payloads) do + def call(payload) + payloads << payload + end + end.new([]) + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache, + cache_observer: observer + ) + stub_prompt(prompt_response) + + client.get_prompt_result(prompt_name) + + expect(observer.payloads.map { |payload| payload.fetch(:event) }).to include(:miss, :write) + expect(observer.payloads.first).to include(logical_key: "greeting:production") + end + + it "does not build cache event payloads when no listeners are active" do + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache + ) + built = false + + client.emit_prompt_cache_event(:hit) do + built = true + {} + end + + expect(built).to be(false) + end + + it "emits event-only payloads when no explicit payload is provided" do + events = [] + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache, + cache_observer: ->(event, payload) { events << [event, payload] } + ) + + client.emit_prompt_cache_event(:manual) + + expect(events).to eq([[:manual, { event: :manual }]]) + end + it "bypasses cache reads and writes when cache_ttl is zero" do stub_request(:get, prompt_url) .to_return( From e79e4f27c7a0ecd051b9eada754ba197a57bec3b Mon Sep 17 00:00:00 2001 From: kadekillary Date: Tue, 5 May 2026 03:54:46 -0600 Subject: [PATCH 2/3] refactor(prompts): tighten cache event dispatch and generation safety --- lib/langfuse.rb | 2 + lib/langfuse/api_client.rb | 278 ++++++++++--------------- lib/langfuse/cache_constants.rb | 32 +++ lib/langfuse/client.rb | 22 +- lib/langfuse/prompt_cache.rb | 33 +-- lib/langfuse/prompt_cache_events.rb | 110 ++++++++++ lib/langfuse/prompt_fetch_result.rb | 9 +- lib/langfuse/rails_cache_adapter.rb | 11 +- lib/langfuse/stale_while_revalidate.rb | 32 ++- spec/langfuse/api_client_spec.rb | 37 ++++ spec/langfuse/prompt_cache_spec.rb | 41 ++++ 11 files changed, 391 insertions(+), 216 deletions(-) create mode 100644 lib/langfuse/cache_constants.rb create mode 100644 lib/langfuse/prompt_cache_events.rb diff --git a/lib/langfuse.rb b/lib/langfuse.rb index 20515e6..24ecb47 100644 --- a/lib/langfuse.rb +++ b/lib/langfuse.rb @@ -40,10 +40,12 @@ class UnauthorizedError < ApiError; end end require_relative "langfuse/config" +require_relative "langfuse/cache_constants" require_relative "langfuse/prompt_cache" require_relative "langfuse/prompt_fetch_result" require_relative "langfuse/rails_cache_adapter" require_relative "langfuse/cache_warmer" +require_relative "langfuse/prompt_cache_events" require_relative "langfuse/api_client" require_relative "langfuse/span_filter" require_relative "langfuse/sampling" diff --git a/lib/langfuse/api_client.rb b/lib/langfuse/api_client.rb index d548101..7789791 100644 --- a/lib/langfuse/api_client.rb +++ b/lib/langfuse/api_client.rb @@ -23,6 +23,16 @@ module Langfuse # ) # class ApiClient # rubocop:disable Metrics/ClassLength + include PromptCacheEvents + + # Bundles the resolved cache key with the per-call TTL override so private + # prompt-fetch helpers take one arg instead of four. + PromptFetchOptions = Struct.new(:key, :cache_ttl, keyword_init: true) do + def name = key.name + def version = key.version + def label = key.label + end + # @return [String] Langfuse public API key attr_reader :public_key @@ -41,9 +51,6 @@ class ApiClient # rubocop:disable Metrics/ClassLength # @return [PromptCache, RailsCacheAdapter, nil] Optional cache for prompt responses attr_reader :cache - # @return [#call, nil] Optional observer for prompt cache events - attr_reader :cache_observer - # Initialize a new API client # # @param public_key [String] Langfuse public API key @@ -62,10 +69,8 @@ def initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cac @timeout = timeout @logger = logger || Logger.new($stdout, level: Logger::WARN) @cache = cache - @cache_observer = cache_observer - @cache_observer_arity = cache_observer&.method(:call)&.arity @cache_backend_name = compute_cache_backend_name - @active_support_notifications = defined?(ActiveSupport::Notifications) ? ActiveSupport::Notifications : nil + setup_prompt_cache_events(cache_observer: cache_observer) end # rubocop:enable Metrics/ParameterLists @@ -145,11 +150,14 @@ def get_prompt(name, version: nil, label: nil, cache_ttl: nil) def get_prompt_result(name, version: nil, label: nil, cache_ttl: nil) validate_prompt_fetch_options!(version, label, cache_ttl) - key = prompt_cache_key(name, version: version, label: label) - return fetch_uncached_prompt_result(key, version, label, :disabled) if cache.nil? - return fetch_uncached_prompt_result(key, version, label, :bypass) if cache_ttl&.zero? + options = PromptFetchOptions.new( + key: prompt_cache_key(name, version: version, label: label), + cache_ttl: cache_ttl + ) + return fetch_uncached_prompt_result(options, CacheStatus::DISABLED) if cache.nil? + return fetch_uncached_prompt_result(options, CacheStatus::BYPASS) if cache_ttl&.zero? - fetch_cached_prompt_result(key, version, label, cache_ttl) + fetch_cached_prompt_result(options) end # Refresh a prompt from the API, optionally writing through to cache. @@ -167,8 +175,12 @@ def get_prompt_result(name, version: nil, label: nil, cache_ttl: nil) def refresh_prompt(name, version: nil, label: nil, cache_ttl: nil) validate_prompt_fetch_options!(version, label, cache_ttl) - key = prompt_cache_key(name, version: version, label: label) - refresh_prompt_result(key, version, label, cache_ttl) + refresh_prompt_result( + PromptFetchOptions.new( + key: prompt_cache_key(name, version: version, label: label), + cache_ttl: cache_ttl + ) + ) end # Inspect the logical and generated cache keys for a prompt. @@ -200,8 +212,10 @@ def prompt_cache_key(name, version: nil, label: nil) def invalidate_prompt_cache(name, version: nil, label: nil) key = prompt_cache_key(name, version: version, label: label) deleted = cache&.delete(key.storage_key) || false - emit_prompt_cache_event(:delete) { event_payload(key, :miss, :cache, deleted: deleted) } - emit_prompt_cache_event(:invalidate) { event_payload(key, :miss, :cache, scope: :exact) } + emit_prompt_cache_event(:delete) { event_payload(key, CacheStatus::MISS, CacheSource::CACHE, deleted: deleted) } + emit_prompt_cache_event(:invalidate) do + event_payload(key, CacheStatus::MISS, CacheSource::CACHE, scope: :exact) + end key end @@ -234,48 +248,6 @@ def prompt_cache_stats cache.stats end - # Emit a prompt cache event to configured hooks. Accepts an eager payload - # hash or a block that builds one. The block is only evaluated when at - # least one listener is active, avoiding hash allocations on the hot path. - # - # @param event [Symbol] Event name - # @param payload [Hash, nil] Event payload (omit when passing a block) - # @yieldreturn [Hash] Lazily constructed payload - # @return [void] - def emit_prompt_cache_event(event, payload = nil) - return unless prompt_cache_listeners_active? - - payload = yield if payload.nil? && block_given? - payload ||= {} - normalized_payload = payload.merge(event: event.to_sym) - notify_cache_observer(event, normalized_payload) - notify_active_support(normalized_payload) - end - - # Build a prompt cache event payload for one logical key. - # - # @api private - # @param key [PromptCacheKey] Logical and storage cache key - # @param cache_status [Symbol] Cache status (:hit, :miss, :stale, :refresh, :bypass, :disabled) - # @param source [Symbol] Prompt source (:cache, :api, :fallback) - # @param extra [Hash] Extra fields to merge into the payload - # @return [Hash] Event payload - def prompt_event_payload(key, cache_status, source, extra = {}) - event_payload(key, cache_status, source, extra) - end - - # Emit a fallback event for a prompt fetch that fell back to caller-provided content. - # - # @param key [PromptCacheKey] Logical and storage cache key - # @param cache_status [Symbol] Cache status to report - # @param error [StandardError] The error that triggered the fallback - # @return [void] - def emit_prompt_fallback_event(key, cache_status:, error:) - emit_prompt_cache_event(:fallback) do - event_payload(key, cache_status, :fallback, error_class: error.class.name, error_message: error.message) - end - end - # Create a new prompt (or new version if prompt with same name exists) # # @param name [String] The prompt name @@ -751,135 +723,140 @@ def delete_dataset_item(id) def validate_prompt_fetch_options!(version, label, cache_ttl) raise ArgumentError, "Cannot specify both version and label" if version && label - return if cache_ttl.nil? || (cache_ttl.respond_to?(:negative?) && !cache_ttl.negative?) - - raise ArgumentError, "cache_ttl must be non-negative" + return if cache_ttl.nil? + raise ArgumentError, "cache_ttl must be a non-negative Integer" unless cache_ttl.is_a?(Integer) + raise ArgumentError, "cache_ttl must be non-negative" if cache_ttl.negative? end - def fetch_uncached_prompt_result(key, version, label, cache_status) - prompt_data = fetch_prompt_from_api(key.name, version: version, label: label) - build_prompt_result(key, prompt_data, cache_status, :api) + def fetch_uncached_prompt_result(options, cache_status) + prompt_data = fetch_prompt_for_options(options) + build_prompt_result(options.key, prompt_data, cache_status, CacheSource::API) end - def fetch_cached_prompt_result(key, version, label, cache_ttl) - swr_enabled = swr_cache_available? - return fetch_swr_prompt_result(key, version, label, cache_ttl) if swr_enabled + def fetch_cached_prompt_result(options) + return fetch_swr_prompt_result(options) if swr_cache_available? - fetch_non_swr_prompt_result(key, version, label, cache_ttl) + fetch_non_swr_prompt_result(options) end - def fetch_swr_prompt_result(key, version, label, cache_ttl) + def fetch_swr_prompt_result(options) unless generated_storage_key_cache? - prompt_data = fetch_with_swr_cache(key.storage_key, key.name, version, label) - return cache_hit_prompt_result(key, prompt_data) + prompt_data = fetch_with_swr_cache(options.key.storage_key, options.name, options.version, options.label) + return cache_hit_prompt_result(options.key, prompt_data) end - result = fetch_swr_cached_prompt_result(key, version, label, cache_ttl) + result = fetch_swr_cached_prompt_result(options) return result if result - fetch_cache_miss_prompt_result(key, version, label, cache_ttl, swr_enabled: true, distributed_enabled: false) + fetch_cache_miss_prompt_result(options, swr_enabled: true, distributed_enabled: false) end - def fetch_non_swr_prompt_result(key, version, label, cache_ttl) + def fetch_non_swr_prompt_result(options) distributed_enabled = distributed_cache_available? if !generated_storage_key_cache? && distributed_enabled - prompt_data = fetch_with_distributed_cache(key.storage_key, key.name, version, label) - return cache_hit_prompt_result(key, prompt_data) + prompt_data = fetch_with_distributed_cache(options.key.storage_key, options.name, options.version, + options.label) + return cache_hit_prompt_result(options.key, prompt_data) end - cached_data = cache.get(key.storage_key) - return cache_hit_prompt_result(key, cached_data) if cached_data + cached_data = cache.get(options.key.storage_key) + return cache_hit_prompt_result(options.key, cached_data) if cached_data - fetch_cache_miss_prompt_result( - key, - version, - label, - cache_ttl, - swr_enabled: false, - distributed_enabled: distributed_enabled - ) + fetch_cache_miss_prompt_result(options, swr_enabled: false, distributed_enabled: distributed_enabled) end - def fetch_swr_cached_prompt_result(key, version, label, cache_ttl) + def fetch_swr_cached_prompt_result(options) + key = options.key entry = cache.entry(key.storage_key) if cache.respond_to?(:entry) return nil unless entry.respond_to?(:fresh?) return cache_hit_prompt_result(key, entry.data) if entry.fresh? - return nil unless entry.stale? - emit_prompt_cache_event(:stale_serve) { event_payload(key, :stale, :cache) } - schedule_prompt_cache_refresh(key, version, label, cache_ttl) - build_prompt_result(key, entry.data, :stale, :cache) + emit_prompt_cache_event(:stale_serve) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } + schedule_prompt_cache_refresh(options) + build_prompt_result(key, entry.data, CacheStatus::STALE, CacheSource::CACHE) end def cache_hit_prompt_result(key, prompt_data) - emit_prompt_cache_event(:hit) { event_payload(key, :hit, :cache) } - build_prompt_result(key, prompt_data, :hit, :cache) + emit_prompt_cache_event(:hit) { event_payload(key, CacheStatus::HIT, CacheSource::CACHE) } + build_prompt_result(key, prompt_data, CacheStatus::HIT, CacheSource::CACHE) end - def fetch_cache_miss_prompt_result(key, version, label, cache_ttl, swr_enabled: false, distributed_enabled: nil) - emit_prompt_cache_event(:miss) { event_payload(key, :miss, :api) } + def fetch_cache_miss_prompt_result(options, swr_enabled: false, distributed_enabled: nil) + emit_prompt_cache_event(:miss) { event_payload(options.key, CacheStatus::MISS, CacheSource::API) } distributed_enabled = distributed_cache_available? if distributed_enabled.nil? if !swr_enabled && distributed_enabled - fetch_cache_miss_with_lock(key, version, label, cache_ttl) + fetch_cache_miss_with_lock(options) else - fetch_cache_miss_directly(key, version, label, cache_ttl, swr_enabled: swr_enabled) + fetch_cache_miss_directly(options, swr_enabled: swr_enabled) end end - def fetch_cache_miss_with_lock(key, version, label, cache_ttl) + def fetch_cache_miss_with_lock(options) + key = options.key fetched = false - prompt_data = cache_fetch_with_lock(key.storage_key, cache_ttl) do + prompt_data = cache_fetch_with_lock(key.storage_key, options.cache_ttl) do fetched = true - fetch_prompt_from_api(key.name, version: version, label: label) + fetch_prompt_for_options(options) end - emit_prompt_cache_event(:write) { event_payload(key, :miss, :api) } if fetched - status = fetched ? :miss : :hit - source = fetched ? :api : :cache + emit_prompt_cache_event(:write) { event_payload(key, CacheStatus::MISS, CacheSource::API) } if fetched + status = fetched ? CacheStatus::MISS : CacheStatus::HIT + source = fetched ? CacheSource::API : CacheSource::CACHE build_prompt_result(key, prompt_data, status, source) end - def fetch_cache_miss_directly(key, version, label, cache_ttl, swr_enabled: false) - prompt_data = fetch_prompt_from_api(key.name, version: version, label: label) - write_prompt_cache(key, prompt_data, cache_ttl, swr_enabled: swr_enabled) - build_prompt_result(key, prompt_data, :miss, :api) + def fetch_cache_miss_directly(options, swr_enabled: false) + prompt_data = fetch_prompt_for_options(options) + write_prompt_cache(options.key, prompt_data, options.cache_ttl, swr_enabled: swr_enabled) + build_prompt_result(options.key, prompt_data, CacheStatus::MISS, CacheSource::API) end - def refresh_prompt_result(key, version, label, cache_ttl) - emit_prompt_cache_event(:refresh_start) { event_payload(key, :refresh, :api) } - prompt_data = fetch_prompt_from_api(key.name, version: version, label: label) - write_refresh_prompt_cache(key, prompt_data, cache_ttl) - emit_prompt_cache_event(:refresh_success) { event_payload(key, refresh_cache_status(cache_ttl), :api) } - build_prompt_result(key, prompt_data, refresh_cache_status(cache_ttl), :api) + def refresh_prompt_result(options) + key = options.key + emit_prompt_cache_event(:refresh_start) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } + prompt_data = fetch_prompt_for_options(options) + write_refresh_prompt_cache(key, prompt_data, options.cache_ttl) + status = refresh_cache_status(options.cache_ttl) + emit_prompt_cache_event(:refresh_success) { event_payload(key, status, CacheSource::API) } + build_prompt_result(key, prompt_data, status, CacheSource::API) rescue StandardError => e emit_prompt_cache_event(:refresh_failure) do - event_payload(key, :refresh, :api, error_class: e.class.name, error_message: e.message) + event_payload(key, CacheStatus::REFRESH, CacheSource::API, + error_class: e.class.name, error_message: e.message) end raise end - def schedule_prompt_cache_refresh(key, version, label, cache_ttl) + def schedule_prompt_cache_refresh(options) return unless cache.respond_to?(:refresh_async) + key = options.key scheduled = cache.refresh_async( key.storage_key, - ttl: cache_ttl, + ttl: options.cache_ttl, on_success: ->(_value) { emit_refresh_success_events(key) }, on_failure: ->(error) { emit_refresh_failure_event(key, error) } - ) { fetch_prompt_from_api(key.name, version: version, label: label) } - emit_prompt_cache_event(:refresh_start) { event_payload(key, :stale, :cache) } if scheduled + ) { fetch_prompt_for_options(options) } + return unless scheduled + + emit_prompt_cache_event(:refresh_start) { event_payload(key, CacheStatus::STALE, CacheSource::CACHE) } + end + + def fetch_prompt_for_options(options) + fetch_prompt_from_api(options.name, version: options.version, label: options.label) end def emit_refresh_success_events(key) - emit_prompt_cache_event(:refresh_success) { event_payload(key, :refresh, :api) } - emit_prompt_cache_event(:write) { event_payload(key, :refresh, :api) } + emit_prompt_cache_event(:refresh_success) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } + emit_prompt_cache_event(:write) { event_payload(key, CacheStatus::REFRESH, CacheSource::API) } end def emit_refresh_failure_event(key, error) emit_prompt_cache_event(:refresh_failure) do - event_payload(key, :stale, :cache, error_class: error.class.name, error_message: error.message) + event_payload(key, CacheStatus::STALE, CacheSource::CACHE, + error_class: error.class.name, error_message: error.message) end end @@ -887,10 +864,11 @@ def write_refresh_prompt_cache(key, prompt_data, cache_ttl) return unless cache return if cache_ttl&.zero? - write_prompt_cache(key, prompt_data, cache_ttl, cache_status: :refresh, swr_enabled: swr_cache_available?) + write_prompt_cache(key, prompt_data, cache_ttl, + cache_status: CacheStatus::REFRESH, swr_enabled: swr_cache_available?) end - def write_prompt_cache(key, prompt_data, cache_ttl, cache_status: :miss, swr_enabled: false) + def write_prompt_cache(key, prompt_data, cache_ttl, cache_status: CacheStatus::MISS, swr_enabled: false) if swr_enabled && cache.respond_to?(:write_with_stale_while_revalidate) cache.write_with_stale_while_revalidate(key.storage_key, prompt_data, ttl: cache_ttl) elsif cache_ttl.nil? @@ -898,7 +876,7 @@ def write_prompt_cache(key, prompt_data, cache_ttl, cache_status: :miss, swr_ena else cache.set(key.storage_key, prompt_data, ttl: cache_ttl) end - emit_prompt_cache_event(:write) { event_payload(key, cache_status, :api) } + emit_prompt_cache_event(:write) { event_payload(key, cache_status, CacheSource::API) } end def cache_fetch_with_lock(storage_key, cache_ttl, &) @@ -908,10 +886,10 @@ def cache_fetch_with_lock(storage_key, cache_ttl, &) end def refresh_cache_status(cache_ttl) - return :disabled unless cache - return :bypass if cache_ttl&.zero? + return CacheStatus::DISABLED unless cache + return CacheStatus::BYPASS if cache_ttl&.zero? - :refresh + CacheStatus::REFRESH end def build_prompt_result(key, prompt_data, cache_status, source) @@ -927,37 +905,24 @@ def build_prompt_result(key, prompt_data, cache_status, source) ) end - def event_payload(key, cache_status, source, extra = {}) - { - name: key.name, - version: key.version, - label: key.resolved_label, - logical_key: key.logical_key, - storage_key: key.storage_key, - backend: cache_backend_name, - cache_status: cache_status, - source: source - }.merge(extra) - end - attr_reader :cache_backend_name def compute_cache_backend_name - return "disabled" unless cache - return "rails" if cache.is_a?(RailsCacheAdapter) - return "memory" if cache.is_a?(PromptCache) + return CacheBackend::DISABLED unless cache + return CacheBackend::RAILS if cache.is_a?(RailsCacheAdapter) + return CacheBackend::MEMORY if cache.is_a?(PromptCache) cache.class.name end def disabled_prompt_cache_stats { - backend: "disabled", + backend: CacheBackend::DISABLED, enabled: false, current_generation_entries: nil, orphaned_entries: nil, total_entries: nil, - unsupported_counts: %i[current_generation_entries orphaned_entries total_entries] + unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS } end @@ -971,33 +936,6 @@ def invalidate_prompt_cache_after_mutation(name) emit_prompt_cache_event(:invalidate, payload) end - def prompt_cache_listeners_active? - !cache_observer.nil? || active_support_listening? - end - - def notify_cache_observer(event, payload) - return unless cache_observer - - @cache_observer_arity == 1 ? cache_observer.call(payload) : cache_observer.call(event, payload) - rescue StandardError => e - logger.warn("Langfuse prompt cache observer failed: #{e.class} - #{e.message}") - end - - def active_support_listening? - return false unless @active_support_notifications - - notifier = @active_support_notifications.notifier - notifier.respond_to?(:listening?) ? notifier.listening?("prompt_cache.langfuse") : true - end - - def notify_active_support(payload) - return unless @active_support_notifications - - @active_support_notifications.instrument("prompt_cache.langfuse", payload) - rescue StandardError => e - logger.warn("Langfuse ActiveSupport cache notification failed: #{e.class} - #{e.message}") - end - # Check if SWR cache is available def swr_cache_available? cache.respond_to?(:swr_enabled?) && cache.swr_enabled? diff --git a/lib/langfuse/cache_constants.rb b/lib/langfuse/cache_constants.rb new file mode 100644 index 0000000..f3d8615 --- /dev/null +++ b/lib/langfuse/cache_constants.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Langfuse + # Symbol constants for prompt cache event payloads. + # Producers (ApiClient, PromptFetchResult) and consumers (observers, + # ActiveSupport::Notifications subscribers) share these definitions so a + # rename in one place can't silently desync from the other. + module CacheStatus + HIT = :hit + MISS = :miss + STALE = :stale + REFRESH = :refresh + BYPASS = :bypass + DISABLED = :disabled + end + + module CacheSource + CACHE = :cache + API = :api + FALLBACK = :fallback + end + + module CacheBackend + MEMORY = "memory" + RAILS = "rails" + DISABLED = "disabled" + + # Stat keys backend implementations may not be able to compute. Surfaced in + # `#stats[:unsupported_counts]` so callers can distinguish "0" from "unknown". + UNSUPPORTED_COUNT_KEYS = %i[current_generation_entries orphaned_entries total_entries].freeze + end +end diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index a3332be..460867a 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -114,7 +114,8 @@ def get_prompt_result(name, version: nil, label: nil, fallback: nil, type: nil, # Log warning and return fallback config.logger.warn("Langfuse API error for prompt '#{name}': #{e.message}. Using fallback.") - build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl: cache_ttl, error: e) + key = api_client.prompt_cache_key(name, version: version, label: label) + build_fallback_prompt_result(key, fallback: fallback, type: type, cache_ttl: cache_ttl, error: e) end # Refresh a prompt from the API, optionally writing through to cache. @@ -858,10 +859,8 @@ def build_client_fetch_result(api_result, prompt_client) ) end - # rubocop:disable Metrics/ParameterLists - def build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl:, error:) - prompt_client = build_fallback_prompt_client(name, fallback, type) - key = api_client.prompt_cache_key(name, version: version, label: label) + def build_fallback_prompt_result(key, fallback:, type:, cache_ttl:, error:) + prompt_client = build_fallback_prompt_client(key.name, fallback, type) cache_status = fallback_cache_status(cache_ttl) api_client.emit_prompt_fallback_event(key, cache_status: cache_status, error: error) PromptFetchResult.new( @@ -869,19 +868,18 @@ def build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl logical_key: key.logical_key, storage_key: key.storage_key, cache_status: cache_status, - source: :fallback, - name: name, - version: version || prompt_client.version, + source: CacheSource::FALLBACK, + name: key.name, + version: key.version || prompt_client.version, label: key.resolved_label ) end - # rubocop:enable Metrics/ParameterLists def fallback_cache_status(cache_ttl) - return :bypass if cache_ttl&.zero? - return :disabled unless api_client.cache + return CacheStatus::BYPASS if cache_ttl&.zero? + return CacheStatus::DISABLED unless api_client.cache - :miss + CacheStatus::MISS end # Check if caching is enabled in configuration diff --git a/lib/langfuse/prompt_cache.rb b/lib/langfuse/prompt_cache.rb index 011a122..eb84bdb 100644 --- a/lib/langfuse/prompt_cache.rb +++ b/lib/langfuse/prompt_cache.rb @@ -19,6 +19,11 @@ module Langfuse class PromptCache include StaleWhileRevalidate + # Caps the per-name generation map. Without a cap, long-lived processes + # that invalidate across many distinct prompts grow it unboundedly; LRU + # eviction keeps the working set live and lets cold names go. + MAX_NAME_GENERATIONS = 1024 + # Cache entry with data and expiration time # # Supports stale-while-revalidate pattern: @@ -82,7 +87,8 @@ def initialize(ttl: 60, max_size: 1000, stale_ttl: 0, refresh_threads: 5, logger @logger = logger @cache = {} @global_generation = 0 - @name_generations = Hash.new(0) + @name_generations = {} + @name_generation_counter = 0 @monitor = Monitor.new @locks = {} # Track locks for in-memory locking initialize_swr(refresh_threads: refresh_threads) if swr_enabled? @@ -121,13 +127,8 @@ def set(key, value, ttl: nil, stale_ttl: nil) @monitor.synchronize do # Evict oldest entry if at max size evict_oldest if @cache.size >= max_size - - now = Time.now - effective_ttl = ttl.nil? ? self.ttl : ttl - effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl - fresh_until = now + effective_ttl - stale_until = fresh_until + effective_stale_ttl - @cache[key] = CacheEntry.new(value, fresh_until, stale_until) + window = compute_window(ttl: ttl, stale_ttl: stale_ttl) + @cache[key] = CacheEntry.new(value, window.fresh_until, window.stale_until) value end end @@ -162,11 +163,19 @@ def clear_logically # Logically invalidate every cache variant for one prompt name. # + # Generations come from a monotonic global counter, not a per-name counter, + # so an evicted name re-entering the map can't reuse a generation value + # that's still embedded in a stale @cache entry. + # # @param name [String] Prompt name # @return [Integer] New name generation def invalidate_name(name) @monitor.synchronize do - @name_generations[name.to_s] += 1 + name_str = name.to_s + @name_generations.delete(name_str) + @name_generations.shift if @name_generations.size >= MAX_NAME_GENERATIONS + @name_generation_counter += 1 + @name_generations[name_str] = @name_generation_counter end end @@ -181,7 +190,7 @@ def storage_key(logical_key, name:) logical_key, name: name, global_generation: @global_generation, - name_generation: @name_generations[name.to_s] + name_generation: @name_generations.fetch(name.to_s, 0) ) end end @@ -191,7 +200,7 @@ def stats @monitor.synchronize do counts = count_entries_by_generation { - backend: "memory", + backend: CacheBackend::MEMORY, enabled: true, current_generation_entries: counts.fetch(:current), orphaned_entries: counts.fetch(:orphaned), @@ -346,7 +355,7 @@ def current_generation_key?(key) global = Integer(parts[0][1..]) name = Base64.urlsafe_decode64(parts[1][1..]) name_generation = Integer(parts[2]) - global == @global_generation && name_generation == @name_generations[name] + global == @global_generation && name_generation == @name_generations.fetch(name, 0) rescue ArgumentError false end diff --git a/lib/langfuse/prompt_cache_events.rb b/lib/langfuse/prompt_cache_events.rb new file mode 100644 index 0000000..70eb1f2 --- /dev/null +++ b/lib/langfuse/prompt_cache_events.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +module Langfuse + # Prompt cache event emission for ApiClient. + # + # Includers must expose: + # - `cache_backend_name` — used in {#event_payload} to tag the cache backend + # - `logger` — used to warn on observer/notifier failures + module PromptCacheEvents + # ActiveSupport::Notifications event name used for prompt cache events. + PROMPT_CACHE_NOTIFICATION = "prompt_cache.langfuse" + + # Configure prompt cache event dispatch. Wraps the observer once into a + # 1-arg callable so the per-event hot path never re-checks arity. + # + # @param cache_observer [#call, nil] Optional observer + # @return [void] + def setup_prompt_cache_events(cache_observer:) + @cache_observer_callable = wrap_cache_observer(cache_observer) + @active_support_notifications = defined?(ActiveSupport::Notifications) ? ActiveSupport::Notifications : nil + end + + # Emit a prompt cache event to configured hooks. Accepts an eager payload + # hash or a block that builds one. The block is only evaluated when at + # least one listener is active, avoiding hash allocations on the hot path. + # + # @param event [Symbol] Event name + # @param payload [Hash, nil] Event payload (omit when passing a block) + # @yieldreturn [Hash] Lazily constructed payload + # @return [void] + def emit_prompt_cache_event(event, payload = nil) + observer_callable = @cache_observer_callable + as_listening = active_support_listening? + return if observer_callable.nil? && !as_listening + + payload ||= block_given? ? yield : {} + normalized_payload = payload.merge(event: event.to_sym) + notify_cache_observer(normalized_payload) if observer_callable + notify_active_support(normalized_payload) if as_listening + end + + # Emit a fallback event for a prompt fetch that fell back to caller-provided content. + # + # @param key [PromptCacheKey] Logical and storage cache key + # @param cache_status [Symbol] Cache status to report + # @param error [StandardError] The error that triggered the fallback + # @return [void] + def emit_prompt_fallback_event(key, cache_status:, error:) + emit_prompt_cache_event(:fallback) do + event_payload(key, cache_status, CacheSource::FALLBACK, + error_class: error.class.name, error_message: error.message) + end + end + + private + + # @api private + def event_payload(key, cache_status, source, extra = {}) + { + name: key.name, + version: key.version, + label: key.resolved_label, + logical_key: key.logical_key, + storage_key: key.storage_key, + backend: cache_backend_name, + cache_status: cache_status, + source: source + }.merge(extra) + end + + # @api private + def notify_cache_observer(payload) + @cache_observer_callable.call(payload) + rescue StandardError => e + logger.warn("Langfuse prompt cache observer failed: #{e.class} - #{e.message}") + end + + # @api private + def active_support_listening? + return false unless @active_support_notifications + + notifier = @active_support_notifications.notifier + # Defensive: notifier stand-ins (test fakes, AS::Notifications forks, + # very old AS versions) may not implement listening?. Assume they're + # listening so we still attempt to instrument; notify_active_support + # rescues failures. + return true unless notifier.respond_to?(:listening?) + + notifier.listening?(PROMPT_CACHE_NOTIFICATION) + end + + # @api private + def notify_active_support(payload) + @active_support_notifications.instrument(PROMPT_CACHE_NOTIFICATION, payload) + rescue StandardError => e + logger.warn("Langfuse ActiveSupport cache notification failed: #{e.class} - #{e.message}") + end + + # @api private + def wrap_cache_observer(observer) + return nil if observer.nil? + + if observer.method(:call).arity == 1 + ->(payload) { observer.call(payload) } + else + ->(payload) { observer.call(payload[:event], payload) } + end + end + end +end diff --git a/lib/langfuse/prompt_fetch_result.rb b/lib/langfuse/prompt_fetch_result.rb index 3ef5021..6e11592 100644 --- a/lib/langfuse/prompt_fetch_result.rb +++ b/lib/langfuse/prompt_fetch_result.rb @@ -49,16 +49,9 @@ def initialize(prompt:, logical_key:, storage_key:, cache_status:, source:, name end # rubocop:enable Metrics/ParameterLists - # Compatibility alias for callers that already use "cache key" language. - # - # @return [String] Stable logical cache identity - def cache_key - logical_key - end - # @return [Boolean] Whether this result used caller-provided fallback content def fallback? - source == :fallback || (prompt.respond_to?(:is_fallback) && prompt.is_fallback) + source == CacheSource::FALLBACK end # @return [Hash] Result metadata as a hash diff --git a/lib/langfuse/rails_cache_adapter.rb b/lib/langfuse/rails_cache_adapter.rb index d2511d8..2d359a4 100644 --- a/lib/langfuse/rails_cache_adapter.rb +++ b/lib/langfuse/rails_cache_adapter.rb @@ -85,10 +85,9 @@ def entry(key) # @param value [Object] Value to cache # @return [Object] The cached value def set(key, value, ttl: nil, stale_ttl: nil) - # Calculate expiration: use total_ttl if SWR enabled, otherwise just ttl - effective_ttl = ttl.nil? ? self.ttl : ttl - effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl - expires_in = swr_enabled? ? effective_ttl + effective_stale_ttl : effective_ttl + # Use total_ttl if SWR enabled, otherwise just ttl + window = compute_window(ttl: ttl, stale_ttl: stale_ttl) + expires_in = swr_enabled? ? window.total_ttl : window.ttl Rails.cache.write(namespaced_key(key), value, expires_in:) value end @@ -156,13 +155,13 @@ def size # @return [Hash] Prompt cache statistics def stats { - backend: "rails", + backend: CacheBackend::RAILS, enabled: true, current_generation_entries: nil, orphaned_entries: nil, total_entries: nil, global_generation: generation_value(global_generation_key), - unsupported_counts: %i[current_generation_entries orphaned_entries total_entries] + unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS } end diff --git a/lib/langfuse/stale_while_revalidate.rb b/lib/langfuse/stale_while_revalidate.rb index 11afa03..74582e8 100644 --- a/lib/langfuse/stale_while_revalidate.rb +++ b/lib/langfuse/stale_while_revalidate.rb @@ -43,6 +43,10 @@ module Langfuse # end # rubocop:disable Metrics/ModuleLength module StaleWhileRevalidate + # Resolved freshness window for one cache write. Struct (not Hash) so the + # hot write path doesn't allocate + rehash a Symbol-keyed Hash per call. + TtlWindow = Struct.new(:ttl, :stale_ttl, :total_ttl, :fresh_until, :stale_until) + # Initialize SWR infrastructure # # Must be called by including class after setting @stale_ttl, @ttl, and @logger. @@ -211,16 +215,28 @@ def fetch_and_cache(key, ttl: nil, stale_ttl: nil, &block) # @param value [Object] Value to cache # @return [Object] The cached value def set_cache_entry(key, value, ttl: nil, stale_ttl: nil) - now = Time.now + window = compute_window(ttl: ttl, stale_ttl: stale_ttl) + entry = PromptCache::CacheEntry.new(value, window.fresh_until, window.stale_until) + cache_set(key, entry, ttl: window.total_ttl) + value + end + + # Resolve effective TTLs and the resulting fresh/stale absolute timestamps. + # + # @param ttl [Integer, nil] Per-call TTL override + # @param stale_ttl [Integer, nil] Per-call stale TTL override + # @return [TtlWindow] + def compute_window(ttl: nil, stale_ttl: nil) effective_ttl = ttl.nil? ? self.ttl : ttl effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl - fresh_until = now + effective_ttl - stale_until = fresh_until + effective_stale_ttl - entry = PromptCache::CacheEntry.new(value, fresh_until, stale_until) - - cache_set(key, entry, ttl: effective_ttl + effective_stale_ttl) - - value + fresh_until = Time.now + effective_ttl + TtlWindow.new( + effective_ttl, + effective_stale_ttl, + effective_ttl + effective_stale_ttl, + fresh_until, + fresh_until + effective_stale_ttl + ) end # Build a lock key for fetch operations diff --git a/spec/langfuse/api_client_spec.rb b/spec/langfuse/api_client_spec.rb index d7fa78c..85eeaf8 100644 --- a/spec/langfuse/api_client_spec.rb +++ b/spec/langfuse/api_client_spec.rb @@ -1083,6 +1083,43 @@ def call(payload) expect(events).to eq([[:manual, { event: :manual }]]) end + it "skips ActiveSupport instrument when no subscriber exists for the notification name" do + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache, + cache_observer: ->(_event, _payload) {} # listener present so emit isn't fully short-circuited + ) + notifier = double(listening?: false) + notifications = Module.new + notifications.define_singleton_method(:notifier) { notifier } + notifications.define_singleton_method(:instrument) { |*_| raise "instrument should not be called" } + client.instance_variable_set(:@active_support_notifications, notifications) + + expect { client.emit_prompt_cache_event(:hit, { foo: :bar }) }.not_to raise_error + end + + it "instruments ActiveSupport when a subscriber listens for the notification name" do + received = [] + client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: cache + ) + notifier = double(listening?: true) + notifications = Module.new + notifications.define_singleton_method(:notifier) { notifier } + notifications.define_singleton_method(:instrument) { |name, payload| received << [name, payload] } + client.instance_variable_set(:@active_support_notifications, notifications) + + client.emit_prompt_cache_event(:hit, { foo: :bar }) + + expect(received).to eq([[Langfuse::PromptCacheEvents::PROMPT_CACHE_NOTIFICATION, + { foo: :bar, event: :hit }]]) + end + it "bypasses cache reads and writes when cache_ttl is zero" do stub_request(:get, prompt_url) .to_return( diff --git a/spec/langfuse/prompt_cache_spec.rb b/spec/langfuse/prompt_cache_spec.rb index 484a7e5..0e3e248 100644 --- a/spec/langfuse/prompt_cache_spec.rb +++ b/spec/langfuse/prompt_cache_spec.rb @@ -274,6 +274,47 @@ ) end + it "evicts least-recently-invalidated names once the generation map is full" do + stub_const("#{described_class}::MAX_NAME_GENERATIONS", 2) + + cache.invalidate_name("oldest") # counter -> 1 + cache.invalidate_name("middle") # counter -> 2 + cache.invalidate_name("newest") # counter -> 3, evicts "oldest" + + logical = described_class.build_key("oldest") + generation_in_key = cache.storage_key(logical, name: "oldest").split(":")[2].to_i + expect(generation_in_key).to eq(0) # missing from map after eviction -> default 0 + + cache.invalidate_name("middle") # counter -> 4, refreshes "middle" (LRU) + cache.invalidate_name("newer-still") # counter -> 5, evicts "newest" + + preserved = cache.storage_key(described_class.build_key("middle"), name: "middle").split(":")[2].to_i + expect(preserved).to eq(4) # middle's last invalidation, not collidable with any past generation + end + + it "never reuses a generation value across an evict/re-introduce cycle for the same name" do + stub_const("#{described_class}::MAX_NAME_GENERATIONS", 2) + + cache.invalidate_name("X") # counter -> 1 + orphan_key = cache.storage_key(described_class.build_key("X"), name: "X") + cache.set(orphan_key, { "stale" => true }) + + # Evict X by inserting two more names past the cap. + cache.invalidate_name("filler1") # counter -> 2 + cache.invalidate_name("filler2") # counter -> 3, evicts "X" + + # Re-introduce X. With a per-name counter this would reset to gen 1 and + # collide with the orphan; the global counter guarantees a fresh value. + cache.invalidate_name("X") # counter -> 4 + fresh_key = cache.storage_key(described_class.build_key("X"), name: "X") + + expect(fresh_key).not_to eq(orphan_key) + expect(cache.get(fresh_key)).to be_nil + # Orphan is unreachable through the current key; it lingers under its old + # storage key only until TTL/eviction reclaims it. + expect(fresh_key.split(":")[2].to_i).to eq(4) + end + it "deletes one generated storage key without touching sibling names" do greeting_key = cache.storage_key(described_class.build_key("greeting"), name: "greeting") sibling_key = cache.storage_key(described_class.build_key("greeting-extra"), name: "greeting-extra") From 30d0668dcc3768bc2b7b882979b67caa0a60206f Mon Sep 17 00:00:00 2001 From: kadekillary Date: Tue, 5 May 2026 04:08:22 -0600 Subject: [PATCH 3/3] perf(prompts): inline TTL math on hot cache write paths Addresses Copilot review on #90: compute_window allocated a TtlWindow on every cache set even when callers only needed two timestamps (or one integer). Inline the math in PromptCache#set, RailsCacheAdapter#set, and StaleWhileRevalidate#set_cache_entry; drop compute_window and TtlWindow. --- lib/langfuse/prompt_cache.rb | 8 ++++++-- lib/langfuse/rails_cache_adapter.rb | 8 +++++--- lib/langfuse/stale_while_revalidate.rb | 28 +++++--------------------- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/lib/langfuse/prompt_cache.rb b/lib/langfuse/prompt_cache.rb index eb84bdb..dc6edc8 100644 --- a/lib/langfuse/prompt_cache.rb +++ b/lib/langfuse/prompt_cache.rb @@ -127,8 +127,12 @@ def set(key, value, ttl: nil, stale_ttl: nil) @monitor.synchronize do # Evict oldest entry if at max size evict_oldest if @cache.size >= max_size - window = compute_window(ttl: ttl, stale_ttl: stale_ttl) - @cache[key] = CacheEntry.new(value, window.fresh_until, window.stale_until) + # TTL math is inlined (not extracted to a helper) to keep this hot path + # allocation-free apart from the CacheEntry below. + effective_ttl = ttl.nil? ? self.ttl : ttl + effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl + fresh_until = Time.now + effective_ttl + @cache[key] = CacheEntry.new(value, fresh_until, fresh_until + effective_stale_ttl) value end end diff --git a/lib/langfuse/rails_cache_adapter.rb b/lib/langfuse/rails_cache_adapter.rb index 2d359a4..305a7d6 100644 --- a/lib/langfuse/rails_cache_adapter.rb +++ b/lib/langfuse/rails_cache_adapter.rb @@ -85,9 +85,11 @@ def entry(key) # @param value [Object] Value to cache # @return [Object] The cached value def set(key, value, ttl: nil, stale_ttl: nil) - # Use total_ttl if SWR enabled, otherwise just ttl - window = compute_window(ttl: ttl, stale_ttl: stale_ttl) - expires_in = swr_enabled? ? window.total_ttl : window.ttl + # Total ttl when SWR is enabled, otherwise just ttl. Inlined (not pushed + # to a shared helper) to keep this hot write path allocation-free. + effective_ttl = ttl.nil? ? self.ttl : ttl + effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl + expires_in = swr_enabled? ? effective_ttl + effective_stale_ttl : effective_ttl Rails.cache.write(namespaced_key(key), value, expires_in:) value end diff --git a/lib/langfuse/stale_while_revalidate.rb b/lib/langfuse/stale_while_revalidate.rb index 74582e8..580f607 100644 --- a/lib/langfuse/stale_while_revalidate.rb +++ b/lib/langfuse/stale_while_revalidate.rb @@ -43,10 +43,6 @@ module Langfuse # end # rubocop:disable Metrics/ModuleLength module StaleWhileRevalidate - # Resolved freshness window for one cache write. Struct (not Hash) so the - # hot write path doesn't allocate + rehash a Symbol-keyed Hash per call. - TtlWindow = Struct.new(:ttl, :stale_ttl, :total_ttl, :fresh_until, :stale_until) - # Initialize SWR infrastructure # # Must be called by including class after setting @stale_ttl, @ttl, and @logger. @@ -215,28 +211,14 @@ def fetch_and_cache(key, ttl: nil, stale_ttl: nil, &block) # @param value [Object] Value to cache # @return [Object] The cached value def set_cache_entry(key, value, ttl: nil, stale_ttl: nil) - window = compute_window(ttl: ttl, stale_ttl: stale_ttl) - entry = PromptCache::CacheEntry.new(value, window.fresh_until, window.stale_until) - cache_set(key, entry, ttl: window.total_ttl) - value - end - - # Resolve effective TTLs and the resulting fresh/stale absolute timestamps. - # - # @param ttl [Integer, nil] Per-call TTL override - # @param stale_ttl [Integer, nil] Per-call stale TTL override - # @return [TtlWindow] - def compute_window(ttl: nil, stale_ttl: nil) + # TTL math is inlined (not extracted to a helper) to keep this hot write + # path allocation-free apart from the CacheEntry below. effective_ttl = ttl.nil? ? self.ttl : ttl effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl fresh_until = Time.now + effective_ttl - TtlWindow.new( - effective_ttl, - effective_stale_ttl, - effective_ttl + effective_stale_ttl, - fresh_until, - fresh_until + effective_stale_ttl - ) + entry = PromptCache::CacheEntry.new(value, fresh_until, fresh_until + effective_stale_ttl) + cache_set(key, entry, ttl: effective_ttl + effective_stale_ttl) + value end # Build a lock key for fetch operations