From 541436a4895761d63fb707d5fabe211cf89cfe64 Mon Sep 17 00:00:00 2001 From: Tony Winn Date: Wed, 15 Apr 2026 09:43:47 -0400 Subject: [PATCH] Add configurable default dispatcher Adds a :dispatcher option to child_spec/start_link that sets the default dispatcher module for all broadcast functions. Defaults to Phoenix.PubSub (preserving existing behavior). Can be overridden per-call by passing a dispatcher to broadcast/4 and friends. {Phoenix.PubSub, name: :my_pubsub, dispatcher: MyApp.Dispatcher} --- lib/phoenix/pubsub.ex | 70 +++++++++++++++++++++++---- lib/phoenix/pubsub/supervisor.ex | 4 +- test/phoenix/pubsub_test.exs | 82 +++++++++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 10 deletions(-) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index d8717dd7..eb5cf5c0 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -171,6 +171,9 @@ defmodule Phoenix.PubSub do * `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages (defaults to `:pool_size`). This option is used during pool size migrations to ensure no messages are lost. See the "Safe Pool Size Migration" section in the module documentation. + * `:dispatcher` - the default dispatcher module for broadcasts + (defaults to `Phoenix.PubSub`). Can be overridden per-call by + passing a dispatcher to `broadcast/4` and friends. """ @spec child_spec(keyword) :: Supervisor.child_spec() @@ -249,7 +252,13 @@ defmodule Phoenix.PubSub do See the "Custom dispatching" section in the module documentation. """ @spec broadcast(t, topic, message, dispatcher) :: :ok | {:error, term} - def broadcast(pubsub, topic, message, dispatcher \\ __MODULE__) + def broadcast(pubsub, topic, message) + when is_atom(pubsub) and is_binary(topic) do + {:ok, dispatcher} = Registry.meta(pubsub, :dispatcher) + broadcast(pubsub, topic, message, dispatcher) + end + + def broadcast(pubsub, topic, message, dispatcher) when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do {:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub) @@ -273,7 +282,13 @@ defmodule Phoenix.PubSub do See the "Custom dispatching" section in the module documentation. """ @spec broadcast_from(t, pid, topic, message, dispatcher) :: :ok | {:error, term} - def broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__) + def broadcast_from(pubsub, from, topic, message) + when is_atom(pubsub) and is_pid(from) and is_binary(topic) do + {:ok, dispatcher} = Registry.meta(pubsub, :dispatcher) + broadcast_from(pubsub, from, topic, message, dispatcher) + end + + def broadcast_from(pubsub, from, topic, message, dispatcher) when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do {:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub) @@ -293,7 +308,13 @@ defmodule Phoenix.PubSub do See the "Custom dispatching" section in the module documentation. """ @spec local_broadcast(t, topic, message, dispatcher) :: :ok - def local_broadcast(pubsub, topic, message, dispatcher \\ __MODULE__) + def local_broadcast(pubsub, topic, message) + when is_atom(pubsub) and is_binary(topic) do + {:ok, dispatcher} = Registry.meta(pubsub, :dispatcher) + dispatch(pubsub, :none, topic, message, dispatcher) + end + + def local_broadcast(pubsub, topic, message, dispatcher) when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do dispatch(pubsub, :none, topic, message, dispatcher) end @@ -313,7 +334,13 @@ defmodule Phoenix.PubSub do See the "Custom dispatching" section in the module documentation. """ @spec local_broadcast_from(t, pid, topic, message, dispatcher) :: :ok - def local_broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__) + def local_broadcast_from(pubsub, from, topic, message) + when is_atom(pubsub) and is_pid(from) and is_binary(topic) do + {:ok, dispatcher} = Registry.meta(pubsub, :dispatcher) + dispatch(pubsub, from, topic, message, dispatcher) + end + + def local_broadcast_from(pubsub, from, topic, message, dispatcher) when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do dispatch(pubsub, from, topic, message, dispatcher) end @@ -333,7 +360,13 @@ defmodule Phoenix.PubSub do See the "Custom dispatching" section in the module documentation. """ @spec direct_broadcast(node_name, t, topic, message, dispatcher) :: :ok | {:error, term} - def direct_broadcast(node_name, pubsub, topic, message, dispatcher \\ __MODULE__) + def direct_broadcast(node_name, pubsub, topic, message) + when is_atom(pubsub) and is_binary(topic) do + {:ok, dispatcher} = Registry.meta(pubsub, :dispatcher) + direct_broadcast(node_name, pubsub, topic, message, dispatcher) + end + + def direct_broadcast(node_name, pubsub, topic, message, dispatcher) when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do {:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub) adapter.direct_broadcast(name, node_name, topic, message, dispatcher) @@ -343,7 +376,14 @@ defmodule Phoenix.PubSub do Raising version of `broadcast/4`. """ @spec broadcast!(t, topic, message, dispatcher) :: :ok - def broadcast!(pubsub, topic, message, dispatcher \\ __MODULE__) do + def broadcast!(pubsub, topic, message) do + case broadcast(pubsub, topic, message) do + :ok -> :ok + {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" + end + end + + def broadcast!(pubsub, topic, message, dispatcher) do case broadcast(pubsub, topic, message, dispatcher) do :ok -> :ok {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" @@ -354,7 +394,14 @@ defmodule Phoenix.PubSub do Raising version of `broadcast_from/5`. """ @spec broadcast_from!(t, pid, topic, message, dispatcher) :: :ok - def broadcast_from!(pubsub, from, topic, message, dispatcher \\ __MODULE__) do + def broadcast_from!(pubsub, from, topic, message) do + case broadcast_from(pubsub, from, topic, message) do + :ok -> :ok + {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" + end + end + + def broadcast_from!(pubsub, from, topic, message, dispatcher) do case broadcast_from(pubsub, from, topic, message, dispatcher) do :ok -> :ok {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" @@ -365,7 +412,14 @@ defmodule Phoenix.PubSub do Raising version of `direct_broadcast/5`. """ @spec direct_broadcast!(node_name, t, topic, message, dispatcher) :: :ok - def direct_broadcast!(node_name, pubsub, topic, message, dispatcher \\ __MODULE__) do + def direct_broadcast!(node_name, pubsub, topic, message) do + case direct_broadcast(node_name, pubsub, topic, message) do + :ok -> :ok + {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" + end + end + + def direct_broadcast!(node_name, pubsub, topic, message, dispatcher) do case direct_broadcast(node_name, pubsub, topic, message, dispatcher) do :ok -> :ok {:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}" diff --git a/lib/phoenix/pubsub/supervisor.ex b/lib/phoenix/pubsub/supervisor.ex index a1b93eaa..08cbc174 100644 --- a/lib/phoenix/pubsub/supervisor.ex +++ b/lib/phoenix/pubsub/supervisor.ex @@ -25,8 +25,10 @@ defmodule Phoenix.PubSub.Supervisor do opts[:registry_size] || opts[:pool_size] || System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc() + dispatcher = Keyword.get(opts, :dispatcher, Phoenix.PubSub) + registry = [ - meta: [pubsub: {adapter, adapter_name}], + meta: [pubsub: {adapter, adapter_name}, dispatcher: dispatcher], partitions: partitions, keys: :duplicate, name: name diff --git a/test/phoenix/pubsub_test.exs b/test/phoenix/pubsub_test.exs index ff2e1323..0eff5df2 100644 --- a/test/phoenix/pubsub_test.exs +++ b/test/phoenix/pubsub_test.exs @@ -1,9 +1,11 @@ defmodule Phoenix.PubSub.UnitTest do use ExUnit.Case, async: true + alias Phoenix.PubSub + describe "child_spec/1" do test "expects a name" do - {:error, {{:EXIT, {exception, _}}, _}} = start_supervised({Phoenix.PubSub, []}) + {:error, {{:EXIT, {exception, _}}, _}} = start_supervised({PubSub, []}) assert Exception.message(exception) == "the :name option is required when starting Phoenix.PubSub" @@ -22,4 +24,82 @@ defmodule Phoenix.PubSub.UnitTest do :"#{__MODULE__}_#{:crypto.strong_rand_bytes(8) |> Base.encode16()}" end end + + describe "default dispatcher" do + defmodule TestDispatcher do + def dispatch(entries, :none, message) do + for {pid, _} <- entries do + send(pid, {:custom_dispatched, message}) + end + + :ok + end + + def dispatch(entries, from, message) do + for {pid, _} <- entries, pid != from do + send(pid, {:custom_dispatched, message}) + end + + :ok + end + end + + test "defaults to Phoenix.PubSub when no dispatcher configured" do + name = :"ps_default_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name}) + + PubSub.subscribe(name, "topic") + PubSub.broadcast(name, "topic", :hello) + assert_receive :hello + end + + test "uses configured dispatcher for broadcast/3" do + name = :"ps_custom_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, dispatcher: TestDispatcher}) + + PubSub.subscribe(name, "topic") + PubSub.broadcast(name, "topic", :hello) + assert_receive {:custom_dispatched, :hello} + refute_received :hello + end + + test "uses configured dispatcher for local_broadcast/3" do + name = :"ps_local_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, dispatcher: TestDispatcher}) + + PubSub.subscribe(name, "topic") + PubSub.local_broadcast(name, "topic", :hello) + assert_receive {:custom_dispatched, :hello} + end + + test "uses configured dispatcher for broadcast_from/4" do + name = :"ps_from_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, dispatcher: TestDispatcher}) + + PubSub.subscribe(name, "topic") + other = spawn(fn -> Process.sleep(:infinity) end) + PubSub.broadcast_from(name, other, "topic", :hello) + assert_receive {:custom_dispatched, :hello} + end + + test "explicit dispatcher overrides the configured default" do + name = :"ps_override_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, dispatcher: TestDispatcher}) + + PubSub.subscribe(name, "topic") + # Pass Phoenix.PubSub explicitly to override the configured TestDispatcher + PubSub.broadcast(name, "topic", :hello, PubSub) + assert_receive :hello + refute_received {:custom_dispatched, :hello} + end + + test "bang variants use configured dispatcher" do + name = :"ps_bang_#{:erlang.unique_integer([:positive])}" + start_supervised!({PubSub, name: name, dispatcher: TestDispatcher}) + + PubSub.subscribe(name, "topic") + PubSub.broadcast!(name, "topic", :hello) + assert_receive {:custom_dispatched, :hello} + end + end end