-
Notifications
You must be signed in to change notification settings - Fork 50
feat: Implement queue-based async DB export pipeline #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9eb0c20
045117e
20d9e7d
282ac44
13bd9e5
4e9cbc2
30bf4c4
c2e37a4
5ef25f8
0c6f88a
5fd7eb1
d47dd37
c9c86c9
12cc3f0
99c722a
a6f3555
9cea150
dce298a
aa5547a
18ab734
d717649
9c14503
414fe0a
a18a81b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| FROM node:22-alpine | ||
|
|
||
| WORKDIR /app | ||
|
|
||
| # Copy root package files | ||
| COPY package.json package-lock.json ./ | ||
|
|
||
| # Copy workspace package.json files | ||
| COPY packages/common/package.json ./packages/common/ | ||
| COPY apps/consumer/package.json ./apps/consumer/ | ||
|
|
||
| RUN npm ci | ||
|
|
||
| # Copy actual source code | ||
| COPY packages/common ./packages/common | ||
| COPY apps/consumer ./apps/consumer | ||
|
|
||
| WORKDIR /app/apps/consumer | ||
|
|
||
| CMD ["npm", "run", "start"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| { | ||
| "name": "consumer", | ||
| "version": "1.0.0", | ||
| "description": "", | ||
| "main": "src/app.js", | ||
| "scripts": { | ||
| "dev": "node src/index.js", | ||
| "start": "node src/index.js", | ||
| "test": "jest --testPathPatterns=src/" | ||
| }, | ||
| "keywords": [], | ||
| "author": "", | ||
| "license": "ISC", | ||
| "type": "commonjs" | ||
| } | ||
|
Comment on lines
+1
to
+15
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§© Analysis chainπ Script executed: #!/bin/bash
set -euo pipefail
# Show consumer manifest
echo "---- apps/consumer/package.json ----"
cat -n apps/consumer/package.json
# Identify import/require usage in consumer src
echo "---- imports in apps/consumer/src ----"
rg -n --no-heading "(require\\(|from\\s+['\"]|import\\s+['\"])\\s*('dotenv'|\"dotenv\"|dotenv)|\\bbullmq\\b|`@urbackend/common`" apps/consumer/src || true
# Show consumer src key files referenced
echo "---- list apps/consumer/src ----"
find apps/consumer/src -maxdepth 2 -type f -print
# Inspect monorepo root package.json for workspace/hoisting rules
echo "---- root package.json ----"
ls -1
if [ -f package.json ]; then cat -n package.json; fi
# If a package manager config exists (pnpm/yarn/npm), show workspace relevant sections
for f in pnpm-workspace.yaml yarn.lock package-lock.json pnpm-lock.yaml .npmrc .yarnrc.yml; do
if [ -e "$f" ]; then
echo "---- $f ----"
head -n 80 "$f" || true
fi
doneRepository: geturbackend/urBackend Length of output: 4313 π Script executed: #!/bin/bash
set -euo pipefail
# Show consumer manifest
echo "---- apps/consumer/package.json ----"
cat -n apps/consumer/package.json
# Identify import/require usage in consumer src
echo "---- imports in apps/consumer/src ----"
rg -n --no-heading "(require\\(|from\\s+['\"]|import\\s+['\"])\\s*('dotenv'|\"dotenv\"|dotenv)|\\bbullmq\\b|`@urbackend/common`" apps/consumer/src || true
# Show consumer src key files referenced
echo "---- list apps/consumer/src ----"
find apps/consumer/src -maxdepth 2 -type f -print
# Inspect monorepo root package.json for workspace/hoisting rules
echo "---- root package.json ----"
ls -1
if [ -f package.json ]; then cat -n package.json; fi
# If a package manager config exists (pnpm/yarn/npm), show workspace relevant sections
for f in pnpm-workspace.yaml yarn.lock package-lock.json pnpm-lock.yaml .npmrc .yarnrc.yml; do
if [ -e "$f" ]; then
echo "---- $f ----"
head -n 80 "$f" || true
fi
doneRepository: geturbackend/urBackend Length of output: 4313 Declare
π€ Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| const dotenv = require('dotenv'); | ||
| dotenv.config({ path: require('path').join(__dirname, '../../../.env') }); | ||
|
|
||
| const { validateEnv } = require('@urbackend/common'); | ||
|
|
||
| if (process.env.NODE_ENV !== 'test') { | ||
| validateEnv(); | ||
| } | ||
|
|
||
| const { initExportWorker } = require('./workers/export.worker'); | ||
|
|
||
| const { connectDB } = require('@urbackend/common'); | ||
|
|
||
| (async () => { | ||
| try { | ||
| await connectDB(); | ||
|
|
||
| const worker = initExportWorker(); | ||
|
|
||
| console.log('[CONSUMER] Export worker started and listening for jobs...'); | ||
|
|
||
| const shutdown = async () => { | ||
| console.log('Shutting down worker...'); | ||
| await worker.close(); | ||
| process.exit(0); | ||
| }; | ||
|
|
||
| process.on('SIGINT', shutdown); | ||
| process.on('SIGTERM', shutdown); | ||
|
|
||
| } catch (err) { | ||
| console.error('Failed to start worker:', err); | ||
| process.exit(1); | ||
| } | ||
| })(); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| const { Worker } = require('bullmq'); | ||
| const { PassThrough } = require('stream'); | ||
| const { Upload } = require('@aws-sdk/lib-storage'); | ||
| const { GetObjectCommand } = require('@aws-sdk/client-s3'); | ||
| const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); | ||
|
|
||
| const { | ||
| redis, | ||
| exportQueue, | ||
| emailQueue, | ||
| Project, | ||
| getConnection, | ||
| getCompiledModel, | ||
| getS3CompatibleStorage, | ||
| getBucket | ||
| } = require('@urbackend/common'); | ||
|
|
||
| const initExportWorker = () => { | ||
| const worker = new Worker(exportQueue.name, async (job) => { | ||
| const { projectId, userId, email } = job.data; | ||
| console.log(`[ExportWorker] Starting export for project ${projectId} requested by ${email}`); | ||
|
|
||
| const project = await Project.findById(projectId); | ||
| if (!project) throw new Error('Project not found'); | ||
|
|
||
| const connection = await getConnection(projectId); | ||
|
|
||
| console.log(`[ExportWorker] Preparing streaming upload to storage...`); | ||
|
|
||
| const { s3Client } = await getS3CompatibleStorage(project); | ||
| const bucket = await getBucket(project); | ||
| const storagePath = `${projectId}/exports/db_export_${Date.now()}.json`; | ||
|
|
||
| const passThrough = new PassThrough(); | ||
|
|
||
| const upload = new Upload({ | ||
| client: s3Client, | ||
| params: { | ||
| Bucket: bucket, | ||
| Key: storagePath, | ||
| Body: passThrough, | ||
| ContentType: 'application/json' | ||
| } | ||
| }); | ||
|
|
||
| // Start the upload promise in parallel | ||
| const uploadPromise = upload.done(); | ||
|
|
||
| try { | ||
| passThrough.write('{\n'); | ||
|
|
||
| for (let i = 0; i < project.collections.length; i++) { | ||
| const col = project.collections[i]; | ||
| const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); | ||
|
|
||
| passThrough.write(` "${col.name}": [\n`); | ||
|
|
||
| // use a mongoose cursor to stream documents one by one | ||
| const cursor = Model.find().lean().cursor(); | ||
| let first = true; | ||
|
|
||
| for await (const doc of cursor) { | ||
| if (!first) passThrough.write(',\n'); | ||
| passThrough.write(` ${JSON.stringify(doc)}`); | ||
| first = false; | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| passThrough.write('\n ]'); | ||
| if (i < project.collections.length - 1) passThrough.write(',\n'); | ||
| } | ||
|
|
||
| passThrough.write('\n}\n'); | ||
| passThrough.end(); | ||
|
|
||
| console.log(`[ExportWorker] Database stream ended. Awaiting final storage upload chunks...`); | ||
| await uploadPromise; | ||
|
|
||
| // create a signed URL valid for 24 hrs (86400 seconds) | ||
| const command = new GetObjectCommand({ Bucket: bucket, Key: storagePath }); | ||
| const signedUrl = await getSignedUrl(s3Client, command, { expiresIn: 86400 }); | ||
|
|
||
| // queue the email to be sent to the user | ||
| await emailQueue.add('send-export-email', { email, downloadUrl: signedUrl, projectName: project.name }); | ||
| console.log(`[ExportWorker] Export completed! Email queued for ${email}`); | ||
|
|
||
| } catch (error) { | ||
| passThrough.destroy(error); | ||
| throw error; | ||
| } | ||
| }, { connection: redis, concurrency: 2 }); | ||
|
|
||
| worker.on('completed', (job) => { | ||
| console.log(`[ExportWorker] Job ${job.id} for project ${job.data.projectId} completed.`); | ||
| }); | ||
|
|
||
| worker.on('failed', (job, err) => { | ||
| console.error(`[ExportWorker] Job ${job?.id} for project ${job?.data?.projectId} failed:`, err.message); | ||
| }); | ||
|
|
||
| return worker; | ||
| }; | ||
|
|
||
| module.exports = { initExportWorker }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| const { AppError } = require('@urbackend/common'); | ||
| const { Developer } = require('@urbackend/common'); | ||
| const { Project } = require('@urbackend/common'); | ||
| const { exportQueue } = require('@urbackend/common'); | ||
| const { redis } = require('@urbackend/common'); | ||
| const { getProjectById, setProjectById } = require('@urbackend/common'); | ||
|
|
||
| module.exports.dbExportHandler = async (req, res, next) => { | ||
| try { | ||
| const { projectId } = req.params; | ||
| const { _id: userId } = req.user; | ||
|
|
||
| let project = await getProjectById(projectId); | ||
| if (!project) { | ||
| project = await Project.findById(projectId).lean(); | ||
| if (!project) { | ||
| return next(new AppError(404, "Project not found.")); | ||
| } | ||
| await setProjectById(projectId, project); | ||
| } | ||
|
|
||
| if (project.owner.toString() !== userId.toString()) { | ||
| return next(new AppError(403, "Access denied. You are not the owner of this project.")); | ||
| } | ||
|
|
||
|
|
||
| const developer = await Developer.findById(userId).select('email plan').lean(); | ||
| if (!developer) { | ||
| return next(new AppError(404, "Authenticated developer not found.")); | ||
| } | ||
| const { email, plan = 'free' } = developer; | ||
|
|
||
| console.log(`[Dashboard API] Received export request for project ${projectId} from user ${userId} (${email})`); | ||
|
|
||
|
|
||
| const maxExports = plan === 'pro' ? 5 : 1; | ||
| const today = new Date().toISOString().split('T')[0]; | ||
| const key = `project:${projectId}:export_limit:${today}`; | ||
|
|
||
| const currentCount = await redis.get(key); | ||
| if (currentCount && Number(currentCount) >= maxExports) { | ||
| return next(new AppError(429, `Daily export limit reached (${maxExports}/${maxExports}). Please try again tomorrow.`)); | ||
| } | ||
|
|
||
| const newCount = await redis.incr(key); | ||
| if (newCount === 1) { | ||
| await redis.expire(key, 86400); // Set expiry to 24 hours | ||
| } | ||
|
|
||
| await exportQueue.add('export-database', { projectId, userId, email }); | ||
|
Comment on lines
+40
to
+50
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make rate-limit increment/check atomic.
Proposed change- const currentCount = await redis.get(key);
- if (currentCount && Number(currentCount) >= maxExports) {
- return next(new AppError(429, `Daily export limit reached (${maxExports}/${maxExports}). Please try again tomorrow.`));
- }
-
const newCount = await redis.incr(key);
if (newCount === 1) {
await redis.expire(key, 86400); // Set expiry to 24 hours
}
+ if (newCount > maxExports) {
+ return next(new AppError(429, `Daily export limit reached (${maxExports}/${maxExports}). Please try again tomorrow.`));
+ }π€ Prompt for AI Agents |
||
|
|
||
| return res.status(202).json({ | ||
| message: `Database export request received. You will receive an email with a download link shortly. Usage today: ${newCount}/${maxExports}.`, | ||
| }); | ||
|
Comment on lines
+52
to
+54
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return the standardized API response shape. This controller should return Proposed change- return res.status(202).json({
- message: `Database export request received. You will receive an email with a download link shortly. Usage today: ${newCount}/${maxExports}.`,
- });
+ return res.status(202).json({
+ success: true,
+ data: {
+ usage: {
+ used: newCount,
+ limit: maxExports,
+ remaining: Math.max(0, maxExports - newCount),
+ },
+ },
+ message: `Database export request received. You will receive an email with a download link shortly.`,
+ });π€ Prompt for AI Agents |
||
|
|
||
| } catch (err) { | ||
| console.error("[Dashboard API] Error handling export request for project - ", req.params.projectId, ": ", err); | ||
| return next(new AppError(500, err.message || "Failed to initiate database export.")); | ||
| } | ||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run the consumer container as a non-root user.
The image currently runs as root, which is an avoidable security risk.
Proposed change
π Committable suggestion
π§° Tools
πͺ Trivy (0.69.3)
[error] 1-1: Image user should not be 'root'
Specify at least 1 USER command in Dockerfile with non-root user as argument
Rule: DS-0002
Learn more
(IaC/Dockerfile)
π€ Prompt for AI Agents