Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ defmodule Phoenix.PubSub do
* `:dispatcher` - the default dispatcher module for broadcasts
(defaults to `Phoenix.PubSub`). Can be overridden per-call by
passing a dispatcher to `broadcast/4` and friends.
* `:group_by` - controls how the underlying `Registry` partitions
subscriptions, either `:pid` or `:key` (defaults to `:pid`). With
`:pid`, entries are grouped by subscriber pid — best when topics
have many subscribers each. With `:key`, entries are grouped by
topic so key-based lookups touch a single partition — best when
there are many topics with few subscribers each. `:key` requires
Elixir v1.19 or later. See `Registry.start_link/1` for the
underlying trade-offs.

"""
@spec child_spec(keyword) :: Supervisor.child_spec()
Expand Down
11 changes: 10 additions & 1 deletion lib/phoenix/pubsub/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ defmodule Phoenix.PubSub.Supervisor do
System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc()

dispatcher = Keyword.get(opts, :dispatcher, Phoenix.PubSub)
keys = registry_keys(Keyword.get(opts, :group_by, :pid))

registry = [
meta: [pubsub: {adapter, adapter_name, dispatcher}],
partitions: partitions,
keys: :duplicate,
keys: keys,
name: name
]

Expand All @@ -41,4 +42,12 @@ defmodule Phoenix.PubSub.Supervisor do

Supervisor.init(children, strategy: :rest_for_one)
end

defp registry_keys(:pid), do: :duplicate
defp registry_keys(:key), do: {:duplicate, :key}

defp registry_keys(other) do
raise ArgumentError,
"invalid :group_by option (got #{inspect(other)}), must be :pid or :key"
end
end
53 changes: 53 additions & 0 deletions test/phoenix/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,57 @@ defmodule Phoenix.PubSub.UnitTest do
assert_receive {:custom_dispatched, :hello}
end
end

describe "group_by" do
test "defaults to :pid, delivering one message per subscription" do
name = :"ps_default_group_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name})

assert :ok = PubSub.subscribe(name, "topic")
assert :ok = PubSub.subscribe(name, "topic")

PubSub.broadcast(name, "topic", :hello)

assert_receive :hello
assert_receive :hello
end

test ":pid delivers one message per subscription" do
name = :"ps_pid_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, group_by: :pid})

assert :ok = PubSub.subscribe(name, "topic")
assert :ok = PubSub.subscribe(name, "topic")

PubSub.broadcast(name, "topic", :hello)

assert_receive :hello
assert_receive :hello
end

test "raises ArgumentError on an invalid value" do
name = :"ps_bad_#{:erlang.unique_integer([:positive])}"

{:error, {{%ArgumentError{} = exception, _stacktrace}, _child_info}} =
start_supervised({PubSub, name: name, group_by: :bogus})

assert Exception.message(exception) =~ "invalid :group_by option"
assert Exception.message(exception) =~ ":bogus"
end

if Version.match?(System.version(), ">= 1.19.0") do
test ":key delivers one message per subscription (Elixir 1.19+)" do
name = :"ps_key_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, group_by: :key})

assert :ok = PubSub.subscribe(name, "topic")
assert :ok = PubSub.subscribe(name, "topic")

PubSub.broadcast(name, "topic", :hello)

assert_receive :hello
assert_receive :hello
end
end
end
end
Loading