diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md index b6a2107..a3ba71f 100644 --- a/docs/API_REFERENCE.md +++ b/docs/API_REFERENCE.md @@ -125,6 +125,8 @@ Langfuse.tracer_provider # => OpenTelemetry::SDK::Trace::TracerProvider `Langfuse.configure` does not call this for you. This is the explicit global-install seam. If you also want another OpenTelemetry backend or custom propagation, that remains application-owned setup. +`Langfuse::OtelSetup` remains as a deprecated compatibility wrapper around the global `Langfuse.tracer_provider`, `Langfuse.force_flush`, and `Langfuse.shutdown` APIs. New code should not call it; explicit clients should use `Langfuse::Client.new(config).tracer_provider`. + **Example:** ```ruby @@ -207,6 +209,21 @@ client = Langfuse.client prompt = client.get_prompt("greeting") ``` +### `Langfuse::Client.new` + +Create an explicit client. Use this when one Ruby process needs more than one Langfuse project. + +```ruby +client = Langfuse::Client.new( + Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_SECRET_KEY"] + end +) +``` + +Explicit clients own their own API client, score queue, prompt cache, and tracer provider. Observations created by an explicit client keep that owner for child observations, masking, trace URLs, scores, `force_flush`, and `shutdown`. + ## Prompt Management ### `Client#get_prompt` @@ -671,6 +688,28 @@ obs.update(output: { result: "done" }) obs.end ``` +### `Client#observe` + +Explicit-client tracing equivalent to `Langfuse.observe`. + +```ruby +client.observe("operation", { input: "data" }) do |obs| + child = obs.start_observation("child") + child.end +end +``` + +Use this instead of `Langfuse.observe` when the trace belongs to an explicit client. Children created from the returned observation stay on the same client. + +### `Client#start_observation` + +Explicit-client equivalent to `Langfuse.start_observation`. + +```ruby +obs = client.start_observation("operation", { input: "data" }) +obs.end +``` + ### `BaseObservation` Returned by `observe` in stateful mode or passed to block. @@ -684,6 +723,7 @@ Returned by `observe` in stateful mode or passed to block. | `trace_url` | `String` or `nil` | URL to Langfuse UI, if project lookup succeeds | | `otel_span` | OpenTelemetry::SDK::Trace::Span | Underlying OTel span | | `type` | String | Observation type | +| `client` | Langfuse::Client | Client that owns follow-up operations | **Methods:** @@ -1001,7 +1041,8 @@ Langfuse.client.flush_scores ### Module-Level Scoring -Convenience methods delegating to `Langfuse.client`: +Convenience methods. Active scoring routes through the client that owns the current Langfuse observation, +falling back to `Langfuse.client` only outside Langfuse-owned observations: ```ruby Langfuse.create_score(name: "quality", value: 0.85, trace_id: "abc") @@ -1546,7 +1587,7 @@ Langfuse.shutdown ### `Langfuse.force_flush` -Force flush all pending data. +Force flush pending traces for the singleton client. **Signature:** @@ -1560,6 +1601,8 @@ Langfuse.force_flush(timeout: 30) Langfuse.force_flush(timeout: 10) ``` +Use `client.force_flush(timeout: 10)` for traces created through an explicit client. + ## See Also - [GETTING_STARTED.md](GETTING_STARTED.md) - Quick start guide diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 58aedaa..32b721b 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -227,7 +227,7 @@ span.end **Key Components:** - **BaseObservation** - Base class for all observation types -- **OtelSetup** - Initializes OpenTelemetry SDK with OTLP exporter +- **Client tracer provider** - Owns isolated OpenTelemetry export for each client - **SpanProcessor** - Propagates trace-level attributes to child spans - **OtelAttributes** - Converts Langfuse attributes to OpenTelemetry format diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 4d06e41..26a42c0 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -23,7 +23,7 @@ This is the part people get wrong. - `Langfuse.configure` stores configuration only. - Module-level tracing initializes lazily on first use. - Langfuse tracing is isolated by default. -- `Langfuse.tracer_provider` is the explicit seam for installing Langfuse as the global OpenTelemetry provider. +- `Langfuse.tracer_provider` returns the singleton client's provider for explicit global OpenTelemetry installation. - `should_export_span` only runs on spans handled by Langfuse's provider. - Filtering is not the fix for ambient-span overcapture. Isolation is. - Langfuse does not auto-configure a second OpenTelemetry backend or any multi-export pipeline for you. @@ -50,6 +50,49 @@ OpenTelemetry.tracer_provider = Langfuse.tracer_provider If you also want propagation or another OpenTelemetry backend, configure those in your application. Langfuse does not infer or install them. +## Multiple Clients + +Use explicit `Langfuse::Client` instances when one Ruby process sends data to multiple Langfuse projects. The module-level APIs are only a facade over `Langfuse.client`; explicit clients own their own API client, score queue, prompt cache, and tracer provider. + +```ruby +primary = Langfuse.client + +project_a_config = Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_PROJECT_A_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_PROJECT_A_SECRET_KEY"] +end + +project_b_config = Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_PROJECT_B_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_PROJECT_B_SECRET_KEY"] +end + +project_a = Langfuse::Client.new(project_a_config) +project_b = Langfuse::Client.new(project_b_config) + +project_a.observe("project-a-workflow") do |root| + root.start_observation("project-a-child") +end + +project_b.observe("project-b-workflow") do |root| + root.start_observation("project-b-child") +end + +project_a.force_flush +project_b.force_flush +``` + +The owner is sticky. A root observation created by `project_a.observe` creates children through `project_a`, uses `project_a.config.mask`, generates trace URLs through `project_a`, and scores through `project_a`. It does not fall back to `Langfuse.client`. + +Use lifecycle methods on the same client that created the work: + +```ruby +project_a.force_flush(timeout: 10) +project_a.shutdown(timeout: 30) +``` + +`Langfuse.force_flush` and `Langfuse.shutdown` apply to the singleton client only. + ## All Configuration Options ### Required @@ -401,12 +444,12 @@ There are three states worth documenting. - `Langfuse.configure` does not mutate `OpenTelemetry.tracer_provider` - `Langfuse.configure` does not mutate `OpenTelemetry.propagation` -- `Langfuse.observe(...)` uses Langfuse's internal tracer provider once tracing is ready +- `Langfuse.observe(...)` uses the singleton client's internal tracer provider once tracing is ready - if `public_key`, `secret_key`, or `base_url` are missing, module-level tracing falls back to a no-op tracer and logs one warning ### Explicit Global Install with `Langfuse.tracer_provider` -If you want Langfuse to own the global OpenTelemetry provider, install it explicitly: +If you want the singleton Langfuse client to own the global OpenTelemetry provider, install it explicitly: ```ruby require "opentelemetry/trace/propagation/trace_context" diff --git a/docs/SCORING.md b/docs/SCORING.md index ce0b121..347a12d 100644 --- a/docs/SCORING.md +++ b/docs/SCORING.md @@ -145,6 +145,8 @@ end ``` This is useful when you don't have the observation ID but want to score from within the traced block. +Inside a Langfuse observation, module-level scoring uses the client that owns that observation. That keeps +explicit-client traces from accidentally sending scores through the singleton client. ### Scoring Active Traces @@ -165,6 +167,10 @@ Langfuse.observe("user-request") do |span| end ``` +When called from a raw OpenTelemetry span that was not created by Langfuse, module-level active scoring still +falls back to `Langfuse.client` for this release and emits a deprecation warning. Prefer +`client.score_active_trace` or `client.score_active_observation` for raw OTel spans so the score owner is explicit. + ## Complete Examples ### User Feedback (Thumbs Up/Down) diff --git a/docs/TRACING.md b/docs/TRACING.md index c46a323..ad9233d 100644 --- a/docs/TRACING.md +++ b/docs/TRACING.md @@ -8,7 +8,39 @@ This guide is about the tracing behavior the SDK actually implements today. If y - Child observations create nested spans inside that trace. - `:generation` is the right type for model calls because it carries model-specific fields like `model`, `usage_details`, and `cost_details`. - `:event` is a point-in-time observation with no duration. -- `Langfuse.configure` stores configuration only. Module-level tracing uses Langfuse's internal tracer provider when tracing is ready. +- `Langfuse.configure` stores configuration only. Module-level tracing uses the singleton client's internal tracer provider when tracing is ready. + +## Singleton vs Explicit Clients + +`Langfuse.observe(...)` is shorthand for `Langfuse.client.observe(...)`. That is the right default when your process sends traces to one Langfuse project. + +When one Ruby process needs multiple Langfuse projects, build explicit clients and call tracing APIs on those clients: + +```ruby +project_a = Langfuse::Client.new( + Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_PROJECT_A_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_PROJECT_A_SECRET_KEY"] + end +) + +project_b = Langfuse::Client.new( + Langfuse::Config.new do |config| + config.public_key = ENV["LANGFUSE_PROJECT_B_PUBLIC_KEY"] + config.secret_key = ENV["LANGFUSE_PROJECT_B_SECRET_KEY"] + end +) + +project_a.observe("ingest-document") do |root| + root.start_observation("extract-text") +end + +project_b.observe("support-answer") do |root| + root.start_observation("openai-chat", as_type: :generation) +end +``` + +Observations keep their owner. A child created from `project_a` stays on `project_a` for tracing, masking, trace URLs, scores, flushing, and shutdown. Do not mix `Langfuse.observe` into an explicit-client trace unless you intentionally want the singleton client's project. ## Start with a Root Observation @@ -217,14 +249,14 @@ This is the default behavior: - `Langfuse.configure` does not mutate `OpenTelemetry.tracer_provider` - `Langfuse.configure` does not mutate `OpenTelemetry.propagation` -- `Langfuse.observe(...)` uses Langfuse's internal tracer provider once tracing is configured +- `Langfuse.observe(...)` uses the singleton client's internal tracer provider once tracing is configured - if tracing config is incomplete, module-level tracing falls back to a no-op tracer and logs one warning This is why ambient spans from some unrelated global OpenTelemetry provider are not exported to Langfuse by default. ### 2. Explicit Global Install with `Langfuse.tracer_provider` -If you want Langfuse to own the global OpenTelemetry provider, install it explicitly: +If you want the singleton Langfuse client to own the global OpenTelemetry provider, install it explicitly: ```ruby Langfuse.configure do |config| @@ -287,6 +319,62 @@ Public helper predicates: The exact signatures live in [API_REFERENCE.md](API_REFERENCE.md). +## Validate Against Langfuse + +For changes to tracing behavior, validate both the local SDK behavior and the server-side Langfuse result. Use one small Ruby script to emit traces, then read the traces back with the Langfuse CLI. + +```ruby +# scratchpad/validate_multiple_clients.rb +require "bundler/setup" +require "langfuse" + +def build_client(public_key:, secret_key:, base_url:) + Langfuse::Client.new( + Langfuse::Config.new do |config| + config.public_key = public_key + config.secret_key = secret_key + config.base_url = base_url + config.tracing_async = false + end + ) +end + +client = build_client( + public_key: ENV.fetch("LANGFUSE_PUBLIC_KEY"), + secret_key: ENV.fetch("LANGFUSE_SECRET_KEY"), + base_url: ENV.fetch("LANGFUSE_BASE_URL", "https://cloud.langfuse.com") +) + +trace_id = nil +client.observe("ruby-sdk-validation", input: { source: "langfuse-rb" }) do |span| + trace_id = span.trace_id + span.start_observation("child-generation", { model: "test-model" }, as_type: :generation) do |generation| + generation.update(output: "ok") + end + span.score_trace(name: "validation-score", value: 1.0) +end + +client.force_flush(timeout: 10) +client.flush_scores +puts trace_id +``` + +Run it locally: + +```bash +trace_id=$(bundle exec ruby scratchpad/validate_multiple_clients.rb) +``` + +Then read the same trace back from Langfuse: + +```bash +npx --yes langfuse-cli api traces get "$trace_id" --fields core,observations,scores --json +npx --yes langfuse-cli api observations list --trace-id "$trace_id" --fields core,basic,metadata --json +npx --yes langfuse-cli api scores list --trace-id "$trace_id" --fields score,trace --json +``` + +For multiple clients, run the script once per project credential set and confirm each trace appears only in the expected project. The CLI readback is the proof; local object identity alone only proves the Ruby process behavior. + ## Best Practices - Put workflow-level output on the root observation and model-level output on the generation. diff --git a/lib/langfuse.rb b/lib/langfuse.rb index 5e5e28f..ab2ce90 100644 --- a/lib/langfuse.rb +++ b/lib/langfuse.rb @@ -50,14 +50,18 @@ class UnauthorizedError < ApiError; end require_relative "langfuse/api_client" require_relative "langfuse/span_filter" require_relative "langfuse/sampling" -require_relative "langfuse/otel_setup" +require_relative "langfuse/tracer_provider_factory" require_relative "langfuse/masking" require_relative "langfuse/otel_attributes" require_relative "langfuse/propagation" require_relative "langfuse/span_processor" +require_relative "langfuse/client_context" +require_relative "langfuse/active_scoring" require_relative "langfuse/observations" require_relative "langfuse/trace_id" require_relative "langfuse/score_client" +require_relative "langfuse/observation_methods" +require_relative "langfuse/noop_observation_client" require_relative "langfuse/prompt_renderer" require_relative "langfuse/text_prompt_client" require_relative "langfuse/chat_prompt_client" @@ -71,10 +75,10 @@ class UnauthorizedError < ApiError; end require_relative "langfuse/dataset_item_client" require_relative "langfuse/experiment_runner" require_relative "langfuse/client" +require_relative "langfuse/otel_setup" # rubocop:disable Metrics/ModuleLength module Langfuse - # rubocop:disable Metrics/ClassLength class << self # @param configuration [Config] the global configuration object attr_writer :configuration @@ -126,8 +130,7 @@ def tracer_provider "Langfuse tracing is disabled until public_key, secret_key, and base_url are configured." end - OtelSetup.setup(configuration) unless OtelSetup.initialized? - OtelSetup.tracer_provider + client.tracer_provider end # Shutdown Langfuse and flush any pending traces and scores @@ -142,8 +145,8 @@ def tracer_provider # at_exit { Langfuse.shutdown } # def shutdown(timeout: 30) - client.shutdown if @client - OtelSetup.shutdown(timeout: timeout) + @client&.shutdown(timeout: timeout) + @noop_observation_client&.shutdown(timeout: timeout) end # Force flush all pending traces @@ -151,7 +154,7 @@ def shutdown(timeout: 30) # @param timeout [Integer] Timeout in seconds # @return [void] def force_flush(timeout: 30) - OtelSetup.force_flush(timeout: timeout) + @client&.force_flush(timeout: timeout) end # Propagate trace-level attributes to all spans created within this context. @@ -275,7 +278,7 @@ def create_score(name:, value:, id: nil, trace_id: nil, session_id: nil, observa # Langfuse.score_active_observation(name: "accuracy", value: 0.92) # end def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) - client.score_active_observation( + ActiveScoring.client.score_active_observation( name: name, value: value, comment: comment, @@ -301,7 +304,7 @@ def score_active_observation(name:, value:, comment: nil, metadata: nil, data_ty # Langfuse.score_active_trace(name: "overall_quality", value: 5) # end def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) - client.score_active_trace( + ActiveScoring.client.score_active_trace( name: name, value: value, comment: comment, @@ -344,18 +347,12 @@ def create_trace_id(seed: nil) # # @return [void] def reset! - client.shutdown if @client - OtelSetup.shutdown(timeout: 5) if OtelSetup.initialized? - @configuration = nil - @client = nil - @noop_tracer = nil - @tracing_disabled_warning_emitted = false + @client&.shutdown rescue StandardError # Ignore shutdown errors during reset (e.g., in tests) - @configuration = nil - @client = nil - @noop_tracer = nil - @tracing_disabled_warning_emitted = false + nil + ensure + reset_runtime_state! end # Creates a new observation (root or child) @@ -390,23 +387,16 @@ def reset! # rubocop:disable Metrics/ParameterLists def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_span_context: nil, start_time: nil, skip_validation: false) - parent_span_context = resolve_trace_context(trace_id, parent_span_context) - type_str = as_type.to_s - validate_observation_type!(as_type, type_str) unless skip_validation - - otel_tracer = otel_tracer() - otel_span = create_otel_span( - name: name, - start_time: start_time, + warn_tracing_disabled_once unless tracing_config_ready? + observation_client.start_observation( + name, + attrs, + as_type: as_type, + trace_id: trace_id, parent_span_context: parent_span_context, - otel_tracer: otel_tracer + start_time: start_time, + skip_validation: skip_validation ) - apply_observation_attributes(otel_span, type_str, attrs) - - observation = wrap_otel_span(otel_span, type_str, otel_tracer) - # Events auto-end immediately when created - observation.end if type_str == OBSERVATION_TYPES[:event] - observation end # rubocop:enable Metrics/ParameterLists @@ -433,129 +423,17 @@ def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_sp # obs = Langfuse.observe("operation", input: { data: "test" }) # obs.update(output: { result: "success" }) # obs.end - def observe(name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block) - merged_attrs = attrs.to_h.merge(kwargs) - observation = start_observation(name, merged_attrs, as_type: as_type, trace_id: trace_id) - return observation unless block - - observation.send(:run_in_context, &block) - end - - # Registry mapping observation type strings to their wrapper classes - OBSERVATION_TYPE_REGISTRY = { - OBSERVATION_TYPES[:generation] => Generation, - OBSERVATION_TYPES[:embedding] => Embedding, - OBSERVATION_TYPES[:event] => Event, - OBSERVATION_TYPES[:agent] => Agent, - OBSERVATION_TYPES[:tool] => Tool, - OBSERVATION_TYPES[:chain] => Chain, - OBSERVATION_TYPES[:retriever] => Retriever, - OBSERVATION_TYPES[:evaluator] => Evaluator, - OBSERVATION_TYPES[:guardrail] => Guardrail, - OBSERVATION_TYPES[:span] => Span - }.freeze - - private - - # @api private - def resolve_trace_context(trace_id, parent_span_context) - return parent_span_context unless trace_id - raise ArgumentError, "Cannot specify both trace_id and parent_span_context" if parent_span_context - - TraceId.send(:to_span_context, trace_id) - end - - # @api private - def validate_observation_type!(as_type, type_str) - return if valid_observation_type?(as_type) - - valid_types = OBSERVATION_TYPES.values.sort.join(", ") - raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}" + def observe(name, attrs = {}, as_type: :span, trace_id: nil, **, &) + warn_tracing_disabled_once unless tracing_config_ready? + observation_client.observe(name, attrs, as_type: as_type, trace_id: trace_id, **, &) end # @api private - def apply_observation_attributes(otel_span, type_str, attrs) - # Guard against ended spans — should always be recording here, but safe. - return unless otel_span.recording? - - otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h, mask: configuration.mask) - otel_attrs.each { |key, value| otel_span.set_attribute(key, value) } - end - - # Validates that an observation type is valid - # - # Checks if the provided type (symbol or string) matches a valid observation type - # in the OBSERVATION_TYPES constant. - # - # @param type [Symbol, String, Object] The observation type to validate - # @return [Boolean] true if valid, false otherwise - # - # @example - # valid_observation_type?(:span) # => true - # valid_observation_type?("span") # => true - # valid_observation_type?(:invalid) # => false - # valid_observation_type?(nil) # => false - def valid_observation_type?(type) - return false unless type.respond_to?(:to_sym) - - OBSERVATION_TYPES.key?(type.to_sym) - rescue TypeError - false + def observation_client + tracing_config_ready? ? client : noop_observation_client end - # Gets the OpenTelemetry tracer for Langfuse - # - # @return [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer - def otel_tracer - return tracer_provider.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) if setup_tracing_if_ready - - warn_tracing_disabled_once - noop_tracer - end - - # Creates an OpenTelemetry span (root or child) - # - # @param name [String] Span name - # @param start_time [Time, Integer, nil] Optional start time - # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context - # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer - # @return [OpenTelemetry::SDK::Trace::Span] The created span - def create_otel_span(name:, otel_tracer:, start_time: nil, parent_span_context: nil) - if parent_span_context - # Create child span with parent context - # Create a non-recording span from the parent context to set in context - parent_span = OpenTelemetry::Trace.non_recording_span(parent_span_context) - parent_context = OpenTelemetry::Trace.context_with_span(parent_span) - OpenTelemetry::Context.with_current(parent_context) do - otel_tracer.start_span(name, start_timestamp: start_time) - end - else - # Create root span - otel_tracer.start_span(name, start_timestamp: start_time) - end - end - - # Wraps an OpenTelemetry span in the appropriate observation class - # - # @param otel_span [OpenTelemetry::SDK::Trace::Span] The OTel span - # @param type_str [String] Observation type string - # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer - # @param attributes [Hash, nil] Optional attributes - # @return [BaseObservation] Appropriate observation wrapper instance - def wrap_otel_span(otel_span, type_str, otel_tracer, attributes: nil) - observation_class = OBSERVATION_TYPE_REGISTRY[type_str] || Span - observation_class.new(otel_span, otel_tracer, attributes: attributes) - end - - # rubocop:disable Naming/PredicateMethod - def setup_tracing_if_ready - return true if OtelSetup.initialized? - return false unless tracing_config_ready? - - OtelSetup.setup(configuration) - true - end - # rubocop:enable Naming/PredicateMethod + private def tracing_config_ready? configured?(configuration.public_key) && @@ -584,10 +462,20 @@ def tracing_warning_mutex @tracing_warning_mutex ||= Mutex.new end - def noop_tracer - @noop_tracer ||= OpenTelemetry::Trace::TracerProvider.new.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) + def noop_observation_client + @noop_observation_client ||= NoopObservationClient.new(configuration) + end + + def reset_runtime_state! + BaseObservation.reset_deprecation_warnings! if defined?(BaseObservation) + ActiveScoring.reset! if defined?(ActiveScoring) + OtelSetup.reset_deprecation_warning! if defined?(OtelSetup) + @configuration = nil + @client = nil + @noop_tracer = nil + @noop_observation_client = nil + @tracing_disabled_warning_emitted = false end end - # rubocop:enable Metrics/ClassLength end # rubocop:enable Metrics/ModuleLength diff --git a/lib/langfuse/active_scoring.rb b/lib/langfuse/active_scoring.rb new file mode 100644 index 0000000..c56b584 --- /dev/null +++ b/lib/langfuse/active_scoring.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module Langfuse + # Resolves the client that should own module-level active score calls. + # + # @api private + module ActiveScoring + RAW_ACTIVE_SCORING_DEPRECATION = + "Langfuse.score_active_trace and Langfuse.score_active_observation outside a Langfuse-owned " \ + "observation currently score through the singleton client. This raw OpenTelemetry fallback is " \ + "deprecated and will be removed in the next major release; use Langfuse.client.score_active_* " \ + "or an explicit client instead." + private_constant :RAW_ACTIVE_SCORING_DEPRECATION + + class << self + # @return [Client, NoopObservationClient] + def client + active_client = ClientContext.current_client + return active_client if active_client + + warn_raw_active_scoring_once + Langfuse.client + end + + # @return [void] + def reset! + raw_active_scoring_warning_mutex.synchronize do + @raw_active_scoring_warning_emitted = false + end + end + + private + + def warn_raw_active_scoring_once + return if @raw_active_scoring_warning_emitted + + raw_active_scoring_warning_mutex.synchronize do + return if @raw_active_scoring_warning_emitted + + Langfuse.configuration.logger.warn(RAW_ACTIVE_SCORING_DEPRECATION) + @raw_active_scoring_warning_emitted = true + end + end + + def raw_active_scoring_warning_mutex + @raw_active_scoring_warning_mutex ||= Mutex.new + end + end + end +end diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index 1a82d17..17030a5 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -22,6 +22,7 @@ module Langfuse # rubocop:disable Metrics/ClassLength class Client extend Forwardable + include ObservationMethods # @return [Integer] Default page size when fetching all dataset items DATASET_ITEMS_PAGE_SIZE = 50 @@ -66,21 +67,21 @@ class Client # @param config [Config] Configuration object # @return [Client] def initialize(config) - @config = config - @config.validate! + config.validate! + @config = config.snapshot # Create cache if enabled cache = create_cache if cache_enabled? # Create API client with cache @api_client = ApiClient.new( - public_key: config.public_key, - secret_key: config.secret_key, - base_url: config.base_url, - timeout: config.timeout, - logger: config.logger, + public_key: @config.public_key, + secret_key: @config.secret_key, + base_url: @config.base_url, + timeout: @config.timeout, + logger: @config.logger, cache: cache, - cache_observer: config.prompt_cache_observer + cache_observer: @config.prompt_cache_observer ) @project_id = nil @@ -89,7 +90,29 @@ def initialize(config) @project_id_fetched = false # Initialize score client for batching score events - @score_client = ScoreClient.new(api_client: @api_client, config: config) + @score_client = ScoreClient.new(api_client: @api_client, config: @config) + @tracer_provider = nil + @tracer_provider_mutex = Mutex.new + end + + # Return this client's isolated tracer provider. + # + # @return [OpenTelemetry::SDK::Trace::TracerProvider] + # @raise [ConfigurationError] if tracing configuration is incomplete + def tracer_provider + return @tracer_provider if @tracer_provider + + @tracer_provider_mutex.synchronize do + @tracer_provider ||= build_tracer_provider + end + end + + # Force flush pending trace spans for this client. + # + # @param timeout [Integer] timeout in seconds + # @return [void] + def force_flush(timeout: 30) + @tracer_provider&.force_flush(timeout: timeout) end # Fetch a prompt and return the appropriate client @@ -405,6 +428,7 @@ def create_score(name:, value:, id: nil, trace_id: nil, session_id: nil, observa # client.score_active_observation(name: "accuracy", value: 0.92) # end def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) + ensure_active_observation_owner! @score_client.score_active_observation( name: name, value: value, @@ -431,6 +455,7 @@ def score_active_observation(name:, value:, comment: nil, metadata: nil, data_ty # client.score_active_trace(name: "overall_quality", value: 5) # end def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) + ensure_active_observation_owner! @score_client.score_active_trace( name: name, value: value, @@ -452,12 +477,16 @@ def flush_scores @score_client.flush end - # Shutdown the client and flush any pending scores + # Shutdown the client and flush any pending traces and scores # # Also shuts down the cache if it supports shutdown (e.g., SWR thread pool). # + # @param timeout [Integer] timeout in seconds for trace shutdown # @return [void] - def shutdown + def shutdown(timeout: 30) + provider = release_tracer_provider + provider&.shutdown(timeout: timeout) + ensure @score_client.shutdown @api_client.shutdown end @@ -662,6 +691,44 @@ def run_experiment(name:, task:, data: nil, dataset_name: nil, description: nil, attr_reader :score_client + def build_tracer_provider + provider = TracerProviderFactory.build(config) + log_tracing_initialized + provider + end + + def log_tracing_initialized + mode = config.tracing_async ? "async" : "sync" + config.logger.info("Langfuse tracing initialized with OpenTelemetry (#{mode} mode)") + end + + def release_tracer_provider + @tracer_provider_mutex.synchronize do + provider = @tracer_provider + @tracer_provider = nil + provider + end + end + + def observation_tracer + tracer_provider.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) + end + + def observation_mask + config.mask + end + + def observation_owner + self + end + + def ensure_active_observation_owner! + active_client = ClientContext.current_client + return if active_client.nil? || active_client.equal?(self) + + raise ArgumentError, "Active Langfuse observation belongs to a different client" + end + # Build a project-scoped URL, returning nil if project ID is unavailable def project_url(path) pid = project_id diff --git a/lib/langfuse/client_context.rb b/lib/langfuse/client_context.rb new file mode 100644 index 0000000..e4b90b2 --- /dev/null +++ b/lib/langfuse/client_context.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +require "opentelemetry/context" + +module Langfuse + # Tracks the Langfuse client that owns the currently active observation. + # + # @api private + module ClientContext + KEY = OpenTelemetry::Context.create_key("langfuse.client") + private_constant :KEY + + class << self + # @return [Client, NoopObservationClient, nil] + def current_client + OpenTelemetry::Context.current.value(KEY) + end + + # @param client [Client, NoopObservationClient] + # @param context [OpenTelemetry::Context] + # @return [OpenTelemetry::Context] + def context_with_client(client, context: OpenTelemetry::Context.current) + context.set_value(KEY, client) + end + end + end +end diff --git a/lib/langfuse/config.rb b/lib/langfuse/config.rb index 3d98c15..17ceace 100644 --- a/lib/langfuse/config.rb +++ b/lib/langfuse/config.rb @@ -20,6 +20,17 @@ module Langfuse # # rubocop:disable Metrics/ClassLength class Config + SNAPSHOT_DUP_FIELDS = %i[ + public_key + secret_key + base_url + cache_stale_ttl + job_queue + environment + release + ].freeze + private_constant :SNAPSHOT_DUP_FIELDS + # @return [String, nil] Langfuse public API key attr_accessor :public_key @@ -217,6 +228,16 @@ def normalized_stale_ttl cache_stale_ttl == :indefinite ? INDEFINITE_SECONDS : cache_stale_ttl end + # @return [Config] frozen copy safe for long-lived clients + # @api private + def snapshot + duplicate = dup + SNAPSHOT_DUP_FIELDS.each do |field| + duplicate.instance_variable_set(:"@#{field}", snapshot_value(public_send(field))) + end + duplicate.freeze + end + # Set trace sampling rate. # # @param value [Numeric, String] Sampling rate from 0.0 to 1.0 @@ -228,6 +249,14 @@ def sample_rate=(value) private + def snapshot_value(value) + return value.dup.freeze if value.respond_to?(:dup) && value.respond_to?(:freeze) + + value + rescue TypeError + value + end + def default_logger if defined?(Rails) && Rails.respond_to?(:logger) Rails.logger diff --git a/lib/langfuse/dataset_item_client.rb b/lib/langfuse/dataset_item_client.rb index c8df97a..b059f91 100644 --- a/lib/langfuse/dataset_item_client.rb +++ b/lib/langfuse/dataset_item_client.rb @@ -117,7 +117,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block) raise ArgumentError, "block is required" unless block output, trace_id, observation_id, task_error = execute_in_trace(run_name, run_metadata, &block) - Langfuse.force_flush(timeout: FLUSH_TIMEOUT) + @client.force_flush(timeout: FLUSH_TIMEOUT) link(trace_id: trace_id, observation_id: observation_id, run_name: run_name, run_description: run_description, metadata: run_metadata) @@ -131,6 +131,7 @@ def run(run_name:, run_description: nil, run_metadata: nil, &block) def execute_in_trace(run_name, run_metadata, &block) TracedExecution.call( trace_name: "dataset-run-#{run_name}", + client: @client, input: @input, metadata: run_metadata || {}, task: block diff --git a/lib/langfuse/experiment_runner.rb b/lib/langfuse/experiment_runner.rb index c931d97..fb51900 100644 --- a/lib/langfuse/experiment_runner.rb +++ b/lib/langfuse/experiment_runner.rb @@ -32,7 +32,7 @@ def initialize(client:, name:, items:, task:, evaluators: [], run_evaluators: [] @metadata = metadata || {} @description = description @run_name = run_name || "#{name} - #{Time.now.utc.iso8601}" - @logger = Langfuse.configuration.logger + @logger = client.config.logger @dataset_run_id = nil @dataset_id = nil end @@ -76,6 +76,7 @@ def process_item(item, index) def run_task_in_trace(item) TracedExecution.call( trace_name: "experiment-#{@name}", + client: @client, input: item.input, metadata: @metadata, task: ->(_span) { @task.call(item) } @@ -156,7 +157,7 @@ def link_to_dataset_run(item, trace_id, observation_id) def flush_all @client.flush_scores - Langfuse.force_flush(timeout: FLUSH_TIMEOUT) + @client.force_flush(timeout: FLUSH_TIMEOUT) end # Wraps raw hashes into ExperimentItem; passes DatasetItemClient through unchanged. diff --git a/lib/langfuse/noop_observation_client.rb b/lib/langfuse/noop_observation_client.rb new file mode 100644 index 0000000..cc3925e --- /dev/null +++ b/lib/langfuse/noop_observation_client.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module Langfuse + # No-op owner for module-level observations when tracing config is incomplete. + # + # @api private + class NoopObservationClient + include ObservationMethods + + # @return [Config] configuration used for masking and logging + attr_reader :config + + # @param config [Config] global configuration snapshot + # @return [NoopObservationClient] + def initialize(config) + @config = config + @tracer_provider = OpenTelemetry::Trace::TracerProvider.new + end + + # @param trace_id [String] ignored trace ID + # @return [nil] + def trace_url(_trace_id) + nil + end + + # @return [nil] + def create_score(**) + nil + end + + # @return [nil] + def score_active_observation(**) + nil + end + + # @return [nil] + def score_active_trace(**) + nil + end + + # @return [nil] + def flush_scores + nil + end + + # @return [nil] + def force_flush(**_kwargs) + nil + end + + # @return [nil] + def shutdown(**_kwargs) + nil + end + + private + + def observation_tracer + @tracer_provider.tracer(LANGFUSE_TRACER_NAME, Langfuse::VERSION) + end + + def observation_mask + config.mask + end + + def observation_owner + self + end + end +end diff --git a/lib/langfuse/observation_methods.rb b/lib/langfuse/observation_methods.rb new file mode 100644 index 0000000..b555638 --- /dev/null +++ b/lib/langfuse/observation_methods.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true + +module Langfuse + # Shared observation factory methods for the singleton facade and explicit clients. + # + # @api private + module ObservationMethods + # Registry mapping observation type strings to their wrapper classes. + OBSERVATION_TYPE_REGISTRY = { + OBSERVATION_TYPES[:generation] => Generation, + OBSERVATION_TYPES[:embedding] => Embedding, + OBSERVATION_TYPES[:event] => Event, + OBSERVATION_TYPES[:agent] => Agent, + OBSERVATION_TYPES[:tool] => Tool, + OBSERVATION_TYPES[:chain] => Chain, + OBSERVATION_TYPES[:retriever] => Retriever, + OBSERVATION_TYPES[:evaluator] => Evaluator, + OBSERVATION_TYPES[:guardrail] => Guardrail, + OBSERVATION_TYPES[:span] => Span + }.freeze + + # @param name [String] Descriptive name for the observation + # @param attrs [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Observation attributes + # @param as_type [Symbol, String] Observation type (:span, :generation, :event, etc.) + # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID. + # @param parent_span_context [OpenTelemetry::Trace::SpanContext, nil] Parent span context + # @param start_time [Time, Integer, nil] Optional start time + # @param skip_validation [Boolean] Skip validation for internal child factories + # @return [BaseObservation] The observation wrapper + # @raise [ArgumentError] if observation type or trace context arguments are invalid + # rubocop:disable Metrics/ParameterLists + def start_observation(name, attrs = {}, as_type: :span, trace_id: nil, parent_span_context: nil, + start_time: nil, skip_validation: false) + parent_span_context = resolve_trace_context(trace_id, parent_span_context) + type_str = as_type.to_s + validate_observation_type!(as_type, type_str) unless skip_validation + + otel_tracer = observation_tracer + otel_span = create_otel_span( + name: name, + start_time: start_time, + parent_span_context: parent_span_context, + otel_tracer: otel_tracer + ) + apply_observation_attributes(otel_span, type_str, attrs) + finish_event_if_needed(wrap_otel_span(otel_span, type_str, otel_tracer), type_str) + end + # rubocop:enable Metrics/ParameterLists + + # @param name [String] Descriptive name for the observation + # @param attrs [Hash] Observation attributes + # @param as_type [Symbol, String] Observation type + # @param trace_id [String, nil] Optional 32-char lowercase hex trace ID. + # @param kwargs [Hash] Additional observation attributes + # @yield [observation] Optional block that receives the observation + # @return [BaseObservation, Object] The observation or block return value + # @raise [ArgumentError] if an invalid `trace_id` is provided + def observe(name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block) + merged_attrs = attrs.to_h.merge(kwargs) + observation = start_observation(name, merged_attrs, as_type: as_type, trace_id: trace_id) + return observation unless block + + observation.send(:run_in_context, &block) + end + + private + + def resolve_trace_context(trace_id, parent_span_context) + return parent_span_context unless trace_id + raise ArgumentError, "Cannot specify both trace_id and parent_span_context" if parent_span_context + + TraceId.send(:to_span_context, trace_id) + end + + def validate_observation_type!(as_type, type_str) + return if valid_observation_type?(as_type) + + valid_types = OBSERVATION_TYPES.values.sort.join(", ") + raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}" + end + + def apply_observation_attributes(otel_span, type_str, attrs) + return unless otel_span.recording? + + otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h, mask: observation_mask) + otel_attrs.each { |key, value| otel_span.set_attribute(key, value) } + end + + def valid_observation_type?(type) + return false unless type.respond_to?(:to_sym) + + OBSERVATION_TYPES.key?(type.to_sym) + rescue TypeError + false + end + + def create_otel_span(name:, otel_tracer:, start_time: nil, parent_span_context: nil) + return otel_tracer.start_span(name, start_timestamp: start_time) unless parent_span_context + + parent_span = OpenTelemetry::Trace.non_recording_span(parent_span_context) + parent_context = OpenTelemetry::Trace.context_with_span(parent_span) + OpenTelemetry::Context.with_current(parent_context) do + otel_tracer.start_span(name, start_timestamp: start_time) + end + end + + def wrap_otel_span(otel_span, type_str, otel_tracer) + observation_class = OBSERVATION_TYPE_REGISTRY[type_str] || Span + observation_class.new(otel_span, otel_tracer, client: observation_owner) + end + + def finish_event_if_needed(observation, type_str) + observation.end if type_str == OBSERVATION_TYPES[:event] + observation + end + end +end diff --git a/lib/langfuse/observations.rb b/lib/langfuse/observations.rb index e5109e0..8c18c46 100644 --- a/lib/langfuse/observations.rb +++ b/lib/langfuse/observations.rb @@ -58,6 +58,46 @@ module Langfuse # # @abstract Subclass and pass type: to super to create concrete observation types class BaseObservation + CLIENT_FALLBACK_DEPRECATION = + "Langfuse observation constructors without client: are deprecated and will require client: " \ + "in the next major release. SDK factories already pass the owning client." + private_constant :CLIENT_FALLBACK_DEPRECATION + + class << self + # @param client [Client, NoopObservationClient, nil] + # @return [Client, NoopObservationClient] + # @api private + def resolve_client(client) + return client if client + + warn_client_fallback_once + Langfuse.observation_client + end + + # @return [void] + # @api private + def reset_deprecation_warnings! + client_fallback_warning_mutex.synchronize do + @client_fallback_warning_emitted = false + end + end + + private + + def warn_client_fallback_once + client_fallback_warning_mutex.synchronize do + return if @client_fallback_warning_emitted + + Langfuse.configuration.logger.warn(CLIENT_FALLBACK_DEPRECATION) + @client_fallback_warning_emitted = true + end + end + + def client_fallback_warning_mutex + @client_fallback_warning_mutex ||= Mutex.new + end + end + # @return [OpenTelemetry::SDK::Trace::Span] The underlying OTel span attr_reader :otel_span @@ -67,13 +107,18 @@ class BaseObservation # @return [String] Observation type (e.g., "span", "generation", "event") attr_reader :type + # @return [Client, NoopObservationClient] Owner used for child observations, URLs, masking, and scores + attr_reader :client + # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, Types::GenerationAttributes, nil] Optional initial attributes + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated # @param type [String] Observation type (e.g., "span", "generation", "event") - def initialize(otel_span, otel_tracer, attributes: nil, type: nil) + def initialize(otel_span, otel_tracer, attributes: nil, client: nil, type: nil) @otel_span = otel_span @otel_tracer = otel_tracer + @client = BaseObservation.resolve_client(client) @type = type || raise(ArgumentError, "type must be provided") # Set initial attributes if provided @@ -99,7 +144,7 @@ def trace_id # puts "View trace: #{obs.trace_url}" # end def trace_url - Langfuse.client.trace_url(trace_id) + @client.trace_url(trace_id) end # Ends the observation span. @@ -118,7 +163,7 @@ def end(end_time: nil) def update_trace(attrs) return self unless @otel_span.recording? - otel_attrs = OtelAttributes.create_trace_attributes(attrs.to_h, mask: Langfuse.configuration.mask) + otel_attrs = OtelAttributes.create_trace_attributes(attrs.to_h, mask: @client.config.mask) otel_attrs.each { |key, value| @otel_span.set_attribute(key, value) } self end @@ -134,7 +179,7 @@ def update_trace(attrs) # @return [BaseObservation, Object] The child observation (or block return value if block given) def start_observation(name, attrs = {}, as_type: :span, &block) # Skip validation so unknown types fall back to Span in the factory. - child = Langfuse.start_observation( + child = @client.start_observation( name, attrs, as_type: as_type, @@ -181,7 +226,7 @@ def level=(value) # @param level [String] Log level (debug, default, warning, error) # @return [void] def event(name:, input: nil, level: "default") - masked_input = Masking.apply(input, mask: Langfuse.configuration.mask) + masked_input = Masking.apply(input, mask: @client.config.mask) attributes = { OtelAttributes::OBSERVATION_INPUT => masked_input&.to_json, OtelAttributes::OBSERVATION_LEVEL => level @@ -204,7 +249,7 @@ def current_span # @param data_type [Symbol] one of :numeric, :boolean, :categorical # @return [Hash] created score data from the API def score_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) - Langfuse.create_score( + @client.create_score( name: name, value: value, trace_id: trace_id, @@ -228,7 +273,7 @@ def update_observation_attributes(attrs = {}, **kwargs) attrs_hash = attrs.to_h.merge(kwargs) # Use @type instance variable set during initialization - otel_attrs = OtelAttributes.create_observation_attributes(type, attrs_hash, mask: Langfuse.configuration.mask) + otel_attrs = OtelAttributes.create_observation_attributes(type, attrs_hash, mask: @client.config.mask) otel_attrs.each { |key, value| @otel_span.set_attribute(key, value) } end @@ -254,7 +299,8 @@ def normalize_prompt(prompt) def run_in_context parent_ctx = OpenTelemetry::Context.current span_ctx = OpenTelemetry::Trace.context_with_span(@otel_span, parent_context: parent_ctx) - OpenTelemetry::Context.with_current(span_ctx) { yield self } + client_ctx = ClientContext.context_with_client(@client, context: span_ctx) + OpenTelemetry::Context.with_current(client_ctx) { yield self } ensure safe_end end @@ -286,8 +332,9 @@ class Span < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:span]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:span]) end # @param attrs [Hash, Types::SpanAttributes] Span attributes to set @@ -356,8 +403,9 @@ class Generation < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::GenerationAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:generation]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:generation]) end # @param attrs [Hash, Types::GenerationAttributes] Generation attributes to set @@ -392,8 +440,9 @@ class Event < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:event]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:event]) end # @param attrs [Hash, Types::SpanAttributes] Event attributes to set @@ -429,8 +478,9 @@ class Agent < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:agent]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:agent]) end # @param attrs [Hash, Types::AgentAttributes] Agent attributes to set @@ -462,8 +512,9 @@ class Tool < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:tool]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:tool]) end # @param attrs [Hash, Types::ToolAttributes] Tool attributes to set @@ -504,8 +555,9 @@ class Chain < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:chain]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:chain]) end # @param attrs [Hash, Types::ChainAttributes] Chain attributes to set @@ -540,8 +592,9 @@ class Retriever < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:retriever]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:retriever]) end # @param attrs [Hash, Types::RetrieverAttributes] Retriever attributes to set @@ -576,8 +629,9 @@ class Evaluator < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:evaluator]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:evaluator]) end # @param attrs [Hash, Types::EvaluatorAttributes] Evaluator attributes to set @@ -612,8 +666,9 @@ class Guardrail < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::SpanAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:guardrail]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:guardrail]) end # @param attrs [Hash, Types::GuardrailAttributes] Guardrail attributes to set @@ -655,8 +710,9 @@ class Embedding < BaseObservation # @param otel_span [OpenTelemetry::SDK::Trace::Span] The underlying OTel span # @param otel_tracer [OpenTelemetry::SDK::Trace::Tracer] The OTel tracer # @param attributes [Hash, Types::EmbeddingAttributes, nil] Optional initial attributes - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: OBSERVATION_TYPES[:embedding]) + # @param client [Client, NoopObservationClient, nil] Owner for follow-up operations; omitting is deprecated + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: OBSERVATION_TYPES[:embedding]) end # @param attrs [Hash, Types::EmbeddingAttributes] Embedding attributes to set diff --git a/lib/langfuse/otel_setup.rb b/lib/langfuse/otel_setup.rb index e4bfaa1..8e52e6a 100644 --- a/lib/langfuse/otel_setup.rb +++ b/lib/langfuse/otel_setup.rb @@ -1,185 +1,99 @@ # frozen_string_literal: true -require "opentelemetry/sdk" -require "opentelemetry/exporter/otlp" -require "base64" - module Langfuse - # OpenTelemetry initialization and setup for Langfuse tracing. - # rubocop:disable Metrics/ModuleLength + # Deprecated compatibility wrapper for the former OpenTelemetry setup module. + # + # New code should use the client-owned tracer provider directly: + # `Langfuse.tracer_provider` for the global client, or + # `Langfuse::Client.new(config).tracer_provider` for explicit clients. module OtelSetup - TRACING_CONFIG_FIELDS = %i[ - public_key - secret_key - base_url - environment - release - sample_rate - should_export_span - tracing_async - batch_size - flush_interval - ].freeze - private_constant(:TRACING_CONFIG_FIELDS) + DEPRECATION_WARNING = + "Langfuse::OtelSetup is deprecated and will be removed in the next major release. " \ + "Use Langfuse.tracer_provider, Langfuse.force_flush, Langfuse.shutdown, or an explicit " \ + "Langfuse::Client instead." + private_constant :DEPRECATION_WARNING class << self - # @return [OpenTelemetry::SDK::Trace::TracerProvider, nil] The configured internal tracer provider - attr_reader :tracer_provider - - # Initialize Langfuse's internal tracer provider without mutating global OpenTelemetry state. + # Initialize and return the global Langfuse tracer provider. # - # @param config [Langfuse::Config] The Langfuse configuration + # @param _config [Langfuse::Config] ignored compatibility argument # @return [OpenTelemetry::SDK::Trace::TracerProvider] - def setup(config) - validate_tracing_config!(config) - return existing_provider_for(config) if initialized? - - candidate_provider = nil - provider = nil - created = false - candidate_provider = build_tracer_provider(config) - provider, created = publish_provider(candidate_provider, tracing_config_snapshot(config)) - unless created - candidate_provider.shutdown(timeout: 30) - return existing_provider_for(config) - end + # @raise [ConfigurationError] if global tracing configuration is incomplete + def setup(_config = Langfuse.configuration) + warn_deprecated_once + Langfuse.tracer_provider + end - log_initialized(config) - provider - rescue StandardError - rollback_provider(provider) if created - raise + # Return the already-initialized global Langfuse tracer provider. + # + # @return [OpenTelemetry::SDK::Trace::TracerProvider, nil] + # @raise [void] + def tracer_provider + warn_deprecated_once + current_tracer_provider end - # Shutdown the internal tracer provider and flush any pending spans. + # Shutdown the global Langfuse client. # - # @param timeout [Integer] Timeout in seconds + # @param timeout [Integer] timeout in seconds # @return [void] + # @raise [void] def shutdown(timeout: 30) - provider = nil - setup_mutex.synchronize do - provider = @tracer_provider - @tracer_provider = nil - @config_snapshot = nil - end - provider&.shutdown(timeout: timeout) + warn_deprecated_once + Langfuse.shutdown(timeout: timeout) end - # Force flush all pending spans on the internal tracer provider. + # Force flush the global Langfuse client. # - # @param timeout [Integer] Timeout in seconds + # @param timeout [Integer] timeout in seconds # @return [void] + # @raise [void] def force_flush(timeout: 30) - @tracer_provider&.force_flush(timeout: timeout) + warn_deprecated_once + Langfuse.force_flush(timeout: timeout) end - # Check if Langfuse tracing has been initialized. + # Check whether the global Langfuse tracer provider is initialized. # # @return [Boolean] + # @raise [void] def initialized? - !@tracer_provider.nil? + warn_deprecated_once + !current_tracer_provider.nil? end - private - - def existing_provider_for(config) - snapshot = tracing_config_snapshot(config) - if @config_snapshot == snapshot - config.logger.debug("Langfuse tracing already initialized; reusing existing tracer provider") - else - config.logger.warn( - "Langfuse tracing is already initialized. Changes to #{TRACING_CONFIG_FIELDS.join(', ')} " \ - "require Langfuse.reset! before they take effect." - ) - end - @tracer_provider - end - - def publish_provider(provider, snapshot) - created = false - current = nil - - # This mutex only guards publication so setup never exposes a half-built provider. - setup_mutex.synchronize do - if @tracer_provider - current = @tracer_provider - else - @tracer_provider = provider - @config_snapshot = snapshot - current = provider - created = true - end - end - - [current, created] - end - - def rollback_provider(provider) - setup_mutex.synchronize do - return unless @tracer_provider.equal?(provider) - - @tracer_provider = nil - @config_snapshot = nil + # @return [void] + # @api private + def reset_deprecation_warning! + warning_mutex.synchronize do + @deprecation_warning_emitted = false end - provider.shutdown(timeout: 1) - rescue StandardError - nil - end - - def build_tracer_provider(config) - provider = OpenTelemetry::SDK::Trace::TracerProvider.new( - sampler: build_sampler(config.sample_rate) - ) - provider.add_span_processor( - SpanProcessor.new(config: config, exporter: build_exporter(config)) - ) - provider - end - - def build_exporter(config) - OpenTelemetry::Exporter::OTLP::Exporter.new( - endpoint: "#{config.base_url}/api/public/otel/v1/traces", - headers: build_headers(config.public_key, config.secret_key), - compression: "gzip" - ) - end - - def log_initialized(config) - mode = config.tracing_async ? "async" : "sync" - config.logger.info("Langfuse tracing initialized with OpenTelemetry (#{mode} mode)") end - def validate_tracing_config!(config) - raise ConfigurationError, "public_key is required" if blank?(config.public_key) - raise ConfigurationError, "secret_key is required" if blank?(config.secret_key) - raise ConfigurationError, "base_url cannot be empty" if blank?(config.base_url) - return if config.should_export_span.nil? || config.should_export_span.respond_to?(:call) + private - raise ConfigurationError, "should_export_span must respond to #call" + def current_tracer_provider + current_client&.instance_variable_get(:@tracer_provider) end - def tracing_config_snapshot(config) - TRACING_CONFIG_FIELDS.to_h { |field| [field, config.public_send(field)] }.freeze + def current_client + Langfuse.instance_variable_get(:@client) end - def setup_mutex - @setup_mutex ||= Mutex.new - end + def warn_deprecated_once + return if @deprecation_warning_emitted - def blank?(value) - value.nil? || value.empty? - end + warning_mutex.synchronize do + return if @deprecation_warning_emitted - def build_headers(public_key, secret_key) - credentials = "#{public_key}:#{secret_key}" - encoded = Base64.strict_encode64(credentials) - { "Authorization" => "Basic #{encoded}" } + Langfuse.configuration.logger.warn(DEPRECATION_WARNING) + @deprecation_warning_emitted = true + end end - def build_sampler(sample_rate) - Sampling.build_sampler(sample_rate) || OpenTelemetry::SDK::Trace::Samplers::ALWAYS_ON + def warning_mutex + @warning_mutex ||= Mutex.new end end end - # rubocop:enable Metrics/ModuleLength end diff --git a/lib/langfuse/traced_execution.rb b/lib/langfuse/traced_execution.rb index 9bc9182..a3cba1f 100644 --- a/lib/langfuse/traced_execution.rb +++ b/lib/langfuse/traced_execution.rb @@ -12,18 +12,19 @@ module TracedExecution # Execute a task proc within a traced observe block. # # @param trace_name [String] name for the observe span + # @param client [Client] Langfuse client that owns the trace # @param input [Object] input set on the trace # @param metadata [Hash] metadata set on the trace # @param task [Proc] the callable to execute — receives the span # @yield [span, trace_id] optional pre-task hook (e.g., dataset run linking) # @return [Array<(Object, String, String, StandardError | nil)>] output, trace_id, observation_id, error - def self.call(trace_name:, input:, task:, metadata: {}) + def self.call(trace_name:, input:, task:, client: Langfuse.observation_client, metadata: {}) output = nil trace_id = nil observation_id = nil task_error = nil - Langfuse.observe(trace_name) do |span| + client.observe(trace_name) do |span| trace_id = span.trace_id observation_id = span.id span.update_trace(input: input, metadata: metadata) diff --git a/lib/langfuse/tracer_provider_factory.rb b/lib/langfuse/tracer_provider_factory.rb new file mode 100644 index 0000000..d2acb50 --- /dev/null +++ b/lib/langfuse/tracer_provider_factory.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require "base64" +require "opentelemetry/sdk" +require "opentelemetry/exporter/otlp" + +module Langfuse + # Builds isolated OpenTelemetry tracer providers for Langfuse clients. + # + # @api private + module TracerProviderFactory + class << self + # @param config [Langfuse::Config] SDK configuration used for tracing + # @param exporter [#export, #force_flush, #shutdown, nil] optional test/export override + # @return [OpenTelemetry::SDK::Trace::TracerProvider] + # @raise [ConfigurationError] if tracing configuration is incomplete + def build(config, exporter: nil) + validate_tracing_config!(config) + + provider = OpenTelemetry::SDK::Trace::TracerProvider.new( + sampler: build_sampler(config.sample_rate) + ) + provider.add_span_processor( + SpanProcessor.new(config: config, exporter: exporter || build_exporter(config)) + ) + provider + end + + # @param config [Langfuse::Config] SDK configuration used for exporter auth + # @return [OpenTelemetry::Exporter::OTLP::Exporter] + def build_exporter(config) + OpenTelemetry::Exporter::OTLP::Exporter.new( + endpoint: "#{config.base_url}/api/public/otel/v1/traces", + headers: build_headers(config.public_key, config.secret_key), + compression: "gzip" + ) + end + + # @param config [Langfuse::Config] SDK configuration used for tracing + # @return [void] + # @raise [ConfigurationError] if tracing configuration is incomplete + def validate_tracing_config!(config) + raise ConfigurationError, "public_key is required" if blank?(config.public_key) + raise ConfigurationError, "secret_key is required" if blank?(config.secret_key) + raise ConfigurationError, "base_url cannot be empty" if blank?(config.base_url) + return if config.should_export_span.nil? || config.should_export_span.respond_to?(:call) + + raise ConfigurationError, "should_export_span must respond to #call" + end + + private + + def blank?(value) + value.nil? || value.empty? + end + + def build_headers(public_key, secret_key) + credentials = "#{public_key}:#{secret_key}" + encoded = Base64.strict_encode64(credentials) + { "Authorization" => "Basic #{encoded}" } + end + + def build_sampler(sample_rate) + Sampling.build_sampler(sample_rate) || OpenTelemetry::SDK::Trace::Samplers::ALWAYS_ON + end + end + end +end diff --git a/spec/langfuse/agent_spec.rb b/spec/langfuse/agent_spec.rb index 4e7e642..507aa41 100644 --- a/spec/langfuse/agent_spec.rb +++ b/spec/langfuse/agent_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-agent") } - let(:agent) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:agent) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'agent'" do @@ -48,7 +49,7 @@ describe "integration with Span via start_observation" do it "creates agent as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) agent_obj = parent_observation.start_observation("nested-agent", { input: { task: "test" } }, as_type: :agent) expect(agent_obj).to be_a(described_class) @@ -102,7 +103,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { task: "research" }, output: { completed: true }, level: "DEFAULT" } - agent_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + agent_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = agent_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "task" => "research" }) diff --git a/spec/langfuse/base_observation_spec.rb b/spec/langfuse/base_observation_spec.rb index 62c97fb..3034130 100644 --- a/spec/langfuse/base_observation_spec.rb +++ b/spec/langfuse/base_observation_spec.rb @@ -7,8 +7,8 @@ # Test subclass that passes type to super let(:test_subclass) do Class.new(Langfuse::BaseObservation) do - def initialize(otel_span, otel_tracer, attributes: nil) - super(otel_span, otel_tracer, attributes: attributes, type: "test_observation") + def initialize(otel_span, otel_tracer, attributes: nil, client: nil) + super(otel_span, otel_tracer, attributes: attributes, client: client, type: "test_observation") end end end @@ -16,7 +16,8 @@ def initialize(otel_span, otel_tracer, attributes: nil) let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-span") } - let(:observation) { test_subclass.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:observation) { test_subclass.new(otel_span, otel_tracer, client: client) } describe "#initialize" do it "stores otel_span and otel_tracer" do @@ -25,14 +26,33 @@ def initialize(otel_span, otel_tracer, attributes: nil) end it "initializes without attributes" do - obs = test_subclass.new(otel_span, otel_tracer) + obs = test_subclass.new(otel_span, otel_tracer, client: client) expect(obs.otel_span).to eq(otel_span) expect(obs.otel_tracer).to eq(otel_tracer) end + it "falls back to the module observation client with one deprecation warning" do + logger = instance_double(Logger) + allow(Langfuse.configuration).to receive(:logger).and_return(logger) + + expect(logger).to receive(:warn).once.with(/constructors without client: are deprecated/) + + first = test_subclass.new(otel_span, otel_tracer) + second = test_subclass.new(otel_tracer.start_span("second"), otel_tracer) + + expect(first.client).to equal(Langfuse.observation_client) + expect(second.client).to equal(Langfuse.observation_client) + end + + it "does not warn when an explicit client is provided" do + expect(Langfuse.configuration.logger).not_to receive(:warn) + + test_subclass.new(otel_span, otel_tracer, client: client) + end + it "sets initial attributes when provided" do attrs = { input: { query: "test" }, output: { result: "success" } } - obs = test_subclass.new(otel_span, otel_tracer, attributes: attrs) + obs = test_subclass.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = obs.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "query" => "test" }) @@ -44,7 +64,7 @@ def initialize(otel_span, otel_tracer, attributes: nil) input: { data: "test" }, level: "DEFAULT" ) - obs = test_subclass.new(otel_span, otel_tracer, attributes: attrs) + obs = test_subclass.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = obs.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "data" => "test" }) @@ -87,12 +107,14 @@ def initialize(otel_span, otel_tracer, attributes: nil) it "raises ArgumentError if type is not provided" do abstract_class = Class.new(described_class) - expect { abstract_class.new(otel_span, otel_tracer) }.to raise_error(ArgumentError, /type must be provided/) + expect do + abstract_class.new(otel_span, otel_tracer, client: client) + end.to raise_error(ArgumentError, /type must be provided/) end it "returns the type passed to initialize" do abstract_class = Class.new(described_class) - obs = abstract_class.new(otel_span, otel_tracer, type: "custom_type") + obs = abstract_class.new(otel_span, otel_tracer, client: client, type: "custom_type") expect(obs.type).to eq("custom_type") end @@ -321,7 +343,7 @@ def initialize(otel_span, otel_tracer, attributes: nil) it "handles different level values" do %w[DEBUG DEFAULT WARNING ERROR].each do |level| - obs = test_subclass.new(otel_tracer.start_span("test"), otel_tracer) + obs = test_subclass.new(otel_tracer.start_span("test"), otel_tracer, client: client) obs.level = level span_data = obs.otel_span.to_span_data expect(span_data.attributes["langfuse.observation.level"]).to eq(level) @@ -618,7 +640,7 @@ def version describe "hierarchical structure" do it "creates nested observations" do parent_span = otel_tracer.start_span("parent") - parent_obs = Langfuse::Span.new(parent_span, otel_tracer) + parent_obs = Langfuse::Span.new(parent_span, otel_tracer, client: client) parent_obs.start_observation("level-1") do |span1| span1.start_observation("level-2") do |span2| @@ -632,7 +654,7 @@ def version it "shares trace_id across nested observations" do parent_span = otel_tracer.start_span("parent") - parent_obs = Langfuse::Span.new(parent_span, otel_tracer) + parent_obs = Langfuse::Span.new(parent_span, otel_tracer, client: client) trace_id = parent_obs.trace_id child = parent_obs.start_observation("child") @@ -646,7 +668,7 @@ def version describe "integration with real OpenTelemetry spans" do it "works with actual span lifecycle" do parent_span = otel_tracer.start_span("parent") - parent_obs = Langfuse::Span.new(parent_span, otel_tracer) + parent_obs = Langfuse::Span.new(parent_span, otel_tracer, client: client) child = parent_obs.start_observation("child-operation", { input: { data: "test" } }) child.output = { result: "success" } @@ -711,37 +733,43 @@ def version end it "calls client.trace_url with correct trace_id" do - mock_client = instance_double(Langfuse::Client) - allow(Langfuse).to receive(:client).and_return(mock_client) - trace_id = observation.trace_id + mock_client = instance_double(Langfuse::Client, config: Langfuse.configuration) + obs = test_subclass.new(otel_span, otel_tracer, client: mock_client) + trace_id = obs.trace_id expect(mock_client).to receive(:trace_url).with(trace_id).and_return("https://example.com/traces/#{trace_id}") - observation.trace_url + obs.trace_url end end describe "#score_trace" do - it "delegates to Langfuse.create_score with trace_id" do - expect(Langfuse).to receive(:create_score).with( + let(:mock_client) { instance_double(Langfuse::Client, config: Langfuse.configuration) } + + it "delegates to the owning client with trace_id" do + owned_observation = test_subclass.new(otel_span, otel_tracer, client: mock_client) + + expect(mock_client).to receive(:create_score).with( name: "quality", value: 0.9, - trace_id: observation.trace_id, + trace_id: owned_observation.trace_id, comment: "good", metadata: { k: "v" }, data_type: :numeric ) - observation.score_trace( + owned_observation.score_trace( name: "quality", value: 0.9, comment: "good", metadata: { k: "v" } ) end it "defaults data_type to :numeric" do - expect(Langfuse).to receive(:create_score).with( + owned_observation = test_subclass.new(otel_span, otel_tracer, client: mock_client) + + expect(mock_client).to receive(:create_score).with( hash_including(data_type: :numeric) ) - observation.score_trace(name: "score", value: 1.0) + owned_observation.score_trace(name: "score", value: 1.0) end end diff --git a/spec/langfuse/chain_spec.rb b/spec/langfuse/chain_spec.rb index b233bab..791184a 100644 --- a/spec/langfuse/chain_spec.rb +++ b/spec/langfuse/chain_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-chain") } - let(:chain) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:chain) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'chain'" do @@ -48,7 +49,7 @@ describe "integration with Span via start_observation" do it "creates chain as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) chain_obj = parent_observation.start_observation("nested-chain", { input: { query: "test" } }, as_type: :chain) expect(chain_obj).to be_a(described_class) @@ -102,7 +103,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { query: "test" }, output: { steps_completed: 3 }, level: "DEFAULT" } - chain_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + chain_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = chain_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "query" => "test" }) diff --git a/spec/langfuse/client_context_spec.rb b/spec/langfuse/client_context_spec.rb new file mode 100644 index 0000000..7416a8e --- /dev/null +++ b/spec/langfuse/client_context_spec.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +require "spec_helper" +require "stringio" + +RSpec.describe "Langfuse active client context" do + def build_client(suffix) + config = Langfuse::Config.new do |c| + c.public_key = "pk_#{suffix}" + c.secret_key = "sk_#{suffix}" + c.base_url = "https://cloud.langfuse.com" + c.tracing_async = false + c.flush_interval = 0 + c.logger = Logger.new(StringIO.new) + end + client = Langfuse::Client.new(config) + constructed_clients << client + client + end + + def score_client(client) + client.instance_variable_get(:@score_client) + end + + def constructed_clients + @constructed_clients ||= [] + end + + let(:client_a) { build_client("a") } + let(:client_b) { build_client("b") } + + after do + constructed_clients.each(&:shutdown) + rescue StandardError + nil + end + + it "routes module-level active trace scores through the active observation owner" do + expect(score_client(client_a)).to receive(:score_active_trace).with( + name: "owner-a", value: 1, comment: nil, metadata: nil, data_type: :numeric + ) + expect(score_client(client_b)).to receive(:score_active_trace).with( + name: "owner-b", value: 2, comment: nil, metadata: nil, data_type: :numeric + ) + + client_a.observe("trace-a") { Langfuse.score_active_trace(name: "owner-a", value: 1) } + client_b.observe("trace-b") { Langfuse.score_active_trace(name: "owner-b", value: 2) } + end + + it "routes module-level active observation scores through the active observation owner" do + expect(score_client(client_a)).to receive(:score_active_observation).with( + name: "observation-owner", value: 0.9, comment: nil, metadata: nil, data_type: :numeric + ) + + client_a.observe("observation-a") do + Langfuse.score_active_observation(name: "observation-owner", value: 0.9) + end + end + + it "raises when an explicit client scores another client's active observation" do + expect do + client_b.observe("trace-b") do + client_a.score_active_trace(name: "wrong-owner", value: 1) + end + end.to raise_error(ArgumentError, /different client/) + end + + it "allows explicit clients to score raw OpenTelemetry active spans" do + tracer = OpenTelemetry.tracer_provider.tracer("raw") + span = tracer.start_span("raw-span") + + expect(score_client(client_a)).to receive(:score_active_trace).with( + name: "raw-owner", value: 1, comment: nil, metadata: nil, data_type: :numeric + ) + + OpenTelemetry::Context.with_current(OpenTelemetry::Trace.context_with_span(span)) do + client_a.score_active_trace(name: "raw-owner", value: 1) + end + ensure + span&.finish + end + + it "preserves singleton raw OpenTelemetry fallback with a deprecation warning" do + tracer = OpenTelemetry.tracer_provider.tracer("raw") + span = tracer.start_span("raw-span") + singleton_score_client = score_client(Langfuse.client) + + expect(Langfuse.configuration.logger).to receive(:warn).with(/raw OpenTelemetry fallback/) + expect(singleton_score_client).to receive(:score_active_trace).with( + name: "singleton-raw", value: 1, comment: nil, metadata: nil, data_type: :numeric + ) + + OpenTelemetry::Context.with_current(OpenTelemetry::Trace.context_with_span(span)) do + Langfuse.score_active_trace(name: "singleton-raw", value: 1) + end + ensure + span&.finish + end +end diff --git a/spec/langfuse/client_spec.rb b/spec/langfuse/client_spec.rb index 6903f0d..418bc09 100644 --- a/spec/langfuse/client_spec.rb +++ b/spec/langfuse/client_spec.rb @@ -15,9 +15,13 @@ expect(client).to be_a(described_class) end - it "sets the config" do + it "snapshots the config" do client = described_class.new(valid_config) - expect(client.config).to eq(valid_config) + expect(client.config).not_to equal(valid_config) + expect(client.config).to be_frozen + expect(client.config.public_key).to eq(valid_config.public_key) + expect(client.config.secret_key).to eq(valid_config.secret_key) + expect(client.config.base_url).to eq(valid_config.base_url) end it "creates an api_client" do @@ -2447,6 +2451,166 @@ def self.cache end end + describe "client-owned tracing" do + let(:client) { described_class.new(valid_config) } + + after do + client.shutdown + rescue StandardError + nil + end + + it "memoizes an isolated tracer provider per client" do + other_config = Langfuse::Config.new do |config| + config.public_key = "pk_other" + config.secret_key = "sk_other" + config.base_url = "https://cloud.langfuse.com" + end + other_client = described_class.new(other_config) + + expect(client.tracer_provider).to equal(client.tracer_provider) + expect(other_client.tracer_provider).not_to equal(client.tracer_provider) + ensure + other_client&.shutdown + end + + it "publishes only one tracer provider under concurrent first access" do + calls = Queue.new + allow(Langfuse::TracerProviderFactory).to receive(:build).and_wrap_original do |method, *args, **kwargs| + calls << true + sleep 0.01 + method.call(*args, **kwargs) + end + + providers = Queue.new + threads = 5.times.map { Thread.new { providers << client.tracer_provider } } + threads.each(&:join) + + resolved = 5.times.map { providers.pop } + expect(resolved.map(&:object_id).uniq.length).to eq(1) + expect(calls.size).to eq(1) + end + + it "keeps root and child observations on the explicit client" do + root = client.start_observation("root") + child = root.start_observation("child") + + expect(root.client).to equal(client) + expect(child.client).to equal(client) + expect(child.trace_id).to eq(root.trace_id) + ensure + child&.end + root&.end + end + + it "uses the explicit client's mask for observation attributes" do + Langfuse.configure { |config| config.mask = ->(data:) { "global-#{data}" } } + valid_config.mask = ->(data:) { "client-#{data}" } + + observation = client.start_observation("masked", { input: "secret" }) + span_data = observation.otel_span.to_span_data + + expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq("client-secret") + ensure + observation&.end + end + + it "snapshots API auth and base URL at construction" do + frozen_client = described_class.new(valid_config) + + valid_config.public_key = "pk_mutated" + valid_config.secret_key = "sk_mutated" + valid_config.base_url = "https://mutated.langfuse.test" + + expect(frozen_client.config).to be_frozen + expect(frozen_client.api_client.public_key).to eq("pk_test_123") + expect(frozen_client.api_client.secret_key).to eq("sk_test_456") + expect(frozen_client.api_client.base_url).to eq("https://cloud.langfuse.com") + ensure + frozen_client&.shutdown + end + + it "uses the construction-time trace exporter endpoint and auth" do + config = Langfuse::Config.new do |c| + c.public_key = "pk_original" + c.secret_key = "sk_original" + c.base_url = "https://original.langfuse.test" + c.tracing_async = false + c.flush_interval = 1 + end + frozen_client = described_class.new(config) + auth = "Basic #{Base64.strict_encode64('pk_original:sk_original')}" + + config.public_key = "pk_mutated" + config.secret_key = "sk_mutated" + config.base_url = "https://mutated.langfuse.test" + + stub_request(:post, "https://original.langfuse.test/api/public/otel/v1/traces") + .with(headers: { "Authorization" => auth }) + .to_return(status: 200, body: "", headers: {}) + + observation = frozen_client.start_observation("exported") + observation.end + frozen_client.force_flush(timeout: 1) + + expect(WebMock).to have_requested(:post, "https://original.langfuse.test/api/public/otel/v1/traces").once + expect(WebMock).not_to have_requested(:post, "https://mutated.langfuse.test/api/public/otel/v1/traces") + ensure + observation&.end if observation&.otel_span&.recording? + frozen_client&.shutdown + end + + it "uses construction-time masking and trace defaults" do + config = Langfuse::Config.new do |c| + c.public_key = "pk_original" + c.secret_key = "sk_original" + c.mask = ->(data:) { "original-#{data}" } + c.environment = "original-env" + c.release = "original-release" + c.tracing_async = false + end + frozen_client = described_class.new(config) + + config.mask = ->(data:) { "mutated-#{data}" } + config.environment = "mutated-env" + config.release = "mutated-release" + + observation = frozen_client.start_observation("snapshot", { input: "secret" }) + span_data = observation.otel_span.to_span_data + + expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq("original-secret") + expect(span_data.attributes["langfuse.environment"]).to eq("original-env") + expect(span_data.attributes["langfuse.release"]).to eq("original-release") + ensure + observation&.end + frozen_client&.shutdown + end + + it "uses the construction-time base URL for trace URLs" do + config = Langfuse::Config.new do |c| + c.public_key = "pk_original" + c.secret_key = "sk_original" + c.base_url = "https://original.langfuse.test" + end + frozen_client = described_class.new(config) + config.base_url = "https://mutated.langfuse.test" + + stub_request(:get, "https://original.langfuse.test/api/public/projects") + .to_return( + status: 200, + body: { "data" => [{ "id" => "project-original" }] }.to_json, + headers: { "Content-Type" => "application/json" } + ) + + expect(frozen_client.trace_url("abc123")).to eq( + "https://original.langfuse.test/project/project-original/traces/abc123" + ) + expect(WebMock).not_to have_requested(:get, "https://mutated.langfuse.test/api/public/projects") + ensure + frozen_client&.shutdown + end + end + describe "#run_experiment" do let(:client) { described_class.new(valid_config) } let(:base_url) { valid_config.base_url } diff --git a/spec/langfuse/dataset_item_client_spec.rb b/spec/langfuse/dataset_item_client_spec.rb index 5d093c7..6764b5c 100644 --- a/spec/langfuse/dataset_item_client_spec.rb +++ b/spec/langfuse/dataset_item_client_spec.rb @@ -199,8 +199,11 @@ let(:item_client) { described_class.new(item_data, client: mock_client) } before do + allow(mock_client).to receive(:observe) do |name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block| + Langfuse.observe(name, attrs, as_type: as_type, trace_id: trace_id, **kwargs, &block) + end allow(mock_client).to receive(:create_dataset_run_item) - allow(Langfuse).to receive(:force_flush) + allow(mock_client).to receive(:force_flush) end it "raises ArgumentError when no block given" do @@ -249,7 +252,7 @@ it "calls force_flush before linking" do call_order = [] - allow(Langfuse).to receive(:force_flush) { call_order << :flush } + allow(mock_client).to receive(:force_flush) { call_order << :flush } allow(mock_client).to receive(:create_dataset_run_item) { call_order << :link } item_client.run(run_name: "test") { "output" } expect(call_order).to eq(%i[flush link]) diff --git a/spec/langfuse/embedding_spec.rb b/spec/langfuse/embedding_spec.rb index 176a263..09d8e61 100644 --- a/spec/langfuse/embedding_spec.rb +++ b/spec/langfuse/embedding_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-embedding") } - let(:embedding) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:embedding) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'embedding'" do @@ -50,7 +51,7 @@ describe "integration with Span via start_observation" do it "creates embedding as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) embedding_obj = parent_observation.start_observation("nested-embedding", { input: { texts: ["test"] }, @@ -127,7 +128,7 @@ it "sets initial attributes when provided" do attrs = { input: { texts: ["test"] }, output: { vectors: [[0.1, 0.2]] }, model: "text-embedding-ada-002", usage_details: { prompt_tokens: 10 } } - embedding_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + embedding_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = embedding_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "texts" => ["test"] }) diff --git a/spec/langfuse/evaluator_spec.rb b/spec/langfuse/evaluator_spec.rb index 146ec32..d3b3b46 100644 --- a/spec/langfuse/evaluator_spec.rb +++ b/spec/langfuse/evaluator_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-evaluator") } - let(:evaluator) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:evaluator) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'evaluator'" do @@ -49,7 +50,7 @@ describe "integration with Span via start_observation" do it "creates evaluator as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) evaluator_obj = parent_observation.start_observation("nested-evaluator", { input: { response: "test" } }, as_type: :evaluator) @@ -104,7 +105,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { response: "test" }, output: { overall_score: 0.87 }, level: "DEFAULT" } - evaluator_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + evaluator_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = evaluator_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "response" => "test" }) diff --git a/spec/langfuse/event_spec.rb b/spec/langfuse/event_spec.rb index c73e335..b1ddf4c 100644 --- a/spec/langfuse/event_spec.rb +++ b/spec/langfuse/event_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-event") } - let(:event) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:event) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'event'" do @@ -37,7 +38,7 @@ describe "auto-ending behavior via start_observation" do let(:parent_span) { otel_tracer.start_span("parent") } - let(:parent_observation) { Langfuse::Span.new(parent_span, otel_tracer) } + let(:parent_observation) { Langfuse::Span.new(parent_span, otel_tracer, client: client) } context "when created without block (stateful API)" do it "automatically ends the event" do @@ -92,7 +93,7 @@ describe "integration with Span via start_observation" do it "creates event as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) event_obj = parent_observation.start_observation("nested-event", { input: { data: "test" } }, as_type: :event) expect(event_obj).to be_a(described_class) @@ -102,7 +103,7 @@ it "creates event as child of generation" do parent_span = otel_tracer.start_span("parent-generation") - parent_observation = Langfuse::Generation.new(parent_span, otel_tracer) + parent_observation = Langfuse::Generation.new(parent_span, otel_tracer, client: client) event_obj = parent_observation.start_observation("streaming-event", { input: { chunk: "data" } }, as_type: :event) expect(event_obj).to be_a(described_class) @@ -157,7 +158,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { action: "click" }, output: { success: true }, level: "DEFAULT" } - event_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + event_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = event_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "action" => "click" }) diff --git a/spec/langfuse/experiment_runner_spec.rb b/spec/langfuse/experiment_runner_spec.rb index 33001ab..adfc1d6 100644 --- a/spec/langfuse/experiment_runner_spec.rb +++ b/spec/langfuse/experiment_runner_spec.rb @@ -8,7 +8,12 @@ allow(mock_client).to receive(:create_dataset_run_item) allow(mock_client).to receive(:create_score) allow(mock_client).to receive(:flush_scores) + allow(mock_client).to receive(:force_flush) allow(mock_client).to receive(:dataset_run_url) + allow(mock_client).to receive(:config).and_return(Langfuse.configuration) + allow(mock_client).to receive(:observe) do |name, attrs = {}, as_type: :span, trace_id: nil, **kwargs, &block| + Langfuse.observe(name, attrs, as_type: as_type, trace_id: trace_id, **kwargs, &block) + end allow(Langfuse).to receive(:force_flush) allow(Langfuse.configuration).to receive(:logger).and_return(logger) end @@ -813,7 +818,7 @@ runner.execute # Only the final flush_all call, not once per item - expect(Langfuse).to have_received(:force_flush).once + expect(mock_client).to have_received(:force_flush).once end it "flushes again after run evaluators produce results" do @@ -827,7 +832,7 @@ # Once for post-items flush_all, once for post-run-evaluators flush_all expect(mock_client).to have_received(:flush_scores).twice - expect(Langfuse).to have_received(:force_flush).twice + expect(mock_client).to have_received(:force_flush).twice end end diff --git a/spec/langfuse/guardrail_spec.rb b/spec/langfuse/guardrail_spec.rb index bac04f6..19afea8 100644 --- a/spec/langfuse/guardrail_spec.rb +++ b/spec/langfuse/guardrail_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-guardrail") } - let(:guardrail) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:guardrail) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'guardrail'" do @@ -49,7 +50,7 @@ describe "integration with Span via start_observation" do it "creates guardrail as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) guardrail_obj = parent_observation.start_observation("nested-guardrail", { input: { content: "test" } }, as_type: :guardrail) @@ -104,7 +105,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { content: "test" }, output: { safe: true }, level: "DEFAULT" } - guardrail_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + guardrail_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = guardrail_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "content" => "test" }) diff --git a/spec/langfuse/otel_setup_spec.rb b/spec/langfuse/otel_setup_spec.rb index 7f76798..a86ab63 100644 --- a/spec/langfuse/otel_setup_spec.rb +++ b/spec/langfuse/otel_setup_spec.rb @@ -3,257 +3,71 @@ require "spec_helper" RSpec.describe Langfuse::OtelSetup do - let(:logger) { instance_double(Logger, info: nil, debug: nil, warn: nil) } - let(:exporter) { OpenTelemetry::SDK::Trace::Export::InMemorySpanExporter.new } - let(:config) do - Langfuse::Config.new do |c| - c.public_key = "pk_test_123" - c.secret_key = "sk_test_456" - c.base_url = "https://api.langfuse.test" - c.tracing_async = false - c.batch_size = 10 - c.flush_interval = 1 - c.logger = logger - end - end + let(:logger) { instance_double(Logger, info: nil, warn: nil) } before do - described_class.shutdown(timeout: 1) if described_class.initialized? - allow(described_class).to receive(:build_exporter).and_return(exporter) + Langfuse.reset! + Langfuse.configure do |config| + config.public_key = "pk_test" + config.secret_key = "sk_test" + config.base_url = "https://cloud.langfuse.com" + config.logger = logger + end end after do - described_class.shutdown(timeout: 1) if described_class.initialized? + Langfuse.reset! end describe ".setup" do - it "initializes the tracer provider" do - described_class.setup(config) - - expect(described_class.tracer_provider).to be_a(OpenTelemetry::SDK::Trace::TracerProvider) - expect(described_class.initialized?).to be true - end - - it "does not mutate the global tracer provider" do - original_global_provider = OpenTelemetry.tracer_provider - - described_class.setup(config) - - expect(OpenTelemetry.tracer_provider).to eq(original_global_provider) - end - - it "does not mutate the global propagator" do - original_global_propagation = OpenTelemetry.propagation - - described_class.setup(config) - - expect(OpenTelemetry.propagation).to eq(original_global_propagation) - end + it "warns and delegates to the global tracer provider" do + provider = instance_double(OpenTelemetry::SDK::Trace::TracerProvider) - it "reuses the existing provider for identical tracing config" do - provider = described_class.setup(config) + expect(logger).to receive(:warn).with(/Langfuse::OtelSetup is deprecated/) + expect(Langfuse).to receive(:tracer_provider).and_return(provider) - expect(logger).to receive(:debug).with(/reusing existing tracer provider/) - expect(described_class.setup(config)).to equal(provider) - end - - it "warns and keeps the existing provider when tracing config changes" do - provider = described_class.setup(config) - config.environment = "staging" - - expect(logger).to receive(:warn).with(/require Langfuse.reset!/) - expect(described_class.setup(config)).to equal(provider) - end - - it "shuts down unpublished providers lost in the setup race" do - candidate_provider = instance_double(OpenTelemetry::SDK::Trace::TracerProvider, shutdown: nil) - existing_provider = instance_double(OpenTelemetry::SDK::Trace::TracerProvider) - - allow(described_class).to receive_messages( - build_tracer_provider: candidate_provider, - publish_provider: [existing_provider, false], - existing_provider_for: existing_provider - ) - - expect(candidate_provider).to receive(:shutdown).with(timeout: 30) - expect(described_class.setup(config)).to equal(existing_provider) + expect(described_class.setup).to equal(provider) end + end - it "validates should_export_span in setup" do - config.should_export_span = "bad" + describe ".tracer_provider" do + it "returns the current global provider without initializing one" do + expect(logger).to receive(:warn).with(/Langfuse::OtelSetup is deprecated/) - expect { described_class.setup(config) }.to raise_error( - Langfuse::ConfigurationError, - "should_export_span must respond to #call" - ) + expect(described_class.tracer_provider).to be_nil + expect(Langfuse.client.instance_variable_get(:@tracer_provider)).to be_nil end - context "with sample_rate below 1.0" do - before do - config.sample_rate = 0.1 - end - - it "uses TraceIdRatioBased sampler" do - described_class.setup(config) - - expect(described_class.tracer_provider.sampler).to be_a(OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased) - end + it "returns the initialized global provider" do + provider = Langfuse.tracer_provider - it "makes deterministic decisions for the same trace id" do - described_class.setup(config) - - sampler = described_class.tracer_provider.sampler - trace_id = ["aabbccddeeff00112233445566778899"].pack("H*") - decision_one = sampler.should_sample?( - trace_id: trace_id, - parent_context: nil, - links: [], - name: "score", - kind: OpenTelemetry::Trace::SpanKind::INTERNAL, - attributes: {} - ).sampled? - decision_two = sampler.should_sample?( - trace_id: trace_id, - parent_context: nil, - links: [], - name: "score", - kind: OpenTelemetry::Trace::SpanKind::INTERNAL, - attributes: {} - ).sampled? - - expect(decision_one).to eq(decision_two) - end + expect(described_class.tracer_provider).to equal(provider) end + end - context "with sample_rate at 1.0" do - before do - config.sample_rate = 1.0 - end - - it "uses always-on sampling behavior" do - described_class.setup(config) + describe ".initialized?" do + it "reflects whether the global provider has been initialized" do + expect(described_class.initialized?).to be(false) - sampler = described_class.tracer_provider.sampler - decision = sampler.should_sample?( - trace_id: ["00112233445566778899aabbccddeeff"].pack("H*"), - parent_context: nil, - links: [], - name: "score", - kind: OpenTelemetry::Trace::SpanKind::INTERNAL, - attributes: {} - ) + Langfuse.tracer_provider - expect(decision.sampled?).to be(true) - end - end - end - - describe ".shutdown" do - it "is safe before initialization" do - expect { described_class.shutdown(timeout: 1) }.not_to raise_error + expect(described_class.initialized?).to be(true) end end describe ".force_flush" do - it "is safe before initialization" do - expect { described_class.force_flush(timeout: 1) }.not_to raise_error - end - end + it "delegates to Langfuse.force_flush" do + expect(Langfuse).to receive(:force_flush).with(timeout: 2) - describe "lazy module-level setup" do - it "does not initialize tracing during Langfuse.configure" do - Langfuse.configure do |c| - c.public_key = config.public_key - c.secret_key = config.secret_key - c.base_url = config.base_url - c.logger = logger - end - - expect(described_class.initialized?).to be false - end - - it "raises from Langfuse.tracer_provider when tracing is not ready" do - Langfuse.reset! - Langfuse.configure do |c| - c.public_key = nil - c.secret_key = nil - c.base_url = nil - c.logger = logger - end - - expect { Langfuse.tracer_provider }.to raise_error( - Langfuse::ConfigurationError, - /Langfuse tracing is disabled/ - ) - end - - it "initializes once when Langfuse.tracer_provider is called concurrently" do - Langfuse.reset! - Langfuse.configure do |c| - c.public_key = config.public_key - c.secret_key = config.secret_key - c.base_url = config.base_url - c.logger = logger - end - - providers = Queue.new - threads = 5.times.map do - Thread.new { providers << Langfuse.tracer_provider } - end - threads.each(&:join) - - resolved = 5.times.map { providers.pop } - expect(resolved.map(&:object_id).uniq.length).to eq(1) + described_class.force_flush(timeout: 2) end end - describe "export behavior" do - before do - Langfuse.reset! - Langfuse.configure do |c| - c.public_key = config.public_key - c.secret_key = config.secret_key - c.base_url = config.base_url - c.tracing_async = false - c.batch_size = 10 - c.flush_interval = 1 - c.logger = logger - end - end - - it "exports Langfuse-created spans without exporting ambient global spans" do - OpenTelemetry.tracer_provider.tracer("dalli").start_span("cache-span").finish - span = Langfuse.observe("langfuse-span") - span.end - Langfuse.force_flush(timeout: 1) - - expect(exporter.finished_spans.map(&:name)).to eq(["langfuse-span"]) - end - - it "exports known LLM scopes after explicit global installation" do - OpenTelemetry.tracer_provider = Langfuse.tracer_provider - OpenTelemetry.tracer_provider.tracer("langsmith.client").start_span("global-span").finish - Langfuse.force_flush(timeout: 1) - - expect(exporter.finished_spans.map(&:name)).to eq(["global-span"]) - end - - it "allows custom filters to drop globally installed spans again" do - Langfuse.reset! - Langfuse.configure do |c| - c.public_key = config.public_key - c.secret_key = config.secret_key - c.base_url = config.base_url - c.tracing_async = false - c.should_export_span = ->(_span) { false } - c.logger = logger - end - - OpenTelemetry.tracer_provider = Langfuse.tracer_provider - OpenTelemetry.tracer_provider.tracer("langsmith.client").start_span("global-span").finish - Langfuse.force_flush(timeout: 1) + describe ".shutdown" do + it "delegates to Langfuse.shutdown" do + expect(Langfuse).to receive(:shutdown).with(timeout: 2) - expect(exporter.finished_spans).to be_empty + described_class.shutdown(timeout: 2) end end end diff --git a/spec/langfuse/retriever_spec.rb b/spec/langfuse/retriever_spec.rb index 5f16044..b5f7dcd 100644 --- a/spec/langfuse/retriever_spec.rb +++ b/spec/langfuse/retriever_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-retriever") } - let(:retriever) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:retriever) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'retriever'" do @@ -48,7 +49,7 @@ describe "integration with Span via start_observation" do it "creates retriever as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) retriever_obj = parent_observation.start_observation("nested-retriever", { input: { query: "test" } }, as_type: :retriever) @@ -104,7 +105,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { query: "search" }, output: { documents: [], count: 10 }, level: "DEFAULT" } - retriever_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + retriever_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = retriever_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "query" => "search" }) diff --git a/spec/langfuse/score_client_spec.rb b/spec/langfuse/score_client_spec.rb index 32b38b5..2cdda33 100644 --- a/spec/langfuse/score_client_spec.rb +++ b/spec/langfuse/score_client_spec.rb @@ -412,9 +412,8 @@ pinned_client.flush end - # Guards against a prior regression where ScoreClient read the sampler from - # OtelSetup's singleton provider, so whichever client initialized tracing - # first dictated sampling for every other client in the process. + # Guards against a prior regression where ScoreClient read sampling from a + # process-wide provider instead of its owning client. it "uses its own config's sample_rate, not another client's" do permissive_config = Langfuse::Config.new do |c| c.public_key = "pk_test" diff --git a/spec/langfuse/tool_spec.rb b/spec/langfuse/tool_spec.rb index 06ed3f9..f5e1ab9 100644 --- a/spec/langfuse/tool_spec.rb +++ b/spec/langfuse/tool_spec.rb @@ -7,7 +7,8 @@ let(:tracer_provider) { OpenTelemetry::SDK::Trace::TracerProvider.new } let(:otel_tracer) { tracer_provider.tracer("test-tracer") } let(:otel_span) { otel_tracer.start_span("test-tool") } - let(:tool) { described_class.new(otel_span, otel_tracer) } + let(:client) { Langfuse.client } + let(:tool) { described_class.new(otel_span, otel_tracer, client: client) } describe "#type" do it "returns 'tool'" do @@ -48,7 +49,7 @@ describe "integration with Span via start_observation" do it "creates tool as child of span" do parent_span = otel_tracer.start_span("parent-span") - parent_observation = Langfuse::Span.new(parent_span, otel_tracer) + parent_observation = Langfuse::Span.new(parent_span, otel_tracer, client: client) tool_obj = parent_observation.start_observation("nested-tool", { input: { query: "test" } }, as_type: :tool) expect(tool_obj).to be_a(described_class) @@ -102,7 +103,7 @@ describe "initialization with attributes" do it "sets initial attributes when provided" do attrs = { input: { query: "search" }, output: { result: "success" }, level: "DEFAULT" } - tool_obj = described_class.new(otel_span, otel_tracer, attributes: attrs) + tool_obj = described_class.new(otel_span, otel_tracer, attributes: attrs, client: client) span_data = tool_obj.otel_span.to_span_data expect(JSON.parse(span_data.attributes["langfuse.observation.input"])).to eq({ "query" => "search" }) diff --git a/spec/langfuse_spec.rb b/spec/langfuse_spec.rb index 423de02..b3c203f 100644 --- a/spec/langfuse_spec.rb +++ b/spec/langfuse_spec.rb @@ -52,7 +52,7 @@ config.secret_key = "test_sk" end - expect(Langfuse::OtelSetup.initialized?).to be false + expect(described_class.instance_variable_get(:@client)).to be_nil end end @@ -77,7 +77,10 @@ it "uses the global configuration" do client = described_class.client - expect(client.config).to eq(described_class.configuration) + expect(client.config).not_to equal(described_class.configuration) + expect(client.config.public_key).to eq(described_class.configuration.public_key) + expect(client.config.secret_key).to eq(described_class.configuration.secret_key) + expect(client.config.base_url).to eq(described_class.configuration.base_url) end it "creates client with configured settings" do @@ -113,7 +116,30 @@ provider = described_class.tracer_provider expect(provider).to be_a(OpenTelemetry::SDK::Trace::TracerProvider) - expect(Langfuse::OtelSetup.tracer_provider).to equal(provider) + expect(described_class.client.tracer_provider).to equal(provider) + end + end + + describe ".observe ownership" do + it "uses the singleton client as the module-level observation owner" do + observation = described_class.observe("test-span") + + expect(observation.client).to equal(described_class.client) + ensure + observation&.end + end + + it "uses a no-op owner when tracing configuration is incomplete" do + described_class.reset! + described_class.configure do |config| + config.public_key = nil + config.secret_key = nil + config.base_url = nil + end + + observation = described_class.observe("disabled-span") + + expect(observation.client).to be_a(Langfuse::NoopObservationClient) end end @@ -152,6 +178,22 @@ expect(described_class.instance_variable_get(:@configuration)).to be_nil expect(described_class.instance_variable_get(:@client)).to be_nil end + + it "resets warning state even when client shutdown fails" do + client = instance_double(Langfuse::Client) + described_class.instance_variable_set(:@client, client) + allow(client).to receive(:shutdown).and_raise(StandardError) + allow(Langfuse::BaseObservation).to receive(:reset_deprecation_warnings!) + allow(Langfuse::ActiveScoring).to receive(:reset!) + allow(Langfuse::OtelSetup).to receive(:reset_deprecation_warning!) + + described_class.reset! + + expect(Langfuse::BaseObservation).to have_received(:reset_deprecation_warnings!) + expect(Langfuse::ActiveScoring).to have_received(:reset!) + expect(Langfuse::OtelSetup).to have_received(:reset_deprecation_warning!) + expect(described_class.instance_variable_get(:@client)).to be_nil + end end describe ".shutdown" do @@ -163,13 +205,15 @@ end end - it "calls OtelSetup.shutdown with timeout" do - expect(Langfuse::OtelSetup).to receive(:shutdown).with(timeout: 30) + it "calls client shutdown with timeout" do + client = described_class.client + expect(client).to receive(:shutdown).with(timeout: 30) described_class.shutdown end it "accepts custom timeout" do - expect(Langfuse::OtelSetup).to receive(:shutdown).with(timeout: 10) + client = described_class.client + expect(client).to receive(:shutdown).with(timeout: 10) described_class.shutdown(timeout: 10) end end