Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,53 @@ end

Call this once at application startup (Rails initializer, boot script, etc.).

## Multiple Clients

For most applications, the singleton client is the simplest option:

```ruby
Langfuse.configure do |config|
config.public_key = ENV["LANGFUSE_PUBLIC_KEY"]
config.secret_key = ENV["LANGFUSE_SECRET_KEY"]
end

client = Langfuse.client
```

If the singleton is not enough, create independent `Langfuse::Client` instances with their own `Langfuse::Config` objects. This is useful when one process has multiple application components that should report to different Langfuse projects, such as a "Meeting Transcription" project and a "Note Summarization" project.

```ruby
meeting_transcription_config = Langfuse::Config.new do |config|
config.public_key = ENV["LANGFUSE_MEETING_TRANSCRIPTION_PUBLIC_KEY"]
config.secret_key = ENV["LANGFUSE_MEETING_TRANSCRIPTION_SECRET_KEY"]
config.base_url = ENV["LANGFUSE_MEETING_TRANSCRIPTION_BASE_URL"]
end

note_summarization_config = Langfuse::Config.new do |config|
config.public_key = ENV["LANGFUSE_NOTE_SUMMARIZATION_PUBLIC_KEY"]
config.secret_key = ENV["LANGFUSE_NOTE_SUMMARIZATION_SECRET_KEY"]
config.base_url = ENV["LANGFUSE_NOTE_SUMMARIZATION_BASE_URL"]
end

meeting_transcription_client = Langfuse::Client.new(meeting_transcription_config)
note_summarization_client = Langfuse::Client.new(note_summarization_config)
```

Use the client instance for client-specific work:

```ruby
meeting_transcription_client.observe("meeting-transcription") do |obs|
prompt = meeting_transcription_client.get_prompt("transcribe-meeting")
obs.update(input: prompt.compile(meeting_id: meeting.id))
end

note_summarization_client.create_score(
name: "manual-review",
value: 1,
trace_id: "0123456789abcdef0123456789abcdef"
)
```

## Tracing Ownership

This is the part people get wrong.
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This is the consumer hub. Start here unless you are already looking for a specif

### Production Hardening

- **[Configuration](CONFIGURATION.md)** — Config surface, tracing ownership, export filtering, environment defaults
- **[Configuration](CONFIGURATION.md)** — Config surface, multiple clients, tracing ownership, export filtering, environment defaults
- **[Caching](CACHING.md)** — Prompt cache backends, stale-while-revalidate, cache warming
- **[Error Handling](ERROR_HANDLING.md)** — Failure modes, retry boundaries, debugging
- **[Migration Guide](MIGRATION.md)** — Move hardcoded prompts into Langfuse-managed prompts without breaking runtime behavior
Expand Down
184 changes: 10 additions & 174 deletions lib/langfuse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class UnauthorizedError < ApiError; end
require_relative "langfuse/span_processor"
require_relative "langfuse/observations"
require_relative "langfuse/trace_id"
require_relative "langfuse/observation_methods"
require_relative "langfuse/score_client"
require_relative "langfuse/prompt_renderer"
require_relative "langfuse/text_prompt_client"
Expand All @@ -74,8 +75,9 @@ class UnauthorizedError < ApiError; end

# rubocop:disable Metrics/ModuleLength
module Langfuse
# rubocop:disable Metrics/ClassLength
class << self
include ObservationMethods

# @param configuration [Config] the global configuration object
attr_writer :configuration

Expand Down Expand Up @@ -142,7 +144,7 @@ def tracer_provider
# at_exit { Langfuse.shutdown }
#
def shutdown(timeout: 30)
client.shutdown if @client
@client&.shutdown(timeout: timeout)
OtelSetup.shutdown(timeout: timeout)
end

Expand Down Expand Up @@ -358,149 +360,18 @@ def reset!
@tracing_disabled_warning_emitted = false
end

