diff --git a/.changeset/telemetry-poller-safe-invoke.md b/.changeset/telemetry-poller-safe-invoke.md new file mode 100644 index 0000000000..e7aa3bcb45 --- /dev/null +++ b/.changeset/telemetry-poller-safe-invoke.md @@ -0,0 +1,8 @@ +--- +'@core/electric-telemetry': patch +'@core/sync-service': patch +--- + +Wrap telemetry-poller MFAs in `ElectricTelemetry.Poller.safe_invoke/3` so that transient collector failures (`:noproc`, `:timeout`, `:shutdown`/`:normal` exits, `ArgumentError` from not-yet-created ETS tables) no longer cause `:telemetry_poller` to permanently remove the measurement from its polling list. Unexpected errors are now logged as warnings with the offending MFA and the collector keeps being polled on subsequent ticks. Strips now-redundant defensive `try/catch` / `with`-fallthrough code from individual collectors. + +Note: user-supplied periodic measurement functions no longer have exceptions propagated up to `:telemetry_poller`'s own error logger — they are caught and logged via `ElectricTelemetry.Poller` instead. diff --git a/packages/electric-telemetry/lib/electric/telemetry/poller.ex b/packages/electric-telemetry/lib/electric/telemetry/poller.ex index 12607ac9d1..93af93c60c 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/poller.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/poller.ex @@ -1,4 +1,6 @@ defmodule ElectricTelemetry.Poller do + require Logger + @callback builtin_periodic_measurements(map) :: list() def child_spec(telemetry_opts, poller_opts) do @@ -25,19 +27,72 @@ defmodule ElectricTelemetry.Poller do def periodic_measurements(%{periodic_measurements: measurements} = telemetry_opts, module) do Enum.flat_map(measurements, fn - :builtin -> module.builtin_periodic_measurements(telemetry_opts) + :builtin -> + module.builtin_periodic_measurements(telemetry_opts) + # These are implemented by telemetry_poller - f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f] + f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> + [f] + # Bare function names are assumed to be referring to functions defined in the caller module - f when is_atom(f) -> {module, f, [telemetry_opts]} - f when is_function(f, 1) -> {__MODULE__, :user_measurement, [f, telemetry_opts]} - {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [{m, f, a ++ [telemetry_opts]}] + f when is_atom(f) -> + [wrap(module, f, [telemetry_opts])] + + f when is_function(f, 1) -> + [wrap(__MODULE__, :user_measurement, [f, telemetry_opts])] + + {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> + [wrap(m, f, a ++ [telemetry_opts])] end) end def periodic_measurements(telemetry_opts, module), - do: module.builtin_periodic_measurements(telemetry_opts) + do: Enum.map(module.builtin_periodic_measurements(telemetry_opts), &wrap_mfa/1) + + defp wrap_mfa({m, f, a}), do: wrap(m, f, a) + defp wrap_mfa(other), do: other + + defp wrap(m, f, a), do: {__MODULE__, :safe_invoke, [m, f, a]} # Helper function to enable telemetry_poller to call a user-provided anonymous function def user_measurement(f, telemetry_opts), do: f.(telemetry_opts) + + @doc """ + Invoke a periodic measurement MFA, absorbing common failure modes. + + `:telemetry_poller` removes a measurement permanently from its polling list + after the first failure. Wrapping every MFA in `safe_invoke/3` prevents that: + transient errors (GenServer restart races, ETS tables not yet created, DB + unavailability) are logged as warnings and swallowed so the measurement keeps + being polled on subsequent ticks. + """ + def safe_invoke(m, f, a) do + apply(m, f, a) + :ok + rescue + ArgumentError -> + :ok + + e -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} crashed: " <> + Exception.message(e) + ) + + :ok + catch + :exit, {reason, _} when reason in [:noproc, :timeout, :shutdown, :normal] -> + :ok + + :exit, reason when reason in [:noproc, :shutdown, :normal] -> + :ok + + kind, reason -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} #{kind}: " <> + inspect(reason) + ) + + :ok + end end diff --git a/packages/electric-telemetry/test/electric/telemetry/poller_test.exs b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs new file mode 100644 index 0000000000..db8363a7fb --- /dev/null +++ b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs @@ -0,0 +1,115 @@ +defmodule ElectricTelemetry.PollerTest do + use ExUnit.Case, async: true + + import ExUnit.CaptureLog + + alias ElectricTelemetry.Poller + + defmodule Fixture do + def ok(), do: :done + def raise_argument(), do: raise(ArgumentError, "boom") + def raise_runtime(), do: raise(RuntimeError, "kaboom") + def exit_noproc(), do: exit({:noproc, {GenServer, :call, [:nowhere, :hi]}}) + def exit_timeout(), do: exit({:timeout, {GenServer, :call, [:slow, :hi]}}) + def exit_shutdown(), do: exit({:shutdown, :foo}) + def exit_normal_atom(), do: exit(:normal) + def exit_shutdown_atom(), do: exit(:shutdown) + def exit_weird(), do: exit(:weird) + def throw_it(), do: throw(:nope) + end + + describe "safe_invoke/3" do + test "returns :ok and runs the function on success" do + assert Poller.safe_invoke(Fixture, :ok, []) == :ok + end + + test "swallows ArgumentError (ETS missing, etc.) silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_argument, []) == :ok end) + refute log =~ "crashed" + end + + test "swallows generic exceptions with a warning" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_runtime, []) == :ok end) + assert log =~ "crashed" + assert log =~ "kaboom" + end + + test "swallows :noproc exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_noproc, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :timeout exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_timeout, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :shutdown exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown, []) == :ok end) + refute log =~ "exit" + end + + test "swallows bare :normal exit silently" do + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end) + + refute log =~ "exit" + end + + test "swallows bare :shutdown exit silently" do + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end) + + refute log =~ "exit" + end + + test "logs a warning for unexpected exits" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_weird, []) == :ok end) + assert log =~ "exit" + assert log =~ "weird" + end + + test "logs a warning for throws" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :throw_it, []) == :ok end) + assert log =~ "throw" + end + end + + describe "periodic_measurements/2 wrapping" do + defmodule CallbackMod do + @behaviour ElectricTelemetry.Poller + def builtin_periodic_measurements(_opts), do: [] + def some_measurement(_opts), do: :ok + end + + test "wraps {m, f, a} tuples in safe_invoke" do + opts = %{periodic_measurements: [{CallbackMod, :some_measurement, []}]} + + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps bare function atoms in safe_invoke" do + opts = %{periodic_measurements: [:some_measurement]} + + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps anonymous functions in safe_invoke around user_measurement" do + f = fn _ -> :ok end + opts = %{periodic_measurements: [f]} + + assert [ + {ElectricTelemetry.Poller, :safe_invoke, + [ElectricTelemetry.Poller, :user_measurement, [^f, _]]} + ] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "leaves telemetry_poller builtins unwrapped" do + opts = %{periodic_measurements: [:memory, :persistent_term]} + assert Poller.periodic_measurements(opts, CallbackMod) == [:memory, :persistent_term] + end + end +end diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index ed74295ed5..4b734672d1 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -1,6 +1,4 @@ defmodule Electric.StackSupervisor.Telemetry do - require Logger - def configure(config) do # Set shared OpenTelemetry span attributes for the given stack. They are stored in # persistent_term so it doesn't matter which process this function is called from. @@ -29,22 +27,22 @@ defmodule Electric.StackSupervisor.Telemetry do end def count_shapes(stack_id, _telemetry_opts) do - # Telemetry is started before everything else in the stack, so we need to handle - # the case where the shape cache is not started yet. - with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <- - Electric.ShapeCache.shape_counts(stack_id) do - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :shapes, :total_shapes], - %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, - %{stack_id: stack_id} - ) - end - + # Emit active_shapes first so that a failure in shape_counts (e.g. shape cache not yet + # started during stack startup) doesn't drop this metric for the tick. Electric.Telemetry.OpenTelemetry.execute( [:electric, :shapes, :active_shapes], %{count: Electric.Shapes.ConsumerRegistry.active_consumer_count(stack_id)}, %{stack_id: stack_id} ) + + %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} = + Electric.ShapeCache.shape_counts(stack_id) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :shapes, :total_shapes], + %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, + %{stack_id: stack_id} + ) end def report_write_buffer_size(stack_id, _telemetry_opts) do @@ -74,47 +72,34 @@ defmodule Electric.StackSupervisor.Telemetry do @doc false @spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do - try do - %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = - Postgrex.query!( - Electric.Connection.Manager.admin_pool(stack_id), - @retained_wal_size_query, - [slot_name], - timeout: 3_000, - deadline: 3_000 - ) - - # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric - # is caught up with Postgres' replication stream. - # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. - - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :postgres, :replication], - %{ - # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by - # plotting its rate of change we can see how fast the WAL is growing. - # - # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above - # to make sure it fits inside the signed 64-bit integer type expected by the - # OpenTelemetry Protocol, - pg_wal_offset: pg_wal_offset, - slot_retained_wal_size: retained_wal_size, - slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) - }, - %{stack_id: stack_id} + %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = + Postgrex.query!( + Electric.Connection.Manager.admin_pool(stack_id), + @retained_wal_size_query, + [slot_name], + timeout: 3_000, + deadline: 3_000 ) - catch - :exit, {:noproc, _} -> - :ok - # catch all errors to not log them as errors, those are reporing issues at best - type, reason -> - Logger.warning( - "Failed to query retained WAL size\nError: #{Exception.format(type, reason)}", - stack_id: stack_id, - slot_name: slot_name - ) - end + # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric + # is caught up with Postgres' replication stream. + # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :postgres, :replication], + %{ + # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by + # plotting its rate of change we can see how fast the WAL is growing. + # + # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above + # to make sure it fits inside the signed 64-bit integer type expected by the + # OpenTelemetry Protocol, + pg_wal_offset: pg_wal_offset, + slot_retained_wal_size: retained_wal_size, + slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) + }, + %{stack_id: stack_id} + ) end if Code.ensure_loaded?(ElectricTelemetry.DiskUsage) do diff --git a/packages/sync-service/test/electric/stack_supervisor_test.exs b/packages/sync-service/test/electric/stack_supervisor_test.exs index 624d2aba34..4e29b37208 100644 --- a/packages/sync-service/test/electric/stack_supervisor_test.exs +++ b/packages/sync-service/test/electric/stack_supervisor_test.exs @@ -9,16 +9,6 @@ defmodule Electric.StackSupervisorTest do describe "Telemetry" do setup [:with_stack_id_from_test] - test "default_periodic_measurements/1 do not raise if stack down", ctx do - for {m, f, a} <- - StackSupervisor.Telemetry.default_periodic_measurements(%{ - stack_id: ctx.stack_id, - replication_opts: [slot_name: "no_such_slot"] - }) do - apply(m, f, a ++ [%{}]) - end - end - test "count_shapes/2 emits split shape metrics", ctx do stack_id = ctx.stack_id