From 501ca324d736d8d07c742df15fe9a0358750eb92 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Wed, 20 May 2026 21:24:24 +1200 Subject: [PATCH 1/3] fix: update gen_rpc to original fork --- config/config.exs | 1 + mise.lock | 3 +++ mix.exs | 2 +- mix.lock | 2 +- 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/config/config.exs b/config/config.exs index 8550e0d72..76215644e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -79,6 +79,7 @@ config :opentelemetry, span_processor: :batch config :gen_rpc, + extra_process_flags: [fullsweep_after: 20], # Inactivity period in milliseconds after which a pending process holding an async_call return value will exit. # This is used for process sanitation purposes so please make sure to set it in a sufficiently high number async_call_inactivity_timeout: 300_000 diff --git a/mise.lock b/mise.lock index d77e9f833..5865a1109 100644 --- a/mise.lock +++ b/mise.lock @@ -8,6 +8,9 @@ backend = "core:elixir" version = "27.3.4.10" backend = "core:erlang" +[tools.erlang."platforms.macos-arm64"] +checksum = "blake3:0d27e4815676201f374d135c1a9a8ee7d41535c3477509be9836232c57afddab" + [[tools.node]] version = "24.14.1" backend = "core:node" diff --git a/mix.exs b/mix.exs index 27a4cc711..f1a840c6e 100644 --- a/mix.exs +++ b/mix.exs @@ -96,7 +96,7 @@ defmodule Realtime.MixProject do {:opentelemetry_phoenix, "~> 2.0"}, {:opentelemetry_cowboy, "~> 1.0"}, {:opentelemetry_ecto, "~> 1.2"}, - {:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "5382a0f2689a4cb8838873a2173928281dbe5002"}, + {:gen_rpc, git: "https://github.com/emqx/gen_rpc.git", tag: "3.6.1"}, {:req, "~> 0.5"}, {:mimic, "~> 1.0", only: :test}, {:floki, ">= 0.30.0", only: :test}, diff --git a/mix.lock b/mix.lock index 01a389c3d..42f41b7c7 100644 --- a/mix.lock +++ b/mix.lock @@ -29,7 +29,7 @@ "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, - "gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "5382a0f2689a4cb8838873a2173928281dbe5002", [ref: "5382a0f2689a4cb8838873a2173928281dbe5002"]}, + "gen_rpc": {:git, "https://github.com/emqx/gen_rpc.git", "891f90d713e83e3fca049345fb641afd9a1def28", [tag: "3.6.1"]}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, From 6be480360980af51ec9f88926789e001c84dca7d Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Thu, 21 May 2026 15:39:32 +1200 Subject: [PATCH 2/3] fix: special case for local abcast --- lib/realtime/gen_rpc.ex | 10 ++++++++-- test/realtime/gen_rpc_test.exs | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/realtime/gen_rpc.ex b/lib/realtime/gen_rpc.ex index 95b7b7ae0..16979f771 100644 --- a/lib/realtime/gen_rpc.ex +++ b/lib/realtime/gen_rpc.ex @@ -25,9 +25,15 @@ defmodule Realtime.GenRpc do @spec abcast([node], atom, any, keyword()) :: :ok def abcast(nodes, name, msg, opts) when is_list(nodes) and is_atom(name) and is_list(opts) do key = Keyword.get(opts, :key, nil) - nodes = cast_rpc_nodes(nodes, key) - :gen_rpc.abcast(nodes, name, msg) + {local, remote} = Enum.split_with(nodes, &(&1 == node())) + + if local != [], do: send({name, node()}, msg) + + remote + |> cast_rpc_nodes(key) + |> :gen_rpc.abcast(name, msg) + :ok end diff --git a/test/realtime/gen_rpc_test.exs b/test/realtime/gen_rpc_test.exs index d26f29649..8de3decdf 100644 --- a/test/realtime/gen_rpc_test.exs +++ b/test/realtime/gen_rpc_test.exs @@ -193,6 +193,20 @@ defmodule Realtime.GenRpcTest do refute_receive _any end + test "abcast to registered process on the local node" do + name = + System.unique_integer() + |> to_string() + |> String.to_atom() + + :erlang.register(name, self()) + + assert GenRpc.abcast([node()], name, "a message", []) == :ok + + assert_receive "a message" + refute_receive _any + end + @tag extra_config: [{:gen_rpc, :tcp_server_port, 9999}] test "tcp error" do Logger.put_process_level(self(), :debug) From 6b1fd4b51cde1fe621850ad5b63261e0c3c5646c Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Thu, 21 May 2026 16:53:39 +1200 Subject: [PATCH 3/3] fix: test --- test/realtime/gen_rpc_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/realtime/gen_rpc_test.exs b/test/realtime/gen_rpc_test.exs index 8de3decdf..511aafc14 100644 --- a/test/realtime/gen_rpc_test.exs +++ b/test/realtime/gen_rpc_test.exs @@ -218,7 +218,7 @@ defmodule Realtime.GenRpcTest do Process.sleep(100) end) - assert log =~ "[error] event=connect_to_remote_server" + assert log =~ "failed_to_connect_server" refute_receive _any end @@ -265,7 +265,7 @@ defmodule Realtime.GenRpcTest do Process.sleep(100) end) - assert log =~ "[error] event=connect_to_remote_server" + assert log =~ "failed_to_connect_server" refute_receive _any end @@ -294,7 +294,7 @@ defmodule Realtime.GenRpcTest do Process.sleep(100) end) - assert log =~ "[error] event=connect_to_remote_server" + assert log =~ "failed_to_connect_server" assert_receive :sent refute_receive _any