Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ defmodule Phoenix.PubSub do
* `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages
(defaults to `:pool_size`). This option is used during pool size migrations to ensure
no messages are lost. See the "Safe Pool Size Migration" section in the module documentation.
* `:dispatcher` - the default dispatcher module for broadcasts
(defaults to `Phoenix.PubSub`). Can be overridden per-call by
passing a dispatcher to `broadcast/4` and friends.

"""
@spec child_spec(keyword) :: Supervisor.child_spec()
Expand Down Expand Up @@ -249,7 +252,13 @@ defmodule Phoenix.PubSub do
See the "Custom dispatching" section in the module documentation.
"""
@spec broadcast(t, topic, message, dispatcher) :: :ok | {:error, term}
def broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)
def broadcast(pubsub, topic, message)
when is_atom(pubsub) and is_binary(topic) do
{:ok, dispatcher} = Registry.meta(pubsub, :dispatcher)
broadcast(pubsub, topic, message, dispatcher)
end

def broadcast(pubsub, topic, message, dispatcher)
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)

Expand All @@ -273,7 +282,13 @@ defmodule Phoenix.PubSub do
See the "Custom dispatching" section in the module documentation.
"""
@spec broadcast_from(t, pid, topic, message, dispatcher) :: :ok | {:error, term}
def broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)
def broadcast_from(pubsub, from, topic, message)
when is_atom(pubsub) and is_pid(from) and is_binary(topic) do
{:ok, dispatcher} = Registry.meta(pubsub, :dispatcher)
broadcast_from(pubsub, from, topic, message, dispatcher)
end

def broadcast_from(pubsub, from, topic, message, dispatcher)
when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)

Expand All @@ -293,7 +308,13 @@ defmodule Phoenix.PubSub do
See the "Custom dispatching" section in the module documentation.
"""
@spec local_broadcast(t, topic, message, dispatcher) :: :ok
def local_broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)
def local_broadcast(pubsub, topic, message)
when is_atom(pubsub) and is_binary(topic) do
{:ok, dispatcher} = Registry.meta(pubsub, :dispatcher)
dispatch(pubsub, :none, topic, message, dispatcher)
end

def local_broadcast(pubsub, topic, message, dispatcher)
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
dispatch(pubsub, :none, topic, message, dispatcher)
end
Expand All @@ -313,7 +334,13 @@ defmodule Phoenix.PubSub do
See the "Custom dispatching" section in the module documentation.
"""
@spec local_broadcast_from(t, pid, topic, message, dispatcher) :: :ok
def local_broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)
def local_broadcast_from(pubsub, from, topic, message)
when is_atom(pubsub) and is_pid(from) and is_binary(topic) do
{:ok, dispatcher} = Registry.meta(pubsub, :dispatcher)
dispatch(pubsub, from, topic, message, dispatcher)
end

def local_broadcast_from(pubsub, from, topic, message, dispatcher)
when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do
dispatch(pubsub, from, topic, message, dispatcher)
end
Expand All @@ -333,7 +360,13 @@ defmodule Phoenix.PubSub do
See the "Custom dispatching" section in the module documentation.
"""
@spec direct_broadcast(node_name, t, topic, message, dispatcher) :: :ok | {:error, term}
def direct_broadcast(node_name, pubsub, topic, message, dispatcher \\ __MODULE__)
def direct_broadcast(node_name, pubsub, topic, message)
when is_atom(pubsub) and is_binary(topic) do
{:ok, dispatcher} = Registry.meta(pubsub, :dispatcher)
direct_broadcast(node_name, pubsub, topic, message, dispatcher)
end

def direct_broadcast(node_name, pubsub, topic, message, dispatcher)
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)
adapter.direct_broadcast(name, node_name, topic, message, dispatcher)
Expand All @@ -343,7 +376,14 @@ defmodule Phoenix.PubSub do
Raising version of `broadcast/4`.
"""
@spec broadcast!(t, topic, message, dispatcher) :: :ok
def broadcast!(pubsub, topic, message, dispatcher \\ __MODULE__) do
def broadcast!(pubsub, topic, message) do
case broadcast(pubsub, topic, message) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
end
end

def broadcast!(pubsub, topic, message, dispatcher) do
case broadcast(pubsub, topic, message, dispatcher) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
Expand All @@ -354,7 +394,14 @@ defmodule Phoenix.PubSub do
Raising version of `broadcast_from/5`.
"""
@spec broadcast_from!(t, pid, topic, message, dispatcher) :: :ok
def broadcast_from!(pubsub, from, topic, message, dispatcher \\ __MODULE__) do
def broadcast_from!(pubsub, from, topic, message) do
case broadcast_from(pubsub, from, topic, message) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
end
end

def broadcast_from!(pubsub, from, topic, message, dispatcher) do
case broadcast_from(pubsub, from, topic, message, dispatcher) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
Expand All @@ -365,7 +412,14 @@ defmodule Phoenix.PubSub do
Raising version of `direct_broadcast/5`.
"""
@spec direct_broadcast!(node_name, t, topic, message, dispatcher) :: :ok
def direct_broadcast!(node_name, pubsub, topic, message, dispatcher \\ __MODULE__) do
def direct_broadcast!(node_name, pubsub, topic, message) do
case direct_broadcast(node_name, pubsub, topic, message) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
end
end

def direct_broadcast!(node_name, pubsub, topic, message, dispatcher) do
case direct_broadcast(node_name, pubsub, topic, message, dispatcher) do
:ok -> :ok
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
Expand Down
4 changes: 3 additions & 1 deletion lib/phoenix/pubsub/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ defmodule Phoenix.PubSub.Supervisor do
opts[:registry_size] || opts[:pool_size] ||
System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc()

dispatcher = Keyword.get(opts, :dispatcher, Phoenix.PubSub)

registry = [
meta: [pubsub: {adapter, adapter_name}],
meta: [pubsub: {adapter, adapter_name}, dispatcher: dispatcher],
partitions: partitions,
keys: :duplicate,
name: name
Expand Down
82 changes: 81 additions & 1 deletion test/phoenix/pubsub_test.exs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
defmodule Phoenix.PubSub.UnitTest do
use ExUnit.Case, async: true

alias Phoenix.PubSub

describe "child_spec/1" do
test "expects a name" do
{:error, {{:EXIT, {exception, _}}, _}} = start_supervised({Phoenix.PubSub, []})
{:error, {{:EXIT, {exception, _}}, _}} = start_supervised({PubSub, []})

assert Exception.message(exception) ==
"the :name option is required when starting Phoenix.PubSub"
Expand All @@ -22,4 +24,82 @@ defmodule Phoenix.PubSub.UnitTest do
:"#{__MODULE__}_#{:crypto.strong_rand_bytes(8) |> Base.encode16()}"
end
end

describe "default dispatcher" do
defmodule TestDispatcher do
def dispatch(entries, :none, message) do
for {pid, _} <- entries do
send(pid, {:custom_dispatched, message})
end

:ok
end

def dispatch(entries, from, message) do
for {pid, _} <- entries, pid != from do
send(pid, {:custom_dispatched, message})
end

:ok
end
end

test "defaults to Phoenix.PubSub when no dispatcher configured" do
name = :"ps_default_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name})

PubSub.subscribe(name, "topic")
PubSub.broadcast(name, "topic", :hello)
assert_receive :hello
end

test "uses configured dispatcher for broadcast/3" do
name = :"ps_custom_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})

PubSub.subscribe(name, "topic")
PubSub.broadcast(name, "topic", :hello)
assert_receive {:custom_dispatched, :hello}
refute_received :hello
end

test "uses configured dispatcher for local_broadcast/3" do
name = :"ps_local_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})

PubSub.subscribe(name, "topic")
PubSub.local_broadcast(name, "topic", :hello)
assert_receive {:custom_dispatched, :hello}
end

test "uses configured dispatcher for broadcast_from/4" do
name = :"ps_from_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})

PubSub.subscribe(name, "topic")
other = spawn(fn -> Process.sleep(:infinity) end)
PubSub.broadcast_from(name, other, "topic", :hello)
assert_receive {:custom_dispatched, :hello}
end

test "explicit dispatcher overrides the configured default" do
name = :"ps_override_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})

PubSub.subscribe(name, "topic")
# Pass Phoenix.PubSub explicitly to override the configured TestDispatcher
PubSub.broadcast(name, "topic", :hello, PubSub)
assert_receive :hello
refute_received {:custom_dispatched, :hello}
end

test "bang variants use configured dispatcher" do
name = :"ps_bang_#{:erlang.unique_integer([:positive])}"
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})

PubSub.subscribe(name, "topic")
PubSub.broadcast!(name, "topic", :hello)
assert_receive {:custom_dispatched, :hello}
end
end
end