diff --git a/.changeset/otel-context-effects-move-in.md b/.changeset/otel-context-effects-move-in.md new file mode 100644 index 0000000000..7fd206938e --- /dev/null +++ b/.changeset/otel-context-effects-move-in.md @@ -0,0 +1,9 @@ +--- +'@core/sync-service': patch +--- + +Propagate OpenTelemetry context so that child spans in ShapeStatus and SnapshotQuery are linked to originating traces: + +- `Effects.query_move_in_async`: propagate context into spawned task +- `ShapeCache.handle_call({:create_or_wait_shape_handle, ...})`: set context before calling ShapeStatus functions +- `ShapeCache.handle_call({:start_consumer_for_handle, ...})`: accept and set context from ConsumerRegistry diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 8cb52a56cd..e7796fb6df 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -8,6 +8,7 @@ defmodule Electric.ShapeCache do alias Electric.Shapes alias Electric.ShapeCache.ShapeCleaner alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry import Electric, only: [is_stack_id: 1, is_shape_handle: 1] @@ -244,11 +245,15 @@ defmodule Electric.ShapeCache do end end - @spec start_consumer_for_handle(shape_handle(), stack_id()) :: + @spec start_consumer_for_handle(shape_handle(), stack_id(), opts :: Access.t()) :: {:ok, pid()} | {:error, :no_shape} - def start_consumer_for_handle(shape_handle, stack_id) + def start_consumer_for_handle(shape_handle, stack_id, opts \\ []) when is_shape_handle(shape_handle) and is_stack_id(stack_id) do - GenServer.call(name(stack_id), {:start_consumer_for_handle, shape_handle}, @call_timeout) + GenServer.call( + name(stack_id), + {:start_consumer_for_handle, shape_handle, opts[:otel_ctx]}, + @call_timeout + ) end @impl GenServer @@ -302,6 +307,8 @@ defmodule Electric.ShapeCache do @impl GenServer def handle_call({:create_or_wait_shape_handle, shape, otel_ctx}, _from, state) do + if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx) + case safe_maybe_create_shape(shape, %{ stack_id: state.stack_id, otel_ctx: otel_ctx, @@ -322,21 +329,22 @@ defmodule Electric.ShapeCache do {:reply, ShapeStatus.has_shape_handle?(state.stack_id, shape_handle), state} end - def handle_call({:start_consumer_for_handle, shape_handle}, _from, state) do + def handle_call({:start_consumer_for_handle, shape_handle, otel_ctx}, _from, state) do # This is racy: it's possible for a shape to have been deleted while the # ShapeLogCollector is processing a transaction that includes it # In this case fetch_shape_by_handle returns an error. ConsumerRegistry # basically ignores the {:error, :no_shape} result - excluding the shape handle # from the broadcast. + if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx) + case ShapeStatus.fetch_shape_by_handle(state.stack_id, shape_handle) do {:ok, shape} -> - # TODO: otel ctx from shape log collector? { :reply, restore_shape_and_dependencies(shape_handle, shape, %{ stack_id: state.stack_id, action: :restore, - otel_ctx: nil, + otel_ctx: otel_ctx, feature_flags: state.feature_flags }), state diff --git a/packages/sync-service/lib/electric/shapes/consumer/effects.ex b/packages/sync-service/lib/electric/shapes/consumer/effects.ex index ba362420c6..05d4f9a783 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/effects.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/effects.ex @@ -13,6 +13,7 @@ defmodule Electric.Shapes.Consumer.Effects do alias Electric.ShapeCache alias Electric.Shapes.Querying alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry require Logger @@ -263,7 +264,14 @@ defmodule Electric.Shapes.Consumer.Effects do stack_id: stack_id }) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + trace_context = OpenTelemetry.get_current_context() + Task.Supervisor.start_child(supervisor, fn -> + OpenTelemetry.set_current_context(trace_context) + snapshot_name = Electric.Utils.uuid4() try do diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 207c07f849..935420b2d0 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -264,7 +264,9 @@ defmodule Electric.Shapes.ConsumerRegistry do ["shape.handle": handle], state.stack_id, fn -> - case ShapeCache.start_consumer_for_handle(handle, stack_id) do + otel_ctx = OpenTelemetry.get_current_context() + + case ShapeCache.start_consumer_for_handle(handle, stack_id, otel_ctx: otel_ctx) do {:ok, pid} -> Logger.debug(fn -> ["Started consumer for existing handle ", handle] end) diff --git a/packages/sync-service/lib/electric/telemetry/sentry.ex b/packages/sync-service/lib/electric/telemetry/sentry.ex index 8578fcdcad..a82a5ba701 100644 --- a/packages/sync-service/lib/electric/telemetry/sentry.ex +++ b/packages/sync-service/lib/electric/telemetry/sentry.ex @@ -2,7 +2,6 @@ defmodule Electric.Telemetry.Sentry do use Electric.Telemetry @default_handler_id :electric_sentry_handler - @default_config %{metadata: :all, capture_log_messages: true, level: :error} @typedoc """ Extra entries for the `Sentry.LoggerHandler` config map (e.g. @@ -19,6 +18,8 @@ defmodule Electric.Telemetry.Sentry do def add_logger_handler(id \\ @default_handler_id, opts \\ []) with_telemetry Sentry.LoggerHandler do + @default_config %{metadata: :all, capture_log_messages: true, level: :error} + def add_logger_handler(id, opts) do config = Map.merge(@default_config, Map.new(opts)) :logger.add_handler(id, Sentry.LoggerHandler, %{config: config}) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 60fb9d9710..275570fe9a 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -228,7 +228,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do Electric.ShapeCache, :start_consumer_for_handle, [mode: :shared], - fn shape_handle, ^stack_id -> + fn shape_handle, ^stack_id, _opts -> id = System.unique_integer([:positive, :monotonic]) with {:ok, pid} <- diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 262210a0eb..25a05def30 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -684,7 +684,7 @@ defmodule Electric.ShapeCacheTest do log = capture_log(fn -> assert {:error, {:exit, _reason}} = - ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{}) + ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil) end) assert log =~ "Failed to create shape" @@ -709,7 +709,7 @@ defmodule Electric.ShapeCacheTest do log = capture_log(fn -> assert {:error, _reason} = - ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{}) + ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil) end) assert log =~ "[error] Failed to create shape" diff --git a/packages/sync-service/test/electric/shapes/consumer_registry_test.exs b/packages/sync-service/test/electric/shapes/consumer_registry_test.exs index f2e18d4e8f..3e5ef80ea1 100644 --- a/packages/sync-service/test/electric/shapes/consumer_registry_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_registry_test.exs @@ -42,7 +42,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do Repatch.patch( Electric.ShapeCache, :start_consumer_for_handle, - fn handle, ^stack_id -> + fn handle, ^stack_id, _opts -> send(parent, {:start_consumer, handle}) {:ok, pid} = @@ -219,10 +219,10 @@ defmodule Electric.Shapes.ConsumerRegistryTest do :start_consumer_for_handle, [force: true], fn - "handle-removed", ^stack_id -> + "handle-removed", ^stack_id, _opts -> {:error, :no_shape} - handle, ^stack_id -> + handle, ^stack_id, _opts -> send(parent, {:start_consumer, handle}) {:ok, pid} = @@ -374,7 +374,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do Electric.ShapeCache, :start_consumer_for_handle, [force: true], - fn handle, stack_id -> + fn handle, stack_id, _opts -> {:ok, pid} = start_supervised( {TestSubscriber, {stack_id, handle, always_suspend.(handle)}},