Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {

publish_state(WS_RUNNING);

// Initial state: we go straight into the outer loop and wait for a wake.
pgstat_report_activity(STATE_IDLE, NULL);

do {

uint32 expected = 1;
Expand All @@ -274,6 +277,8 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {
continue;
}

pgstat_report_activity(STATE_RUNNING, NULL);

uint64 requests_consumed = 0;
uint64 expired_responses = 0;

Expand Down Expand Up @@ -397,6 +402,9 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {

} while (!worker_should_restart && (requests_consumed > 0 || expired_responses > 0));

// Inner loop drained; back to waiting for the next wake.
pgstat_report_activity(STATE_IDLE, NULL);

} while (!worker_should_restart);

publish_state(WS_EXITED);
Expand Down
51 changes: 51 additions & 0 deletions test/test_worker_behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,54 @@ def test_worker_writes_trigger_autoanalyze_on_http_response(sess, autocommit_ses
autocommit_sess.execute(text("alter system reset autovacuum_naptime;"))
autocommit_sess.execute(text("select pg_reload_conf();"))


def test_worker_reports_activity_in_pg_stat_activity(sess, autocommit_sess):
"""the pg_net worker must call pgstat_report_activity() so its row in
pg_stat_activity has a valid state column.
"""

autocommit_sess.execute(text("select net.wait_until_running();"))

# Wait for the worker to drain any leftover work from previous tests
# and settle into idle. Polling makes this robust regardless of what
# ran before.
deadline = time.time() + 5.0
state = None
while time.time() < deadline:
(state,) = autocommit_sess.execute(text(
"select state from pg_stat_activity where backend_type ilike '%pg_net%';"
)).fetchone()
if state == 'idle':
break
time.sleep(0.1)
assert state == 'idle', (
f"pg_net worker state expected 'idle' at rest, got {state!r}. "
"Without pgstat_report_activity(STATE_IDLE, ...) the state column "
"stays NULL."
)

# Fire a slow request so the worker stays active long enough to observe.
sess.execute(text("""
select net.http_get('http://localhost:8080/pathological?status=200&delay=2');
"""))
sess.commit()

# Poll for 'active' for up to 5s. The slow request keeps the worker
# busy for ~2s, so we have a wide observation window.
deadline = time.time() + 5.0
saw_active = False
while time.time() < deadline:
(state,) = autocommit_sess.execute(text(
"select state from pg_stat_activity where backend_type ilike '%pg_net%';"
)).fetchone()
if state == 'active':
saw_active = True
break
time.sleep(0.1)

assert saw_active, (
"pg_net worker state was never observed as 'active' during a slow "
"request. Without pgstat_report_activity(STATE_RUNNING, ...) the "
"state column stays NULL."
)

Loading