Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,35 @@ See the [Open Plugins specification](https://open-plugins.com/plugin-builders/sp
- **Framework**: [Next.js](https://nextjs.org) (App Router, Turbopack)
- **Runtime**: [Bun](https://bun.sh)
- **Database**: [Supabase](https://supabase.com) (PostgreSQL)
- **Job Queue**: [Supabase Queues](https://supabase.com/docs/guides/queues) (`pgmq`) drained by a 1-min Vercel cron
- **Styling**: [Tailwind CSS](https://tailwindcss.com)
- **UI**: [Radix UI](https://radix-ui.com) + [shadcn/ui](https://ui.shadcn.com)
- **Search**: [Fuse.js](https://fusejs.io) (client-side fuzzy search)
- **URL State**: [nuqs](https://nuqs.47ng.com)
- **Linting**: [Biome](https://biomejs.dev)

## Plugin security scan

Submitted plugins are auto-reviewed by a Cursor SDK agent (`composer-2`) running
Comment thread
pontusab marked this conversation as resolved.
in `local` mode against a fresh clone of the plugin's repo plus its inline
component content. The verdict (`safe` / `suspicious` / `malicious`) is written
back to `plugins.scan_status` and surfaces in the admin queue.

The scan is asynchronous and runs out of the request lifecycle:

1. **Enqueue** — server actions and the recover-stuck-scans cron call
`enqueuePluginScan(pluginId)` which sends a message to the `plugin_scans`
pgmq queue.
2. **Kick** — user-facing actions also fire `kickDrainAfterResponse()` so the
drain route is called via `next/server` `after()` immediately after the
response is flushed. Scans typically start within a few hundred ms.
3. **Drain** — `/api/queue/plugin-scans/drain` reads one message
(`vt=900s`, `n=1`), runs `runPluginScan(pluginId)`, archives the message on
success, leaves it for VT-expiry on retryable error, or buries it after
`MAX_ATTEMPTS=5` deliveries.
4. **Cron safety net** — Vercel cron hits the same drain route every minute so
any messages that missed their kick still get processed.

The drain route uses `maxDuration = 800` (Vercel Pro+ Fluid Compute ceiling) and
relies on `CURSOR_API_KEY` + `CRON_SECRET` from the env file.

5 changes: 3 additions & 2 deletions apps/cursor/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ NEXT_PUBLIC_ADMIN_USER_IDS=
# local dev; otherwise checkRateLimit is a no-op outside production.
NEXT_PUBLIC_VERCEL_FIREWALL_HOST_FOR_DEVELOPMENT=

# Cursor SDK — required for the plugin security scan workflow
# (src/workflows/scan-plugin.ts). Mint a key at
# Cursor SDK — required for the plugin security scan worker
# (src/lib/plugins/scan.ts, drained from a Supabase Queue by
# /api/queue/plugin-scans/drain). Mint a key at
# https://cursor.com/dashboard/cloud-agents or use a team service-account key.
CURSOR_API_KEY=

Expand Down
3 changes: 1 addition & 2 deletions apps/cursor/next.config.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
import { withWorkflow } from "workflow/next";

const __dirname = dirname(fileURLToPath(import.meta.url));

Expand Down Expand Up @@ -115,4 +114,4 @@ const nextConfig = {
},
};

export default withWorkflow(nextConfig);
export default nextConfig;
1 change: 0 additions & 1 deletion apps/cursor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"slugify": "^1.6.9",
"sonner": "^2.0.7",
"tailwind-merge": "^3.5.0",
"workflow": "^4.2.4",
"zod": "^4.4.3"
},
"devDependencies": {
Expand Down
6 changes: 3 additions & 3 deletions apps/cursor/src/actions/review-flagged-plugin.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"use server";

import { revalidatePath } from "next/cache";
import { start } from "workflow/api";
import { z } from "zod";
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
import { createClient } from "@/utils/supabase/admin-client";
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
import { ActionError, adminActionClient } from "./safe-action";

const pluginIdSchema = z.object({ pluginId: z.string().uuid() });
Expand Down Expand Up @@ -89,7 +88,8 @@ export const rescanPluginAction = adminActionClient
}

try {
await start(scanPluginWorkflow, [pluginId]);
await enqueuePluginScan(pluginId);
kickDrainAfterResponse();
} catch (err) {
throw new ActionError(
`Failed to enqueue scan: ${err instanceof Error ? err.message : String(err)}`,
Expand Down
10 changes: 5 additions & 5 deletions apps/cursor/src/actions/update-plugin.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"use server";

import { revalidatePath } from "next/cache";
import { start } from "workflow/api";
import { z } from "zod";
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
import { pluginScanLimit } from "@/lib/rate-limit";
import { createClient } from "@/utils/supabase/admin-client";
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
import { ActionError, authActionClient } from "./safe-action";

const componentSchema = z.object({
Expand Down Expand Up @@ -148,9 +147,10 @@ export const updatePluginAction = authActionClient
}

try {
await start(scanPluginWorkflow, [id]);
} catch (workflowError) {
console.error("Failed to enqueue scan workflow", workflowError);
await enqueuePluginScan(id);
kickDrainAfterResponse();
} catch (queueError) {
console.error("Failed to enqueue plugin scan", queueError);
}

revalidatePath("/");
Expand Down
18 changes: 11 additions & 7 deletions apps/cursor/src/app/api/cron/recover-stuck-scans/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { type NextRequest, NextResponse } from "next/server";
import { start } from "workflow/api";
import { requireCronAuth } from "@/lib/cron-auth";
import { enqueuePluginScan } from "@/lib/plugins/queue";
import { createClient } from "@/utils/supabase/admin-client";
import { scanPluginWorkflow } from "@/workflows/scan-plugin";

export const dynamic = "force-dynamic";
export const maxDuration = 60;
Expand All @@ -12,8 +11,8 @@ export const maxDuration = 60;
// and this cron see the same set of rows.
const STALE_AFTER_MS = 15 * 60 * 1000;

// Hard cap per cron tick. Each start() is cheap (it just enqueues the workflow)
// but we don't want to flood the workflow service if a backlog builds up.
// Hard cap per cron tick. Each enqueuePluginScan() is cheap (just a pgmq.send)
// but we don't want to flood the queue if a backlog builds up.
const MAX_RETRIES_PER_RUN = 25;

export async function GET(request: NextRequest) {
Expand All @@ -36,10 +35,15 @@ export async function GET(request: NextRequest) {
throw new Error(`Failed to query stuck plugins: ${error.message}`);
}

const results: Array<{ id: string; slug: string; ok: boolean; error?: string }> = [];
const results: Array<{
id: string;
slug: string;
ok: boolean;
error?: string;
}> = [];

for (const plugin of stuck ?? []) {
// Reset back to pending so the workflow's loadPlugin → markScanning
// Reset back to pending so the worker's loadPlugin → markScanning
// transition is idempotent (the prevActive snapshot is recomputed).
if (plugin.scan_status === "scanning") {
await supabase
Expand All @@ -49,7 +53,7 @@ export async function GET(request: NextRequest) {
}

try {
await start(scanPluginWorkflow, [plugin.id]);
await enqueuePluginScan(plugin.id);
results.push({ id: plugin.id, slug: plugin.slug, ok: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
Expand Down
147 changes: 147 additions & 0 deletions apps/cursor/src/app/api/queue/plugin-scans/drain/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { type NextRequest, NextResponse } from "next/server";
import { requireCronAuth } from "@/lib/cron-auth";
import {
archivePluginScan,
PLUGIN_SCAN_QUEUE,
readNextPluginScan,
} from "@/lib/plugins/queue";
import {
FatalScanError,
markScanFailed,
runPluginScan,
} from "@/lib/plugins/scan";

// Vercel max for Pro / Enterprise + Fluid Compute (default since 2025) is 800s.
// Source: https://vercel.com/docs/functions/configuring-functions/duration
//
// The `Agent.prompt` step can take 1–3 minutes for a typical plugin; the git
// clone is bounded by CLONE_TIMEOUT_MS (60s) inside scan.ts. 800s gives us
// generous headroom for the worst-case agent run.
export const dynamic = "force-dynamic";
export const maxDuration = 800;

// Visibility timeout: how long the message is invisible to other consumers
// after a successful `read`. Set comfortably longer than `maxDuration` so we
// can never hand the same message to a second drain invocation while the
// first one is still running.
const VT_SECONDS = 900;

// Bury after this many delivery attempts. With per-cron `n=1` and a 1-min
// schedule, this means a poisonous message stays in the queue for ~5 min
// after `read_ct=1` (we only see read_ct on the next read after the VT
// expires) before we mark the plugin errored and stop retrying.
const MAX_ATTEMPTS = 5;

function logInfo(msg: string, meta?: Record<string, unknown>) {
console.log(`[scan-drain] ${msg}${meta ? ` ${JSON.stringify(meta)}` : ""}`);
}

function logError(msg: string, err: unknown) {
const detail =
err instanceof Error
? { name: err.name, message: err.message, stack: err.stack }
: { value: String(err) };
console.error(`[scan-drain] ${msg}`, detail);
}

export async function GET(request: NextRequest) {
const unauthorized = requireCronAuth(request);
if (unauthorized) return unauthorized;

let msg: Awaited<ReturnType<typeof readNextPluginScan>>;
try {
msg = await readNextPluginScan(VT_SECONDS);
} catch (err) {
logError("readNextPluginScan failed", err);
return NextResponse.json(
{ ok: false, error: "queue_read_failed" },
{ status: 500 },
);
}

if (!msg) {
return NextResponse.json({
ok: true,
queue: PLUGIN_SCAN_QUEUE,
drained: 0,
});
}

const { msg_id, read_ct, message } = msg;
const pluginId = message.plugin_id;

if (!pluginId || typeof pluginId !== "string") {
// Malformed payload — archive it so it doesn't keep getting retried.
logError(
"malformed message; archiving",
new Error(JSON.stringify(message)),
);
await archivePluginScan(msg_id).catch((err) =>
logError("archive (malformed) failed", err),
);
return NextResponse.json(
{ ok: false, archived: msg_id, reason: "malformed_message" },
{ status: 200 },
);
}

if (read_ct > MAX_ATTEMPTS) {
logInfo("exceeded MAX_ATTEMPTS; burying", {
pluginId,
msg_id,
read_ct,
max: MAX_ATTEMPTS,
});
await markScanFailed(pluginId, `Exceeded ${MAX_ATTEMPTS} scan attempts`);
await archivePluginScan(msg_id);
Comment thread
pontusab marked this conversation as resolved.
return NextResponse.json({
ok: true,
buried: pluginId,
msg_id,
read_ct,
});
}

logInfo("processing", { pluginId, msg_id, read_ct });

try {
await runPluginScan(pluginId);
await archivePluginScan(msg_id);
logInfo("scanned ok", { pluginId, msg_id });
return NextResponse.json({ ok: true, scanned: pluginId, msg_id });
} catch (err) {
if (err instanceof FatalScanError) {
// runPluginScan already wrote `scan_status='error'` via its compensation
// path. Archive so the message doesn't get retried.
logError("fatal; archiving", err);
await archivePluginScan(msg_id).catch((archiveErr) =>
logError("archive (fatal) failed", archiveErr),
);
return NextResponse.json(
{
ok: false,
fatal: true,
pluginId,
msg_id,
error: err.message,
},
{ status: 200 },
);
}

// Retryable: do NOT archive. The pgmq visibility timeout (VT_SECONDS)
// expires and the next cron tick re-reads the message with read_ct + 1.
logError("retryable; leaving message for VT to expire", err);
return NextResponse.json(
{
ok: false,
retryable: true,
pluginId,
msg_id,
read_ct,
error: err instanceof Error ? err.message : String(err),
},
{ status: 500 },
);
}
}
2 changes: 1 addition & 1 deletion apps/cursor/src/lib/github-plugin/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Parse a public GitHub repo into a Cursor plugin shape.
*
* Pure module — no `"use server"`, no auth context, no DB calls. Safe to call
* from server actions, workflows, or one-shot scripts.
* from server actions, queue workers, or one-shot scripts.
*
* Optional `GITHUB_TOKEN` env var bumps the rate limit on the Repos / git tree
* endpoints from 60 req/h (unauth) to 5,000 req/h (auth).
Expand Down
17 changes: 10 additions & 7 deletions apps/cursor/src/lib/plugins/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
* Wrapped by [`createPluginAction`](src/actions/create-plugin.ts) for the auth'd
* user-submission path and by the seed scripts for bulk import. Server actions
* stay responsible for auth, rate-limiting, and `revalidatePath` — this lib
* only touches the database and (optionally) the scan workflow.
* only touches the database and (optionally) enqueues the security scan.
*/

import { start } from "workflow/api";
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
import { createClient } from "@/utils/supabase/admin-client";
import { scanPluginWorkflow } from "@/workflows/scan-plugin";

type ComponentInput = {
type: string;
Expand Down Expand Up @@ -45,7 +44,7 @@ export type InsertPluginOptions = {
* and skip the security scan. Used for the curated bulk seed.
*
* When false (default for user submissions): insert as `active=false,
* scan_status='pending'` and enqueue `scanPluginWorkflow`.
* scan_status='pending'` and enqueue the scan via `enqueuePluginScan`.
*/
skipScan?: boolean;
};
Expand Down Expand Up @@ -156,9 +155,13 @@ export async function insertPlugin(

if (!skipScan) {
try {
await start(scanPluginWorkflow, [plugin.id]);
} catch (workflowError) {
console.error("Failed to enqueue scan workflow", workflowError);
await enqueuePluginScan(plugin.id);
kickDrainAfterResponse();
} catch (queueError) {
// Don't fail the insert if the queue is unreachable — the
// recover-stuck-scans cron will re-enqueue any rows left at
// scan_status='pending' after 15 min.
console.error("Failed to enqueue plugin scan", queueError);
}
}

Expand Down
Loading