From 385cd9fa8d247d1480a37fbe5f88086cf735723d Mon Sep 17 00:00:00 2001 From: Igor Artemenko Date: Tue, 5 May 2026 19:58:22 +0000 Subject: [PATCH 1/3] Support multiple clients being instantiated --- lib/langfuse.rb | 184 ++--------------------- lib/langfuse/client.rb | 36 ++++- lib/langfuse/observation_methods.rb | 190 ++++++++++++++++++++++++ lib/langfuse/observations.rb | 23 ++- lib/langfuse/otel_setup.rb | 50 +------ lib/langfuse/tracer_provider_factory.rb | 57 +++++++ spec/langfuse/base_observation_spec.rb | 12 +- spec/langfuse/client_spec.rb | 73 +++++++++ spec/langfuse/otel_setup_spec.rb | 4 +- 9 files changed, 395 insertions(+), 234 deletions(-) create mode 100644 lib/langfuse/observation_methods.rb create mode 100644 lib/langfuse/tracer_provider_factory.rb diff --git a/lib/langfuse.rb b/lib/langfuse.rb index 5e5e28f..9a34c8a 100644 --- a/lib/langfuse.rb +++ b/lib/langfuse.rb @@ -57,6 +57,7 @@ class UnauthorizedError < ApiError; end require_relative "langfuse/span_processor" require_relative "langfuse/observations" require_relative "langfuse/trace_id" +require_relative "langfuse/observation_methods" require_relative "langfuse/score_client" require_relative "langfuse/prompt_renderer" require_relative "langfuse/text_prompt_client" @@ -74,8 +75,9 @@ class UnauthorizedError < ApiError; end # rubocop:disable Metrics/ModuleLength module Langfuse - # rubocop:disable Metrics/ClassLength class << self + include ObservationMethods + # @param configuration [Config] the global configuration object attr_writer :configuration @@ -142,7 +144,7 @@ def tracer_provider # at_exit { Langfuse.shutdown } # def shutdown(timeout: 30) - client.shutdown if @client + @client&.shutdown(timeout: timeout) OtelSetup.shutdown(timeout: timeout) end @@ -358,149 +360,18 @@ def reset! @tracing_disabled_warning_emitted = false end - # Creates a new observation (root or child) - # - # This is the module-level factory method that creates observations of any type. - # It can create root observations (when parent_span_context is nil) or child - # observations (when parent_span_context is provided). - # - # @param name [String] Descriptive name for the observation - # @param attrs [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Observation attributes - # @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.) - # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to. - # Mutually exclusive with `parent_span_context`. Use {Langfuse.create_trace_id} to generate one. - # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context for child observations - # @param start_time [Time, Integer, nil] Optional start time (Time object or Unix timestamp in nanoseconds) - # @param skip_validation [Boolean] Skip validation (for internal use). Defaults to false. - # @return [BaseObservation] The observation wrapper (Span, Generation, or Event) - # @raise [ArgumentError] if an invalid observation type is provided, an invalid `trace_id` is given, - # or both `trace_id` and `parent_span_context` are provided - # - # @example Create root span - # span = Langfuse.start_observation("root-operation", { input: {...} }) - # - # @example Create child generation - # child = Langfuse.start_observation("llm-call", { model: "gpt-4" }, - # as_type: :generation, - # parent_span_context: parent.otel_span.context) - # - # @example Attach to a deterministic trace ID - # trace_id = Langfuse.create_trace_id(seed: "order-123") - # root = Langfuse.start_observation("process-order", trace_id: trace_id) - # rubocop:disable Metrics/ParameterLists - def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_span_context: nil, - start_time: nil, skip_validation: false) - parent_span_context = resolve_trace_context(trace_id, parent_span_context) - type_str = as_type.to_s - validate_observation_type!(as_type, type_str) unless skip_validation - - otel_tracer = otel_tracer() - otel_span = create_otel_span( - name: name, - start_time: start_time, - parent_span_context: parent_span_context, - otel_tracer: otel_tracer - ) - apply_observation_attributes(otel_span, type_str, attrs) - - observation = wrap_otel_span(otel_span, type_str, otel_tracer) - # Events auto-end immediately when created - observation.end if type_str == OBSERVATION_TYPES[:event] - observation - end - # rubocop:enable Metrics/ParameterLists - - # User-facing convenience method for creating root observations - # - # @param name [String] Descriptive name for the observation - # @param attrs [Hash] Observation attributes (optional positional or keyword) - # @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.) - # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to. - # Use {Langfuse.create_trace_id} to generate one. Forwarded to {.start_observation}. - # @param kwargs [Hash] Additional keyword arguments merged into observation attributes (e.g., input:, output:, metadata:) - # @yield [observation] Optional block that receives the observation object - # @yieldparam observation [BaseObservation] The observation object - # @return [BaseObservation, Object] The observation (or block return value if block given) - # @raise [ArgumentError] if an invalid `trace_id` is provided - # - # @example Block-based API (auto-ends) - # Langfuse.observe("operation") do |obs| - # result = perform_operation - # obs.update(output: result) - # end - # - # @example Stateful API (manual end) - # obs = Langfuse.observe("operation", input: { data: "test" }) - # obs.update(output: { result: "success" }) - # obs.end - def observe(name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block) - merged_attrs = attrs.to_h.merge(kwargs) - observation = start_observation(name, merged_attrs, as_type: as_type, trace_id: trace_id) - return observation unless block - - observation.send(:run_in_context, &block) - end - - # Registry mapping observation type strings to their wrapper classes - OBSERVATION_TYPE_REGISTRY = { - OBSERVATION_TYPES[:generation] => Generation, - OBSERVATION_TYPES[:embedding] => Embedding, - OBSERVATION_TYPES[:event] => Event, - OBSERVATION_TYPES[:agent] => Agent, - OBSERVATION_TYPES[:tool] => Tool, - OBSERVATION_TYPES[:chain] => Chain, - OBSERVATION_TYPES[:retriever] => Retriever, - OBSERVATION_TYPES[:evaluator] => Evaluator, - OBSERVATION_TYPES[:guardrail] => Guardrail, - OBSERVATION_TYPES[:span] => Span - }.freeze - private - # @api private - def resolve_trace_context(trace_id, parent_span_context) - return parent_span_context unless trace_id - raise ArgumentError, "Cannot specify both trace_id and parent_span_context" if parent_span_context - - TraceId.send(:to_span_context, trace_id) - end - - # @api private - def validate_observation_type!(as_type, type_str) - return if valid_observation_type?(as_type) - - valid_types = OBSERVATION_TYPES.values.sort.join(", ") - raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}" + def observation_tracer + otel_tracer end - # @api private - def apply_observation_attributes(otel_span, type_str, attrs) - # Guard against ended spans — should always be recording here, but safe. - return unless otel_span.recording? - - otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h, mask: configuration.mask) - otel_attrs.each { |key, value| otel_span.set_attribute(key, value) } + def observation_mask + configuration.mask end - # Validates that an observation type is valid - # - # Checks if the provided type (symbol or string) matches a valid observation type - # in the OBSERVATION_TYPES constant. - # - # @param type [Symbol, String, Object] The observation type to validate - # @return [Boolean] true if valid, false otherwise - # - # @example - # valid_observation_type?(:span) # => true - # valid_observation_type?("span") # => true - # valid_observation_type?(:invalid) # => false - # valid_observation_type?(nil) # => false - def valid_observation_type?(type) - return false unless type.respond_to?(:to_sym) - - OBSERVATION_TYPES.key?(type.to_sym) - rescue TypeError - false + def observation_client + nil end # Gets the OpenTelemetry tracer for Langfuse @@ -513,40 +384,6 @@ def otel_tracer noop_tracer end - # Creates an OpenTelemetry span (root or child) - # - # @param name [String] Span name - # @param start_time [Time, Integer, nil] Optional start time - # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context - # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer - # @return [OpenTelemetry::SDK::Trace::Span] The created span - def create_otel_span(name:, otel_tracer:, start_time: nil, parent_span_context: nil) - if parent_span_context - # Create child span with parent context - # Create a non-recording span from the parent context to set in context - parent_span = OpenTelemetry::Trace.non_recording_span(parent_span_context) - parent_context = OpenTelemetry::Trace.context_with_span(parent_span) - OpenTelemetry::Context.with_current(parent_context) do - otel_tracer.start_span(name, start_timestamp: start_time) - end - else - # Create root span - otel_tracer.start_span(name, start_timestamp: start_time) - end - end - - # Wraps an OpenTelemetry span in the appropriate observation class - # - # @param otel_span [OpenTelemetry::SDK::Trace::Span] The OTel span - # @param type_str [String] Observation type string - # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer - # @param attributes [Hash, nil] Optional attributes - # @return [BaseObservation] Appropriate observation wrapper instance - def wrap_otel_span(otel_span, type_str, otel_tracer, attributes: nil) - observation_class = OBSERVATION_TYPE_REGISTRY[type_str] || Span - observation_class.new(otel_span, otel_tracer, attributes: attributes) - end - # rubocop:disable Naming/PredicateMethod def setup_tracing_if_ready return true if OtelSetup.initialized? @@ -588,6 +425,5 @@ def noop_tracer @noop_tracer ||= OpenTelemetry::Trace::TracerProvider.new.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) end end - # rubocop:enable Metrics/ClassLength end # rubocop:enable Metrics/ModuleLength diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index 1a82d17..be5bcb5 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "forwardable" +require_relative "tracer_provider_factory" module Langfuse # Main client for Langfuse SDK @@ -22,6 +23,7 @@ module Langfuse # rubocop:disable Metrics/ClassLength class Client extend Forwardable + include ObservationMethods # @return [Integer] Default page size when fetching all dataset items DATASET_ITEMS_PAGE_SIZE = 50 @@ -452,14 +454,34 @@ def flush_scores @score_client.flush end + # Return this client's tracer provider without mutating global OpenTelemetry state + # + # @return [OpenTelemetry::SDK::Trace::TracerProvider] + def tracer_provider + @tracer_provider ||= TracerProviderFactory.build(config) + end + # Shutdown the client and flush any pending scores # # Also shuts down the cache if it supports shutdown (e.g., SWR thread pool). # + # @param timeout [Integer] Timeout in seconds # @return [void] - def shutdown + def shutdown(timeout: 30) + provider = @tracer_provider + @tracer_provider = nil + @score_client.shutdown @api_client.shutdown + provider&.shutdown(timeout: timeout) + end + + # Force flush all pending traces for this client + # + # @param timeout [Integer] Timeout in seconds + # @return [void] + def force_flush(timeout: 30) + @tracer_provider&.force_flush(timeout: timeout) end # Create a new dataset @@ -662,6 +684,18 @@ def run_experiment(name:, task:, data: nil, dataset_name: nil, description: nil, attr_reader :score_client + def observation_tracer + tracer_provider.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) + end + + def observation_mask + config.mask + end + + def observation_client + self + end + # Build a project-scoped URL, returning nil if project ID is unavailable def project_url(path) pid = project_id diff --git a/lib/langfuse/observation_methods.rb b/lib/langfuse/observation_methods.rb new file mode 100644 index 0000000..a24eb39 --- /dev/null +++ b/lib/langfuse/observation_methods.rb @@ -0,0 +1,190 @@ +# frozen_string_literal: true + +module Langfuse + # Shared public observation API used by the singleton facade and client instances. + # + # @api private + module ObservationMethods + # Creates a new observation (root or child) + # + # This is the module-level factory method that creates observations of any type. + # It can create root observations (when parent_span_context is nil) or child + # observations (when parent_span_context is provided). + # + # @param name [String] Descriptive name for the observation + # @param attrs [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Observation attributes + # @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.) + # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to. + # Mutually exclusive with `parent_span_context`. Use {Langfuse.create_trace_id} to generate one. + # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context for child observations + # @param start_time [Time, Integer, nil] Optional start time (Time object or Unix timestamp in nanoseconds) + # @param skip_validation [Boolean] Skip validation (for internal use). Defaults to false. + # @return [BaseObservation] The observation wrapper (Span, Generation, or Event) + # @raise [ArgumentError] if an invalid observation type is provided, an invalid `trace_id` is given, + # or both `trace_id` and `parent_span_context` are provided + # + # @example Create root span + # span = Langfuse.start_observation("root-operation", { input: {...} }) + # + # @example Create child generation + # child = Langfuse.start_observation("llm-call", { model: "gpt-4" }, + # as_type: :generation, + # parent_span_context: parent.otel_span.context) + # + # @example Attach to a deterministic trace ID + # trace_id = Langfuse.create_trace_id(seed: "order-123") + # root = Langfuse.start_observation("process-order", trace_id: trace_id) + # rubocop:disable Metrics/ParameterLists + def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_span_context: nil, + start_time: nil, skip_validation: false) + parent_span_context = resolve_trace_context(trace_id, parent_span_context) + type_str = as_type.to_s + validate_observation_type!(as_type, type_str) unless skip_validation + + otel_tracer = observation_tracer + otel_span = create_otel_span( + name: name, + start_time: start_time, + parent_span_context: parent_span_context, + otel_tracer: otel_tracer + ) + apply_observation_attributes(otel_span, type_str, attrs) + + observation = wrap_otel_span(otel_span, type_str, otel_tracer) + # Events auto-end immediately when created + observation.end if type_str == OBSERVATION_TYPES[:event] + observation + end + # rubocop:enable Metrics/ParameterLists + + # User-facing convenience method for creating root observations + # + # @param name [String] Descriptive name for the observation + # @param attrs [Hash] Observation attributes (optional positional or keyword) + # @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.) + # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID to attach the observation to. + # Use {Langfuse.create_trace_id} to generate one. Forwarded to {.start_observation}. + # @param kwargs [Hash] Additional keyword arguments merged into observation attributes (e.g., input:, output:, metadata:) + # @yield [observation] Optional block that receives the observation object + # @yieldparam observation [BaseObservation] The observation object + # @return [BaseObservation, Object] The observation (or block return value if block given) + # @raise [ArgumentError] if an invalid `trace_id` is provided + # + # @example Block-based API (auto-ends) + # Langfuse.observe("operation") do |obs| + # result = perform_operation + # obs.update(output: result) + # end + # + # @example Stateful API (manual end) + # obs = Langfuse.observe("operation", input: { data: "test" }) + # obs.update(output: { result: "success" }) + # obs.end + def observe(name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block) + merged_attrs = attrs.to_h.merge(kwargs) + observation = start_observation(name, merged_attrs, as_type: as_type, trace_id: trace_id) + return observation unless block + + observation.send(:run_in_context, &block) + end + + # Registry mapping observation type strings to their wrapper classes + OBSERVATION_TYPE_REGISTRY = { + OBSERVATION_TYPES[:generation] => Generation, + OBSERVATION_TYPES[:embedding] => Embedding, + OBSERVATION_TYPES[:event] => Event, + OBSERVATION_TYPES[:agent] => Agent, + OBSERVATION_TYPES[:tool] => Tool, + OBSERVATION_TYPES[:chain] => Chain, + OBSERVATION_TYPES[:retriever] => Retriever, + OBSERVATION_TYPES[:evaluator] => Evaluator, + OBSERVATION_TYPES[:guardrail] => Guardrail, + OBSERVATION_TYPES[:span] => Span + }.freeze + + private + + # @api private + def resolve_trace_context(trace_id, parent_span_context) + return parent_span_context unless trace_id + raise ArgumentError, "Cannot specify both trace_id and parent_span_context" if parent_span_context + + TraceId.send(:to_span_context, trace_id) + end + + # @api private + def validate_observation_type!(as_type, type_str) + return if valid_observation_type?(as_type) + + valid_types = OBSERVATION_TYPES.values.sort.join(", ") + raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}" + end + + # @api private + def apply_observation_attributes(otel_span, type_str, attrs) + # Guard against ended spans — should always be recording here, but safe. + return unless otel_span.recording? + + otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h, mask: observation_mask) + otel_attrs.each { |key, value| otel_span.set_attribute(key, value) } + end + + # Validates that an observation type is valid + # + # Checks if the provided type (symbol or string) matches a valid observation type + # in the OBSERVATION_TYPES constant. + # + # @param type [Symbol, String, Object] The observation type to validate + # @return [Boolean] true if valid, false otherwise + # + # @example + # valid_observation_type?(:span) # => true + # valid_observation_type?("span") # => true + # valid_observation_type?(:invalid) # => false + # valid_observation_type?(nil) # => false + def valid_observation_type?(type) + return false unless type.respond_to?(:to_sym) + + OBSERVATION_TYPES.key?(type.to_sym) + rescue TypeError + false + end + + # Creates an OpenTelemetry span (root or child) + # + # @param name [String] Span name + # @param start_time [Time, Integer, nil] Optional start time + # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context + # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer + # @return [OpenTelemetry::SDK::Trace::Span] The created span + def create_otel_span(name:, otel_tracer:, start_time: nil, parent_span_context: nil) + if parent_span_context + # Create child span with parent context + # Create a non-recording span from the parent context to set in context + parent_span = OpenTelemetry::Trace.non_recording_span(parent_span_context) + parent_context = OpenTelemetry::Trace.context_with_span(parent_span) + OpenTelemetry::Context.with_current(parent_context) do + otel_tracer.start_span(name, start_timestamp: start_time) + end + else + # Create root span + otel_tracer.start_span(name, start_timestamp: start_time) + end + end + + # Wraps an OpenTelemetry span in the appropriate observation class + # + # @param otel_span [OpenTelemetry::SDK::Trace::Span] The OTel span + # @param type_str [String] Observation type string + # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer + # @param attributes [Hash, nil] Optional attributes + # @return [BaseObservation] Appropriate observation wrapper instance + def wrap_otel_span(otel_span, type_str, otel_tracer, attributes: nil) + observation_class = OBSERVATION_TYPE_REGISTRY[type_str] || Span + observation = observation_class.new(otel_span, otel_tracer, attributes: attributes) + client = observation_client + observation.client = client if client + observation + end + end +end diff --git a/lib/langfuse/observations.rb b/lib/langfuse/observations.rb index e5109e0..1f0b15b 100644 --- a/lib/langfuse/observations.rb +++ b/lib/langfuse/observations.rb @@ -67,6 +67,9 @@ class BaseObservation # @return [String] Observation type (e.g., "span", "generation", "event") attr_reader :type + # @return [Client, nil] Client that created this observation + attr_accessor :client + # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Optional initial attributes @@ -99,7 +102,7 @@ def trace_id # puts "View trace: #{obs.trace_url}" # end def trace_url - Langfuse.client.trace_url(trace_id) + observation_client.trace_url(trace_id) end # Ends the observation span. @@ -118,7 +121,7 @@ def end(end_time: nil) def update_trace(attrs) return self unless @otel_span.recording? - otel_attrs = OtelAttributes.create_trace_attributes(attrs.to_h, mask: Langfuse.configuration.mask) + otel_attrs = OtelAttributes.create_trace_attributes(attrs.to_h, mask: observation_config.mask) otel_attrs.each { |key, value| @otel_span.set_attribute(key, value) } self end @@ -134,7 +137,7 @@ def update_trace(attrs) # @return [BaseObservation, Object] The child observation (or block return value if block given) def start_observation(name, attrs = {}, as_type: :span, &block) # Skip validation so unknown types fall back to Span in the factory. - child = Langfuse.start_observation( + child = observation_client.start_observation( name, attrs, as_type: as_type, @@ -181,7 +184,7 @@ def level=(value) # @param level [String] Log level (debug, default, warning, error) # @return [void] def event(name:, input: nil, level: "default") - masked_input = Masking.apply(input, mask: Langfuse.configuration.mask) + masked_input = Masking.apply(input, mask: observation_config.mask) attributes = { OtelAttributes::OBSERVATION_INPUT => masked_input&.to_json, OtelAttributes::OBSERVATION_LEVEL => level @@ -204,7 +207,7 @@ def current_span # @param data_type [Symbol] one of :numeric, :boolean, :categorical # @return [Hash] created score data from the API def score_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) - Langfuse.create_score( + observation_client.create_score( name: name, value: value, trace_id: trace_id, @@ -228,7 +231,7 @@ def update_observation_attributes(attrs = {}, **kwargs) attrs_hash = attrs.to_h.merge(kwargs) # Use @type instance variable set during initialization - otel_attrs = OtelAttributes.create_observation_attributes(type, attrs_hash, mask: Langfuse.configuration.mask) + otel_attrs = OtelAttributes.create_observation_attributes(type, attrs_hash, mask: observation_config.mask) otel_attrs.each { |key, value| @otel_span.set_attribute(key, value) } end @@ -248,6 +251,14 @@ def normalize_prompt(prompt) private + def observation_client + @client || Langfuse.client + end + + def observation_config + @client&.config || Langfuse.configuration + end + # Runs the block with this observation as the active OTel span, # then ends the span in ensure (events excluded — they auto-end). # @api private diff --git a/lib/langfuse/otel_setup.rb b/lib/langfuse/otel_setup.rb index e4bfaa1..330b9c2 100644 --- a/lib/langfuse/otel_setup.rb +++ b/lib/langfuse/otel_setup.rb @@ -1,12 +1,9 @@ # frozen_string_literal: true -require "opentelemetry/sdk" -require "opentelemetry/exporter/otlp" -require "base64" +require_relative "tracer_provider_factory" module Langfuse # OpenTelemetry initialization and setup for Langfuse tracing. - # rubocop:disable Metrics/ModuleLength module OtelSetup TRACING_CONFIG_FIELDS = %i[ public_key @@ -31,13 +28,12 @@ class << self # @param config [Langfuse::Config] The Langfuse configuration # @return [OpenTelemetry::SDK::Trace::TracerProvider] def setup(config) - validate_tracing_config!(config) return existing_provider_for(config) if initialized? candidate_provider = nil provider = nil created = false - candidate_provider = build_tracer_provider(config) + candidate_provider = TracerProviderFactory.build(config) provider, created = publish_provider(candidate_provider, tracing_config_snapshot(config)) unless created candidate_provider.shutdown(timeout: 30) @@ -126,38 +122,11 @@ def rollback_provider(provider) nil end - def build_tracer_provider(config) - provider = OpenTelemetry::SDK::Trace::TracerProvider.new( - sampler: build_sampler(config.sample_rate) - ) - provider.add_span_processor( - SpanProcessor.new(config: config, exporter: build_exporter(config)) - ) - provider - end - - def build_exporter(config) - OpenTelemetry::Exporter::OTLP::Exporter.new( - endpoint: "#{config.base_url}/api/public/otel/v1/traces", - headers: build_headers(config.public_key, config.secret_key), - compression: "gzip" - ) - end - def log_initialized(config) mode = config.tracing_async ? "async" : "sync" config.logger.info("Langfuse tracing initialized with OpenTelemetry (#{mode} mode)") end - def validate_tracing_config!(config) - raise ConfigurationError, "public_key is required" if blank?(config.public_key) - raise ConfigurationError, "secret_key is required" if blank?(config.secret_key) - raise ConfigurationError, "base_url cannot be empty" if blank?(config.base_url) - return if config.should_export_span.nil? || config.should_export_span.respond_to?(:call) - - raise ConfigurationError, "should_export_span must respond to #call" - end - def tracing_config_snapshot(config) TRACING_CONFIG_FIELDS.to_h { |field| [field, config.public_send(field)] }.freeze end @@ -165,21 +134,6 @@ def tracing_config_snapshot(config) def setup_mutex @setup_mutex ||= Mutex.new end - - def blank?(value) - value.nil? || value.empty? - end - - def build_headers(public_key, secret_key) - credentials = "#{public_key}:#{secret_key}" - encoded = Base64.strict_encode64(credentials) - { "Authorization" => "Basic #{encoded}" } - end - - def build_sampler(sample_rate) - Sampling.build_sampler(sample_rate) || OpenTelemetry::SDK::Trace::Samplers::ALWAYS_ON - end end end - # rubocop:enable Metrics/ModuleLength end diff --git a/lib/langfuse/tracer_provider_factory.rb b/lib/langfuse/tracer_provider_factory.rb new file mode 100644 index 0000000..374b0a1 --- /dev/null +++ b/lib/langfuse/tracer_provider_factory.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require "opentelemetry/sdk" +require "opentelemetry/exporter/otlp" +require "base64" + +module Langfuse + # Builds Langfuse tracer providers without mutating global OpenTelemetry state. + # + # @api private + module TracerProviderFactory + module_function + + def build(config) + validate_config!(config) + + provider = OpenTelemetry::SDK::Trace::TracerProvider.new( + sampler: build_sampler(config.sample_rate) + ) + provider.add_span_processor( + SpanProcessor.new(config: config, exporter: build_exporter(config)) + ) + provider + end + + def build_exporter(config) + OpenTelemetry::Exporter::OTLP::Exporter.new( + endpoint: "#{config.base_url}/api/public/otel/v1/traces", + headers: build_headers(config.public_key, config.secret_key), + compression: "gzip" + ) + end + + def validate_config!(config) + raise ConfigurationError, "public_key is required" if blank?(config.public_key) + raise ConfigurationError, "secret_key is required" if blank?(config.secret_key) + raise ConfigurationError, "base_url cannot be empty" if blank?(config.base_url) + return if config.should_export_span.nil? || config.should_export_span.respond_to?(:call) + + raise ConfigurationError, "should_export_span must respond to #call" + end + + def blank?(value) + value.nil? || value.empty? + end + + def build_headers(public_key, secret_key) + credentials = "#{public_key}:#{secret_key}" + encoded = Base64.strict_encode64(credentials) + { "Authorization" => "Basic #{encoded}" } + end + + def build_sampler(sample_rate) + Sampling.build_sampler(sample_rate) || OpenTelemetry::SDK::Trace::Samplers::ALWAYS_ON + end + end +end diff --git a/spec/langfuse/base_observation_spec.rb b/spec/langfuse/base_observation_spec.rb index 62c97fb..35513ad 100644 --- a/spec/langfuse/base_observation_spec.rb +++ b/spec/langfuse/base_observation_spec.rb @@ -722,8 +722,11 @@ def version end describe "#score_trace" do - it "delegates to Langfuse.create_score with trace_id" do - expect(Langfuse).to receive(:create_score).with( + it "delegates to the observation client with trace_id" do + mock_client = instance_double(Langfuse::Client) + allow(Langfuse).to receive(:client).and_return(mock_client) + + expect(mock_client).to receive(:create_score).with( name: "quality", value: 0.9, trace_id: observation.trace_id, @@ -738,7 +741,10 @@ def version end it "defaults data_type to :numeric" do - expect(Langfuse).to receive(:create_score).with( + mock_client = instance_double(Langfuse::Client) + allow(Langfuse).to receive(:client).and_return(mock_client) + + expect(mock_client).to receive(:create_score).with( hash_including(data_type: :numeric) ) observation.score_trace(name: "score", value: 1.0) diff --git a/spec/langfuse/client_spec.rb b/spec/langfuse/client_spec.rb index 6903f0d..85243cb 100644 --- a/spec/langfuse/client_spec.rb +++ b/spec/langfuse/client_spec.rb @@ -240,6 +240,79 @@ def self.cache end end + describe "#observe" do + let(:client) { described_class.new(valid_config) } + + it "creates observations using a client-owned tracer provider" do + original_global_provider = OpenTelemetry.tracer_provider + + observation = client.observe("client-span", input: { data: "test" }) + + expect(observation).to be_a(Langfuse::Span) + expect(observation.client).to eq(client) + expect(client.tracer_provider).not_to be_nil + expect(OpenTelemetry.tracer_provider).to eq(original_global_provider) + end + + it "creates children on the same client" do + root = client.observe("root") + child = root.start_observation("child") + + expect(child.client).to eq(client) + expect(child.trace_id).to eq(root.trace_id) + end + + it "uses the client config mask for observation updates" do + valid_config.mask = ->(data:) { data.is_a?(Hash) ? { masked: true } : data } + observation = client.observe("masked") + + observation.update(input: { secret: "value" }) + + input = observation.otel_span.to_span_data.attributes["langfuse.observation.input"] + expect(JSON.parse(input)).to eq({ "masked" => true }) + end + + it "supports custom trace IDs" do + trace_id = Langfuse.create_trace_id(seed: "client-trace") + captured = nil + + client.observe("root", trace_id: trace_id) { |obs| captured = obs.trace_id } + + expect(captured).to eq(trace_id) + end + end + + describe "client-owned tracing" do + def tracing_config(base_url, public_key) + Langfuse::Config.new do |config| + config.public_key = public_key + config.secret_key = "sk_test" + config.base_url = base_url + config.tracing_async = false + config.batch_size = 10 + config.flush_interval = 1 + end + end + + it "exports separate clients to their own Langfuse endpoints" do + first_url = "https://first.langfuse.test" + second_url = "https://second.langfuse.test" + first_client = described_class.new(tracing_config(first_url, "pk_first")) + second_client = described_class.new(tracing_config(second_url, "pk_second")) + + first_client.observe("first") { |span| span.update(output: "ok") } + second_client.observe("second") { |span| span.update(output: "ok") } + first_client.force_flush(timeout: 1) + second_client.force_flush(timeout: 1) + + expect(WebMock).to have_requested(:post, "#{first_url}/api/public/otel/v1/traces").at_least_once + expect(WebMock).to have_requested(:post, "#{second_url}/api/public/otel/v1/traces").at_least_once + ensure + first_client&.shutdown(timeout: 1) + second_client&.shutdown(timeout: 1) + end + end + describe "#get_prompt" do let(:client) { described_class.new(valid_config) } let(:base_url) { valid_config.base_url } diff --git a/spec/langfuse/otel_setup_spec.rb b/spec/langfuse/otel_setup_spec.rb index 7f76798..89ec3ef 100644 --- a/spec/langfuse/otel_setup_spec.rb +++ b/spec/langfuse/otel_setup_spec.rb @@ -19,7 +19,7 @@ before do described_class.shutdown(timeout: 1) if described_class.initialized? - allow(described_class).to receive(:build_exporter).and_return(exporter) + allow(Langfuse::TracerProviderFactory).to receive(:build_exporter).and_return(exporter) end after do @@ -70,10 +70,10 @@ existing_provider = instance_double(OpenTelemetry::SDK::Trace::TracerProvider) allow(described_class).to receive_messages( - build_tracer_provider: candidate_provider, publish_provider: [existing_provider, false], existing_provider_for: existing_provider ) + allow(Langfuse::TracerProviderFactory).to receive(:build).and_return(candidate_provider) expect(candidate_provider).to receive(:shutdown).with(timeout: 30) expect(described_class.setup(config)).to equal(existing_provider) From 5ebf38563cb4179b280385f9b4623a0d03d7233c Mon Sep 17 00:00:00 2001 From: Igor Artemenko Date: Wed, 6 May 2026 22:56:05 +0000 Subject: [PATCH 2/3] Replace other singleton usages --- lib/langfuse/dataset_item_client.rb | 3 ++- lib/langfuse/experiment_runner.rb | 5 +++-- lib/langfuse/traced_execution.rb | 5 +++-- spec/langfuse/dataset_item_client_spec.rb | 18 ++++++++++++++---- spec/langfuse/experiment_runner_spec.rb | 19 +++++++++++++++---- spec/langfuse/traced_execution_spec.rb | 11 +++++++++++ 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/lib/langfuse/dataset_item_client.rb b/lib/langfuse/dataset_item_client.rb index c8df97a..73c832b 100644 --- a/lib/langfuse/dataset_item_client.rb +++ b/lib/langfuse/dataset_item_client.rb @@ -117,7 +117,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block) raise ArgumentError, "block is required" unless block output, trace_id, observation_id, task_error = execute_in_trace(run_name, run_metadata, &block) - Langfuse.force_flush(timeout: FLUSH_TIMEOUT) + @client.force_flush(timeout: FLUSH_TIMEOUT) link(trace_id: trace_id, observation_id: observation_id, run_name: run_name, run_description: run_description, metadata: run_metadata) @@ -130,6 +130,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block) def execute_in_trace(run_name, run_metadata, &block) TracedExecution.call( + client: @client, trace_name: "dataset-run-#{run_name}", input: @input, metadata: run_metadata || {}, diff --git a/lib/langfuse/experiment_runner.rb b/lib/langfuse/experiment_runner.rb index c931d97..882ca27 100644 --- a/lib/langfuse/experiment_runner.rb +++ b/lib/langfuse/experiment_runner.rb @@ -32,7 +32,7 @@ def initialize(client:, name:, items:, task:, evaluators: [], run_evaluators: [] @metadata = metadata || {} @description = description @run_name = run_name || "#{name} - #{Time.now.utc.iso8601}" - @logger = Langfuse.configuration.logger + @logger = client.config.logger @dataset_run_id = nil @dataset_id = nil end @@ -75,6 +75,7 @@ def process_item(item, index) def run_task_in_trace(item) TracedExecution.call( + client: @client, trace_name: "experiment-#{@name}", input: item.input, metadata: @metadata, @@ -156,7 +157,7 @@ def link_to_dataset_run(item, trace_id, observation_id) def flush_all @client.flush_scores - Langfuse.force_flush(timeout: FLUSH_TIMEOUT) + @client.force_flush(timeout: FLUSH_TIMEOUT) end # Wraps raw hashes into ExperimentItem; passes DatasetItemClient through unchanged. diff --git a/lib/langfuse/traced_execution.rb b/lib/langfuse/traced_execution.rb index 9bc9182..34ac300 100644 --- a/lib/langfuse/traced_execution.rb +++ b/lib/langfuse/traced_execution.rb @@ -11,19 +11,20 @@ module Langfuse module TracedExecution # Execute a task proc within a traced observe block. # + # @param client [Client] client used to create the observation # @param trace_name [String] name for the observe span # @param input [Object] input set on the trace # @param metadata [Hash] metadata set on the trace # @param task [Proc] the callable to execute — receives the span # @yield [span, trace_id] optional pre-task hook (e.g., dataset run linking) # @return [Array<(Object, String, String, StandardError | nil)>] output, trace_id, observation_id, error - def self.call(trace_name:, input:, task:, metadata: {}) + def self.call(client:, trace_name:, input:, task:, metadata: {}) output = nil trace_id = nil observation_id = nil task_error = nil - Langfuse.observe(trace_name) do |span| + client.observe(trace_name) do |span| trace_id = span.trace_id observation_id = span.id span.update_trace(input: input, metadata: metadata) diff --git a/spec/langfuse/dataset_item_client_spec.rb b/spec/langfuse/dataset_item_client_spec.rb index 5d093c7..40b2f58 100644 --- a/spec/langfuse/dataset_item_client_spec.rb +++ b/spec/langfuse/dataset_item_client_spec.rb @@ -197,10 +197,20 @@ describe "#run" do let(:mock_client) { instance_double(Langfuse::Client) } let(:item_client) { described_class.new(item_data, client: mock_client) } + let(:span) do + instance_double( + Langfuse::Span, + trace_id: "a" * 32, + id: "b" * 16, + update_trace: nil, + update: nil + ) + end before do allow(mock_client).to receive(:create_dataset_run_item) - allow(Langfuse).to receive(:force_flush) + allow(mock_client).to receive(:force_flush) + allow(mock_client).to receive(:observe).and_yield(span) end it "raises ArgumentError when no block given" do @@ -220,7 +230,7 @@ yielded_span = span "output" end - expect(yielded_span).to be_a(Langfuse::BaseObservation) + expect(yielded_span).to eq(span) end it "returns the block output" do @@ -241,7 +251,7 @@ it "passes observation_id in link call" do expect(mock_client).to receive(:create_dataset_run_item).with( hash_including( - observation_id: a_string_matching(/\A[0-9a-f]{16}\z/) + observation_id: "b" * 16 ) ) item_client.run(run_name: "test") { "output" } @@ -249,7 +259,7 @@ it "calls force_flush before linking" do call_order = [] - allow(Langfuse).to receive(:force_flush) { call_order << :flush } + allow(mock_client).to receive(:force_flush) { call_order << :flush } allow(mock_client).to receive(:create_dataset_run_item) { call_order << :link } item_client.run(run_name: "test") { "output" } expect(call_order).to eq(%i[flush link]) diff --git a/spec/langfuse/experiment_runner_spec.rb b/spec/langfuse/experiment_runner_spec.rb index 33001ab..727e91c 100644 --- a/spec/langfuse/experiment_runner_spec.rb +++ b/spec/langfuse/experiment_runner_spec.rb @@ -2,15 +2,26 @@ RSpec.describe Langfuse::ExperimentRunner do let(:mock_client) { instance_double(Langfuse::Client) } + let(:config) { instance_double(Langfuse::Config, logger: logger) } let(:logger) { instance_double(Logger, warn: nil, error: nil, info: nil, debug: nil) } + let(:span) do + instance_double( + Langfuse::Span, + trace_id: "a" * 32, + id: "b" * 16, + update_trace: nil, + update: nil + ) + end before do + allow(mock_client).to receive(:config).and_return(config) allow(mock_client).to receive(:create_dataset_run_item) allow(mock_client).to receive(:create_score) allow(mock_client).to receive(:flush_scores) + allow(mock_client).to receive(:force_flush) allow(mock_client).to receive(:dataset_run_url) - allow(Langfuse).to receive(:force_flush) - allow(Langfuse.configuration).to receive(:logger).and_return(logger) + allow(mock_client).to receive(:observe).and_yield(span) end describe "#execute" do @@ -813,7 +824,7 @@ runner.execute # Only the final flush_all call, not once per item - expect(Langfuse).to have_received(:force_flush).once + expect(mock_client).to have_received(:force_flush).once end it "flushes again after run evaluators produce results" do @@ -827,7 +838,7 @@ # Once for post-items flush_all, once for post-run-evaluators flush_all expect(mock_client).to have_received(:flush_scores).twice - expect(Langfuse).to have_received(:force_flush).twice + expect(mock_client).to have_received(:force_flush).twice end end diff --git a/spec/langfuse/traced_execution_spec.rb b/spec/langfuse/traced_execution_spec.rb index f886f80..6f1a6e4 100644 --- a/spec/langfuse/traced_execution_spec.rb +++ b/spec/langfuse/traced_execution_spec.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true RSpec.describe Langfuse::TracedExecution do + let(:client) { Langfuse } + before do allow(Langfuse).to receive(:force_flush) end @@ -8,6 +10,7 @@ describe ".call" do it "returns output, trace_id, observation_id, and nil error on success" do output, trace_id, observation_id, error = described_class.call( + client: client, trace_name: "test-trace", input: { q: "hello" }, task: ->(_span) { "result" } @@ -23,6 +26,7 @@ it "captures task errors without re-raising" do output, trace_id, observation_id, error = described_class.call( + client: client, trace_name: "test-trace", input: {}, task: ->(_span) { raise StandardError, "boom" } @@ -38,6 +42,7 @@ it "marks the span with error state on task failure" do span_captured = nil described_class.call( + client: client, trace_name: "test-trace", input: {}, task: lambda { |span| @@ -52,6 +57,7 @@ it "passes the span to the task" do received_span = nil described_class.call( + client: client, trace_name: "test-trace", input: {}, task: ->(span) { received_span = span } @@ -63,6 +69,7 @@ it "sets input and metadata on the trace" do span_captured = nil described_class.call( + client: client, trace_name: "test-trace", input: { question: "What?" }, metadata: { run: "v1" }, @@ -75,6 +82,7 @@ it "defaults metadata to empty hash" do # Should not raise when metadata is omitted output, _trace_id, _observation_id, error = described_class.call( + client: client, trace_name: "test-trace", input: {}, task: ->(_span) { "ok" } @@ -89,6 +97,7 @@ yielded_trace_id = nil described_class.call( + client: client, trace_name: "test-trace", input: {}, task: ->(_span) { "result" } @@ -106,6 +115,7 @@ call_order = [] described_class.call( + client: client, trace_name: "test-trace", input: {}, task: lambda { |_span| @@ -121,6 +131,7 @@ it "still captures task error when pre-task hook is given" do _output, _trace_id, _observation_id, error = described_class.call( + client: client, trace_name: "test-trace", input: {}, task: ->(_span) { raise StandardError, "task failed" } From af0f5a7e97c8a5c75a1fafe16a2921efa4d488b8 Mon Sep 17 00:00:00 2001 From: Igor Artemenko Date: Thu, 7 May 2026 13:15:34 +0000 Subject: [PATCH 3/3] Add documentation --- docs/CONFIGURATION.md | 47 +++++++++++++++++++++++++++++++++++++++++++ docs/README.md | 2 +- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 4d06e41..3c8a598 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -16,6 +16,53 @@ end Call this once at application startup (Rails initializer, boot script, etc.). +## Multiple Clients + +For most applications, the singleton client is the simplest option: + +```ruby +Langfuse.configure do |config| + config.public_key = ENV["LANGFUSE_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_SECRET_KEY"] +end + +client = Langfuse.client +``` + +If the singleton is not enough, create independent `Langfuse::Client` instances with their own `Langfuse::Config` objects. This is useful when one process has multiple application components that should report to different Langfuse projects, such as a "Meeting Transcription" project and a "Note Summarization" project. + +```ruby +meeting_transcription_config = Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_MEETING_TRANSCRIPTION_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_MEETING_TRANSCRIPTION_SECRET_KEY"] + config.base_url = ENV["LANGFUSE_MEETING_TRANSCRIPTION_BASE_URL"] +end + +note_summarization_config = Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_NOTE_SUMMARIZATION_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_NOTE_SUMMARIZATION_SECRET_KEY"] + config.base_url = ENV["LANGFUSE_NOTE_SUMMARIZATION_BASE_URL"] +end + +meeting_transcription_client = Langfuse::Client.new(meeting_transcription_config) +note_summarization_client = Langfuse::Client.new(note_summarization_config) +``` + +Use the client instance for client-specific work: + +```ruby +meeting_transcription_client.observe("meeting-transcription") do |obs| + prompt = meeting_transcription_client.get_prompt("transcribe-meeting") + obs.update(input: prompt.compile(meeting_id: meeting.id)) +end + +note_summarization_client.create_score( + name: "manual-review", + value: 1, + trace_id: "0123456789abcdef0123456789abcdef" +) +``` + ## Tracing Ownership This is the part people get wrong. diff --git a/docs/README.md b/docs/README.md index 7522069..0e506c9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -26,7 +26,7 @@ This is the consumer hub. Start here unless you are already looking for a specif ### Production Hardening -- **[Configuration](CONFIGURATION.md)** — Config surface, tracing ownership, export filtering, environment defaults +- **[Configuration](CONFIGURATION.md)** — Config surface, multiple clients, tracing ownership, export filtering, environment defaults - **[Caching](CACHING.md)** — Prompt cache backends, stale-while-revalidate, cache warming - **[Error Handling](ERROR_HANDLING.md)** — Failure modes, retry boundaries, debugging - **[Migration Guide](MIGRATION.md)** — Move hardcoded prompts into Langfuse-managed prompts without breaking runtime behavior