Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ gleam_otp = ">= 1.0.0 and < 2.0.0"
gleam_stdlib = ">= 0.51.0 and < 2.0.0"
gleam_time = ">= 1.0.0 and < 2.0.0"
exception = ">= 2.1.0 and < 3.0.0"
pgo = ">= 0.14.0 and < 1.0.0"
pgo = ">= 0.16.0 and < 1.0.0"

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
Expand Down
8 changes: 4 additions & 4 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ packages = [
{ name = "gleam_stdlib", version = "0.61.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "3DC407D6EDA98FCE089150C11F3AD892B6F4C3CA77C87A97BAE8D5AB5E41F331" },
{ name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" },
{ name = "gleeunit", version = "1.6.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "63022D81C12C17B7F1A60E029964E830A4CBD846BBC6740004FC1F1031AE0326" },
{ name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" },
{ name = "pg_types", version = "0.4.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "B02EFA785CAECECF9702C681C80A9CA12A39F9161A846CE17B01FB20AEEED7EB" },
{ name = "pgo", version = "0.14.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "71016C22599936E042DC0012EE4589D24C71427D266292F775EBF201D97DF9C9" },
{ name = "opentelemetry_api", version = "1.5.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "F53EC8A1337AE4A487D43AC89DA4BD3A3C99DDF576655D071DEED8B56A2D5DDA" },
{ name = "pg_types", version = "0.6.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "9949A4849DD13408FA249AB7B745E0D2DFDB9532AEE2B9722326E33CD082A778" },
{ name = "pgo", version = "0.20.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "2F11E6649CEB38E569EF56B16BE1D04874AE5B11A02867080A2817CE423C683B" },
]

[requirements]
Expand All @@ -21,4 +21,4 @@ gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" }
gleam_time = { version = ">= 1.0.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
pgo = { version = ">= 0.14.0 and < 1.0.0" }
pgo = { version = ">= 0.16.0 and < 1.0.0" }
112 changes: 112 additions & 0 deletions src/pog.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,28 @@ pub opaque type Connection {
SingleConnection(SingleConnection)
}

/// A database connection used specifically for `LISTEN`/`NOTIFY` functionality.
/// This is a long-lived connection in its own pool, and cannot be used for general
/// database queries.
///
pub opaque type NotificationsConnection {
NotificationsConnection(Name(Message))
}
Comment thread
foxfriends marked this conversation as resolved.

type SingleConnection

pub type Message

/// A notification received from the database in response to a `NOTIFY`.
///
/// Create a NotificationsConnection and use `listen` to issue PostgreSQL
/// `LISTEN` commands to the database and receive these notifications
/// using `select_notifications`.
///
pub type Notification {
Notify(pid: Pid, reference: Reference, channel: String, payload: String)
}
Comment thread
foxfriends marked this conversation as resolved.

/// Create a reference to a pool using the pool's name.
///
/// If no pool has been started using this name then queries using this
Expand All @@ -42,6 +60,17 @@ pub fn named_connection(name: Name(Message)) -> Connection {
Pool(name)
}

/// Create a reference to a pool using the pool's name.
///
/// If no notifications process has been started using this name then
/// listeners using this connection will fail.
///
Comment thread
foxfriends marked this conversation as resolved.
pub fn named_notifications_connection(
name: Name(Message),
) -> NotificationsConnection {
NotificationsConnection(name)
}

/// The configuration for a pool of connections.
pub type Config {
Config(
Expand Down Expand Up @@ -343,6 +372,33 @@ pub fn start(config: Config) -> actor.StartResult(Connection) {
@external(erlang, "pog_ffi", "start")
fn start_tree(config: Config) -> Result(Pid, dynamic.Dynamic)

/// Start a process holding a connection to the database that can be used to
/// `LISTEN` to channels for notifications. Most the time you want to use
/// `notifications_supervised` and add the process to your supervision tree
/// instead of using this function directly.
///
/// The process will asynchronously connect to the PostgreSQL instance specified
/// in the config. If the configuration is invalid or it cannot connect for
/// another reason it will continue to attempt to connect, and any queries made
/// using the connection pool will fail.
Comment thread
foxfriends marked this conversation as resolved.
///
/// The `NotificationsConnection` is different from a regular connection, it is
/// part of a new distinct connection pool, and cannot be used to make regular
/// queries to the database. Processes which require both to receive notifications
/// and to query the database must have both a `NotificationsConnection` and a
/// regular `Connection`.
pub fn start_notifications(
config: Config,
) -> actor.StartResult(NotificationsConnection) {
case start_tree_notifications(config) {
Ok(pid) -> Ok(actor.Started(pid, NotificationsConnection(config.pool_name)))
Error(reason) -> Error(actor.InitExited(process.Abnormal(reason)))
}
}

@external(erlang, "pog_ffi", "start_notifications")
fn start_tree_notifications(config: Config) -> Result(Pid, dynamic.Dynamic)

/// Start a database connection pool by adding it to your supervision tree.
///
/// Use the `named_connection` function to create a connection to query this
Expand All @@ -358,6 +414,23 @@ pub fn supervised(config: Config) -> supervision.ChildSpecification(Connection)
supervision.supervisor(fn() { start(config) })
}

/// Start a database connection pool by adding it to your supervision tree.
///
/// Use the `named_connection` function to create a connection to query this
/// pool with if your supervisor does not pass back the return value of
/// creating the pool.
///
/// The pool is started in a new process and will asynchronously connect to the
/// PostgreSQL instance specified in the config. If the configuration is invalid
/// or it cannot connect for another reason it will continue to attempt to
/// connect, and any queries made using the connection pool will fail.
///
pub fn notifications_supervised(
config: Config,
) -> supervision.ChildSpecification(NotificationsConnection) {
supervision.supervisor(fn() { start_notifications(config) })
}

/// A value that can be sent to PostgreSQL as one of the arguments to a
/// parameterised SQL query.
pub type Value
Expand Down Expand Up @@ -512,6 +585,45 @@ fn run_query_extended(
query: String,
) -> Result(#(Int, List(Dynamic)), QueryError)

/// Subscribes the current process to a PostgreSQL `LISTEN`/`NOTIFY` channel by name.
///
/// Use `select_notifications` to start receiving the resulting notifications.
///
/// The returned reference can be used to later `unlisten` on this channel.
///
@external(erlang, "pog_ffi", "listen")
pub fn listen(
conn: NotificationsConnection,
channel: String,
) -> Result(Reference, Nil)

/// Remove a previously created listener by sending a corresponding `UNLISTEN`
/// to the database.
///
@external(erlang, "pog_ffi", "unlisten")
pub fn unlisten(conn: NotificationsConnection, listener: Reference) -> Nil

@external(erlang, "pog_ffi", "decode_notification")
fn decode_notification(dyn: dynamic.Dynamic) -> Notification

type Tag {
Notification
Comment thread
foxfriends marked this conversation as resolved.
}

/// Add a handler to a selector for `NOTIFY` events received from the database
/// after calling `listen` for the corresponding channel.
///
pub fn select_notifications(
selector: process.Selector(payload),
transform: fn(Notification) -> payload,
) -> process.Selector(payload) {
selector
|> process.select_record(tag: Notification, fields: 4, mapping: fn(tuple) {
let notification = decode_notification(tuple)
transform(notification)
})
}

pub type QueryError {
/// The query failed as a database constraint would have been violated by the
/// change.
Expand Down
78 changes: 70 additions & 8 deletions src/pog_ffi.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
-module(pog_ffi).

-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]).
-export([
query/4,
query_extended/2,
start/1,
coerce/1,
null/0,
checkout/1,
start_notifications/1,
listen/2,
unlisten/2,
decode_notification/1
]).

-include_lib("pog/include/pog_Config.hrl").
-include_lib("pog/include/pog_Notify.hrl").
-include_lib("pg_types/include/pg_types.hrl").

null() ->
Expand All @@ -11,6 +23,14 @@ null() ->
coerce(Value) ->
Value.

decode_notification({notification, Pid, Ref, Channel, Payload}) ->
#notify{
pid=Pid,
reference=Ref,
channel=Channel,
payload=Payload
}.

%% Use correct defaults for SSL connections when SSL is enabled.
%% Peers have to be verified & cacerts are fetched directly from the system.
%%
Expand All @@ -36,11 +56,9 @@ default_ssl_options(Host, Ssl) ->
]}
end.

start(Config) ->
% Unfortunately this has to be supplied via global mutable state currently.
application:set_env(pg_types, timestamp_config, integer_system_time_microseconds),
compute_options(Config) ->
#config{
pool_name = PoolName,
pool_name = _,
host = Host,
port = Port,
database = Database,
Expand Down Expand Up @@ -76,11 +94,40 @@ start(Config) ->
ipv6 -> [inet6]
end
},
Options2 = case Password of
case Password of
{some, Pw} -> maps:put(password, Pw, Options1);
none -> Options1
end,
pgo_pool:start_link(PoolName, Options2).
end.

start(Config) ->
% Unfortunately this has to be supplied via global mutable state currently.
application:set_env(pg_types, timestamp_config, integer_system_time_microseconds),
#config{pool_name = PoolName, _ = _} = Config,
Options = compute_options(Config),
pgo_pool:start_link(PoolName, Options).

start_notifications(Config) ->
Comment thread
foxfriends marked this conversation as resolved.
% Unfortunately this has to be supplied via global mutable state currently.
application:set_env(pg_types, timestamp_config, integer_system_time_microseconds),
#config{pool_name = PoolName, _ = _} = Config,
Options1 = compute_options(Config),
Options2 = normalize_pool_config(Options1),
pgo_notifications:start_link({local, PoolName}, Options2).

% NOTE:
% Copied from https://github.com/erleans/pgo/blob/main/src/pgo_pool.erl
% This may be better to occur within pgo_notifications, but they have chosen
% to omit this (despite doing it in pgo_pool which we use in the other case),
% so we must do it ourselves here.
normalize_pool_config(PoolConfig) when is_list(PoolConfig) ->
normalize_pool_config(maps:from_list(PoolConfig));
normalize_pool_config(PoolConfig) ->
maps:map(fun normalize_pool_config_value/2, PoolConfig).

normalize_pool_config_value(_, V) when is_binary(V) ->
binary_to_list(V);
normalize_pool_config_value(_, V) ->
V.

query(Pool, Sql, Arguments, Timeout) ->
Res = case Pool of
Expand Down Expand Up @@ -116,6 +163,21 @@ checkout(Name) when is_atom(Name) ->
{error, Error} -> {error, convert_error(Error)}
end.

listen(Conn, Channel) ->
case Conn of
{notifications_connection, Name} ->
case pgo_notifications:listen(Name, Channel) of
{ok, Ref} -> {ok, Ref};
error -> {error, nil}
end
end.

unlisten(Conn, Ref) ->
case Conn of
{notifications_connection, Name} ->
pgo_notifications:unlisten(Name, Ref)
end.

convert_error(none_available) ->
connection_unavailable;
convert_error({pgo_protocol, {parameters, Expected, Got}}) ->
Expand Down
42 changes: 42 additions & 0 deletions test/pog_test.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import exception
import gleam/dynamic/decode.{type Decoder}
import gleam/erlang/process
import gleam/function
import gleam/option.{None, Some}
import gleam/otp/actor
import gleam/time/calendar
Expand Down Expand Up @@ -615,3 +616,44 @@ pub fn transaction_commit_test() {

disconnect(db)
}

pub fn notifications_test() {
let db = start_default()
let assert Ok(notifications) =
process.new_name("pog_test_notifications")
|> default_config
|> pog.start_notifications

let assert Ok(listener) = pog.listen(notifications.data, "the_channel")

let assert Ok(_) =
pog.query("NOTIFY the_channel, 'first payload'")
|> pog.execute(db.data)

let assert Ok(_) =
process.new_selector()
|> pog.select_notifications(function.identity)
|> process.selector_receive(100)

pog.unlisten(notifications.data, listener)

let assert Ok(_) =
pog.query("NOTIFY the_channel, 'second payload'")
|> pog.execute(db.data)

let assert Error(Nil) =
process.new_selector()
|> pog.select_notifications(function.identity)
|> process.selector_receive(10)

disconnect(db)
disconnect(notifications)
}

pub fn notifications_no_process_fail() {
let notifications =
process.new_name("pog_test_notifications")
|> pog.named_notifications_connection()

let assert Error(Nil) = pog.listen(notifications, "the_channel")
}