Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/langfuse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class UnauthorizedError < ApiError; end
end

require_relative "langfuse/config"
require_relative "langfuse/cache_constants"
require_relative "langfuse/prompt_cache"
require_relative "langfuse/prompt_fetch_result"
require_relative "langfuse/rails_cache_adapter"
require_relative "langfuse/cache_warmer"
require_relative "langfuse/prompt_cache_events"
require_relative "langfuse/api_client"
require_relative "langfuse/span_filter"
require_relative "langfuse/sampling"
Expand Down
256 changes: 112 additions & 144 deletions lib/langfuse/api_client.rb

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions lib/langfuse/cache_constants.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Langfuse
# Symbol constants for prompt cache event payloads.
# Producers (ApiClient, PromptFetchResult) and consumers (observers,
# ActiveSupport::Notifications subscribers) share these definitions so a
# rename in one place can't silently desync from the other.
module CacheStatus
HIT = :hit
MISS = :miss
STALE = :stale
REFRESH = :refresh
BYPASS = :bypass
DISABLED = :disabled
end

module CacheSource
CACHE = :cache
API = :api
FALLBACK = :fallback
end

module CacheBackend
MEMORY = "memory"
RAILS = "rails"
DISABLED = "disabled"

# Stat keys backend implementations may not be able to compute. Surfaced in
# `#stats[:unsupported_counts]` so callers can distinguish "0" from "unknown".
UNSUPPORTED_COUNT_KEYS = %i[current_generation_entries orphaned_entries total_entries].freeze
end
end
40 changes: 13 additions & 27 deletions lib/langfuse/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def get_prompt_result(name, version: nil, label: nil, fallback: nil, type: nil,

# Log warning and return fallback
config.logger.warn("Langfuse API error for prompt '#{name}': #{e.message}. Using fallback.")
build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl: cache_ttl, error: e)
key = api_client.prompt_cache_key(name, version: version, label: label)
build_fallback_prompt_result(key, fallback: fallback, type: type, cache_ttl: cache_ttl, error: e)
end

# Refresh a prompt from the API, optionally writing through to cache.
Expand Down Expand Up @@ -858,42 +859,27 @@ def build_client_fetch_result(api_result, prompt_client)
)
end

# rubocop:disable Metrics/ParameterLists
def build_fallback_prompt_result(name, version, label, fallback, type, cache_ttl:, error:)
prompt_client = build_fallback_prompt_client(name, fallback, type)
key = api_client.prompt_cache_key(name, version: version, label: label)
api_client.emit_prompt_cache_event(
:fallback,
fallback_event_payload(key, cache_ttl, error)
)
def build_fallback_prompt_result(key, fallback:, type:, cache_ttl:, error:)
prompt_client = build_fallback_prompt_client(key.name, fallback, type)
cache_status = fallback_cache_status(cache_ttl)
api_client.emit_prompt_fallback_event(key, cache_status: cache_status, error: error)
PromptFetchResult.new(
prompt: prompt_client,
logical_key: key.logical_key,
storage_key: key.storage_key,
cache_status: fallback_cache_status(cache_ttl),
source: :fallback,
name: name,
version: version || prompt_client.version,
cache_status: cache_status,
source: CacheSource::FALLBACK,
name: key.name,
version: key.version || prompt_client.version,
label: key.resolved_label
)
end
# rubocop:enable Metrics/ParameterLists

def fallback_event_payload(key, cache_ttl, error)
api_client.prompt_event_payload(
key,
fallback_cache_status(cache_ttl),
:fallback,
error_class: error.class.name,
error_message: error.message
)
end

def fallback_cache_status(cache_ttl)
return :bypass if cache_ttl&.zero?
return :disabled unless api_client.cache
return CacheStatus::BYPASS if cache_ttl&.zero?
return CacheStatus::DISABLED unless api_client.cache

:miss
CacheStatus::MISS
end

# Check if caching is enabled in configuration
Expand Down
33 changes: 23 additions & 10 deletions lib/langfuse/prompt_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ module Langfuse
class PromptCache
include StaleWhileRevalidate

# Caps the per-name generation map. Without a cap, long-lived processes
# that invalidate across many distinct prompts grow it unboundedly; LRU
# eviction keeps the working set live and lets cold names go.
MAX_NAME_GENERATIONS = 1024