# Creates a new observation (root or child)
#
# This is the module-level factory method that creates observations of any type.
# It can create root observations (when parent_span_context is nil) or child
# observations (when parent_span_context is provided).
#
# @param name [String] Descriptive name for the observation
# @param attrs [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Observation attributes
# @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.)
# @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to.
# Mutually exclusive with `parent_span_context`. Use {Langfuse.create_trace_id} to generate one.
# @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context for child observations
# @param start_time [Time, Integer, nil] Optional start time (Time object or Unix timestamp in nanoseconds)
# @param skip_validation [Boolean] Skip validation (for internal use). Defaults to false.
# @return [BaseObservation] The observation wrapper (Span, Generation, or Event)
# @raise [ArgumentError] if an invalid observation type is provided, an invalid `trace_id` is given,
# or both `trace_id` and `parent_span_context` are provided
#
# @example Create root span
# span = Langfuse.start_observation("root-operation", { input: {...} })
#
# @example Create child generation
# child = Langfuse.start_observation("llm-call", { model: "gpt-4" },
# as_type: :generation,
# parent_span_context: parent.otel_span.context)
#
# @example Attach to a deterministic trace ID
# trace_id = Langfuse.create_trace_id(seed: "order-123")
# root = Langfuse.start_observation("process-order", trace_id: trace_id)
# rubocop:disable Metrics/ParameterLists
def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_span_context: nil,
start_time: nil, skip_validation: false)
parent_span_context = resolve_trace_context(trace_id, parent_span_context)
type_str = as_type.to_s
validate_observation_type!(as_type, type_str) unless skip_validation

otel_tracer = otel_tracer()
otel_span = create_otel_span(
name: name,
start_time: start_time,
parent_span_context: parent_span_context,
otel_tracer: otel_tracer
)
apply_observation_attributes(otel_span, type_str, attrs)

observation = wrap_otel_span(otel_span, type_str, otel_tracer)
# Events auto-end immediately when created
observation.end if type_str == OBSERVATION_TYPES[:event]
observation
end
# rubocop:enable Metrics/ParameterLists

# User-facing convenience method for creating root observations
#
# @param name [String] Descriptive name for the observation
# @param attrs [Hash] Observation attributes (optional positional or keyword)
# @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.)
# @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to.
# Use {Langfuse.create_trace_id} to generate one. Forwarded to {.start_observation}.
# @param kwargs [Hash] Additional keyword arguments merged into observation attributes (e.g., input:, output:, metadata:)
# @yield [observation] Optional block that receives the observation object
# @yieldparam observation [BaseObservation] The observation object
# @return [BaseObservation, Object] The observation (or block return value if block given)
# @raise [ArgumentError] if an invalid `trace_id` is provided
#
# @example Block-based API (auto-ends)
# Langfuse.observe("operation") do |obs|
# result = perform_operation
# obs.update(output: result)
# end
#
# @example Stateful API (manual end)
# obs = Langfuse.observe("operation", input: { data: "test" })
# obs.update(output: { result: "success" })
# obs.end
def observe(name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block)
merged_attrs = attrs.to_h.merge(kwargs)
observation = start_observation(name, merged_attrs, as_type: as_type, trace_id: trace_id)
return observation unless block

observation.send(:run_in_context, &block)
end

# Registry mapping observation type strings to their wrapper classes
OBSERVATION_TYPE_REGISTRY = {
OBSERVATION_TYPES[:generation] => Generation,
OBSERVATION_TYPES[:embedding] => Embedding,
OBSERVATION_TYPES[:event] => Event,
OBSERVATION_TYPES[:agent] => Agent,
OBSERVATION_TYPES[:tool] => Tool,
OBSERVATION_TYPES[:chain] => Chain,
OBSERVATION_TYPES[:retriever] => Retriever,
OBSERVATION_TYPES[:evaluator] => Evaluator,
OBSERVATION_TYPES[:guardrail] => Guardrail,
OBSERVATION_TYPES[:span] => Span
}.freeze

private

# @api private
def resolve_trace_context(trace_id, parent_span_context)
return parent_span_context unless trace_id
raise ArgumentError, "Cannot specify both trace_id and parent_span_context" if parent_span_context

TraceId.send(:to_span_context, trace_id)
end

# @api private
def validate_observation_type!(as_type, type_str)
return if valid_observation_type?(as_type)

valid_types = OBSERVATION_TYPES.values.sort.join(", ")
raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}"
def observation_tracer
otel_tracer
end

# @api private
def apply_observation_attributes(otel_span, type_str, attrs)
# Guard against ended spans — should always be recording here, but safe.
return unless otel_span.recording?

otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h, mask: configuration.mask)
otel_attrs.each { |key, value| otel_span.set_attribute(key, value) }
def observation_mask
configuration.mask
end

# Validates that an observation type is valid
#
# Checks if the provided type (symbol or string) matches a valid observation type
# in the OBSERVATION_TYPES constant.
#
# @param type [Symbol, String, Object] The observation type to validate
# @return [Boolean] true if valid, false otherwise
#
# @example
# valid_observation_type?(:span) # => true
# valid_observation_type?("span") # => true
# valid_observation_type?(:invalid) # => false
# valid_observation_type?(nil) # => false
def valid_observation_type?(type)
return false unless type.respond_to?(:to_sym)

