Skip to content

Add celery_id DB tracking for privacy requests and request tasks#7531

Open
galvana wants to merge 4 commits intomainfrom
celery-id-db-tracking
Open

Add celery_id DB tracking for privacy requests and request tasks#7531
galvana wants to merge 4 commits intomainfrom
celery-id-db-tracking

Conversation

@galvana
Copy link
Contributor

@galvana galvana commented Feb 28, 2026

Description Of Changes

Stores pre-generated Celery task IDs directly on PrivacyRequest and RequestTask models instead of in Redis cache. This makes task tracking durable across Redis restarts and enables reliable task cancellation via batch revoke().

Key changes:

  • Add celery_id column to privacyrequest and requesttask tables via Alembic migration
  • Pre-generate UUIDs and persist to DB before dispatching Celery tasks
  • Add cancel_celery_tasks() method on PrivacyRequest for batch revocation
  • Add get_request_task_celery_task_ids() helper method
  • Replace Redis-based cache_task_tracking_key with durable DB persistence
  • Add privacy request retry count tracking with configurable limits via privacy_request_requeue_retry_count config
  • Add _cancel_interrupted_tasks_and_error_privacy_request() for interrupted task cleanup
  • Clean up unnecessary inline imports across source and test files

Code Changes

  • src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py - New migration adding celery_id column to both tables
  • src/fides/api/models/privacy_request/privacy_request.py - Add celery_id column, cancel_celery_tasks(), get_request_task_celery_task_ids(), celery task cancellation in cancel_processing()
  • src/fides/api/models/privacy_request/request_task.py - Add celery_id column
  • src/fides/api/task/execute_request_tasks.py - Pre-generate and persist celery_id before dispatching; move uuid to top-level import
  • src/fides/service/privacy_request/privacy_request_service.py - Pre-generate and persist celery_id for privacy request tasks; move uuid to top-level import
  • src/fides/api/service/privacy_request/request_service.py - Add _cancel_interrupted_tasks_and_error_privacy_request(), retry count tracking in requeue flow
  • tests/ops/models/privacy_request/test_privacy_request.py - Tests for cancel_celery_tasks, get_request_task_celery_task_ids, integration with cancel_processing; clean up inline imports
  • tests/ops/models/privacy_request/test_request_task.py - Tests for celery_id column on RequestTask
  • tests/ops/service/privacy_request/test_request_service.py - Tests for _cancel_interrupted_tasks_and_error_privacy_request, retry count requeue handling; clean up inline imports
  • tests/ops/util/test_cache.py - Tests for retry count cache functions; consolidate inline imports to top-level
  • tests/task/test_requeue_interrupted_tasks.py - Tests for enhanced requeue with retry limits; clean up inline imports

Steps to Confirm

  1. Run the migration: verify celery_id column exists on both privacyrequest and requesttask tables
  2. Submit a privacy request and verify celery_id is populated on the PrivacyRequest record before the Celery task starts
  3. Verify request tasks also have celery_id populated
  4. Cancel a running privacy request and verify cancel_celery_tasks() issues batch revoke
  5. Run test suite: pytest --no-cov tests/ops/models/privacy_request/test_privacy_request.py::TestCancelCeleryTasks tests/ops/service/privacy_request/test_request_service.py::TestCancelInterruptedTasksAndErrorPrivacyRequest tests/task/test_requeue_interrupted_tasks.py::TestEnhancedRequeueInterruptedTasks tests/ops/util/test_cache.py::TestPrivacyRequestRetryCache -v

Pre-Merge Checklist

  • Issue requirements met
  • All CI pipelines succeeded
  • CHANGELOG.md updated
    • Add a db-migration This indicates that a change includes a database migration label to the entry if your change includes a DB migration
    • Add a high-risk This issue suggests changes that have a high-probability of breaking existing code label to the entry if your change includes a high-risk change (i.e. potential for performance impact or unexpected regression) that should be flagged
    • Updates unreleased work already in Changelog, no new entry necessary
  • UX feedback:
    • No UX review needed
  • Followup issues:
    • No followup issues
  • Database migrations:
    • Ensure that your downrev is up to date with the latest revision on main
    • Ensure that your downgrade() migration is correct and works
  • Documentation:
    • No documentation updates required

Made with Cursor

Summary by CodeRabbit

  • Reliability Improvements
    • Async task tracking for privacy requests is now more durable, storing task identifiers in the database instead of temporary storage for improved resilience across system restarts and cache failures.

Store pre-generated Celery task IDs directly on PrivacyRequest and
RequestTask models instead of in Redis cache. This makes task tracking
durable across Redis restarts and enables reliable task cancellation.

Changes:
- Add celery_id column to privacyrequest and requesttask tables
- Pre-generate and persist celery_id before dispatching Celery tasks
- Add cancel_celery_tasks method for batch revocation
- Add retry count tracking with configurable limits
- Clean up unnecessary inline imports across source and test files