# Cache entry with data and expiration time
#
# Supports stale-while-revalidate pattern:
Expand Down Expand Up @@ -82,7 +87,8 @@ def initialize(ttl: 60, max_size: 1000, stale_ttl: 0, refresh_threads: 5, logger
@logger = logger
@cache = {}
@global_generation = 0
@name_generations = Hash.new(0)
@name_generations = {}
@name_generation_counter = 0
@monitor = Monitor.new
@locks = {} # Track locks for in-memory locking
initialize_swr(refresh_threads: refresh_threads) if swr_enabled?
Expand Down Expand Up @@ -121,13 +127,12 @@ def set(key, value, ttl: nil, stale_ttl: nil)
@monitor.synchronize do
# Evict oldest entry if at max size
evict_oldest if @cache.size >= max_size

now = Time.now
# TTL math is inlined (not extracted to a helper) to keep this hot path
# allocation-free apart from the CacheEntry below.
effective_ttl = ttl.nil? ? self.ttl : ttl
effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl
fresh_until = now + effective_ttl
stale_until = fresh_until + effective_stale_ttl
@cache[key] = CacheEntry.new(value, fresh_until, stale_until)
fresh_until = Time.now + effective_ttl
@cache[key] = CacheEntry.new(value, fresh_until, fresh_until + effective_stale_ttl)
value
end
end
Expand Down Expand Up @@ -162,11 +167,19 @@ def clear_logically

# Logically invalidate every cache variant for one prompt name.
#
# Generations come from a monotonic global counter, not a per-name counter,
# so an evicted name re-entering the map can't reuse a generation value
# that's still embedded in a stale @cache entry.
#
# @param name [String] Prompt name
# @return [Integer] New name generation
def invalidate_name(name)
@monitor.synchronize do
@name_generations[name.to_s] += 1
name_str = name.to_s
@name_generations.delete(name_str)
@name_generations.shift if @name_generations.size >= MAX_NAME_GENERATIONS
@name_generation_counter += 1
@name_generations[name_str] = @name_generation_counter
end
end

Expand All @@ -181,7 +194,7 @@ def storage_key(logical_key, name:)
logical_key,
name: name,
global_generation: @global_generation,
name_generation: @name_generations[name.to_s]
name_generation: @name_generations.fetch(name.to_s, 0)
)
end
end
Expand All @@ -191,7 +204,7 @@ def stats
@monitor.synchronize do
counts = count_entries_by_generation
{
backend: "memory",
backend: CacheBackend::MEMORY,
enabled: true,
current_generation_entries: counts.fetch(:current),
orphaned_entries: counts.fetch(:orphaned),
Expand Down Expand Up @@ -346,7 +359,7 @@ def current_generation_key?(key)
global = Integer(parts[0][1..])
name = Base64.urlsafe_decode64(parts[1][1..])
name_generation = Integer(parts[2])
global == @global_generation && name_generation == @name_generations[name]
global == @global_generation && name_generation == @name_generations.fetch(name, 0)
rescue ArgumentError
false
end
Expand Down
110 changes: 110 additions & 0 deletions lib/langfuse/prompt_cache_events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

module Langfuse
# Prompt cache event emission for ApiClient.
#
# Includers must expose:
# - `cache_backend_name` — used in {#event_payload} to tag the cache backend
# - `logger` — used to warn on observer/notifier failures
module PromptCacheEvents
# ActiveSupport::Notifications event name used for prompt cache events.
PROMPT_CACHE_NOTIFICATION = "prompt_cache.langfuse"

# Configure prompt cache event dispatch. Wraps the observer once into a
# 1-arg callable so the per-event hot path never re-checks arity.
#
# @param cache_observer [#call, nil] Optional observer
# @return [void]
def setup_prompt_cache_events(cache_observer:)
@cache_observer_callable = wrap_cache_observer(cache_observer)
@active_support_notifications = defined?(ActiveSupport::Notifications) ? ActiveSupport::Notifications : nil
end

# Emit a prompt cache event to configured hooks. Accepts an eager payload
# hash or a block that builds one. The block is only evaluated when at
# least one listener is active, avoiding hash allocations on the hot path.
#
# @param event [Symbol] Event name
# @param payload [Hash, nil] Event payload (omit when passing a block)
# @yieldreturn [Hash] Lazily constructed payload
# @return [void]
def emit_prompt_cache_event(event, payload = nil)
observer_callable = @cache_observer_callable
as_listening = active_support_listening?
return if observer_callable.nil? && !as_listening

payload ||= block_given? ? yield : {}
normalized_payload = payload.merge(event: event.to_sym)
notify_cache_observer(normalized_payload) if observer_callable
notify_active_support(normalized_payload) if as_listening
end

# Emit a fallback event for a prompt fetch that fell back to caller-provided content.
#
# @param key [PromptCacheKey] Logical and storage cache key
# @param cache_status [Symbol] Cache status to report
# @param error [StandardError] The error that triggered the fallback
# @return [void]
def emit_prompt_fallback_event(key, cache_status:, error:)
emit_prompt_cache_event(:fallback) do
event_payload(key, cache_status, CacheSource::FALLBACK,
error_class: error.class.name, error_message: error.message)
end
end

private

# @api private
def event_payload(key, cache_status, source, extra = {})
{
name: key.name,
version: key.version,
label: key.resolved_label,
logical_key: key.logical_key,
storage_key: key.storage_key,
backend: cache_backend_name,
cache_status: cache_status,
source: source
}.merge(extra)
end

# @api private
def notify_cache_observer(payload)
@cache_observer_callable.call(payload)
rescue StandardError => e
logger.warn("Langfuse prompt cache observer failed: #{e.class} - #{e.message}")
end

# @api private
def active_support_listening?
return false unless @active_support_notifications

notifier = @active_support_notifications.notifier
# Defensive: notifier stand-ins (test fakes, AS::Notifications forks,
# very old AS versions) may not implement listening?. Assume they're
# listening so we still attempt to instrument; notify_active_support
# rescues failures.
return true unless notifier.respond_to?(:listening?)

notifier.listening?(PROMPT_CACHE_NOTIFICATION)
end

# @api private
def notify_active_support(payload)
@active_support_notifications.instrument(PROMPT_CACHE_NOTIFICATION, payload)
rescue StandardError => e
logger.warn("Langfuse ActiveSupport cache notification failed: #{e.class} - #{e.message}")
end

# @api private
def wrap_cache_observer(observer)
return nil if observer.nil?

if observer.method(:call).arity == 1
->(payload) { observer.call(payload) }
else
->(payload) { observer.call(payload[:event], payload) }
end
end
end
end
9 changes: 1 addition & 8 deletions lib/langfuse/prompt_fetch_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,9 @@ def initialize(prompt:, logical_key:, storage_key:, cache_status:, source:, name
end
# rubocop:enable Metrics/ParameterLists

# Compatibility alias for callers that already use "cache key" language.
#
# @return [String] Stable logical cache identity
def cache_key
logical_key
end

# @return [Boolean] Whether this result used caller-provided fallback content
def fallback?
source == :fallback || (prompt.respond_to?(:is_fallback) && prompt.is_fallback)
source == CacheSource::FALLBACK
end

# @return [Hash] Result metadata as a hash
Expand Down
10 changes: 6 additions & 4 deletions lib/langfuse/rails_cache_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def initialize(ttl: 60, namespace: "langfuse", lock_timeout: 10, stale_ttl: 0, r

@ttl = ttl
@namespace = namespace
@namespace_prefix = "#{namespace}:"
@lock_timeout = lock_timeout
@stale_ttl = stale_ttl
@logger = logger
Expand Down Expand Up @@ -84,7 +85,8 @@ def entry(key)
# @param value [Object] Value to cache
# @return [Object] The cached value
def set(key, value, ttl: nil, stale_ttl: nil)
# Calculate expiration: use total_ttl if SWR enabled, otherwise just ttl
# Total ttl when SWR is enabled, otherwise just ttl. Inlined (not pushed
# to a shared helper) to keep this hot write path allocation-free.
effective_ttl = ttl.nil? ? self.ttl : ttl
effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl
expires_in = swr_enabled? ? effective_ttl + effective_stale_ttl : effective_ttl
Expand Down Expand Up @@ -155,13 +157,13 @@ def size
# @return [Hash] Prompt cache statistics
def stats
{
backend: "rails",
backend: CacheBackend::RAILS,
enabled: true,
current_generation_entries: nil,
orphaned_entries: nil,
total_entries: nil,
global_generation: generation_value(global_generation_key),
unsupported_counts: %i[current_generation_entries orphaned_entries total_entries]
unsupported_counts: CacheBackend::UNSUPPORTED_COUNT_KEYS
}
end

Expand Down Expand Up @@ -327,7 +329,7 @@ def wait_for_cache(key)
# @param key [String] Original cache key
# @return [String] Namespaced cache key
def namespaced_key(key)
key.start_with?("#{namespace}:") ? key : "#{namespace}:#{key}"
key.start_with?(@namespace_prefix) ? key : "#{@namespace_prefix}#{key}"
end

def global_generation_key
Expand Down
10 changes: 4 additions & 6 deletions lib/langfuse/stale_while_revalidate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,13 @@ def fetch_and_cache(key, ttl: nil, stale_ttl: nil, &block)
# @param value [Object] Value to cache
# @return [Object] The cached value
def set_cache_entry(key, value, ttl: nil, stale_ttl: nil)
now = Time.now
# TTL math is inlined (not extracted to a helper) to keep this hot write
# path allocation-free apart from the CacheEntry below.
effective_ttl = ttl.nil? ? self.ttl : ttl
effective_stale_ttl = stale_ttl.nil? ? self.stale_ttl : stale_ttl
fresh_until = now + effective_ttl
stale_until = fresh_until + effective_stale_ttl
entry = PromptCache::CacheEntry.new(value, fresh_until, stale_until)

fresh_until = Time.now + effective_ttl
entry = PromptCache::CacheEntry.new(value, fresh_until, fresh_until + effective_stale_ttl)
cache_set(key, entry, ttl: effective_ttl + effective_stale_ttl)

value
end

Expand Down
Loading
Loading