From 191ad163fbe51923cf8d1273816e38b964c3efaa Mon Sep 17 00:00:00 2001 From: Tony Winn Date: Fri, 10 Apr 2026 13:16:58 -0400 Subject: [PATCH] Rewrite documentation for professional tone Align moduledocs, @doc strings, and README with the declarative third-person style used by Elixir's hexdocs. Update stale references to pg groups and consistent hashing to reflect the current PgRegistry and rendezvous hashing implementation. --- README.md | 41 ++++++++++++++-------- lib/stagehand.ex | 23 ++++++------ lib/stagehand/config.ex | 4 +-- lib/stagehand/queue/consumer_supervisor.ex | 4 +-- lib/stagehand/queue/executor.ex | 2 +- lib/stagehand/queue/pipeline.ex | 11 +++--- lib/stagehand/queue/producer.ex | 25 +++++++------ lib/stagehand/router.ex | 9 ++--- lib/stagehand/supervisor.ex | 6 ++-- lib/stagehand/testing.ex | 10 +++--- lib/stagehand/unique.ex | 29 +++++++-------- lib/stagehand/worker.ex | 2 +- 12 files changed, 93 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 047cda2..21e67f5 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,31 @@ # Stagehand -GenStage-based background job processing for Elixir. In-memory, no database required. +An in-memory, GenStage-based background job processing library for Elixir. -## Why - -Most job processing libraries require a database. Stagehand doesn't. It's built on GenStage and runs entirely in-memory, making it a good fit for applications that need background job processing without the overhead of external dependencies. +Stagehand runs entirely in-memory with no database dependency. It is +built on GenStage and uses `PgRegistry` for cluster-wide producer +discovery and `Highlander` for singleton scheduling. ## Guarantees -- **Graceful shutdown** — executing jobs complete before the node stops. The producer drains in-flight work within a configurable grace period. -- **No new work during shutdown** — the producer snapshots the current cluster membership, then leaves the pg group so no new jobs are routed to it. Any messages already in the mailbox are drained before redistribution. -- **Job redistribution** — on shutdown, scheduled, queued, and in-flight jobs are redistributed to surviving producers. If there are no surviving producers (e.g. the last node shuts down without a replacement joining first), these jobs are lost. -- **At-most-once delivery** — each job runs at most once. Jobs are in-memory with no persistence, so a VM crash loses queued, scheduled, and executing jobs. -- **Unique jobs (best effort)** — deduplication is backed by a local ETS table. A consistent hash ring routes the same job fingerprint to the same producer. When a node joins or leaves, the ring only remaps keys that belong to the changed node — all other fingerprints stay on their current owner, keeping their dedup state intact. On graceful shutdown, the producer snapshots cluster membership, leaves the group, then transfers dedup entries to their new owners using the snapshot. When a new node joins, unique checks are blocked until all existing producers have synced their entries, preventing duplicates during the transition. On crashes, entries on the lost node are gone and duplicates are possible until the uniqueness period expires. +- **Graceful shutdown** — executing jobs complete before the node stops. + The producer drains in-flight work within a configurable grace period. +- **No new work during shutdown** — the producer leaves the cluster + registry so no new jobs are routed to it. Any messages already in the + mailbox are drained before redistribution. +- **Job redistribution** — on shutdown, scheduled, queued, and in-flight + jobs are redistributed to surviving producers. If no surviving + producers exist, these jobs are lost. +- **At-most-once delivery** — each job runs at most once. Jobs are + in-memory with no persistence; a VM crash loses queued, scheduled, + and executing jobs. +- **Unique jobs (best effort)** — deduplication is backed by a local ETS + table per node. Rendezvous hashing routes the same job fingerprint to + the same producer, keeping dedup checks local. On graceful shutdown, + dedup entries are transferred to their new owners. On node join, + unique checks are blocked until all existing producers have synced + their entries. On crashes, entries on the lost node are gone and + duplicates are possible until the uniqueness period expires. ## Installation @@ -41,7 +54,7 @@ config :my_app, Stagehand, config :my_app, Stagehand, testing: :manual ``` -Add Stagehand to your supervision tree: +Add Stagehand to the application supervision tree: ```elixir children = [ @@ -63,7 +76,7 @@ defmodule MyApp.EmailWorker do end ``` -Insert jobs: +Inserting jobs: ```elixir %{"to" => "user@example.com", "body" => "hello"} @@ -74,7 +87,7 @@ Insert jobs: ### Return values - `:ok` or `{:ok, value}` — job succeeded -- `{:error, reason}` — job failed, will retry if attempts remain +- `{:error, reason}` — job failed, retries if attempts remain - `{:snooze, seconds}` — re-enqueue after delay - `{:cancel, reason}` — stop, no more retries @@ -82,8 +95,8 @@ Insert jobs: - `:queue` — queue name (default `:default`) - `:max_attempts` — retry limit (default `20`) -- `:priority` — 0-9, lower is higher (default `0`) -- `:unique` — uniqueness config or `false` +- `:priority` — 0-9, lower is higher priority (default `0`) +- `:unique` — uniqueness configuration or `false` - `:schedule_in` — delay in seconds or `{amount, :seconds | :minutes | :hours | :days}` - `:scheduled_at` — specific `DateTime` diff --git a/lib/stagehand.ex b/lib/stagehand.ex index e56c392..9d09843 100644 --- a/lib/stagehand.ex +++ b/lib/stagehand.ex @@ -1,7 +1,6 @@ defmodule Stagehand do @moduledoc """ - Stagehand is a GenStage-based background job processing library, - In-memory, no database required. + An in-memory, GenStage-based background job processing library. ## Configuration @@ -37,7 +36,7 @@ defmodule Stagehand do alias Stagehand.Router @doc """ - Start a Stagehand supervision tree. + Starts a Stagehand supervision tree. """ @spec start_link(keyword()) :: Supervisor.on_start() def start_link(opts) when is_list(opts) do @@ -56,7 +55,7 @@ defmodule Stagehand do end @doc """ - Insert a job for execution. + Inserts a job for execution. """ @spec insert(atom(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} | {:error, term()} def insert(name \\ __MODULE__, %Stagehand.Job{} = job) do @@ -76,7 +75,7 @@ defmodule Stagehand do end @doc """ - Insert multiple jobs at once. + Inserts multiple jobs at once. """ @spec insert_all(atom(), [Stagehand.Job.t()]) :: {:ok, [Stagehand.Job.t()]} | {:error, [term()]} def insert_all(name \\ __MODULE__, jobs) when is_list(jobs) do @@ -93,7 +92,7 @@ defmodule Stagehand do end @doc """ - Cancel a job by its ref. For available jobs, removes from the producer. + Cancels a job by its ref. For available jobs, removes from the producer. Returns `:ok` on success or `{:error, :not_found}`. """ @spec cancel_job(atom(), Stagehand.Job.t()) :: :ok | {:error, :not_found} | :not_found @@ -114,7 +113,7 @@ defmodule Stagehand do end @doc """ - Retry a job by re-inserting it. + Retries a job by re-inserting it. """ @spec retry_job(atom(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} | {:error, term()} def retry_job(name \\ __MODULE__, %Stagehand.Job{} = job) do @@ -122,7 +121,7 @@ defmodule Stagehand do end @doc """ - Pause a queue. Broadcasts to all producers for this queue. + Pauses a queue across all producers in the cluster. """ @spec pause_queue(atom(), keyword()) :: :ok def pause_queue(name \\ __MODULE__, opts) do @@ -136,7 +135,7 @@ defmodule Stagehand do end @doc """ - Resume a paused queue. Broadcasts to all producers for this queue. + Resumes a paused queue across all producers in the cluster. """ @spec resume_queue(atom(), keyword()) :: :ok def resume_queue(name \\ __MODULE__, opts) do @@ -150,7 +149,7 @@ defmodule Stagehand do end @doc """ - Drain a queue, returning all pending jobs from all producers. + Drains a queue, returning all pending jobs from all producers. """ @spec drain_queue(atom(), keyword()) :: [Stagehand.Job.t()] def drain_queue(name \\ __MODULE__, opts) do @@ -162,7 +161,7 @@ defmodule Stagehand do end @doc """ - Check the status of a queue across all producers. + Returns the aggregated status of a queue across all producers. """ @spec check_queue(atom(), keyword()) :: map() def check_queue(name \\ __MODULE__, opts) do @@ -182,7 +181,7 @@ defmodule Stagehand do end @doc """ - Get the configuration for a Stagehand instance. + Returns the configuration for a Stagehand instance. """ @spec config(atom()) :: Config.t() | {:error, {:not_running, atom()}} def config(name \\ __MODULE__) do diff --git a/lib/stagehand/config.ex b/lib/stagehand/config.ex index 4971416..760e7a5 100644 --- a/lib/stagehand/config.ex +++ b/lib/stagehand/config.ex @@ -35,9 +35,9 @@ defmodule Stagehand.Config do ] @doc """ - Build a config struct from the given options. + Builds a config struct from the given options. - Options can be passed directly or read from Application config: + Options can be passed directly or read from application config: Config.new(otp_app: :my_app, name: Stagehand) """ diff --git a/lib/stagehand/queue/consumer_supervisor.ex b/lib/stagehand/queue/consumer_supervisor.ex index 43ab6b5..c425cc2 100644 --- a/lib/stagehand/queue/consumer_supervisor.ex +++ b/lib/stagehand/queue/consumer_supervisor.ex @@ -1,8 +1,8 @@ defmodule Stagehand.Queue.ConsumerSupervisor do @moduledoc """ ConsumerSupervisor that spawns an Executor task for each job received - from the Producer. Starts without a subscription — the Producer calls - `GenStage.async_subscribe/2` after joining the pg group. + from the Producer. Starts without a subscription; the Producer calls + `GenStage.async_subscribe/2` after registering with the cluster. """ use ConsumerSupervisor diff --git a/lib/stagehand/queue/executor.ex b/lib/stagehand/queue/executor.ex index b3e73ab..7758dd5 100644 --- a/lib/stagehand/queue/executor.ex +++ b/lib/stagehand/queue/executor.ex @@ -14,7 +14,7 @@ defmodule Stagehand.Queue.Executor do alias Stagehand.Telemetry @doc """ - Execute a job. This is the entry point called by the ConsumerSupervisor. + Executes a job. Called by the `ConsumerSupervisor` for each dispatched job. """ @spec run(Job.t()) :: term() def run(%Job{worker: worker} = job) do diff --git a/lib/stagehand/queue/pipeline.ex b/lib/stagehand/queue/pipeline.ex index 019770b..1113772 100644 --- a/lib/stagehand/queue/pipeline.ex +++ b/lib/stagehand/queue/pipeline.ex @@ -1,10 +1,11 @@ defmodule Stagehand.Queue.Pipeline do @moduledoc """ - Supervisor for a single queue's GenStage pipeline: ConsumerSupervisor + Producer. + Supervisor for a single queue's GenStage pipeline. - Children are ordered so that on shutdown (reverse order) the Producer - stops first — leaving the pg group and draining executing jobs — before - the ConsumerSupervisor is stopped. + Manages a `ConsumerSupervisor` and a `Producer`. On shutdown, the + producer stops first (reverse child order), leaves the cluster + registry, and drains executing jobs before the consumer supervisor + is stopped. """ use Supervisor @@ -75,7 +76,7 @@ defmodule Stagehand.Queue.Pipeline do end @doc """ - Get the consumer supervisor process name. + Returns the consumer supervisor process name. """ @spec consumer_name(atom(), atom() | binary()) :: {:via, module(), term()} def consumer_name(stagehand_name, queue) do diff --git a/lib/stagehand/queue/producer.ex b/lib/stagehand/queue/producer.ex index cb7aa9c..081e8b6 100644 --- a/lib/stagehand/queue/producer.ex +++ b/lib/stagehand/queue/producer.ex @@ -1,7 +1,10 @@ defmodule Stagehand.Queue.Producer do @moduledoc """ - GenStage producer for a single queue. Thin demand buffer — receives jobs, - queues them, dispatches when consumers have demand. + GenStage producer for a single queue. + + Receives jobs, buffers them in a priority queue, and dispatches to + consumers as demand arrives. Registers with `PgRegistry` for + cluster-wide discovery. """ use GenStage @@ -24,7 +27,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Enqueue a job into this producer. + Enqueues a job into this producer. """ @spec enqueue(GenServer.server(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} def enqueue(producer, %Stagehand.Job{} = job) do @@ -32,7 +35,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Schedule a job to be enqueued after `delay_ms` milliseconds. + Schedules a job to be enqueued after `delay_ms` milliseconds. The timer is tracked so the job can be redistributed on shutdown. """ @spec schedule(GenServer.server(), Stagehand.Job.t(), non_neg_integer()) :: {:ok, Stagehand.Job.t()} @@ -41,7 +44,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Cancel a job by ref. Removes from available queue if present. + Cancels a job by ref. Removes from the available queue if present. """ @spec cancel(GenServer.server(), reference()) :: :ok | :not_found def cancel(producer, ref) when is_reference(ref) do @@ -49,7 +52,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Pause this queue — stops dispatching jobs. + Pauses this queue. Jobs continue to be accepted but are not dispatched. """ @spec pause(GenServer.server()) :: :ok def pause(producer) do @@ -57,7 +60,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Resume a paused queue. + Resumes a paused queue and dispatches any buffered jobs. """ @spec resume(GenServer.server()) :: :ok def resume(producer) do @@ -65,8 +68,8 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Update the concurrency limit (handled by the consumer supervisor, not here). - Returns current queue info. + Returns current queue info including pause state, available count, + and executing count. """ @spec check(GenServer.server()) :: map() def check(producer) do @@ -74,7 +77,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Drain all available jobs synchronously. Returns the list of jobs. + Drains all available jobs synchronously. Returns the list of jobs. """ @spec drain(GenServer.server()) :: [Stagehand.Job.t()] def drain(producer) do @@ -82,7 +85,7 @@ defmodule Stagehand.Queue.Producer do end @doc """ - Initiate graceful shutdown. Stops accepting new jobs and waits for + Initiates graceful shutdown. Stops accepting new jobs and waits for executing jobs to finish before terminating. """ @spec shutdown(GenServer.server()) :: :ok diff --git a/lib/stagehand/router.ex b/lib/stagehand/router.ex index 40a66ef..4ea101f 100644 --- a/lib/stagehand/router.ex +++ b/lib/stagehand/router.ex @@ -1,9 +1,10 @@ defmodule Stagehand.Router do @moduledoc """ - Routes jobs to the correct producer. + Routes jobs to producers. - - Normal jobs: random producer from the :pg group - - Unique jobs: consistent hash → deterministic producer + Normal jobs are sent to a random producer for the queue. Unique jobs + are routed deterministically via rendezvous hashing so that the same + fingerprint always reaches the same producer. """ alias Stagehand.Job @@ -12,7 +13,7 @@ defmodule Stagehand.Router do alias Stagehand.Unique @doc """ - Route and insert a job. + Routes a job to a producer and inserts it. """ @spec route(Job.t(), Stagehand.Config.t()) :: {:ok, Job.t()} | {:error, :no_producers} def route(%Job{} = job, conf) do diff --git a/lib/stagehand/supervisor.ex b/lib/stagehand/supervisor.ex index 494dfba..4b28a52 100644 --- a/lib/stagehand/supervisor.ex +++ b/lib/stagehand/supervisor.ex @@ -1,7 +1,9 @@ defmodule Stagehand.Supervisor do @moduledoc """ - Top-level supervisor for a Stagehand instance. Manages the registry, - hash ring, unique server, queue manager, and plugins. + Top-level supervisor for a Stagehand instance. + + Manages the local process registry, unique job deduplication server, + queue pipelines, and plugins. """ use Supervisor diff --git a/lib/stagehand/testing.ex b/lib/stagehand/testing.ex index 0c17247..9bec493 100644 --- a/lib/stagehand/testing.ex +++ b/lib/stagehand/testing.ex @@ -22,7 +22,7 @@ defmodule Stagehand.Testing do """ @doc """ - Record a job in manual testing mode (called internally by `Stagehand.insert/2`). + Records a job in manual testing mode. Called internally by `Stagehand.insert/2`. """ @spec record_job(atom(), Stagehand.Job.t()) :: Stagehand.Job.t() def record_job(name, job) do @@ -56,7 +56,7 @@ defmodule Stagehand.Testing do end @doc """ - Refute that any job matching the given opts was enqueued. + Asserts that no job matching the given opts was enqueued. """ @spec refute_enqueued(atom(), keyword()) :: :ok def refute_enqueued(name \\ Stagehand, opts) do @@ -71,7 +71,7 @@ defmodule Stagehand.Testing do end @doc """ - Get all enqueued jobs matching the given opts. + Returns all enqueued jobs matching the given opts. """ @spec all_enqueued(atom(), keyword()) :: [Stagehand.Job.t()] def all_enqueued(name \\ Stagehand, opts \\ []) do @@ -84,7 +84,7 @@ defmodule Stagehand.Testing do end @doc """ - Build and execute a job directly for testing. Returns the result of `perform/1`. + Builds and executes a job directly for testing. Returns the result of `perform/1`. """ @spec perform_job(module(), map(), keyword()) :: term() def perform_job(worker, args, opts \\ []) do @@ -94,7 +94,7 @@ defmodule Stagehand.Testing do end @doc """ - Clear all recorded jobs for the given instance. + Clears all recorded jobs for the given instance. """ @spec drain_jobs(atom()) :: :ok def drain_jobs(name \\ Stagehand) do diff --git a/lib/stagehand/unique.ex b/lib/stagehand/unique.ex index 17b1ebf..9d7d5cf 100644 --- a/lib/stagehand/unique.ex +++ b/lib/stagehand/unique.ex @@ -2,8 +2,9 @@ defmodule Stagehand.Unique do @moduledoc """ Unique job deduplication using a local ETS table. - Each node owns its own ETS table. Consistent hashing ensures the same - unique job always routes to the same node, so the dedup check is local. + Each node owns its own deduplication table. Rendezvous hashing ensures + the same job fingerprint always routes to the same node, keeping the + dedup check local. """ use GenServer @@ -20,7 +21,7 @@ defmodule Stagehand.Unique do end @doc """ - Generate a fingerprint for a job based on its unique configuration. + Generates a fingerprint for a job based on its unique configuration. """ @spec fingerprint(Stagehand.Job.t()) :: non_neg_integer() | nil def fingerprint(%Stagehand.Job{unique: nil}), do: nil @@ -41,8 +42,8 @@ defmodule Stagehand.Unique do end @doc """ - Check for an existing job with the same fingerprint. If none found, - insert the fingerprint. Returns `{:ok, job}` or `{:conflict, existing}`. + Checks for an existing job with the same fingerprint. If none is found, + inserts the fingerprint. Returns `{:ok, job}` or `{:conflict, existing}`. """ @spec check_and_insert(GenServer.server(), non_neg_integer(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} | {:conflict, Stagehand.Job.t()} @@ -51,8 +52,8 @@ defmodule Stagehand.Unique do end @doc """ - Remove a fingerprint entry (called when a unique job completes and - its period has expired). + Removes a fingerprint entry. Called when a unique job completes and + its period has expired. """ @spec remove(GenServer.server(), non_neg_integer()) :: :ok def remove(server, fingerprint) do @@ -60,7 +61,7 @@ defmodule Stagehand.Unique do end @doc """ - Prune expired entries older than `max_age` seconds. + Prunes expired entries older than `max_age` seconds. """ @spec prune(GenServer.server(), pos_integer()) :: :ok def prune(server, max_age) do @@ -68,8 +69,8 @@ defmodule Stagehand.Unique do end @doc """ - Tell this server to expect `count` sync completions before processing - unique checks. Calls to `check_and_insert` will block until all syncs arrive. + Blocks unique checks until `count` sync completions have been received. + Calls to `check_and_insert/3` block until all syncs arrive. """ @spec await_sync(GenServer.server(), non_neg_integer()) :: :ok def await_sync(server, count) do @@ -77,8 +78,8 @@ defmodule Stagehand.Unique do end @doc """ - Signal that one sync has completed. When all expected syncs are done, - blocked `check_and_insert` calls are released. + Signals that one sync has completed. When all expected syncs have + arrived, blocked `check_and_insert/3` calls are released. """ @spec sync_complete(GenServer.server()) :: :ok def sync_complete(server) do @@ -86,7 +87,7 @@ defmodule Stagehand.Unique do end @doc """ - Export all entries from this server. Returns a list of + Exports all entries from this server as a list of `{fingerprint, job, inserted_at}` tuples. """ @spec export(GenServer.server()) :: [{non_neg_integer(), Stagehand.Job.t(), integer()}] @@ -95,7 +96,7 @@ defmodule Stagehand.Unique do end @doc """ - Import entries into this server. Existing entries are not overwritten. + Imports entries into this server. Existing entries are not overwritten. """ @spec import(GenServer.server(), [{non_neg_integer(), Stagehand.Job.t(), integer()}]) :: :ok def import(server, entries) do diff --git a/lib/stagehand/worker.ex b/lib/stagehand/worker.ex index e4bc151..dab18e4 100644 --- a/lib/stagehand/worker.ex +++ b/lib/stagehand/worker.ex @@ -50,7 +50,7 @@ defmodule Stagehand.Worker do @stagehand_opts opts @doc """ - Build a new job struct for this worker. + Builds a new job struct for this worker. """ def new(args, runtime_opts \\ []) do Stagehand.Worker.build_job(__MODULE__, @stagehand_opts, args, runtime_opts)