Made-with: Cursor
@vercel
Copy link
Contributor

vercel bot commented Feb 28, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

2 Skipped Deployments
Project Deployment Actions Updated (UTC)
fides-plus-nightly Ignored Ignored Preview Mar 6, 2026 6:39am
fides-privacy-center Ignored Ignored Mar 6, 2026 6:39am

Request Review

Made-with: Cursor
@galvana galvana added the run unsafe ci checks Runs fides-related CI checks that require sensitive credentials label Feb 28, 2026
Combines DB-based celery_id tracking with main's upstream-awaiting
logic for pending tasks. Cache-based task ID lookups removed since
celery_id is now read directly from the RequestTask DB column.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Mar 6, 2026

📝 Walkthrough

Walkthrough

The pull request migrates Celery task ID tracking from ephemeral Redis cache to durable database columns (celery_id) on PrivacyRequest and RequestTask models, with corresponding updates across service layer and test suite.

Changes

Cohort / File(s) Summary
Database Schema & Migration
changelog/7531-celery-id-db-tracking.yaml, src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py
Added changelog entry and Alembic migration to introduce nullable, indexed celery_id columns (String 255) on privacyrequest and requesttask tables for durable task tracking.
Data Models
src/fides/api/models/privacy_request/privacy_request.py, src/fides/api/models/privacy_request/request_task.py
Added public celery_id column to both models; updated get_cached_task_id and related methods to return celery_id from database instead of cache, and updated docstrings to reflect DB-backed retrieval.
Service Layer
src/fides/api/service/privacy_request/request_service.py, src/fides/api/task/execute_request_tasks.py, src/fides/service/privacy_request/privacy_request_service.py
Removed cache-based task ID tracking; now pre-generates UUID-based celery_id, persists to database before task dispatch, and uses it for task lifecycle management. Renamed _get_request_ids_in_progress to _get_request_tasks_in_progress to include celery_id in returned tuple. Removed get_cached_task_id function and cache utility imports.
Tests
tests/ops/api/v1/endpoints/privacy_request/test_privacy_request_endpoints.py, tests/ops/api/v1/endpoints/test_drp_endpoints.py, tests/ops/models/privacy_request/test_privacy_request.py, tests/ops/models/privacy_request/test_request_task.py, tests/ops/models/test_request_task.py, tests/ops/service/privacy_request/test_request_service.py, tests/ops/util/test_cache.py, tests/task/test_requeue_interrupted_tasks.py
Replaced cache_task_tracking_key usage with direct celery_id assignment on model instances; removed TestGetCachedTaskId test class; updated test fixtures to persist celery_id via DB commit instead of cache; renamed test functions and updated assertions to reference celery_id semantics.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client/API
    participant Service as Service Layer
    participant DB as Database
    participant Queue as Celery Queue
    participant Cache as Redis Cache

    Note over Client,Cache: Previous (Cache-based) Flow
    Client->>Service: queue_request_task()
    Service->>Queue: dispatch_task()
    Queue-->>Service: task_id
    Service->>Cache: store task_id (cache_task_tracking_key)
    Cache-->>Service: ✓
    
    Note over Client,DB: New (Database-backed) Flow
    Client->>Service: queue_request_task()
    Service->>Service: celery_id = uuid()
    Service->>DB: privacy_request.celery_id = celery_id<br/>(commit)
    DB-->>Service: ✓
    Service->>Queue: dispatch_task(task_id=celery_id)
    Queue-->>Service: task_id
    Service-->>Client: celery_id (persisted)
Loading
sequenceDiagram
    participant Client as Client/API
    participant Service as Service Layer
    participant DB as Database
    participant Queue as Celery Queue

    Note over Client,Queue: Task Status Retrieval (New Flow)
    Client->>Service: requeue_interrupted_tasks()
    Service->>DB: _get_request_tasks_in_progress()<br/>(fetch celery_id + status)
    DB-->>Service: [(task_id, celery_id, status, awaiting_upstream)]
    Service->>Queue: check queue state(celery_id)
    Queue-->>Service: task running? 
    alt celery_id present AND in queue
        Service->>Service: skip (task running)
    else celery_id absent OR not in queue
        Service->>DB: mark requires_input / requeue
        DB-->>Service: ✓
    end
    Service-->>Client: tasks updated
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • adamsachs
  • JadeCara

Poem

🐰 Hop, skip, and a celery bound
IDs now in databases, safely sound
No Redis cache, just DB delight
Task tracking's durable, oh what a sight!
Migrations whisper, requests persist
Rabbit approves this data-backed gist! 🌾✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add celery_id DB tracking for privacy requests and request tasks' clearly and concisely summarizes the main change of the PR: adding database-backed Celery ID tracking to two models.
Description check ✅ Passed The PR description includes all required template sections: Description of Changes, Code Changes, Steps to Confirm, and Pre-Merge Checklist with appropriate selections marked.
Docstring Coverage ✅ Passed Docstring coverage is 85.07% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch celery-id-db-tracking

