Add celery_id DB tracking for privacy requests and request tasks#7531
Add celery_id DB tracking for privacy requests and request tasks#7531
Conversation
Store pre-generated Celery task IDs directly on PrivacyRequest and RequestTask models instead of in Redis cache. This makes task tracking durable across Redis restarts and enables reliable task cancellation. Changes: - Add celery_id column to privacyrequest and requesttask tables - Pre-generate and persist celery_id before dispatching Celery tasks - Add cancel_celery_tasks method for batch revocation - Add retry count tracking with configurable limits - Clean up unnecessary inline imports across source and test files Made-with: Cursor
|
The latest updates on your projects. Learn more about Vercel for GitHub. 2 Skipped Deployments
|
Made-with: Cursor
Combines DB-based celery_id tracking with main's upstream-awaiting logic for pending tasks. Cache-based task ID lookups removed since celery_id is now read directly from the RequestTask DB column. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThe pull request migrates Celery task ID tracking from ephemeral Redis cache to durable database columns (celery_id) on PrivacyRequest and RequestTask models, with corresponding updates across service layer and test suite. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/API
participant Service as Service Layer
participant DB as Database
participant Queue as Celery Queue
participant Cache as Redis Cache
Note over Client,Cache: Previous (Cache-based) Flow
Client->>Service: queue_request_task()
Service->>Queue: dispatch_task()
Queue-->>Service: task_id
Service->>Cache: store task_id (cache_task_tracking_key)
Cache-->>Service: ✓
Note over Client,DB: New (Database-backed) Flow
Client->>Service: queue_request_task()
Service->>Service: celery_id = uuid()
Service->>DB: privacy_request.celery_id = celery_id<br/>(commit)
DB-->>Service: ✓
Service->>Queue: dispatch_task(task_id=celery_id)
Queue-->>Service: task_id
Service-->>Client: celery_id (persisted)
sequenceDiagram
participant Client as Client/API
participant Service as Service Layer
participant DB as Database
participant Queue as Celery Queue
Note over Client,Queue: Task Status Retrieval (New Flow)
Client->>Service: requeue_interrupted_tasks()
Service->>DB: _get_request_tasks_in_progress()<br/>(fetch celery_id + status)
DB-->>Service: [(task_id, celery_id, status, awaiting_upstream)]
Service->>Queue: check queue state(celery_id)
Queue-->>Service: task running?
alt celery_id present AND in queue
Service->>Service: skip (task running)
else celery_id absent OR not in queue
Service->>DB: mark requires_input / requeue
DB-->>Service: ✓
end
Service-->>Client: tasks updated
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
tests/ops/service/privacy_request/test_request_service.py (1)
26-26: ImportPrivacyRequestErrorfrom its defining module.Using
fides.service.privacy_request.privacy_request_serviceas an exception re-export path is brittle. Prefer importingPrivacyRequestErrordirectly fromfides.api.common_exceptions.Suggested import change
-from fides.service.privacy_request.privacy_request_service import PrivacyRequestError +from fides.api.common_exceptions import PrivacyRequestError🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/ops/service/privacy_request/test_request_service.py` at line 26, The test imports PrivacyRequestError from fides.service.privacy_request.privacy_request_service which is a brittle re-export; update the import to pull PrivacyRequestError directly from its defining module fides.api.common_exceptions so tests reference the canonical symbol (PrivacyRequestError) instead of a service-level re-export (privacy_request_service).tests/task/test_requeue_interrupted_tasks.py (1)
16-22: Finish removing cache-key based setup from this suite.This module still carries
cache_task_tracking_key, while requeue logic now keys off modelcelery_id. Converting remaining cache-key seeded cases to DBcelery_idsetup will keep these tests aligned with runtime behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/task/test_requeue_interrupted_tasks.py` around lines 16 - 22, Tests still import and/or use the cache key symbol cache_task_tracking_key and related cache seeding functions (get_cache, get_privacy_request_retry_count, increment_privacy_request_retry_count, reset_privacy_request_retry_count); update the suite to stop using cache-key based setup and instead seed the DB celery_id on the relevant model instances that the requeue logic uses at runtime. Replace any calls that set or read cache_task_tracking_key with code that creates/updates the task model (or PrivacyRequest/Task model used by requeue logic) to have the expected celery_id value, and remove imports of cache_task_tracking_key and the cache helper functions from tests/task/test_requeue_interrupted_tasks.py. Ensure assertions now check DB state (by querying the model using the celery_id) rather than cache-based counters.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py`:
- Around line 21-29: The migration's upgrade() currently relies on
sa.Column(..., index=True) which won't create indexes for ALTER TABLE
operations; after adding the celery_id columns to "privacyrequest" and
"requesttask" (the op.add_column calls that add sa.Column("celery_id", ...)),
add explicit op.create_index() calls to create indexes (e.g., names like
ix_privacyrequest_celery_id and ix_requesttask_celery_id) on those tables and
columns; update the downgrade() to explicitly drop those indexes with
op.drop_index() (using the same names) before removing the columns so index
cleanup is handled properly.
In `@src/fides/api/service/privacy_request/request_service.py`:
- Around line 583-593: The current branch sets should_requeue=True whenever
privacy_request.celery_id is NULL, which also requeues requests paused for user
input; change the logic in request_service.py so that when task_id
(privacy_request.celery_id) is falsy you first check
privacy_request.requires_input (or the equivalent flag/property) and skip
setting should_requeue for those requests (i.e., do not requeue requires_input
requests), leaving existing behavior for other NULL celery_id cases; update the
if-not-task_id block that sets should_requeue to early-return/continue or only
set should_requeue when not privacy_request.requires_input.
In `@src/fides/api/task/execute_request_tasks.py`:
- Around line 613-617: The code may queue a Celery task without persisting
request_task.celery_id because Session.object_session(request_task) can be None;
ensure a DB session exists and commit the change before dispatch: if db is None,
either obtain a new session (attach/merge request_task into that session), set
request_task.celery_id, and call db.commit(), or raise an explicit error to
block dispatch; update the block around Session.object_session(request_task),
request_task.celery_id, and db.commit() so persistence always occurs before the
task is sent.
- Around line 619-627: The persisted celery_id is written before calling
celery_task_fn.apply_async, so wrap the apply_async call in a try/except inside
the same scope where request_task (the object with the celery_id) is available;
on exception clear request_task.celery_id (or set a task-state field indicating
publish failure), persist that change to the DB (flush/commit using the same
session) to avoid stale tracking metadata, then re-raise the exception so
upstream error handling can run; reference celery_task_fn.apply_async,
request_task.celery_id, and the surrounding function in execute_request_tasks.py
to locate where to add the try/except and DB update.
In `@src/fides/service/privacy_request/privacy_request_service.py`:
- Around line 1007-1021: The code currently enqueues
run_privacy_request.apply_async even when PrivacyRequest.get(db=db,
object_id=privacy_request_id) returns None, producing orphaned tasks; change the
logic so after opening get_db and calling PrivacyRequest.get (the
privacy_request variable) you only set celery_id, commit, and call
run_privacy_request.apply_async when privacy_request is not None—if the record
is missing, do not call apply_async (instead return/raise or handle the
missing-request case as the surrounding flow expects). Ensure you reference the
existing symbols privacy_request, get_db, PrivacyRequest.get, celery_id, and
run_privacy_request.apply_async when implementing the guard.
---
Nitpick comments:
In `@tests/ops/service/privacy_request/test_request_service.py`:
- Line 26: The test imports PrivacyRequestError from
fides.service.privacy_request.privacy_request_service which is a brittle
re-export; update the import to pull PrivacyRequestError directly from its
defining module fides.api.common_exceptions so tests reference the canonical
symbol (PrivacyRequestError) instead of a service-level re-export
(privacy_request_service).
In `@tests/task/test_requeue_interrupted_tasks.py`:
- Around line 16-22: Tests still import and/or use the cache key symbol
cache_task_tracking_key and related cache seeding functions (get_cache,
get_privacy_request_retry_count, increment_privacy_request_retry_count,
reset_privacy_request_retry_count); update the suite to stop using cache-key
based setup and instead seed the DB celery_id on the relevant model instances
that the requeue logic uses at runtime. Replace any calls that set or read
cache_task_tracking_key with code that creates/updates the task model (or
PrivacyRequest/Task model used by requeue logic) to have the expected celery_id
value, and remove imports of cache_task_tracking_key and the cache helper
functions from tests/task/test_requeue_interrupted_tasks.py. Ensure assertions
now check DB state (by querying the model using the celery_id) rather than
cache-based counters.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3a808316-c6a8-4113-b557-36f94ca72e93
📒 Files selected for processing (15)
changelog/7531-celery-id-db-tracking.yamlsrc/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.pysrc/fides/api/models/privacy_request/privacy_request.pysrc/fides/api/models/privacy_request/request_task.pysrc/fides/api/service/privacy_request/request_service.pysrc/fides/api/task/execute_request_tasks.pysrc/fides/service/privacy_request/privacy_request_service.pytests/ops/api/v1/endpoints/privacy_request/test_privacy_request_endpoints.pytests/ops/api/v1/endpoints/test_drp_endpoints.pytests/ops/models/privacy_request/test_privacy_request.pytests/ops/models/privacy_request/test_request_task.pytests/ops/models/test_request_task.pytests/ops/service/privacy_request/test_request_service.pytests/ops/util/test_cache.pytests/task/test_requeue_interrupted_tasks.py
| def upgrade(): | ||
| op.add_column( | ||
| "privacyrequest", | ||
| sa.Column("celery_id", sa.String(length=255), nullable=True, index=True), | ||
| ) | ||
| op.add_column( | ||
| "requesttask", | ||
| sa.Column("celery_id", sa.String(length=255), nullable=True, index=True), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how migrations handle indexes - search for add_column with index=True
echo "=== Migrations using index=True in add_column ==="
rg -n "add_column.*index\s*=\s*True" --type py src/fides/api/alembic/migrations/versions/ | head -30
echo ""
echo "=== Migrations using explicit create_index ==="
rg -n "create_index" --type py src/fides/api/alembic/migrations/versions/ | head -30Repository: ethyca/fides
Length of output: 3579
🏁 Script executed:
#!/bin/bash
# Examine the actual migration file
echo "=== Full migration file content ==="
cat -n src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.pyRepository: ethyca/fides
Length of output: 1168
🏁 Script executed:
#!/bin/bash
# Check SQLAlchemy/Alembic version constraints
echo "=== Checking requirements or setup files for Alembic version ==="
find . -maxdepth 3 -type f \( -name "setup.py" -o -name "requirements*.txt" -o -name "pyproject.toml" -o -name "poetry.lock" \) | head -10Repository: ethyca/fides
Length of output: 189
🏁 Script executed:
#!/bin/bash
# Check pyproject.toml for Alembic version
echo "=== Alembic version in pyproject.toml ==="
grep -A 5 -B 5 -i "alembic" pyproject.toml | head -30
echo ""
echo "=== SQLAlchemy version in pyproject.toml ==="
grep -A 1 -B 1 "sqlalchemy" pyproject.toml | head -20Repository: ethyca/fides
Length of output: 1250
🏁 Script executed:
#!/bin/bash
# Double-check: search for ANY use of index=True in add_column across entire codebase
echo "=== Any add_column with index parameter anywhere ==="
rg "add_column.*index" --type py src/fides/api/alembic/migrations/versions/ -A 1 -B 1Repository: ethyca/fides
Length of output: 108
🏁 Script executed:
#!/bin/bash
# Check if there's documentation or discussion about this in recent migrations
echo "=== Recent migration patterns - check a few recent files ==="
ls -t src/fides/api/alembic/migrations/versions/ | head -5 | while read file; do
echo "=== File: $file ==="
rg "add_column|create_index" "src/fides/api/alembic/migrations/versions/$file" -A 1 | head -20
doneRepository: ethyca/fides
Length of output: 914
🌐 Web query:
Alembic 1.8.1 add_column index=True behavior SQLAlchemy 1.4
💡 Result:
In Alembic 1.8.1 + SQLAlchemy 1.4, passing index=True on the Column you give to op.add_column() does not create an index in the database. Alembic’s add_column operation only emits ALTER TABLE ... ADD COLUMN ...; index creation is a separate DDL step (CREATE INDEX). (alembic.sqlalchemy.org)
Column(index=True) is primarily an inline convenience for table creation (it causes an Index object to be associated with the Table, and the CREATE INDEX DDL is emitted right after CREATE TABLE), but it is not something the database automatically applies when you later add a column via ALTER TABLE. (docs.sqlalchemy.org)
What to do in a migration
Create the index explicitly:
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column("user", sa.Column("email", sa.String(length=255), nullable=True))
op.create_index(op.f("ix_user_email"), "user", ["email"], unique=False)
def downgrade():
op.drop_index(op.f("ix_user_email"), table_name="user")
op.drop_column("user", "email")(op.create_index(...) is the standard Alembic way to add the index.) (stackoverflow.com)
Citations:
- 1: https://alembic.sqlalchemy.org/en/latest/api/operations.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/21/core/constraints.html?utm_source=openai
- 3: https://stackoverflow.com/questions/72527745/how-to-alter-index-of-a-column-in-alembic?utm_source=openai
🏁 Script executed:
#!/bin/bash
# Search for any documentation or comments in the codebase about index handling
echo "=== Looking for index-related comments in migrations ==="
rg "index.*create|create.*index" --type py src/fides/api/alembic/migrations/versions/ -B 2 | head -40Repository: ethyca/fides
Length of output: 3304
🏁 Script executed:
#!/bin/bash
# Check one of the existing migrations that uses create_index to see the pattern
echo "=== Example migration with create_index pattern ==="
head -100 src/fides/api/alembic/migrations/versions/179f2bb623ae_update_table_for_twilio.py | tail -50Repository: ethyca/fides
Length of output: 1689
Use explicit op.create_index() calls instead of index=True in add_column().
In Alembic 1.8.1 with SQLAlchemy 1.4, the index=True parameter in op.add_column() does not create indexes during migrations. The index=True attribute only applies to CREATE TABLE operations; for schema modifications via ALTER TABLE, indexes must be created explicitly with op.create_index(). This is the established pattern throughout the codebase.
🔧 Required fix for proper index creation
def upgrade():
op.add_column(
"privacyrequest",
- sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
+ sa.Column("celery_id", sa.String(length=255), nullable=True),
)
+ op.create_index("ix_privacyrequest_celery_id", "privacyrequest", ["celery_id"])
op.add_column(
"requesttask",
- sa.Column("celery_id", sa.String(length=255), nullable=True, index=True),
+ sa.Column("celery_id", sa.String(length=255), nullable=True),
)
+ op.create_index("ix_requesttask_celery_id", "requesttask", ["celery_id"])And update downgrade to remove the indexes:
def downgrade():
+ op.drop_index("ix_requesttask_celery_id", table_name="requesttask")
op.drop_column("requesttask", "celery_id")
+ op.drop_index("ix_privacyrequest_celery_id", table_name="privacyrequest")
op.drop_column("privacyrequest", "celery_id")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py`
around lines 21 - 29, The migration's upgrade() currently relies on
sa.Column(..., index=True) which won't create indexes for ALTER TABLE
operations; after adding the celery_id columns to "privacyrequest" and
"requesttask" (the op.add_column calls that add sa.Column("celery_id", ...)),
add explicit op.create_index() calls to create indexes (e.g., names like
ix_privacyrequest_celery_id and ix_requesttask_celery_id) on those tables and
columns; update the downgrade() to explicitly drop those indexes with
op.drop_index() (using the same names) before removing the columns so index
cleanup is handled properly.
| if not task_id: | ||
| _cancel_interrupted_tasks_and_error_privacy_request( | ||
| db, | ||
| privacy_request, | ||
| f"No task ID found for privacy request {privacy_request.id}, " | ||
| f"request is stuck without a running task - canceling", | ||
| # NULL celery_id means the request was never dispatched or | ||
| # the column hasn't been populated yet (e.g. upgrade path). | ||
| # Requeue so completed work is skipped and incomplete work | ||
| # is retried. | ||
| logger.warning( | ||
| f"No celery_id on privacy request {privacy_request.id}, " | ||
| f"requeueing to recover" | ||
| ) | ||
| continue | ||
|
|
||
| # Check if the main privacy request task is active | ||
| if task_id not in queued_tasks_ids and not celery_tasks_in_flight( | ||
| should_requeue = True | ||
| elif task_id not in queued_tasks_ids and not celery_tasks_in_flight( |
There was a problem hiding this comment.
Do not auto-requeue requires_input requests when main celery_id is NULL.
This branch currently requeues all NULL-main-id requests, including ones intentionally paused for user input. That can restart workflows unexpectedly.
Suggested guard
if not task_id:
+ if privacy_request.status == PrivacyRequestStatus.requires_input:
+ logger.warning(
+ f"No celery_id on privacy request {privacy_request.id} in requires_input status - "
+ f"keeping request in current status"
+ )
+ continue
# NULL celery_id means the request was never dispatched or
# the column hasn't been populated yet (e.g. upgrade path).
# Requeue so completed work is skipped and incomplete work
# is retried.
logger.warning(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/fides/api/service/privacy_request/request_service.py` around lines 583 -
593, The current branch sets should_requeue=True whenever
privacy_request.celery_id is NULL, which also requeues requests paused for user
input; change the logic in request_service.py so that when task_id
(privacy_request.celery_id) is falsy you first check
privacy_request.requires_input (or the equivalent flag/property) and skip
setting should_requeue for those requests (i.e., do not requeue requires_input
requests), leaving existing behavior for other NULL celery_id cases; update the
if-not-task_id block that sets should_requeue to early-return/continue or only
set should_requeue when not privacy_request.requires_input.
| db: Session = Session.object_session(request_task) | ||
| if db: | ||
| request_task.celery_id = celery_id | ||
| db.commit() | ||
|
|
There was a problem hiding this comment.
Require DB persistence before dispatching.
If db is missing, the code still queues the Celery task, but never persists request_task.celery_id. That breaks the durable-tracking contract introduced by this PR.
Suggested fix
def queue_request_task(
request_task: RequestTask, privacy_request_proceed: bool = True
) -> None:
@@
celery_id = str(uuid.uuid4())
db: Session = Session.object_session(request_task)
- if db:
- request_task.celery_id = celery_id
- db.commit()
+ if not db:
+ raise RuntimeError(
+ f"RequestTask {request_task.id} is not attached to a DB session; "
+ "cannot persist celery_id for durable tracking."
+ )
+ request_task.celery_id = celery_id
+ db.commit()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/fides/api/task/execute_request_tasks.py` around lines 613 - 617, The code
may queue a Celery task without persisting request_task.celery_id because
Session.object_session(request_task) can be None; ensure a DB session exists and
commit the change before dispatch: if db is None, either obtain a new session
(attach/merge request_task into that session), set request_task.celery_id, and
call db.commit(), or raise an explicit error to block dispatch; update the block
around Session.object_session(request_task), request_task.celery_id, and
db.commit() so persistence always occurs before the task is sent.
| celery_task_fn.apply_async( | ||
| queue=DSR_QUEUE_NAME, | ||
| task_id=celery_id, | ||
| kwargs={ | ||
| "privacy_request_id": request_task.privacy_request_id, | ||
| "privacy_request_task_id": request_task.id, | ||
| "privacy_request_proceed": privacy_request_proceed, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the actual code around lines 619-627
cat -n src/fides/api/task/execute_request_tasks.py | sed -n '600,650p'Repository: ethyca/fides
Length of output: 1541
🏁 Script executed:
# Look for where celery_id is set/persisted before the apply_async call
rg -n 'celery_id' src/fides/api/task/execute_request_tasks.py -B 5 -A 5Repository: ethyca/fides
Length of output: 922
🏁 Script executed:
# Check if there's any existing exception handling around apply_async calls
rg -n -B 10 -A 10 'apply_async' src/fides/api/task/execute_request_tasks.pyRepository: ethyca/fides
Length of output: 817
🏁 Script executed:
# Look at the broader queue_request_task function context
rg -n 'def queue_request_task' src/fides/api/task/execute_request_tasks.py -A 100 | head -120Repository: ethyca/fides
Length of output: 1257
🏁 Script executed:
# Find all call sites of queue_request_task
rg -n 'queue_request_task\(' src/fides/api/task/execute_request_tasks.py -B 3 -A 3Repository: ethyca/fides
Length of output: 746
🏁 Script executed:
# Check for exception handling patterns around queue_request_task calls in other files
rg -n 'queue_request_task' src/fides --type=py -B 5 -A 5 | head -100Repository: ethyca/fides
Length of output: 8189
🏁 Script executed:
# Look at celery apply_async documentation/usage patterns in the codebase
rg -n 'apply_async' src/fides --type=py -B 2 -A 2 | head -80Repository: ethyca/fides
Length of output: 3611
Add error handling for broker publish failures after persisting celery_id.
The celery_id is persisted to the database at line 616 and committed before apply_async() is called at line 619. If apply_async() fails (e.g., due to broker connection issues), the persisted celery_id will remain in the database despite the task never being published to the broker, creating stale tracking metadata that contradicts the function's durability intent.
Wrap apply_async() in a try/except block and clear celery_id (or mark the task state appropriately) on failure before re-raising the exception.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/fides/api/task/execute_request_tasks.py` around lines 619 - 627, The
persisted celery_id is written before calling celery_task_fn.apply_async, so
wrap the apply_async call in a try/except inside the same scope where
request_task (the object with the celery_id) is available; on exception clear
request_task.celery_id (or set a task-state field indicating publish failure),
persist that change to the DB (flush/commit using the same session) to avoid
stale tracking metadata, then re-raise the exception so upstream error handling
can run; reference celery_task_fn.apply_async, request_task.celery_id, and the
surrounding function in execute_request_tasks.py to locate where to add the
try/except and DB update.
| with get_db() as db: | ||
| privacy_request = PrivacyRequest.get(db=db, object_id=privacy_request_id) | ||
| if privacy_request: | ||
| privacy_request.celery_id = celery_id | ||
| db.commit() | ||
|
|
||
| run_privacy_request.apply_async( | ||
| queue=DSR_QUEUE_NAME, | ||
| task_id=celery_id, | ||
| kwargs={ | ||
| "privacy_request_id": privacy_request_id, | ||
| "from_webhook_id": from_webhook_id, | ||
| "from_step": from_step, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Avoid dispatching when the privacy request row is missing.
The task is still queued even when PrivacyRequest.get(...) returns None. That can enqueue orphaned work and return a misleading celery_id.
Suggested fix
try:
with get_db() as db:
privacy_request = PrivacyRequest.get(db=db, object_id=privacy_request_id)
- if privacy_request:
- privacy_request.celery_id = celery_id
- db.commit()
+ if not privacy_request:
+ raise PrivacyRequestError(
+ f"Privacy request {privacy_request_id} not found during queueing"
+ )
+ privacy_request.celery_id = celery_id
+ db.commit()
run_privacy_request.apply_async(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/fides/service/privacy_request/privacy_request_service.py` around lines
1007 - 1021, The code currently enqueues run_privacy_request.apply_async even
when PrivacyRequest.get(db=db, object_id=privacy_request_id) returns None,
producing orphaned tasks; change the logic so after opening get_db and calling
PrivacyRequest.get (the privacy_request variable) you only set celery_id,
commit, and call run_privacy_request.apply_async when privacy_request is not
None—if the record is missing, do not call apply_async (instead return/raise or
handle the missing-request case as the surrounding flow expects). Ensure you
reference the existing symbols privacy_request, get_db, PrivacyRequest.get,
celery_id, and run_privacy_request.apply_async when implementing the guard.
Greptile SummaryThis PR moves Celery task ID tracking from ephemeral Redis cache to durable Key findings:
Confidence Score: 3/5
Important Files Changed
Last reviewed commit: 1209b4d |
| celery_id = str(uuid.uuid4()) | ||
|
|
||
| db: Session = Session.object_session(request_task) | ||
| if db: |
There was a problem hiding this comment.
Silent DB skip defeats durability goal
If Session.object_session(request_task) returns None, celery_id is never written to the database, but apply_async still proceeds with the pre-generated UUID as the Celery task ID. This creates an invisible split:
- The Celery task runs with a known UUID
- The DB record has
celery_id = NULL
When requeue_interrupted_tasks runs later, it will see celery_id IS NULL on this in-progress request task and treat it as interrupted — triggering a requeue even though the original task is still executing. This could result in duplicate task execution.
If Session.object_session(request_task) can legitimately return None here, the function should raise rather than silently skip the persistence step, so the caller can handle the failure explicitly:
db: Session = Session.object_session(request_task)
if not db:
raise RuntimeError(
f"RequestTask {request_task.id} has no attached DB session; cannot persist celery_id"
)
request_task.celery_id = celery_id
db.commit()| get_autoclose_db_session as get_db, | ||
| ) | ||
| from fides.api.service.privacy_request.request_runner_service import ( |
There was a problem hiding this comment.
New inline import should be at module top
This PR introduces a new inline import of get_autoclose_db_session inside queue_privacy_request. The # pylint: disable=cyclic-import comment indicates a circular dependency, but the project convention is to resolve that at the module level (e.g., by restructuring) rather than using mid-function imports.
The existing run_privacy_request inline import (which was already in this function before this PR) follows the same pattern, but adding a second one compounds the issue. Both should ideally be moved to the top of the file, or the circular dependency should be broken via refactoring.
Context Used: Rule from dashboard - Python imports should always be placed at the top of the file, not near the code that uses them. (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| """Add celery_id column to privacyrequest and requesttask tables | ||
|
|
||
| Moves Celery task ID tracking from ephemeral Redis cache to durable DB | ||
| columns, following the pattern already established by MonitorTask. | ||
|
|
There was a problem hiding this comment.
Migration filename uses non-standard xx_ prefix
All other migration files in versions/ use a short alphanumeric hash as the filename prefix (e.g., 0210948a8147_initial.py, 074796d61d8a_...py). This file uses xx_2026_02_28_1200_ which is inconsistent with the project convention and could confuse tooling or developers searching for the migration by its revision ID.
The filename should be renamed to match the convention, e.g.:
ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py
Note: Alembic uses the revision/down_revision variables inside the file for chain ordering — the filename itself is cosmetic — but consistency matters for readability and tooling.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| @@ -13,7 +13,14 @@ | |||
| REQUEUE_INTERRUPTED_TASKS_LOCK, | |||
| requeue_interrupted_tasks, | |||
There was a problem hiding this comment.
Unused import after migration to DB-backed celery_id
cache_task_tracking_key is still imported here, but all its call sites in this file have been replaced with direct request.celery_id = ... / db.commit() assignments as part of this PR. This import should be removed.
| requeue_interrupted_tasks, | |
| from fides.api.util.cache import ( | |
| get_cache, |
| "fides.api.service.privacy_request.request_service.celery_tasks_in_flight", | ||
| return_value=False, | ||
| ) | ||
| def test_pending_task_awaiting_upstream_is_not_canceled( |
There was a problem hiding this comment.
I think this one might need to be updated
- Writes PR task ID to cache; production reads celery_id=NULL from DB → takes requeue path. asserts mock_requeue.assert_not_called() — should fail
- Writes upstream task's celery ID to cache; production reads celery_id=NULL from DB → upstream task treated as stuck
| "fides.api.service.privacy_request.request_service.celery_tasks_in_flight", | ||
| return_value=False, | ||
| ) | ||
| def test_pending_task_with_no_cache_key_is_canceled( |
There was a problem hiding this comment.
Same pattern — cache write is dead, production takes NULL-celery_id requeue path instead of the subtask-evaluation path this test exercises
| @@ -606,17 +606,25 @@ def _build_upstream_access_data( | |||
| def queue_request_task( | |||
There was a problem hiding this comment.
fidesplus ManualTaskService._requeue_request_tasks calls queue_request_task. Since queue_request_task's signature is unchanged, the fidesplus caller is compatible. But worth confirming fidesplus tests pass with this change since the function now does a db.commit() internally — callers that previously controlled their own commit boundary may see different transactional behavior.
|
from
|
|
The old code canceled privacy requests with no tracked task ID. The new code requeues them, as "completed work is idempotent and will be skipped on re-execution." That's true for access/erasure graph traversal (the RequestTask status tracks completion). But are there side effects in the request lifecycle that aren't idempotent? For example:
|
Description Of Changes
Stores pre-generated Celery task IDs directly on
PrivacyRequestandRequestTaskmodels instead of in Redis cache. This makes task tracking durable across Redis restarts and enables reliable task cancellation via batchrevoke().Key changes:
celery_idcolumn toprivacyrequestandrequesttasktables via Alembic migrationcancel_celery_tasks()method onPrivacyRequestfor batch revocationget_request_task_celery_task_ids()helper methodcache_task_tracking_keywith durable DB persistenceprivacy_request_requeue_retry_countconfig_cancel_interrupted_tasks_and_error_privacy_request()for interrupted task cleanupCode Changes
src/fides/api/alembic/migrations/versions/xx_2026_02_28_1200_ce488ce97a31_add_celery_id_to_privacy_request_and_request_task.py- New migration addingcelery_idcolumn to both tablessrc/fides/api/models/privacy_request/privacy_request.py- Addcelery_idcolumn,cancel_celery_tasks(),get_request_task_celery_task_ids(), celery task cancellation incancel_processing()src/fides/api/models/privacy_request/request_task.py- Addcelery_idcolumnsrc/fides/api/task/execute_request_tasks.py- Pre-generate and persistcelery_idbefore dispatching; moveuuidto top-level importsrc/fides/service/privacy_request/privacy_request_service.py- Pre-generate and persistcelery_idfor privacy request tasks; moveuuidto top-level importsrc/fides/api/service/privacy_request/request_service.py- Add_cancel_interrupted_tasks_and_error_privacy_request(), retry count tracking in requeue flowtests/ops/models/privacy_request/test_privacy_request.py- Tests forcancel_celery_tasks,get_request_task_celery_task_ids, integration withcancel_processing; clean up inline importstests/ops/models/privacy_request/test_request_task.py- Tests forcelery_idcolumn onRequestTasktests/ops/service/privacy_request/test_request_service.py- Tests for_cancel_interrupted_tasks_and_error_privacy_request, retry count requeue handling; clean up inline importstests/ops/util/test_cache.py- Tests for retry count cache functions; consolidate inline imports to top-leveltests/task/test_requeue_interrupted_tasks.py- Tests for enhanced requeue with retry limits; clean up inline importsSteps to Confirm
celery_idcolumn exists on bothprivacyrequestandrequesttasktablescelery_idis populated on thePrivacyRequestrecord before the Celery task startscelery_idpopulatedcancel_celery_tasks()issues batch revokepytest --no-cov tests/ops/models/privacy_request/test_privacy_request.py::TestCancelCeleryTasks tests/ops/service/privacy_request/test_request_service.py::TestCancelInterruptedTasksAndErrorPrivacyRequest tests/task/test_requeue_interrupted_tasks.py::TestEnhancedRequeueInterruptedTasks tests/ops/util/test_cache.py::TestPrivacyRequestRetryCache -vPre-Merge Checklist
CHANGELOG.mdupdatedmaindowngrade()migration is correct and worksMade with Cursor
Summary by CodeRabbit