feat(import): add async CSV import queue with job status endpoint#1708
feat(import): add async CSV import queue with job status endpoint#1708ClemRz wants to merge 6 commits into
Conversation
Front-end Integration GuideThe CSV import endpoint has changed from synchronous (200 with results) to asynchronous (202 with batch ID + polling). Here's what needs to change in the front-end: API ChangesBefore (synchronous): After (asynchronous): Implementation Steps
UX Recommendations
Access Control
New Notification Type:
|
Dev Environment: Local File StorageThis PR also includes a local file storage mechanism for development (commit What it doesWhen How it works
No changes needed to test locallyJust run |
Paul-AUB
left a comment
There was a problem hiding this comment.
This is a well-structured PR with a clear design rationale. The separation between chunk processing, completion checking, and aggregation is clean, and the affinity-based chunking to preserve duplicate detection is a solid approach. A few issues need addressing before merge.
Scope note: 2760 additions; analysis focused on
CSVImportQueueService.js, the controller, model, migration, and config. The test files and Swagger doc were reviewed at a higher level.
Issues (Must Fix)
-
[api/services/CSVImportQueueService.js:377–403]
checkBatchCompletionis not idempotent, creating a duplicate-notification window. When the last two chunks complete at nearly the same time, both enqueue a completion-check job before either runs. The completion queue processes them sequentially, but both seeallDone = true(all chunks are already committed topgboss.jobascompleted). Both then callaggregateBatch, which uploads reports, creates aTNotification, and sends an email — resulting in the user receiving two emails and two in-app notifications. Fix: add an atomic test-and-set guard before callingaggregateBatch:const guard = await TJobBatch.updateOne({ id: batchId, status: 'active' }).set({ status: 'aggregating' }); if (!guard) return; // another worker already took this await module.exports.aggregateBatch(batchId, jobs);
This requires adding
'aggregating'to theisInvalidator and the SQLCHECKconstraint (or using a DB enum). -
[CODE_REVIEW.md] This self-review file should not be merged into the repository. It lists some items as open issues that the final implementation actually addresses (e.g., the race condition is mitigated via the completion queue), making the document misleading. The relevant discussion belongs in PR comments or the linked issue, not in version-controlled source. Remove this file before merging.
Suggestions (Should Consider)
-
[api/services/CSVImportQueueService.js:411–452]
aggregateBatchhas no error handling around the Azure upload step. IfgenerateAndUploadReportsthrows (e.g., transient Azure failure), the exception propagates up throughcheckBatchCompletion→processCompletionCheck, the completion-check job fails, andTJobBatch.statusstays stuck at'active'permanently — no retry, no error state, no notification. Consider wrapping the upload in try/catch and settingstatus: 'failed'on error so the user is informed:try { const reportUrls = await module.exports.generateAndUploadReports(...); // ... rest of aggregation } catch (err) { sails.log.error('CSVImportQueueService: aggregation failed:', err); await TJobBatch.updateOne({ id: batchId }).set({ status: 'failed', completedAt: new Date() }); }
-
[api/models/TJobBatch.js:1–77] Add
autoUpdatedAt: falseto the model. The model explicitly customizescreatedAt(good practice for a table that uses snake_case column names), but doesn't opt out of Waterline's automaticupdatedAtmanagement. The SQL migration has noupdated_atcolumn. Whileschema: truein the global config may prevent Waterline from inserting the column, this relies on implicit behaviour that could break silently if the global config changes. Being explicit is safer and consistent with the pattern already used forcreatedAt. -
[api/services/CSVImportQueueService.js:597–599] The email subject
'CSV Import Complete'is hardcoded English. Thelocaleparameter is correctly passed tosendEmail, so the EJS template body is fully translated, but the subject line will be English for all 15 locales. Add a locale-aware i18n key for the subject (e.g.,__('csv_import_complete_subject')).
Nitpicks (Optional)
-
[api/services/CSVImportQueueService.js:346–353] The outer
catchinprocessOneChunkswallows unexpected chunk-level errors and returns an emptyresult({ successes: [], duplicates: [], failures: [] }). From the user's perspective, rows in that chunk silently disappear — the completed batch will show fewer total rows thantotalRowswith no failure entries to explain the discrepancy. Consider pushing all remaining unprocessed rows as failures when the outer catch fires, or re-throwing so pg-boss can retry the job (givenretryLimit: 3is already configured). -
[scripts/test-async-import.sh:38]
TOKEN_SALT || 'aR4nd0mT0kenSalt'provides a hardcoded fallback secret. Acceptable for a local dev script, but the fallback risks accidentally generating valid tokens against a non-dev environment ifTOKEN_SALTis unset. Consider removing the fallback and requiring the variable explicitly.
|
Thanks for the thorough review @Paul-AUB! Pushed fixes in 14fb03f. Here's what was addressed: Fixed:
Not changed (with rationale):
|
14fb03f to
3c92d5e
Compare
- Refactor POST /api/v1/entrances/import-rows to return 202 with batch ID - Add CSVImportQueueService with pg-boss backed chunk processing - Add affinity-based chunking to keep duplicate-key rows together - Add TJobBatch model and t_job_batch migration for batch tracking - Add GET /api/v1/jobs/:batchId endpoint for progress polling - Generate CSV reports (success/duplicates/failures) uploaded to Azure Blob - Send email notification on import completion - Add IMPORT_COMPLETE notification type and t_notification FK - Update Swagger spec for new endpoints and schemas - Add unit, property-based, and DB integration tests # Conflicts: # assets/swaggerV1.yaml
- Store uploads in .local-uploads/ when AZURE_KEY is not set - Serve local files via /local-uploads/* middleware - Add .local-uploads/ to .gitignore - Update FileService tests for new local storage behavior
pg-boss v12 only stores the handler's return value as job output when teamSize is 1. With teamSize > 1, multiple jobs are passed to a single handler invocation and the output is discarded. Switch to teamSize:1, teamConcurrency:4 to keep parallelism while ensuring each job's result (successes/duplicates/failures) is persisted. This fixes the progress endpoint always showing 0 processed rows.
Generates 2000 rows (85% fresh, 15% duplicates), submits via the API, polls for completion, downloads reports, and prints a summary. Useful for end-to-end validation of the async import pipeline.
Add proper translations for the csv-import-complete email template keys in all 15 supported languages (ar, bg, ca, de, el, en, es, fr, he, id, it, ja, nl, pt, ro).
- Add atomic test-and-set guard in checkBatchCompletion to prevent duplicate notifications when concurrent completion-check jobs fire - Add error handling around generateAndUploadReports in aggregateBatch so batch transitions to 'failed' instead of staying stuck at 'active' - Push remaining unprocessed rows as failures when outer catch fires in processOneChunk to avoid silent row loss - Remove TOKEN_SALT hardcoded fallback in test-async-import.sh - Remove CODE_REVIEW.md (does not belong in version control) - Add 'aggregating' status to TJobBatch model and Swagger spec
3c92d5e to
d1c2a3e
Compare
🤔 What
Converts the synchronous
POST /api/v1/entrances/import-rowsendpoint into an async queue-based pipeline and adds a newGET /api/v1/jobs/:batchIdendpoint for progress polling.CSVImportQueueServicewith pg-boss backed chunk processingTJobBatchmodel andt_job_batchSQL migrationCloses #1574
🤷♂️ Why
The synchronous endpoint times out on large imports (thousands of rows). Moving to async processing:
🔍 How
Architecture:
CSVImportQueueService.createBatch()splits rows into chunks using affinity-based grouping (rows sharing the same dedup key always land in the same chunk)teamSize: 1, teamConcurrency: 4)Key design decisions:
teamSize: 1so pg-boss stores the handler return value as job output (with teamSize > 1, output is discarded by pg-boss v12)ENTRANCE_MANDATORY_COLUMNSextracted tocsvHelper.jsas single source of truthsails.services.coordinatessnapshotserviceused instead ofrequire()to get the Sails-loaded instance🧪 Testing
Tested locally with 2000 rows (1700 fresh + 300 duplicates) — completes in ~170s with correct classification (1700 successes, 300 duplicates, 0 failures).
📸 Previews
Polling response (in progress):
{ "batchId": "6e5f37de-...", "status": "active", "progress": { "totalChunks": 41, "completedChunks": 20, "processedRows": 980 } }Completed response:
{ "status": "completed", "progress": { "totalRows": 2000, "processedRows": 2000, "successes": 1700, "duplicates": 300, "failures": 0 }, "result": { "reportUrls": { "success": "https://...", "duplicates": "https://..." }, "summary": { "successes": 1700, "duplicates": 300, "failures": 0 } } }