Comment @coderabbitai help to get the list of available commands and usage tips.

@galvana galvana requested a review from JadeCara March 6, 2026 06:41
@galvana galvana marked this pull request as ready for review March 6, 2026 06:41
@galvana galvana requested a review from a team as a code owner March 6, 2026 06:41
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
tests/ops/service/privacy_request/test_request_service.py (1)

26-26: Import PrivacyRequestError from its defining module.

Using fides.service.privacy_request.privacy_request_service as an exception re-export path is brittle. Prefer importing PrivacyRequestError directly from fides.api.common_exceptions.

Suggested import change
-from fides.service.privacy_request.privacy_request_service import PrivacyRequestError
+from fides.api.common_exceptions import PrivacyRequestError
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/ops/service/privacy_request/test_request_service.py` at line 26, The
test imports PrivacyRequestError from
fides.service.privacy_request.privacy_request_service which is a brittle
re-export; update the import to pull PrivacyRequestError directly from its
defining module fides.api.common_exceptions so tests reference the canonical
symbol (PrivacyRequestError) instead of a service-level re-export
(privacy_request_service).
tests/task/test_requeue_interrupted_tasks.py (1)

16-22: Finish removing cache-key based setup from this suite.

This module still carries cache_task_tracking_key, while requeue logic now keys off model celery_id. Converting remaining cache-key seeded cases to DB celery_id setup will keep these tests aligned with runtime behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/task/test_requeue_interrupted_tasks.py` around lines 16 - 22, Tests
still import and/or use the cache key symbol cache_task_tracking_key and related
cache seeding functions (get_cache, get_privacy_request_retry_count,
increment_privacy_request_retry_count, reset_privacy_request_retry_count);
update the suite to stop using cache-key based setup and instead seed the DB
celery_id on the relevant model instances that the requeue logic uses at
runtime. Replace any calls that set or read cache_task_tracking_key with code
that creates/updates the task model (or PrivacyRequest/Task model used by
requeue logic) to have the expected celery_id value, and remove imports of
cache_task_tracking_key and the cache helper functions from
tests/task/test_requeue_interrupted_tasks.py. Ensure assertions now check DB
state (by querying the model using the celery_id) rather than cache-based
counters.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py`:
- Around line 21-29: The migration's upgrade() currently relies on
sa.Column(..., index=True) which won't create indexes for ALTER TABLE
operations; after adding the celery_id columns to "privacyrequest" and
"requesttask" (the op.add_column calls that add sa.Column("celery_id", ...)),
add explicit op.create_index() calls to create indexes (e.g., names like
ix_privacyrequest_celery_id and ix_requesttask_celery_id) on those tables and
columns; update the downgrade() to explicitly drop those indexes with
op.drop_index() (using the same names) before removing the columns so index
cleanup is handled properly.

In `@src/fides/api/service/privacy_request/request_service.py`:
- Around line 583-593: The current branch sets should_requeue=True whenever
privacy_request.celery_id is NULL, which also requeues requests paused for user
input; change the logic in request_service.py so that when task_id
(privacy_request.celery_id) is falsy you first check
privacy_request.requires_input (or the equivalent flag/property) and skip
setting should_requeue for those requests (i.e., do not requeue requires_input
requests), leaving existing behavior for other NULL celery_id cases; update the
if-not-task_id block that sets should_requeue to early-return/continue or only
set should_requeue when not privacy_request.requires_input.

In `@src/fides/api/task/execute_request_tasks.py`:
- Around line 613-617: The code may queue a Celery task without persisting
request_task.celery_id because Session.object_session(request_task) can be None;
ensure a DB session exists and commit the change before dispatch: if db is None,
either obtain a new session (attach/merge request_task into that session), set
request_task.celery_id, and call db.commit(), or raise an explicit error to
block dispatch; update the block around Session.object_session(request_task),
request_task.celery_id, and db.commit() so persistence always occurs before the
task is sent.
- Around line 619-627: The persisted celery_id is written before calling
celery_task_fn.apply_async, so wrap the apply_async call in a try/except inside
the same scope where request_task (the object with the celery_id) is available;
on exception clear request_task.celery_id (or set a task-state field indicating
publish failure), persist that change to the DB (flush/commit using the same
session) to avoid stale tracking metadata, then re-raise the exception so
upstream error handling can run; reference celery_task_fn.apply_async,
request_task.celery_id, and the surrounding function in execute_request_tasks.py
to locate where to add the try/except and DB update.

In `@src/fides/service/privacy_request/privacy_request_service.py`:
- Around line 1007-1021: The code currently enqueues
run_privacy_request.apply_async even when PrivacyRequest.get(db=db,
object_id=privacy_request_id) returns None, producing orphaned tasks; change the
logic so after opening get_db and calling PrivacyRequest.get (the
privacy_request variable) you only set celery_id, commit, and call
run_privacy_request.apply_async when privacy_request is not None—if the record
is missing, do not call apply_async (instead return/raise or handle the
missing-request case as the surrounding flow expects). Ensure you reference the
existing symbols privacy_request, get_db, PrivacyRequest.get, celery_id, and
run_privacy_request.apply_async when implementing the guard.

---

Nitpick comments:
In `@tests/ops/service/privacy_request/test_request_service.py`:
- Line 26: The test imports PrivacyRequestError from
fides.service.privacy_request.privacy_request_service which is a brittle
re-export; update the import to pull PrivacyRequestError directly from its
defining module fides.api.common_exceptions so tests reference the canonical
symbol (PrivacyRequestError) instead of a service-level re-export
(privacy_request_service).

In `@tests/task/test_requeue_interrupted_tasks.py`:
- Around line 16-22: Tests still import and/or use the cache key symbol
cache_task_tracking_key and related cache seeding functions (get_cache,
get_privacy_request_retry_count, increment_privacy_request_retry_count,
reset_privacy_request_retry_count); update the suite to stop using cache-key
based setup and instead seed the DB celery_id on the relevant model instances
that the requeue logic uses at runtime. Replace any calls that set or read
cache_task_tracking_key with code that creates/updates the task model (or
PrivacyRequest/Task model used by requeue logic) to have the expected celery_id
value, and remove imports of cache_task_tracking_key and the cache helper
functions from tests/task/test_requeue_interrupted_tasks.py. Ensure assertions
now check DB state (by querying the model using the celery_id) rather than
cache-based counters.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3a808316-c6a8-4113-b557-36f94ca72e93

📥 Commits

Reviewing files that changed from the base of the PR and between 0e8f39e and 1209b4d.

📒 Files selected for processing (15)
  • changelog/7531-celery-id-db-tracking.yaml
  • src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py
  • src/fides/api/models/privacy_request/privacy_request.py
  • src/fides/api/models/privacy_request/request_task.py
  • src/fides/api/service/privacy_request/request_service.py
  • src/fides/api/task/execute_request_tasks.py
  • src/fides/service/privacy_request/privacy_request_service.py
  • tests/ops/api/v1/endpoints/privacy_request/test_privacy_request_endpoints.py
  • tests/ops/api/v1/endpoints/test_drp_endpoints.py
  • tests/ops/models/privacy_request/test_privacy_request.py
  • tests/ops/models/privacy_request/test_request_task.py
  • tests/ops/models/test_request_task.py
  • tests/ops/service/privacy_request/test_request_service.py
  • tests/ops/util/test_cache.py
  • tests/task/test_requeue_interrupted_tasks.py

Comment on lines +21 to +29
def upgrade():
op.add_column(
"privacyrequest",
sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
)
op.add_column(
"requesttask",
sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how migrations handle indexes - search for add_column with index=True
echo "=== Migrations using index=True in add_column ==="
rg -n "add_column.*index\s*=\s*True" --type py src/fides/api/alembic/migrations/versions/ | head -30

echo ""
echo "=== Migrations using explicit create_index ==="
rg -n "create_index" --type py src/fides/api/alembic/migrations/versions/ | head -30

Repository: ethyca/fides

Length of output: 3579


🏁 Script executed:

#!/bin/bash
# Examine the actual migration file
echo "=== Full migration file content ==="
cat -n src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py

Repository: ethyca/fides

Length of output: 1168


🏁 Script executed:

#!/bin/bash
# Check SQLAlchemy/Alembic version constraints
echo "=== Checking requirements or setup files for Alembic version ==="
find . -maxdepth 3 -type f \( -name "setup.py" -o -name "requirements*.txt" -o -name "pyproject.toml" -o -name "poetry.lock" \) | head -10

Repository: ethyca/fides

Length of output: 189


🏁 Script executed:

#!/bin/bash
# Check pyproject.toml for Alembic version
echo "=== Alembic version in pyproject.toml ==="
grep -A 5 -B 5 -i "alembic" pyproject.toml | head -30

echo ""
echo "=== SQLAlchemy version in pyproject.toml ==="
grep -A 1 -B 1 "sqlalchemy" pyproject.toml | head -20

Repository: ethyca/fides

Length of output: 1250


🏁 Script executed:

#!/bin/bash
# Double-check: search for ANY use of index=True in add_column across entire codebase
echo "=== Any add_column with index parameter anywhere ==="
rg "add_column.*index" --type py src/fides/api/alembic/migrations/versions/ -A 1 -B 1

Repository: ethyca/fides

Length of output: 108


🏁 Script executed:

#!/bin/bash
# Check if there's documentation or discussion about this in recent migrations
echo "=== Recent migration patterns - check a few recent files ==="
ls -t src/fides/api/alembic/migrations/versions/ | head -5 | while read file; do
  echo "=== File: $file ==="
  rg "add_column|create_index" "src/fides/api/alembic/migrations/versions/$file" -A 1 | head -20
done

Repository: ethyca/fides

Length of output: 914


🌐 Web query:

Alembic 1.8.1 add_column index=True behavior SQLAlchemy 1.4

💡 Result:

In Alembic 1.8.1 + SQLAlchemy 1.4, passing index=True on the Column you give to op.add_column() does not create an index in the database. Alembic’s add_column operation only emits ALTER TABLE ... ADD COLUMN ...; index creation is a separate DDL step (CREATE INDEX). (alembic.sqlalchemy.org)

Column(index=True) is primarily an inline convenience for table creation (it causes an Index object to be associated with the Table, and the CREATE INDEX DDL is emitted right after CREATE TABLE), but it is not something the database automatically applies when you later add a column via ALTER TABLE. (docs.sqlalchemy.org)

What to do in a migration

Create the index explicitly:

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.add_column("user", sa.Column("email", sa.String(length=255), nullable=True))
    op.create_index(op.f("ix_user_email"), "user", ["email"], unique=False)

def downgrade():
    op.drop_index(op.f("ix_user_email"), table_name="user")
    op.drop_column("user", "email")

(op.create_index(...) is the standard Alembic way to add the index.) (stackoverflow.com)

Citations:


🏁 Script executed:

#!/bin/bash
# Search for any documentation or comments in the codebase about index handling
echo "=== Looking for index-related comments in migrations ==="
rg "index.*create|create.*index" --type py src/fides/api/alembic/migrations/versions/ -B 2 | head -40

Repository: ethyca/fides

Length of output: 3304


🏁 Script executed:

#!/bin/bash
# Check one of the existing migrations that uses create_index to see the pattern
echo "=== Example migration with create_index pattern ==="
head -100 src/fides/api/alembic/migrations/versions/179f2bb623ae_update_table_for_twilio.py | tail -50

Repository: ethyca/fides

Length of output: 1689


Use explicit op.create_index() calls instead of index=True in add_column().

In Alembic 1.8.1 with SQLAlchemy 1.4, the index=True parameter in op.add_column() does not create indexes during migrations. The index=True attribute only applies to CREATE TABLE operations; for schema modifications via ALTER TABLE, indexes must be created explicitly with op.create_index(). This is the established pattern throughout the codebase.

🔧 Required fix for proper index creation
 def upgrade():
     op.add_column(
         "privacyrequest",
-        sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
+        sa.Column("celery_id", sa.String(length=255), nullable=True),
     )
+    op.create_index("ix_privacyrequest_celery_id", "privacyrequest", ["celery_id"])
     op.add_column(
         "requesttask",
-        sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
+        sa.Column("celery_id", sa.String(length=255), nullable=True),
     )
+    op.create_index("ix_requesttask_celery_id", "requesttask", ["celery_id"])

And update downgrade to remove the indexes:

 def downgrade():
+    op.drop_index("ix_requesttask_celery_id", table_name="requesttask")
     op.drop_column("requesttask", "celery_id")
+    op.drop_index("ix_privacyrequest_celery_id", table_name="privacyrequest")
     op.drop_column("privacyrequest", "celery_id")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py`
