Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/otel-context-effects-move-in.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@core/sync-service': patch
---

Propagate OpenTelemetry context so that child spans in ShapeStatus and SnapshotQuery are linked to originating traces:

- `Effects.query_move_in_async`: propagate context into spawned task
- `ShapeCache.handle_call({:create_or_wait_shape_handle, ...})`: set context before calling ShapeStatus functions
- `ShapeCache.handle_call({:start_consumer_for_handle, ...})`: accept and set context from ConsumerRegistry
20 changes: 14 additions & 6 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Electric.ShapeCache do
alias Electric.Shapes
alias Electric.ShapeCache.ShapeCleaner
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

import Electric, only: [is_stack_id: 1, is_shape_handle: 1]

Expand Down Expand Up @@ -244,11 +245,15 @@ defmodule Electric.ShapeCache do
end
end

@spec start_consumer_for_handle(shape_handle(), stack_id()) ::
@spec start_consumer_for_handle(shape_handle(), stack_id(), opts :: Access.t()) ::
{:ok, pid()} | {:error, :no_shape}
def start_consumer_for_handle(shape_handle, stack_id)
def start_consumer_for_handle(shape_handle, stack_id, opts \\ [])
when is_shape_handle(shape_handle) and is_stack_id(stack_id) do
GenServer.call(name(stack_id), {:start_consumer_for_handle, shape_handle}, @call_timeout)
GenServer.call(
name(stack_id),
{:start_consumer_for_handle, shape_handle, opts[:otel_ctx]},
@call_timeout
)
end

@impl GenServer
Expand Down Expand Up @@ -302,6 +307,8 @@ defmodule Electric.ShapeCache do

@impl GenServer
def handle_call({:create_or_wait_shape_handle, shape, otel_ctx}, _from, state) do
if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx)

case safe_maybe_create_shape(shape, %{
stack_id: state.stack_id,
otel_ctx: otel_ctx,
Expand All @@ -322,21 +329,22 @@ defmodule Electric.ShapeCache do
{:reply, ShapeStatus.has_shape_handle?(state.stack_id, shape_handle), state}
end

def handle_call({:start_consumer_for_handle, shape_handle}, _from, state) do
def handle_call({:start_consumer_for_handle, shape_handle, otel_ctx}, _from, state) do
# This is racy: it's possible for a shape to have been deleted while the
# ShapeLogCollector is processing a transaction that includes it
# In this case fetch_shape_by_handle returns an error. ConsumerRegistry
# basically ignores the {:error, :no_shape} result - excluding the shape handle
# from the broadcast.
if not is_nil(otel_ctx), do: OpenTelemetry.set_current_context(otel_ctx)

case ShapeStatus.fetch_shape_by_handle(state.stack_id, shape_handle) do
{:ok, shape} ->
# TODO: otel ctx from shape log collector?
{
:reply,
restore_shape_and_dependencies(shape_handle, shape, %{
stack_id: state.stack_id,
action: :restore,
otel_ctx: nil,
otel_ctx: otel_ctx,
feature_flags: state.feature_flags
}),
state
Expand Down
8 changes: 8 additions & 0 deletions packages/sync-service/lib/electric/shapes/consumer/effects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Electric.Shapes.Consumer.Effects do
alias Electric.ShapeCache
alias Electric.Shapes.Querying
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

require Logger

Expand Down Expand Up @@ -263,7 +264,14 @@ defmodule Electric.Shapes.Consumer.Effects do
stack_id: stack_id
})

# Propagate OTel context so spans created inside the task are linked to the
# caller's trace. OTel context is per-process, so without this any
# `with_child_span` calls in the task would be silently dropped.
trace_context = OpenTelemetry.get_current_context()

Task.Supervisor.start_child(supervisor, fn ->
OpenTelemetry.set_current_context(trace_context)

snapshot_name = Electric.Utils.uuid4()

try do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ defmodule Electric.Shapes.ConsumerRegistry do
["shape.handle": handle],
state.stack_id,
fn ->
case ShapeCache.start_consumer_for_handle(handle, stack_id) do
otel_ctx = OpenTelemetry.get_current_context()

case ShapeCache.start_consumer_for_handle(handle, stack_id, otel_ctx: otel_ctx) do
{:ok, pid} ->
Logger.debug(fn -> ["Started consumer for existing handle ", handle] end)

Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/telemetry/sentry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Electric.Telemetry.Sentry do
use Electric.Telemetry

@default_handler_id :electric_sentry_handler
@default_config %{metadata: :all, capture_log_messages: true, level: :error}

@typedoc """
Extra entries for the `Sentry.LoggerHandler` config map (e.g.
Expand All @@ -19,6 +18,8 @@ defmodule Electric.Telemetry.Sentry do
def add_logger_handler(id \\ @default_handler_id, opts \\ [])

with_telemetry Sentry.LoggerHandler do
@default_config %{metadata: :all, capture_log_messages: true, level: :error}

def add_logger_handler(id, opts) do
config = Map.merge(@default_config, Map.new(opts))
:logger.add_handler(id, Sentry.LoggerHandler, %{config: config})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
Electric.ShapeCache,
:start_consumer_for_handle,
[mode: :shared],
fn shape_handle, ^stack_id ->
fn shape_handle, ^stack_id, _opts ->
id = System.unique_integer([:positive, :monotonic])

with {:ok, pid} <-
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ defmodule Electric.ShapeCacheTest do
log =
capture_log(fn ->
assert {:error, {:exit, _reason}} =
ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{})
ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil)
end)

assert log =~ "Failed to create shape"
Expand All @@ -709,7 +709,7 @@ defmodule Electric.ShapeCacheTest do
log =
capture_log(fn ->
assert {:error, _reason} =
ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: %{})
ShapeCache.get_or_create_shape_handle(@shape, ctx.stack_id, otel_ctx: nil)
end)

assert log =~ "[error] Failed to create shape"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do
Repatch.patch(
Electric.ShapeCache,
:start_consumer_for_handle,
fn handle, ^stack_id ->
fn handle, ^stack_id, _opts ->
send(parent, {:start_consumer, handle})

{:ok, pid} =
Expand Down Expand Up @@ -219,10 +219,10 @@ defmodule Electric.Shapes.ConsumerRegistryTest do
:start_consumer_for_handle,
[force: true],
fn
"handle-removed", ^stack_id ->
"handle-removed", ^stack_id, _opts ->
{:error, :no_shape}

handle, ^stack_id ->
handle, ^stack_id, _opts ->
send(parent, {:start_consumer, handle})

{:ok, pid} =
Expand Down Expand Up @@ -374,7 +374,7 @@ defmodule Electric.Shapes.ConsumerRegistryTest do
Electric.ShapeCache,
:start_consumer_for_handle,
[force: true],
fn handle, stack_id ->
fn handle, stack_id, _opts ->
{:ok, pid} =
start_supervised(
{TestSubscriber, {stack_id, handle, always_suspend.(handle)}},
Expand Down
Loading