From 31d26ce880f2c6254bdd3e8dd76f5527d8f8ca91 Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Thu, 12 Feb 2026 23:15:26 +0100 Subject: [PATCH 1/6] wip --- .../postgres_cdc_rls/subscriptions.ex | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index db6cd027b..a9cf7d5da 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -220,17 +220,22 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do case params do %{"schema" => schema, "table" => table, "filter" => filter} when is_binary(schema) and is_binary(table) and is_binary(filter) -> - with [col, rest] <- String.split(filter, "=", parts: 2), - [filter_type, value] when filter_type in @filter_types <- - String.split(rest, ".", parts: 2), - {:ok, formatted_value} <- format_filter_value(filter_type, value) do - {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}} - else - {:error, msg} -> - {:error, "Error parsing `filter` params: #{msg}"} - - e -> - {:error, "Error parsing `filter` params: #{inspect(e)}"} + try do + {:ok, + {action_filter, schema, table, + Enum.map(String.split(filter, ","), fn part -> + with [col, rest] <- String.split(part, "=", parts: 2), + [filter_type, value] when filter_type in @filter_types <- + String.split(rest, ".", parts: 2), + {:ok, formatted_value} <- format_filter_value(filter_type, value) do + {col, filter_type, formatted_value} + else + {:error, msg} -> throw("Error parsing `filter` params: #{msg}") + e -> throw("Error parsing `filter` params: #{inspect(e)}") + end + end)}} + catch + msg -> {:error, msg} end %{"schema" => schema, "table" => table} From de22c9db74b4c2ce3be87388c4ff4e57ef4c99db Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Fri, 13 Feb 2026 20:32:28 +0100 Subject: [PATCH 2/6] parsing --- .../realtime_filter_parser.ex | 203 ++++++++++++++++++ .../postgres_cdc_rls/subscriptions.ex | 35 +-- test/realtime/filter_parser_test.exs | 37 ++++ 3 files changed, 245 insertions(+), 30 deletions(-) create mode 100644 lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex create mode 100644 test/realtime/filter_parser_test.exs diff --git a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex new file mode 100644 index 000000000..104da904d --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex @@ -0,0 +1,203 @@ +defmodule RealtimeFilterParser do + @moduledoc """ + Parse Supabase realtime filter strings like: + + "date=eq.2026-02-03,published_at=not.is.null,area=eq.\"Oslo, Norway\",id=in.(1,2,3)" + + Returns `{:ok, filters}` where filters is a list of `{column, operator, value}` tuples. + + Special-cases: + - "is.null" -> {"", "null", nil} + - "not.is.null" -> {"", "nnull", nil} + - "in.(a,b)" -> {"", "in", "{a,b}"} + """ + + @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] + + @spec parse_filter(String.t() | nil) :: + {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} + def parse_filter(nil), do: {:ok, []} + def parse_filter(""), do: {:ok, []} + + def parse_filter(filter) when is_binary(filter) do + with parts when is_list(parts) <- split_on_unquoted_commas(filter), + {:ok, filters} <- parse_parts(parts) do + {:ok, filters} + else + {:error, _} = err -> err + other -> {:error, "unexpected parse error: #{inspect(other)}"} + end + end + + # ------------------------------------------------------------ + # Splitting logic (comma, but not inside quoted strings or parentheses) + # ------------------------------------------------------------ + @spec split_on_unquoted_commas(String.t()) :: [String.t()] + defp split_on_unquoted_commas(s) do + s + |> String.graphemes() + |> do_split([], "", nil, 0, false) + |> Enum.map(&String.trim/1) + |> Enum.reject(&(&1 == "")) + end + + # acc - completed parts (reversed) + # buf - current buffer + # quote - current quote char (" or ') or nil + # paren_depth- nesting level of parentheses (0 = outside) + # escape - whether previous char was backslash inside quotes + defp do_split([], acc, buf, _quote, _paren_depth, _escape), + do: Enum.reverse([buf | acc]) + + # Split only when we see a comma that is outside quotes and with no open parentheses. + defp do_split(["," | rest], acc, buf, nil, 0, false) do + do_split(rest, [buf | acc], "", nil, 0, false) + end + + defp do_split([c | rest], acc, buf, quote, paren_depth, escape) do + cond do + # inside quotes and previous char was escape -> append char literally + quote in ["\"", "'"] and escape -> + do_split(rest, acc, buf <> c, quote, paren_depth, false) + + # inside quotes and see backslash -> mark escape + quote in ["\"", "'"] and c == "\\" -> + do_split(rest, acc, buf <> c, quote, paren_depth, true) + + # closing quote (matches current) -> append and leave quote context + quote in ["\"", "'"] and c == quote -> + do_split(rest, acc, buf <> c, nil, paren_depth, false) + + # inside quotes, normal char + quote in ["\"", "'"] -> + do_split(rest, acc, buf <> c, quote, paren_depth, false) + + # not in quotes, see an opening quote -> enter quote context + c in ["\"", "'"] -> + do_split(rest, acc, buf <> c, c, paren_depth, false) + + # not in quotes, opening parenthesis increments depth + c == "(" -> + do_split(rest, acc, buf <> c, quote, paren_depth + 1, false) + + # not in quotes, closing parenthesis decrements depth (but never below 0) + c == ")" -> + new_depth = max(paren_depth - 1, 0) + do_split(rest, acc, buf <> c, quote, new_depth, false) + + # any other char outside quotes/parentheses -> append + true -> + do_split(rest, acc, buf <> c, quote, paren_depth, false) + end + end + + # ------------------------------------------------------------ + # Parsing logic + # ------------------------------------------------------------ + @spec parse_parts([String.t()]) :: + {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} + defp parse_parts(parts) do + parts + |> Enum.reduce_while({:ok, []}, fn part, {:ok, acc} -> + case String.split(part, "=", parts: 2) do + [col, rest] -> + col = String.trim(col) + + cond do + rest == "is.null" -> + {:cont, {:ok, [{col, "null", nil} | acc]}} + + rest == "not.is.null" -> + {:cont, {:ok, [{col, "nnull", nil} | acc]}} + + true -> + case String.split(rest, ".", parts: 2) do + [filter_type, raw_value] -> + if filter_type in @filter_types do + with {:ok, extracted} <- extract_value(raw_value), + {:ok, formatted} <- format_filter_value(filter_type, extracted) do + {:cont, {:ok, [{col, filter_type, formatted} | acc]}} + else + {:error, reason} -> + {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} + end + else + {:halt, + {:error, + "unsupported filter type '#{filter_type}' for part: '#{part}'. supported: #{inspect(@filter_types)}"}} + end + + _ -> + {:halt, {:error, "invalid filter format for '#{part}', expected '.'"}} + end + end + + _ -> + {:halt, {:error, "missing '=' in filter part: '#{part}'"}} + end + end) + |> case do + {:ok, list} -> {:ok, Enum.reverse(list)} + other -> other + end + end + + # ------------------------------------------------------------ + # Value extraction + formatting + # ------------------------------------------------------------ + @spec extract_value(String.t()) :: {:ok, String.t()} | {:error, String.t()} + defp extract_value(value) when is_binary(value) do + value = String.trim(value) + + cond do + quoted?(value, "\"") -> + inside = String.slice(value, 1..-2) + {:ok, unescape_inside(inside)} + + quoted?(value, "'") -> + inside = String.slice(value, 1..-2) + {:ok, unescape_inside(inside)} + + true -> + {:ok, value} + end + end + + defp quoted?(value, q) do + String.starts_with?(value, q) and String.ends_with?(value, q) + end + + defp unescape_inside(s) do + s + |> String.replace(~S(\\\"), "\"") + |> String.replace(~S(\\'), "'") + |> String.replace(~S(\\\\), "\\") + end + + # ------------------------------------------------------------ + # format_filter_value/2 (incorporated) + # ------------------------------------------------------------ + @spec format_filter_value(String.t(), any()) :: + {:ok, any()} | {:error, String.t()} + defp format_filter_value(filter, value) do + case filter do + "in" -> + case Regex.run(~r/^\((.*)\)$/, value) do + nil -> + {:error, "`in` filter value must be wrapped by parentheses"} + + [_, new_value] -> + {:ok, "{#{new_value}}"} + end + + "null" -> + {:ok, nil} + + "nnull" -> + {:ok, nil} + + _ -> + {:ok, value} + end + end +end diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index a9cf7d5da..89d07f9d6 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -11,8 +11,6 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do @type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]} @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}] - @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] - @spec create(conn(), String.t(), subscription_list, pid(), pid()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}} @@ -223,19 +221,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do try do {:ok, {action_filter, schema, table, - Enum.map(String.split(filter, ","), fn part -> - with [col, rest] <- String.split(part, "=", parts: 2), - [filter_type, value] when filter_type in @filter_types <- - String.split(rest, ".", parts: 2), - {:ok, formatted_value} <- format_filter_value(filter_type, value) do - {col, filter_type, formatted_value} - else - {:error, msg} -> throw("Error parsing `filter` params: #{msg}") - e -> throw("Error parsing `filter` params: #{inspect(e)}") - end - end)}} + case RealtimeFilterParser.parse_filter(filter) do + {:ok, filters} -> filters + {:error, error} -> throw(error) + end}} catch - msg -> {:error, msg} + error -> {:error, error} end %{"schema" => schema, "table" => table} @@ -272,20 +263,4 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do end defp action_filter(_), do: "*" - - defp format_filter_value(filter, value) do - case filter do - "in" -> - case Regex.run(~r/^\((.*)\)$/, value) do - nil -> - {:error, "`in` filter value must be wrapped by parentheses"} - - [_, new_value] -> - {:ok, "{#{new_value}}"} - end - - _ -> - {:ok, value} - end - end end diff --git a/test/realtime/filter_parser_test.exs b/test/realtime/filter_parser_test.exs new file mode 100644 index 000000000..98efb79b3 --- /dev/null +++ b/test/realtime/filter_parser_test.exs @@ -0,0 +1,37 @@ +defmodule RealtimeFilterParserTest do + use ExUnit.Case, async: true + + alias RealtimeFilterParser + + test "parses complex filter string" do + input = + ~s/date=eq.2026-02-03,published_at=not.is.null,area=eq."Oslo, Norway",id=in.(1,2,3)/ + + assert {:ok, + [ + {"date", "eq", "2026-02-03"}, + {"published_at", "nnull", nil}, + {"area", "eq", "Oslo, Norway"}, + {"id", "in", "{1,2,3}"} + ]} = RealtimeFilterParser.parse_filter(input) + end + + test "parses in(...) into { ... }" do + assert {:ok, [{"id", "in", "{1,2}"}]} = RealtimeFilterParser.parse_filter("id=in.(1,2)") + end + + test "returns error for in without parentheses" do + # malformed: value not wrapped in parentheses -> should error + assert {:error, _} = RealtimeFilterParser.parse_filter("id=in.1,2,3") + end + + test "handles quoted values containing commas" do + input = ~s/area=eq."Bergen, NO"/ + assert {:ok, [{"area", "eq", "Bergen, NO"}]} = RealtimeFilterParser.parse_filter(input) + end + + test "empty or nil filter returns ok with empty list" do + assert {:ok, []} = RealtimeFilterParser.parse_filter("") + assert {:ok, []} = RealtimeFilterParser.parse_filter(nil) + end +end From 8d108547451cbe93a127c185ccb27fee375f71c1 Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Fri, 13 Feb 2026 21:12:10 +0100 Subject: [PATCH 3/6] add migration, use isnull/notnull over null/nnull --- .../realtime_filter_parser.ex | 12 ++--- .../20260213194320_support_is_null_filter.exs | 53 +++++++++++++++++++ test/realtime/filter_parser_test.exs | 2 +- 3 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs diff --git a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex index 104da904d..42becce36 100644 --- a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex +++ b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex @@ -7,8 +7,8 @@ defmodule RealtimeFilterParser do Returns `{:ok, filters}` where filters is a list of `{column, operator, value}` tuples. Special-cases: - - "is.null" -> {"", "null", nil} - - "not.is.null" -> {"", "nnull", nil} + - "is.null" -> {"", "isnull", nil} + - "not.is.null" -> {"", "notnull", nil} - "in.(a,b)" -> {"", "in", "{a,b}"} """ @@ -105,10 +105,10 @@ defmodule RealtimeFilterParser do cond do rest == "is.null" -> - {:cont, {:ok, [{col, "null", nil} | acc]}} + {:cont, {:ok, [{col, "isnull", nil} | acc]}} rest == "not.is.null" -> - {:cont, {:ok, [{col, "nnull", nil} | acc]}} + {:cont, {:ok, [{col, "notnull", nil} | acc]}} true -> case String.split(rest, ".", parts: 2) do @@ -190,10 +190,10 @@ defmodule RealtimeFilterParser do {:ok, "{#{new_value}}"} end - "null" -> + "isnull" -> {:ok, nil} - "nnull" -> + "notnull" -> {:ok, nil} _ -> diff --git a/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs new file mode 100644 index 000000000..cbb60bcb1 --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs @@ -0,0 +1,53 @@ +defmodule Realtime.Tenants.Migrations.SupportIsNullFilter do + @moduledoc false + + use Ecto.Migration + + def change do + execute("alter type realtime.equality_op add value 'isnull';") + execute("alter type realtime.equality_op add value 'notnull';") + + execute(" +CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) + RETURNS boolean + LANGUAGE plpgsql + IMMUTABLE +AS $function$ + /* + Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness + */ + declare + op_symbol text = ( + case + when op = 'eq' then '=' + when op = 'neq' then '!=' + when op = 'lt' then '<' + when op = 'lte' then '<=' + when op = 'gt' then '>' + when op = 'gte' then '>=' + when op = 'in' then '= any' + when op in ('isnull', 'notnull') then op + else 'UNKNOWN OP' + end + ); + res boolean; + begin + if op in ('isnull', 'notnull') then + execute format('select %L::'|| type_::text || ' ' || op_symbol, val_1) into res; + else + execute format( + 'select %L::'|| type_::text || ' ' || op_symbol + || ' ( %L::' + || ( + case + when op = 'in' then type_::text || '[]' + else type_::text end + ) + || ')', val_1, val_2) into res; + end if; + return res; + end; + $function$ + ") + end +end diff --git a/test/realtime/filter_parser_test.exs b/test/realtime/filter_parser_test.exs index 98efb79b3..58552e129 100644 --- a/test/realtime/filter_parser_test.exs +++ b/test/realtime/filter_parser_test.exs @@ -10,7 +10,7 @@ defmodule RealtimeFilterParserTest do assert {:ok, [ {"date", "eq", "2026-02-03"}, - {"published_at", "nnull", nil}, + {"published_at", "notnull", nil}, {"area", "eq", "Oslo, Norway"}, {"id", "in", "{1,2,3}"} ]} = RealtimeFilterParser.parse_filter(input) From 44c9fd9375f1c193ca08c873a3241472b77b32c4 Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Fri, 13 Feb 2026 21:29:07 +0100 Subject: [PATCH 4/6] whoops --- .../repo/migrations/20260213194320_support_is_null_filter.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs index cbb60bcb1..e4c5df74b 100644 --- a/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs +++ b/lib/realtime/tenants/repo/migrations/20260213194320_support_is_null_filter.exs @@ -26,7 +26,8 @@ AS $function$ when op = 'gt' then '>' when op = 'gte' then '>=' when op = 'in' then '= any' - when op in ('isnull', 'notnull') then op + when op = 'isnull' then 'isnull' + when op = 'notnull' then 'notnull' else 'UNKNOWN OP' end ); From b94ca7e0fbfefd268d9a0ea33a7c6b18b5a1223a Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Sun, 15 Feb 2026 17:07:56 +0100 Subject: [PATCH 5/6] remove quote handling, instead split by unescaped commas --- .../realtime_filter_parser.ex | 205 ++++++++---------- test/realtime/filter_parser_test.exs | 7 +- 2 files changed, 96 insertions(+), 116 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex index 42becce36..0a537fa72 100644 --- a/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex +++ b/lib/extensions/postgres_cdc_rls/realtime_filter_parser.ex @@ -1,18 +1,38 @@ defmodule RealtimeFilterParser do @moduledoc """ - Parse Supabase realtime filter strings like: + Parses Supabase realtime filter strings such as: - "date=eq.2026-02-03,published_at=not.is.null,area=eq.\"Oslo, Norway\",id=in.(1,2,3)" + "date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)" - Returns `{:ok, filters}` where filters is a list of `{column, operator, value}` tuples. + Splitting rules: - Special-cases: - - "is.null" -> {"", "isnull", nil} - - "not.is.null" -> {"", "notnull", nil} - - "in.(a,b)" -> {"", "in", "{a,b}"} + • The filter string is split on commas. + • A comma can be escaped using '\\,' and will then be treated as part of the value. + Example: + area=eq.Oslo\\, Norway + becomes: + {"area", "eq", "Oslo, Norway"} + + • Commas inside parentheses are NOT treated as separators. + Example: + id=in.(1,2,3) + + Supported operators: + + eq, neq, lt, lte, gt, gte, in, isnull, notnull + + Special cases: + + is.null → {"column", "isnull", nil} + not.is.null → {"column", "notnull", nil} + + Returns: + + {:ok, [{column, operator, value}, ...]} + {:error, reason} """ - @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] + @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"] @spec parse_filter(String.t() | nil) :: {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} @@ -20,7 +40,7 @@ defmodule RealtimeFilterParser do def parse_filter(""), do: {:ok, []} def parse_filter(filter) when is_binary(filter) do - with parts when is_list(parts) <- split_on_unquoted_commas(filter), + with parts when is_list(parts) <- split_filter(filter), {:ok, filters} <- parse_parts(parts) do {:ok, filters} else @@ -30,70 +50,55 @@ defmodule RealtimeFilterParser do end # ------------------------------------------------------------ - # Splitting logic (comma, but not inside quoted strings or parentheses) + # Splitting logic: + # - split on commas unless escaped: '\,' + # - do not split on commas inside parentheses (for in.(...)) + # - unescape '\,' -> ',' in each resulting part # ------------------------------------------------------------ - @spec split_on_unquoted_commas(String.t()) :: [String.t()] - defp split_on_unquoted_commas(s) do - s + + @spec split_filter(String.t()) :: [String.t()] + defp split_filter(filter) do + filter |> String.graphemes() - |> do_split([], "", nil, 0, false) + |> do_split([], "", 0, false) |> Enum.map(&String.trim/1) |> Enum.reject(&(&1 == "")) + |> Enum.map(&String.replace(&1, "\\,", ",")) end - # acc - completed parts (reversed) - # buf - current buffer - # quote - current quote char (" or ') or nil - # paren_depth- nesting level of parentheses (0 = outside) - # escape - whether previous char was backslash inside quotes - defp do_split([], acc, buf, _quote, _paren_depth, _escape), + # acc: list of completed parts (reversed) + # buf: current part buffer + # paren_depth: nesting depth of parentheses + # escaped: whether previous char was '\' + defp do_split([], acc, buf, _paren_depth, _escaped), do: Enum.reverse([buf | acc]) - # Split only when we see a comma that is outside quotes and with no open parentheses. - defp do_split(["," | rest], acc, buf, nil, 0, false) do - do_split(rest, [buf | acc], "", nil, 0, false) + # split only when comma is outside parentheses and not escaped + defp do_split(["," | rest], acc, buf, 0, false) do + do_split(rest, [buf | acc], "", 0, false) end - defp do_split([c | rest], acc, buf, quote, paren_depth, escape) do - cond do - # inside quotes and previous char was escape -> append char literally - quote in ["\"", "'"] and escape -> - do_split(rest, acc, buf <> c, quote, paren_depth, false) - - # inside quotes and see backslash -> mark escape - quote in ["\"", "'"] and c == "\\" -> - do_split(rest, acc, buf <> c, quote, paren_depth, true) - - # closing quote (matches current) -> append and leave quote context - quote in ["\"", "'"] and c == quote -> - do_split(rest, acc, buf <> c, nil, paren_depth, false) - - # inside quotes, normal char - quote in ["\"", "'"] -> - do_split(rest, acc, buf <> c, quote, paren_depth, false) - - # not in quotes, see an opening quote -> enter quote context - c in ["\"", "'"] -> - do_split(rest, acc, buf <> c, c, paren_depth, false) + # backslash means next char is "escaped" for splitting purposes (we keep the backslash) + defp do_split(["\\" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> "\\", paren_depth, true) + end - # not in quotes, opening parenthesis increments depth - c == "(" -> - do_split(rest, acc, buf <> c, quote, paren_depth + 1, false) + defp do_split(["(" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> "(", paren_depth + 1, false) + end - # not in quotes, closing parenthesis decrements depth (but never below 0) - c == ")" -> - new_depth = max(paren_depth - 1, 0) - do_split(rest, acc, buf <> c, quote, new_depth, false) + defp do_split([")" | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> ")", max(paren_depth - 1, 0), false) + end - # any other char outside quotes/parentheses -> append - true -> - do_split(rest, acc, buf <> c, quote, paren_depth, false) - end + defp do_split([c | rest], acc, buf, paren_depth, _escaped) do + do_split(rest, acc, buf <> c, paren_depth, false) end # ------------------------------------------------------------ # Parsing logic # ------------------------------------------------------------ + @spec parse_parts([String.t()]) :: {:ok, list({String.t(), String.t(), any()})} | {:error, String.t()} defp parse_parts(parts) do @@ -103,33 +108,23 @@ defmodule RealtimeFilterParser do [col, rest] -> col = String.trim(col) - cond do - rest == "is.null" -> - {:cont, {:ok, [{col, "isnull", nil} | acc]}} - - rest == "not.is.null" -> - {:cont, {:ok, [{col, "notnull", nil} | acc]}} - - true -> - case String.split(rest, ".", parts: 2) do - [filter_type, raw_value] -> - if filter_type in @filter_types do - with {:ok, extracted} <- extract_value(raw_value), - {:ok, formatted} <- format_filter_value(filter_type, extracted) do - {:cont, {:ok, [{col, filter_type, formatted} | acc]}} - else - {:error, reason} -> - {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} - end - else - {:halt, - {:error, - "unsupported filter type '#{filter_type}' for part: '#{part}'. supported: #{inspect(@filter_types)}"}} - end - - _ -> - {:halt, {:error, "invalid filter format for '#{part}', expected '.'"}} + case parse_op_and_value(rest) do + {:ok, filter_type, raw_value} -> + if filter_type in @filter_types do + with {:ok, formatted} <- format_filter_value(filter_type, raw_value) do + {:cont, {:ok, [{col, filter_type, formatted} | acc]}} + else + {:error, reason} -> + {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} + end + else + {:halt, + {:error, + "unsupported filter type '#{filter_type}' for part: '#{part}'. supported: #{inspect(@filter_types)}"}} end + + {:error, reason} -> + {:halt, {:error, "failed to parse filter '#{part}': #{reason}"}} end _ -> @@ -142,43 +137,33 @@ defmodule RealtimeFilterParser do end end - # ------------------------------------------------------------ - # Value extraction + formatting - # ------------------------------------------------------------ - @spec extract_value(String.t()) :: {:ok, String.t()} | {:error, String.t()} - defp extract_value(value) when is_binary(value) do - value = String.trim(value) - - cond do - quoted?(value, "\"") -> - inside = String.slice(value, 1..-2) - {:ok, unescape_inside(inside)} + # "is.null" => {"isnull", nil} + # "not.is.null" => {"notnull", nil} + # "." => {op, value} (value left untouched; quotes untouched) + @spec parse_op_and_value(String.t()) :: {:ok, String.t(), any()} | {:error, String.t()} + defp parse_op_and_value(rest) when is_binary(rest) do + rest = String.trim(rest) - quoted?(value, "'") -> - inside = String.slice(value, 1..-2) - {:ok, unescape_inside(inside)} + case String.split(rest, ".", parts: 3) do + ["is", "null"] -> + {:ok, "isnull", nil} - true -> - {:ok, value} - end - end + ["not", "is", "null"] -> + {:ok, "notnull", nil} - defp quoted?(value, q) do - String.starts_with?(value, q) and String.ends_with?(value, q) - end + [filter_type, raw_value] -> + {:ok, filter_type, raw_value} - defp unescape_inside(s) do - s - |> String.replace(~S(\\\"), "\"") - |> String.replace(~S(\\'), "'") - |> String.replace(~S(\\\\), "\\") + _ -> + {:error, "invalid filter format, expected 'is.null', 'not.is.null', or '.'"} + end end # ------------------------------------------------------------ - # format_filter_value/2 (incorporated) + # Value formatting # ------------------------------------------------------------ - @spec format_filter_value(String.t(), any()) :: - {:ok, any()} | {:error, String.t()} + + @spec format_filter_value(String.t(), any()) :: {:ok, any()} | {:error, String.t()} defp format_filter_value(filter, value) do case filter do "in" -> diff --git a/test/realtime/filter_parser_test.exs b/test/realtime/filter_parser_test.exs index 58552e129..3fd60c33a 100644 --- a/test/realtime/filter_parser_test.exs +++ b/test/realtime/filter_parser_test.exs @@ -5,7 +5,7 @@ defmodule RealtimeFilterParserTest do test "parses complex filter string" do input = - ~s/date=eq.2026-02-03,published_at=not.is.null,area=eq."Oslo, Norway",id=in.(1,2,3)/ + ~s/date=eq.2026-02-03,published_at=not.is.null,area=eq.Oslo\\, Norway,id=in.(1,2,3)/ assert {:ok, [ @@ -25,11 +25,6 @@ defmodule RealtimeFilterParserTest do assert {:error, _} = RealtimeFilterParser.parse_filter("id=in.1,2,3") end - test "handles quoted values containing commas" do - input = ~s/area=eq."Bergen, NO"/ - assert {:ok, [{"area", "eq", "Bergen, NO"}]} = RealtimeFilterParser.parse_filter(input) - end - test "empty or nil filter returns ok with empty list" do assert {:ok, []} = RealtimeFilterParser.parse_filter("") assert {:ok, []} = RealtimeFilterParser.parse_filter(nil) From 2957afee7808b9554551674d56e7b2f71ec60e8e Mon Sep 17 00:00:00 2001 From: Martin Pedersen Date: Tue, 17 Feb 2026 17:59:03 +0100 Subject: [PATCH 6/6] remove try/catch, fix tests --- lib/extensions/postgres_cdc_rls/subscriptions.ex | 16 +++++----------- .../channels/realtime_channel_test.exs | 2 +- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index 89d07f9d6..b8f1f5365 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -197,12 +197,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do An unsupported filter will respond with an error tuple: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}) - {:error, ~s(Error parsing `filter` params: ["like", "hey"])} + {:error, ~s(Error parsing `filter` params: unsupported filter type 'like' for part: 'subject=like.hey'. supported: ["eq", "neq", "lt", "lte", "gt", "gte", "in", "isnull", "notnull"])} Catch `undefined` filters: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"}) - {:error, ~s(Error parsing `filter` params: ["undefined"])} + {:error, ~s(Error parsing `filter` params: missing '=' in filter part: 'undefined')} Catch `missing params`: @@ -218,15 +218,9 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do case params do %{"schema" => schema, "table" => table, "filter" => filter} when is_binary(schema) and is_binary(table) and is_binary(filter) -> - try do - {:ok, - {action_filter, schema, table, - case RealtimeFilterParser.parse_filter(filter) do - {:ok, filters} -> filters - {:error, error} -> throw(error) - end}} - catch - error -> {:error, error} + case RealtimeFilterParser.parse_filter(filter) do + {:ok, filters} -> {:ok, {action_filter, schema, table, filters}} + {:error, msg} -> {:error, "Error parsing `filter` params: #{msg}"} end %{"schema" => schema, "table" => table} diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 3663f2acb..9fb45802d 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -168,7 +168,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert_push "system", %{ - message: "Error parsing `filter` params: [\"wrong\"]", + message: "Error parsing `filter` params: missing '=' in filter part: 'wrong'", status: "error", extension: "postgres_changes", channel: "test"