From def5a4747e27e2803ce0a0f812bf0ec5729ab272 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 5 May 2026 14:59:22 +0200 Subject: [PATCH 1/5] Propagate OTel context into Effects.query_move_in_async spawned task This is a follow-up to #4149 which fixed OTel context propagation in PartialModes and Snapshotter. The Effects.query_move_in_async function has the same pattern: it spawns a task that calls SnapshotQuery.execute_for_shape, but without OTel context propagation the child spans inside would be silently dropped. Co-Authored-By: Claude Opus 4.5 --- .../sync-service/lib/electric/shapes/consumer/effects.ex | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/sync-service/lib/electric/shapes/consumer/effects.ex b/packages/sync-service/lib/electric/shapes/consumer/effects.ex index ba362420c6..05d4f9a783 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/effects.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/effects.ex @@ -13,6 +13,7 @@ defmodule Electric.Shapes.Consumer.Effects do alias Electric.ShapeCache alias Electric.Shapes.Querying alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry require Logger @@ -263,7 +264,14 @@ defmodule Electric.Shapes.Consumer.Effects do stack_id: stack_id }) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + trace_context = OpenTelemetry.get_current_context() + Task.Supervisor.start_child(supervisor, fn -> + OpenTelemetry.set_current_context(trace_context) + snapshot_name = Electric.Utils.uuid4() try do From ce498cf18dfc2584108a64c1d57437ac8ac3d9cc Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 5 May 2026 15:43:01 +0200 Subject: [PATCH 2/5] Propagate OTel context through ShapeCache GenServer handlers Two handlers receive calls that should have OTel context but weren't setting it before calling ShapeStatus functions that use with_child_span: 1. handle_call({:create_or_wait_shape_handle, shape, otel_ctx}) - the otel_ctx was passed to opts but not set in the process context before calling ShapeStatus.fetch_handle_by_shape_critical and add_shape. 2. handle_call({:start_consumer_for_handle, ...}) - previously had a TODO comment acknowledging the missing context. Now accepts an opts keyword list (matching get_or_create_shape_handle/3) and ConsumerRegistry passes the context from within its span. Co-Authored-By: Claude Opus 4.5 --- .../sync-service/lib/electric/shape_cache.ex | 20 +++++++++++++------ .../lib/electric/shapes/consumer_registry.ex | 4 +++- .../test/electric/shape_cache_test.exs | 4 ++-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 8cb52a56cd..e7796fb6df 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -8,6 +8,7 @@ defmodule Electric.ShapeCache do alias Electric.Shapes alias Electric.ShapeCache.ShapeCleaner alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry import Electric, only: [is_stack_id: 1, is_shape_handle: 1] @@ -244,11 +245,15 @@ defmodule Electric.ShapeCache do end end - @spec start_consumer_for_handle(shape_handle(), stack_id()) :: + @spec start_consumer_for_handle(shape_handle(), stack_id(), opts :: Access.t()) :: {:ok, pid()} | {:error, :no_shape} - def start_consumer_for_handle(shape_handle, stack_id) + def start_consumer_for_handle(shape_handle, stack_id, opts \\ []) when is_shape_handle(shape_handle) and is_stack_id(stack_id) do - GenServer.call(name(stack_id), {:start_consumer_for_handle, shape_handle}, @call_timeout) + GenServer.call( + name(stack_id), + {:start_consumer_for_handle, shape_handle, opts[:otel_ctx]}, + @call_timeout + ) end @impl GenServer @@ -302,6 +307,8 @@ defmodule Electric.ShapeCache do @impl GenServer def handle_call({:create_or_wait_shape_handle, shape, otel_ctx}, _from, state) do + if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx) + case safe_maybe_create_shape(shape, %{ stack_id: state.stack_id, otel_ctx: otel_ctx, @@ -322,21 +329,22 @@ defmodule Electric.ShapeCache do {:reply, ShapeStatus.has_shape_handle?(state.stack_id, shape_handle), state} end - def handle_call({:start_consumer_for_handle, shape_handle}, _from, state) do + def handle_call({:start_consumer_for_handle, shape_handle, otel_ctx}, _from, state) do # This is racy: it's possible for a shape to have been deleted while the # ShapeLogCollector is processing a transaction that includes it # In this case fetch_shape_by_handle returns an error. ConsumerRegistry # basically ignores the {:error, :no_shape} result - excluding the shape handle # from the broadcast. + if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx) + case ShapeStatus.fetch_shape_by_handle(state.stack_id, shape_handle) do {:ok, shape} -> - # TODO: otel ctx from shape log collector? { :reply, restore_shape_and_dependencies(shape_handle, shape, %{ stack_id: state.stack_id, action: :restore, - otel_ctx: nil, + otel_ctx: otel_ctx, feature_flags: state.feature_flags }), state diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 207c07f849..935420b2d0 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -264,7 +264,9 @@ defmodule Electric.Shapes.ConsumerRegistry do ["shape.handle": handle], state.stack_id, fn -> - case ShapeCache.start_consumer_for_handle(handle, stack_id) do + otel_ctx = OpenTelemetry.get_current_context() + + case ShapeCache.start_consumer_for_handle(handle, stack_id, otel_ctx: otel_ctx) do {:ok, pid} -> Logger.debug(fn -> ["Started consumer for existing handle ", handle] end) diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 262210a0eb..25a05def30 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -684,7 +684,7 @@ defmodule Electric.ShapeCacheTest do log = capture_log(fn -> assert {:error, {:exit, _reason}} = - ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{}) + ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil) end) assert log =~ "Failed to create shape" @@ -709,7 +709,7 @@ defmodule Electric.ShapeCacheTest do log = capture_log(fn -> assert {:error, _reason} = - ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{}) + ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil) end) assert log =~ "[error] Failed to create shape" From 410f53a8afbd297112a88925647f3f123359d61c Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 5 May 2026 15:43:05 +0200 Subject: [PATCH 3/5] Add changeset for OTel context propagation fixes Co-Authored-By: Claude Opus 4.5 --- .changeset/otel-context-effects-move-in.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/otel-context-effects-move-in.md diff --git a/.changeset/otel-context-effects-move-in.md b/.changeset/otel-context-effects-move-in.md new file mode 100644 index 0000000000..7fd206938e --- /dev/null +++ b/.changeset/otel-context-effects-move-in.md @@ -0,0 +1,9 @@ +--- +'@core/sync-service': patch +--- + +Propagate OpenTelemetry context so that child spans in ShapeStatus and SnapshotQuery are linked to originating traces: + +- `Effects.query_move_in_async`: propagate context into spawned task +- `ShapeCache.handle_call({:create_or_wait_shape_handle, ...})`: set context before calling ShapeStatus functions +- `ShapeCache.handle_call({:start_consumer_for_handle, ...})`: accept and set context from ConsumerRegistry From ee37781f9b197ca83a33fee663b2e9c279eeb002 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 13:04:10 +0200 Subject: [PATCH 4/5] Update test patches for new start_consumer_for_handle/3 arity The OTel context propagation change added an opts argument to ShapeCache.start_consumer_for_handle. Update Repatch stubs in the ConsumerRegistry and ShapeLogCollector tests to accept the new arity. --- .../electric/replication/shape_log_collector_test.exs | 2 +- .../test/electric/shapes/consumer_registry_test.exs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 60fb9d9710..275570fe9a 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -228,7 +228,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do Electric.ShapeCache, :start_consumer_for_handle, [mode: :shared], - fn shape_handle, ^stack_id -> + fn shape_handle, ^stack_id, _opts -> id = System.unique_integer([:positive, :monotonic]) with {:ok, pid} <- diff --git a/packages/sync-service/test/electric/shapes/consumer_registry_test.exs b/packages/sync-service/test/electric/shapes/consumer_registry_test.exs index f2e18d4e8f..3e5ef80ea1 100644 --- a/packages/sync-service/test/electric/shapes/consumer_registry_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_registry_test.exs @@ -42,7 +42,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do Repatch.patch( Electric.ShapeCache, :start_consumer_for_handle, - fn handle, ^stack_id -> + fn handle, ^stack_id, _opts -> send(parent, {:start_consumer, handle}) {:ok, pid} = @@ -219,10 +219,10 @@ defmodule Electric.Shapes.ConsumerRegistryTest do :start_consumer_for_handle, [force: true], fn - "handle-removed", ^stack_id -> + "handle-removed", ^stack_id, _opts -> {:error, :no_shape} - handle, ^stack_id -> + handle, ^stack_id, _opts -> send(parent, {:start_consumer, handle}) {:ok, pid} = @@ -374,7 +374,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do Electric.ShapeCache, :start_consumer_for_handle, [force: true], - fn handle, stack_id -> + fn handle, stack_id, _opts -> {:ok, pid} = start_supervised( {TestSubscriber, {stack_id, handle, always_suspend.(handle)}}, From 2d990f2560cc581bc549023ea7843a96de360516 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 14:11:22 +0200 Subject: [PATCH 5/5] Fix unused-attribute warning in Electric.Telemetry.Sentry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move @default_config inside the with_telemetry Sentry.LoggerHandler do branch so it is only defined when actually referenced. Fixes the "module attribute @default_config was set but never used" warning introduced by #4146 (4176bd243), where the attribute lives at the module top level but is only used inside the do branch — the else branch (compiled when Sentry.LoggerHandler is unavailable) never references it. --- packages/sync-service/lib/electric/telemetry/sentry.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/telemetry/sentry.ex b/packages/sync-service/lib/electric/telemetry/sentry.ex index 8578fcdcad..a82a5ba701 100644 --- a/packages/sync-service/lib/electric/telemetry/sentry.ex +++ b/packages/sync-service/lib/electric/telemetry/sentry.ex @@ -2,7 +2,6 @@ defmodule Electric.Telemetry.Sentry do use Electric.Telemetry @default_handler_id :electric_sentry_handler - @default_config %{metadata: :all, capture_log_messages: true, level: :error} @typedoc """ Extra entries for the `Sentry.LoggerHandler` config map (e.g. @@ -19,6 +18,8 @@ defmodule Electric.Telemetry.Sentry do def add_logger_handler(id \\ @default_handler_id, opts \\ []) with_telemetry Sentry.LoggerHandler do + @default_config %{metadata: :all, capture_log_messages: true, level: :error} + def add_logger_handler(id, opts) do config = Map.merge(@default_config, Map.new(opts)) :logger.add_handler(id, Sentry.LoggerHandler, %{config: config})