OBSERVATION_TYPES.key?(type.to_sym)
rescue TypeError
false
def observation_client
nil
end

# Gets the OpenTelemetry tracer for Langfuse
Expand All @@ -513,40 +384,6 @@ def otel_tracer
noop_tracer
end

# Creates an OpenTelemetry span (root or child)
#
# @param name [String] Span name
# @param start_time [Time, Integer, nil] Optional start time
# @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context
# @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer
# @return [OpenTelemetry::SDK::Trace::Span] The created span
def create_otel_span(name:, otel_tracer:, start_time: nil, parent_span_context: nil)
if parent_span_context
# Create child span with parent context
# Create a non-recording span from the parent context to set in context
parent_span = OpenTelemetry::Trace.non_recording_span(parent_span_context)
parent_context = OpenTelemetry::Trace.context_with_span(parent_span)
OpenTelemetry::Context.with_current(parent_context) do
otel_tracer.start_span(name, start_timestamp: start_time)
end
else
# Create root span
otel_tracer.start_span(name, start_timestamp: start_time)
end
end

# Wraps an OpenTelemetry span in the appropriate observation class
#
# @param otel_span [OpenTelemetry::SDK::Trace::Span] The OTel span
# @param type_str [String] Observation type string
# @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer
# @param attributes [Hash, nil] Optional attributes
# @return [BaseObservation] Appropriate observation wrapper instance
def wrap_otel_span(otel_span, type_str, otel_tracer, attributes: nil)
observation_class = OBSERVATION_TYPE_REGISTRY[type_str] || Span
observation_class.new(otel_span, otel_tracer, attributes: attributes)
end

# rubocop:disable Naming/PredicateMethod
def setup_tracing_if_ready
return true if OtelSetup.initialized?
Expand Down Expand Up @@ -588,6 +425,5 @@ def noop_tracer
@noop_tracer ||= OpenTelemetry::Trace::TracerProvider.new.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION)
end
end
# rubocop:enable Metrics/ClassLength
end
# rubocop:enable Metrics/ModuleLength
36 changes: 35 additions & 1 deletion lib/langfuse/client.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "forwardable"
require_relative "tracer_provider_factory"

module Langfuse
# Main client for Langfuse SDK
Expand All @@ -22,6 +23,7 @@ module Langfuse
# rubocop:disable Metrics/ClassLength
class Client
extend Forwardable
include ObservationMethods

# @return [Integer] Default page size when fetching all dataset items
DATASET_ITEMS_PAGE_SIZE = 50
Expand Down Expand Up @@ -452,14 +454,34 @@ def flush_scores
@score_client.flush
end

# Return this client's tracer provider without mutating global OpenTelemetry state
#
# @return [OpenTelemetry::SDK::Trace::TracerProvider]
def tracer_provider
@tracer_provider ||= TracerProviderFactory.build(config)
end

# Shutdown the client and flush any pending scores
#
# Also shuts down the cache if it supports shutdown (e.g., SWR thread pool).
#
# @param timeout [Integer] Timeout in seconds
# @return [void]
def shutdown
def shutdown(timeout: 30)
provider = @tracer_provider
@tracer_provider = nil

@score_client.shutdown
@api_client.shutdown
provider&.shutdown(timeout: timeout)
end

# Force flush all pending traces for this client
#
# @param timeout [Integer] Timeout in seconds
# @return [void]
def force_flush(timeout: 30)
@tracer_provider&.force_flush(timeout: timeout)
end

# Create a new dataset
Expand Down Expand Up @@ -662,6 +684,18 @@ def run_experiment(name:, task:, data: nil, dataset_name: nil, description: nil,

attr_reader :score_client

def observation_tracer
tracer_provider.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION)
end

def observation_mask
config.mask
end

def observation_client
self
end

# Build a project-scoped URL, returning nil if project ID is unavailable
def project_url(path)
pid = project_id
Expand Down
3 changes: 2 additions & 1 deletion lib/langfuse/dataset_item_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block)
raise ArgumentError, "block is required" unless block

output, trace_id, observation_id, task_error = execute_in_trace(run_name, run_metadata, &block)
Langfuse.force_flush(timeout: FLUSH_TIMEOUT)
@client.force_flush(timeout: FLUSH_TIMEOUT)

link(trace_id: trace_id, observation_id: observation_id, run_name: run_name,
run_description: run_description, metadata: run_metadata)
Expand All @@ -130,6 +130,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block)

def execute_in_trace(run_name, run_metadata, &block)
TracedExecution.call(
client: @client,
trace_name: "dataset-run-#{run_name}",
input: @input,
metadata: run_metadata || {},
Expand Down
Loading