around lines 21 - 29, The migration's upgrade() currently relies on
sa.Column(..., index=True) which won't create indexes for ALTER TABLE
operations; after adding the celery_id columns to "privacyrequest" and
"requesttask" (the op.add_column calls that add sa.Column("celery_id", ...)),
add explicit op.create_index() calls to create indexes (e.g., names like
ix_privacyrequest_celery_id and ix_requesttask_celery_id) on those tables and
columns; update the downgrade() to explicitly drop those indexes with
op.drop_index() (using the same names) before removing the columns so index
cleanup is handled properly.

Comment on lines 583 to +593
if not task_id:
_cancel_interrupted_tasks_and_error_privacy_request(
db,
privacy_request,
f"No task ID found for privacy request {privacy_request.id}, "
f"request is stuck without a running task - canceling",
# NULL celery_id means the request was never dispatched or
# the column hasn't been populated yet (e.g. upgrade path).
# Requeue so completed work is skipped and incomplete work
# is retried.
logger.warning(
f"No celery_id on privacy request {privacy_request.id}, "
f"requeueing to recover"
)
continue

# Check if the main privacy request task is active
if task_id not in queued_tasks_ids and not celery_tasks_in_flight(
should_requeue = True
elif task_id not in queued_tasks_ids and not celery_tasks_in_flight(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not auto-requeue requires_input requests when main celery_id is NULL.

This branch currently requeues all NULL-main-id requests, including ones intentionally paused for user input. That can restart workflows unexpectedly.

Suggested guard
                 if not task_id:
+                    if privacy_request.status == PrivacyRequestStatus.requires_input:
+                        logger.warning(
+                            f"No celery_id on privacy request {privacy_request.id} in requires_input status - "
+                            f"keeping request in current status"
+                        )
+                        continue
                     # NULL celery_id means the request was never dispatched or
                     # the column hasn't been populated yet (e.g. upgrade path).
                     # Requeue so completed work is skipped and incomplete work
                     # is retried.
                     logger.warning(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fides/api/service/privacy_request/request_service.py` around lines 583 -
593, The current branch sets should_requeue=True whenever
privacy_request.celery_id is NULL, which also requeues requests paused for user
input; change the logic in request_service.py so that when task_id
(privacy_request.celery_id) is falsy you first check
privacy_request.requires_input (or the equivalent flag/property) and skip
setting should_requeue for those requests (i.e., do not requeue requires_input
requests), leaving existing behavior for other NULL celery_id cases; update the
if-not-task_id block that sets should_requeue to early-return/continue or only
set should_requeue when not privacy_request.requires_input.

Comment on lines +613 to +617
db: Session = Session.object_session(request_task)
if db:
request_task.celery_id = celery_id
db.commit()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Require DB persistence before dispatching.

If db is missing, the code still queues the Celery task, but never persists request_task.celery_id. That breaks the durable-tracking contract introduced by this PR.

Suggested fix
 def queue_request_task(
     request_task: RequestTask, privacy_request_proceed: bool = True
 ) -> None:
@@
     celery_id = str(uuid.uuid4())

     db: Session = Session.object_session(request_task)
-    if db:
-        request_task.celery_id = celery_id
-        db.commit()
+    if not db:
+        raise RuntimeError(
+            f"RequestTask {request_task.id} is not attached to a DB session; "
+            "cannot persist celery_id for durable tracking."
+        )
+    request_task.celery_id = celery_id
+    db.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fides/api/task/execute_request_tasks.py` around lines 613 - 617, The code
may queue a Celery task without persisting request_task.celery_id because
Session.object_session(request_task) can be None; ensure a DB session exists and
commit the change before dispatch: if db is None, either obtain a new session
(attach/merge request_task into that session), set request_task.celery_id, and
call db.commit(), or raise an explicit error to block dispatch; update the block
around Session.object_session(request_task), request_task.celery_id, and
db.commit() so persistence always occurs before the task is sent.

Comment on lines +619 to 627
celery_task_fn.apply_async(
queue=DSR_QUEUE_NAME,
task_id=celery_id,
kwargs={
"privacy_request_id": request_task.privacy_request_id,
"privacy_request_task_id": request_task.id,
"privacy_request_proceed": privacy_request_proceed,
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the actual code around lines 619-627
cat -n src/fides/api/task/execute_request_tasks.py | sed -n '600,650p'

Repository: ethyca/fides

Length of output: 1541


🏁 Script executed:

# Look for where celery_id is set/persisted before the apply_async call
rg -n 'celery_id' src/fides/api/task/execute_request_tasks.py -B 5 -A 5

Repository: ethyca/fides

Length of output: 922


🏁 Script executed:

# Check if there's any existing exception handling around apply_async calls
rg -n -B 10 -A 10 'apply_async' src/fides/api/task/execute_request_tasks.py

Repository: ethyca/fides

Length of output: 817


🏁 Script executed:

# Look at the broader queue_request_task function context
rg -n 'def queue_request_task' src/fides/api/task/execute_request_tasks.py -A 100 | head -120

Repository: ethyca/fides

Length of output: 1257


🏁 Script executed:

# Find all call sites of queue_request_task
rg -n 'queue_request_task\(' src/fides/api/task/execute_request_tasks.py -B 3 -A 3

Repository: ethyca/fides

Length of output: 746


🏁 Script executed:

# Check for exception handling patterns around queue_request_task calls in other files
rg -n 'queue_request_task' src/fides --type=py -B 5 -A 5 | head -100

Repository: ethyca/fides

Length of output: 8189


🏁 Script executed:

# Look at celery apply_async documentation/usage patterns in the codebase
rg -n 'apply_async' src/fides --type=py -B 2 -A 2 | head -80

Repository: ethyca/fides

Length of output: 3611


Add error handling for broker publish failures after persisting celery_id.

The celery_id is persisted to the database at line 616 and committed before apply_async() is called at line 619. If apply_async() fails (e.g., due to broker connection issues), the persisted celery_id will remain in the database despite the task never being published to the broker, creating stale tracking metadata that contradicts the function's durability intent.

Wrap apply_async() in a try/except block and clear celery_id (or mark the task state appropriately) on failure before re-raising the exception.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fides/api/task/execute_request_tasks.py` around lines 619 - 627, The
persisted celery_id is written before calling celery_task_fn.apply_async, so
wrap the apply_async call in a try/except inside the same scope where
request_task (the object with the celery_id) is available; on exception clear
request_task.celery_id (or set a task-state field indicating publish failure),
persist that change to the DB (flush/commit using the same session) to avoid
stale tracking metadata, then re-raise the exception so upstream error handling
can run; reference celery_task_fn.apply_async, request_task.celery_id, and the
surrounding function in execute_request_tasks.py to locate where to add the
try/except and DB update.

Comment on lines +1007 to 1021
with get_db() as db:
privacy_request = PrivacyRequest.get(db=db, object_id=privacy_request_id)
if privacy_request:
privacy_request.celery_id = celery_id
db.commit()

run_privacy_request.apply_async(
queue=DSR_QUEUE_NAME,
task_id=celery_id,
kwargs={
"privacy_request_id": privacy_request_id,
"from_webhook_id": from_webhook_id,
"from_step": from_step,
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid dispatching when the privacy request row is missing.

The task is still queued even when PrivacyRequest.get(...) returns None. That can enqueue orphaned work and return a misleading celery_id.

Suggested fix
     try:
         with get_db() as db:
             privacy_request = PrivacyRequest.get(db=db, object_id=privacy_request_id)
-            if privacy_request:
-                privacy_request.celery_id = celery_id
-                db.commit()
+            if not privacy_request:
+                raise PrivacyRequestError(
+                    f"Privacy request {privacy_request_id} not found during queueing"
+                )
+            privacy_request.celery_id = celery_id
+            db.commit()
 
         run_privacy_request.apply_async(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fides/service/privacy_request/privacy_request_service.py` around lines
1007 - 1021, The code currently enqueues run_privacy_request.apply_async even
when PrivacyRequest.get(db=db, object_id=privacy_request_id) returns None,
producing orphaned tasks; change the logic so after opening get_db and calling
PrivacyRequest.get (the privacy_request variable) you only set celery_id,
commit, and call run_privacy_request.apply_async when privacy_request is not
None—if the record is missing, do not call apply_async (instead return/raise or
handle the missing-request case as the surrounding flow expects). Ensure you
reference the existing symbols privacy_request, get_db, PrivacyRequest.get,
celery_id, and run_privacy_request.apply_async when implementing the guard.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 6, 2026

Greptile Summary

This PR moves Celery task ID tracking from ephemeral Redis cache to durable celery_id DB columns on PrivacyRequest and RequestTask. The overall approach is sound — pre-generating a UUID, persisting it before apply_async, and reading it back in requeue_interrupted_tasks — and the DB migration, model changes, and test refactors are clean and well-structured.

Key findings:

  • Logic concern in queue_request_task: If Session.object_session(request_task) returns None, the celery_id is silently not persisted while the Celery task still dispatches with that UUID. This could cause requeue_interrupted_tasks to see celery_id = NULL on a live task and incorrectly re-dispatch it, potentially leading to duplicate execution.
  • New inline import in queue_privacy_request (get_autoclose_db_session) is added mid-function with # pylint: disable=cyclic-import; this violates the project's convention of placing all imports at the top of the module.
  • Migration filename prefix xx_ is inconsistent with the rest of the project, which uses short alphanumeric revision hashes as the filename prefix.
  • Unused import: cache_task_tracking_key remains in the tests/task/test_requeue_interrupted_tasks.py import block after all its call sites were replaced with direct DB assignments.
  • The behavioral change in requeue_interrupted_tasks — treating NULL celery_id as "requeue" rather than "cancel" — is well-motivated for the upgrade path and is correctly bounded by the new retry-count limit.

Confidence Score: 3/5

  • Mostly safe to merge, but the silent DB skip in queue_request_task could cause duplicate task execution in edge cases and should be addressed before merging.
  • The migration and model changes are correct and well-tested. The main concern is in execute_request_tasks.py where a None DB session silently bypasses celery_id persistence while still dispatching the task — this directly undermines the durability guarantee that motivates the entire PR. The inline import violation and unused import are minor cleanup items. The behavioral change in requeue_interrupted_tasks (NULL → requeue vs. cancel) is intentional and bounded by retry count.
  • src/fides/api/task/execute_request_tasks.py — the silent DB skip in queue_request_task needs to be addressed to guarantee celery_id durability.

Important Files Changed

Filename Overview
src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py Adds nullable celery_id String(255) column to privacyrequest and requesttask tables. Downgrade correctly drops both columns. Minor: the xx_ filename prefix is inconsistent with the rest of the project's hash-based naming convention.
src/fides/api/task/execute_request_tasks.py Pre-generates UUID and persists celery_id before dispatching via apply_async. However, the DB write is silently skipped if Session.object_session(request_task) returns None, which would leave celery_id as NULL while a real task runs — the core goal of durable tracking is defeated in this path.
src/fides/service/privacy_request/privacy_request_service.py Correctly pre-generates and persists celery_id before dispatching. However, a new inline import for get_autoclose_db_session is introduced inside the function body, violating the project's import placement conventions.
src/fides/api/service/privacy_request/request_service.py Removes Redis-based get_cached_task_id, adds _cancel_interrupted_tasks_and_error_privacy_request, and updates requeue_interrupted_tasks to read celery_id from DB. Behavioral change: NULL celery_id now triggers requeue instead of cancel (intentional for upgrade path).
tests/task/test_requeue_interrupted_tasks.py Tests updated to use direct DB celery_id assignment. cache_task_tracking_key is still imported at the top of the file but appears to be unused after all usages were replaced with DB assignments.

Last reviewed commit: 1209b4d

Comment on lines +611 to +614
celery_id = str(uuid.uuid4())

db: Session = Session.object_session(request_task)
if db:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent DB skip defeats durability goal

If Session.object_session(request_task) returns None, celery_id is never written to the database, but apply_async still proceeds with the pre-generated UUID as the Celery task ID. This creates an invisible split:

  • The Celery task runs with a known UUID
  • The DB record has celery_id = NULL

When requeue_interrupted_tasks runs later, it will see celery_id IS NULL on this in-progress request task and treat it as interrupted — triggering a requeue even though the original task is still executing. This could result in duplicate task execution.

If Session.object_session(request_task) can legitimately return None here, the function should raise rather than silently skip the persistence step, so the caller can handle the failure explicitly:

db: Session = Session.object_session(request_task)
if not db:
    raise RuntimeError(
        f"RequestTask {request_task.id} has no attached DB session; cannot persist celery_id"
    )
request_task.celery_id = celery_id
db.commit()

Comment on lines +996 to 998
get_autoclose_db_session as get_db,
)
from fides.api.service.privacy_request.request_runner_service import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New inline import should be at module top

This PR introduces a new inline import of get_autoclose_db_session inside queue_privacy_request. The # pylint: disable=cyclic-import comment indicates a circular dependency, but the project convention is to resolve that at the module level (e.g., by restructuring) rather than using mid-function imports.

The existing run_privacy_request inline import (which was already in this function before this PR) follows the same pattern, but adding a second one compounds the issue. Both should ideally be moved to the top of the file, or the circular dependency should be broken via refactoring.

Context Used: Rule from dashboard - Python imports should always be placed at the top of the file, not near the code that uses them. (source)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +1 to +5
"""Add celery_id column to privacyrequest and requesttask tables

Moves Celery task ID tracking from ephemeral Redis cache to durable DB
columns, following the pattern already established by MonitorTask.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration filename uses non-standard xx_ prefix

All other migration files in versions/ use a short alphanumeric hash as the filename prefix (e.g., 0210948a8147_initial.py, 074796d61d8a_...py). This file uses xx_2026_02_28_1200_ which is inconsistent with the project convention and could confuse tooling or developers searching for the migration by its revision ID.

The filename should be renamed to match the convention, e.g.:
ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py

Note: Alembic uses the revision/down_revision variables inside the file for chain ordering — the filename itself is cosmetic — but consistency matters for readability and tooling.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@@ -13,7 +13,14 @@
REQUEUE_INTERRUPTED_TASKS_LOCK,
requeue_interrupted_tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import after migration to DB-backed celery_id

cache_task_tracking_key is still imported here, but all its call sites in this file have been replaced with direct request.celery_id = ... / db.commit() assignments as part of this PR. This import should be removed.

Suggested change
requeue_interrupted_tasks,
from fides.api.util.cache import (
get_cache,

"fides.api.service.privacy_request.request_service.celery_tasks_in_flight",
return_value=False,
)
def test_pending_task_awaiting_upstream_is_not_canceled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one might need to be updated

  • Writes PR task ID to cache; production reads celery_id=NULL from DB → takes requeue path. asserts mock_requeue.assert_not_called() — should fail
  • Writes upstream task's celery ID to cache; production reads celery_id=NULL from DB → upstream task treated as stuck

"fides.api.service.privacy_request.request_service.celery_tasks_in_flight",
return_value=False,
)
def test_pending_task_with_no_cache_key_is_canceled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same pattern — cache write is dead, production takes NULL-celery_id requeue path instead of the subtask-evaluation path this test exercises

@@ -606,17 +606,25 @@ def _build_upstream_access_data(
def queue_request_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fidesplus ManualTaskService._requeue_request_tasks calls queue_request_task. Since queue_request_task's signature is unchanged, the fidesplus caller is compatible. But worth confirming fidesplus tests pass with this change since the function now does a db.commit() internally — callers that previously controlled their own commit boundary may see different transactional behavior.

@JadeCara
Copy link
Contributor

JadeCara commented Mar 6, 2026

from codegraph - probably want to verify

cache_task_tracking_key and get_async_task_tracking_cache_key in cache.py have zero production callers after this branch. graph_impact confirms they're only called from tests. Once the 3 remaining test
call sites are fixed, these can be deleted.

@JadeCara
Copy link
Contributor

JadeCara commented Mar 6, 2026

The old code canceled privacy requests with no tracked task ID. The new code requeues them, as "completed work is idempotent and will be skipped on re-execution." That's true for access/erasure graph traversal (the RequestTask status tracks completion). But are there side effects in the request lifecycle that aren't idempotent? For example:

  • Webhook notifications fired during processing
  • Email sends triggered by status transitions
  • External API calls (consent propagation to third-party systems)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

run unsafe ci checks Runs fides-related CI checks that require sensitive credentials

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants