From 971441aa8c8de24704dd9f6c55acd1dda4e10481 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 30 Apr 2026 13:16:39 +0200 Subject: [PATCH] Add :group_by option to choose Registry sharding strategy Selects how the underlying Registry partitions subscriptions. Defaults to :pid (current behavior). Accepts :pid or :key. - :pid (default) groups entries by subscriber pid; best when topics have many subscribers each. Translates to keys: :duplicate. - :key groups entries by topic so key-based lookups touch a single partition; best when there are many topics with few subscribers each. Translates to keys: {:duplicate, :key}, which requires Elixir 1.19+ (see elixir-lang/elixir#14654). Invalid values raise ArgumentError at supervisor start. Refs: #198 --- lib/phoenix/pubsub.ex | 8 +++++ lib/phoenix/pubsub/supervisor.ex | 11 ++++++- test/phoenix/pubsub_test.exs | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 9e94a612..13f7c285 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -174,6 +174,14 @@ defmodule Phoenix.PubSub do * `:dispatcher` - the default dispatcher module for broadcasts (defaults to `Phoenix.PubSub`). Can be overridden per-call by passing a dispatcher to `broadcast/4` and friends. + * `:group_by` - controls how the underlying `Registry` partitions + subscriptions, either `:pid` or `:key` (defaults to `:pid`). With + `:pid`, entries are grouped by subscriber pid — best when topics + have many subscribers each. With `:key`, entries are grouped by + topic so key-based lookups touch a single partition — best when + there are many topics with few subscribers each. `:key` requires + Elixir v1.19 or later. See `Registry.start_link/1` for the + underlying trade-offs. """ @spec child_spec(keyword) :: Supervisor.child_spec() diff --git a/lib/phoenix/pubsub/supervisor.ex b/lib/phoenix/pubsub/supervisor.ex index 973e2987..42d6d6c8 100644 --- a/lib/phoenix/pubsub/supervisor.ex +++ b/lib/phoenix/pubsub/supervisor.ex @@ -26,11 +26,12 @@ defmodule Phoenix.PubSub.Supervisor do System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc() dispatcher = Keyword.get(opts, :dispatcher, Phoenix.PubSub) + keys = registry_keys(Keyword.get(opts, :group_by, :pid)) registry = [ meta: [pubsub: {adapter, adapter_name, dispatcher}], partitions: partitions, - keys: :duplicate, + keys: keys, name: name ] @@ -41,4 +42,12 @@ defmodule Phoenix.PubSub.Supervisor do Supervisor.init(children, strategy: :rest_for_one) end + + defp registry_keys(:pid), do: :duplicate + defp registry_keys(:key), do: {:duplicate, :key} + + defp registry_keys(other) do + raise ArgumentError, + "invalid :group_by option (got #{inspect(other)}), must be :pid or :key" + end end diff --git a/test/phoenix/pubsub_test.exs b/test/phoenix/pubsub_test.exs index 0eff5df2..b7de76fe 100644 --- a/test/phoenix/pubsub_test.exs +++ b/test/phoenix/pubsub_test.exs @@ -102,4 +102,57 @@ defmodule Phoenix.PubSub.UnitTest do assert_receive {:custom_dispatched, :hello} end end + + describe "group_by" do + test "defaults to :pid, delivering one message per subscription" do + name = :"ps_default_group_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name}) + + assert :ok = PubSub.subscribe(name, "topic") + assert :ok = PubSub.subscribe(name, "topic") + + PubSub.broadcast(name, "topic", :hello) + + assert_receive :hello + assert_receive :hello + end + + test ":pid delivers one message per subscription" do + name = :"ps_pid_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, group_by: :pid}) + + assert :ok = PubSub.subscribe(name, "topic") + assert :ok = PubSub.subscribe(name, "topic") + + PubSub.broadcast(name, "topic", :hello) + + assert_receive :hello + assert_receive :hello + end + + test "raises ArgumentError on an invalid value" do + name = :"ps_bad_#{:erlang.unique_integer([:positive])}" + + {:error, {{%ArgumentError{} = exception, _stacktrace}, _child_info}} = + start_supervised({PubSub, name: name, group_by: :bogus}) + + assert Exception.message(exception) =~ "invalid :group_by option" + assert Exception.message(exception) =~ ":bogus" + end + + if Version.match?(System.version(), ">= 1.19.0") do + test ":key delivers one message per subscription (Elixir 1.19+)" do + name = :"ps_key_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, group_by: :key}) + + assert :ok = PubSub.subscribe(name, "topic") + assert :ok = PubSub.subscribe(name, "topic") + + PubSub.broadcast(name, "topic", :hello) + + assert_receive :hello + assert_receive :hello + end + end + end end