diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ee5718..5e68251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Streaming extraction with bounded memory: PostgreSQL/MySQL use server-side cursors + (`pg-query-stream` / mysql2 row streams) and Stripe uses lazy pagination. The pipeline now runs + extract โ†’ hash โ†’ upload one batch at a time via a `SyncSession`, so the full audience is never + materialized in memory. - Google Customer Match now sends a plain-text `countryCode` in `addressInfo`, so country participates in address matching (while Meta continues to receive the hashed country column). - Accurate Google partial-failure accounting: `recordsAccepted` / `recordsRejected` are now derived diff --git a/README.md b/README.md index 71e1fff..ed870da 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,10 @@ because there is no CSV. - **๐Ÿ”’ In-memory, file-free by design.** No CSV, no temp file, no staging table. PII exists only as transient values in RAM and leaves the process exclusively as **SHA-256 digests**. +- **๐ŸŒŠ Streaming, bounded memory.** Sources are read through server-side cursors (Postgres/MySQL) and + lazy pagination (Stripe), then processed extract โ†’ hash โ†’ upload one batch at a time. The full + audience is **never materialized at once**, so peak memory stays flat whether you sync 10 K or + 10 M records. - **๐Ÿงผ Platform-perfect normalization.** Emails and phone numbers are cleaned to the exact spec Meta and Google publish (trim โ†’ lowercase โ†’ E.164 country code) so your match rate is maximized. - **๐Ÿ“ฆ Smart batching + retries.** Records are chunked to each platform's limit and uploaded diff --git a/package-lock.json b/package-lock.json index db9bfe8..6dfe58c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "mysql2": "^3.11.3", "node-cron": "^3.0.3", "pg": "^8.13.0", + "pg-query-stream": "^4.16.0", "stripe": "^16.12.0" }, "bin": { @@ -3252,6 +3253,15 @@ "integrity": "sha512-XwWDGcLRGCXAR8F/AM5bG7Q+A3Wm2s6QeEjlOKZLlH3UYcguiqCWKyWXVag5TLTIjR7oOJUY8kcADaZgWPyLeg==", "license": "MIT" }, + "node_modules/pg-cursor": { + "version": "2.21.0", + "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.21.0.tgz", + "integrity": "sha512-IYvk/j+Suhtbo/C3uOf4JLsLK/gWxOTUOmYbDsbKnLaVJDq+KwhwK6ngpRfiCk8eDMS3AmGQABZCv0cREEzHQw==", + "license": "MIT", + "peerDependencies": { + "pg": "^8" + } + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", @@ -3276,6 +3286,18 @@ "integrity": "sha512-cq9sECI5s0+uPUXjbz8ioyPJni6RzsRib0US67i5IoTZKw8fNeYlVE7u8F4dG7vEJJtc5wdD1K189lCCUwqWTQ==", "license": "MIT" }, + "node_modules/pg-query-stream": { + "version": "4.16.0", + "resolved": "https://registry.npmjs.org/pg-query-stream/-/pg-query-stream-4.16.0.tgz", + "integrity": "sha512-vyqxAG4YVax43BCSfqcKxHSbh8YQshhVR0CjLNdo7MIM2UOqI+XsIYk0bVZJ0aKjQfjJQcGiGo9xK3hz0JcFyg==", + "license": "MIT", + "dependencies": { + "pg-cursor": "^2.21.0" + }, + "peerDependencies": { + "pg": "^8" + } + }, "node_modules/pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", diff --git a/package.json b/package.json index ae1f080..7a7e0e8 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "mysql2": "^3.11.3", "node-cron": "^3.0.3", "pg": "^8.13.0", + "pg-query-stream": "^4.16.0", "stripe": "^16.12.0" }, "devDependencies": { diff --git a/src/extractor.ts b/src/extractor.ts index b11e540..d6b3658 100644 --- a/src/extractor.ts +++ b/src/extractor.ts @@ -14,6 +14,7 @@ import mysql from 'mysql2/promise'; import { Pool as PgPool } from 'pg'; +import QueryStream from 'pg-query-stream'; import Stripe from 'stripe'; import type { @@ -59,10 +60,10 @@ function rowToRawCustomer(row: Record): RawCustomer { * PostgreSQL extractor. * ============================================================================================== */ -async function extractFromPostgres( +async function* streamFromPostgres( config: PostgresSourceConfig, window: SyncWindow, -): Promise { +): AsyncGenerator { const pool = new PgPool({ connectionString: config.connectionString, ssl: config.ssl ? { rejectUnauthorized: true } : false, @@ -73,14 +74,22 @@ async function extractFromPostgres( statement_timeout: 60_000, }); + // A server-side cursor (pg-query-stream) fetches rows in pages of `batchSize` rather than + // buffering the whole result set, keeping peak memory bounded regardless of row count. + const client = await pool.connect(); try { // `$1` = window.since (inclusive), `$2` = window.until (exclusive). - const result = await pool.query>(config.query, [ - window.since.toISOString(), - window.until.toISOString(), - ]); - return result.rows.map(rowToRawCustomer); + const query = new QueryStream( + config.query, + [window.since.toISOString(), window.until.toISOString()], + { batchSize: 1_000 }, + ); + const stream = client.query(query); + for await (const row of stream as AsyncIterable>) { + yield rowToRawCustomer(row); + } } finally { + client.release(); await pool.end(); } } @@ -89,10 +98,20 @@ async function extractFromPostgres( * MySQL extractor. * ============================================================================================== */ -async function extractFromMysql( +/** Minimal shape of mysql2's callback query object, just enough to obtain a row stream. */ +interface StreamableQuery { + stream(options?: Record): AsyncIterable>; +} + +/** Minimal shape of mysql2's core (callback) connection used for row streaming. */ +interface StreamableConnection { + query(sql: string, values: unknown[]): StreamableQuery; +} + +async function* streamFromMysql( config: MysqlSourceConfig, window: SyncWindow, -): Promise { +): AsyncGenerator { const pool = mysql.createPool({ uri: config.connectionString, connectionLimit: 4, @@ -105,18 +124,23 @@ async function extractFromMysql( ...(config.ssl ? { ssl: { rejectUnauthorized: true } } : {}), }); + const conn = await pool.getConnection(); try { // Two positional `?` placeholders: lower bound (inclusive), upper bound (exclusive). - const [rows] = await pool.query(config.query, [ + const params = [ window.since.toISOString().slice(0, 19).replace('T', ' '), window.until.toISOString().slice(0, 19).replace('T', ' '), - ]); - - if (!Array.isArray(rows)) { - return []; + ]; + // mysql2's row-streaming API lives on the underlying (callback) connection, which the promise + // typings don't surface cleanly. Cast to a minimal streamable shape: `.query(...).stream()` + // returns a Node Readable that emits one row at a time without buffering the whole result set. + const core = conn.connection as unknown as StreamableConnection; + const stream = core.query(config.query, params).stream(); + for await (const row of stream) { + yield rowToRawCustomer(row); } - return (rows as Record[]).map(rowToRawCustomer); } finally { + conn.release(); await pool.end(); } } @@ -186,10 +210,10 @@ function stripeCustomerToRaw(cust: Stripe.Customer, fallbackEmail: string | null }; } -async function extractFromStripe( +async function* streamFromStripe( config: StripeSourceConfig, window: SyncWindow, -): Promise { +): AsyncGenerator { const stripe = new Stripe(config.apiKey, { apiVersion: '2024-06-20', maxNetworkRetries: 3, @@ -200,12 +224,24 @@ async function extractFromStripe( const sinceUnix = Math.floor(window.since.getTime() / 1000); const untilUnix = Math.floor(window.until.getTime() / 1000); - const out: RawCustomer[] = []; - // Dedup by stable customer id so repeat purchasers are only synced once per run. + // Dedup by stable customer id so repeat purchasers are only synced once per run. This Set is the + // single piece of unbounded state; it holds short id strings, not full records, so it stays small + // relative to the row data even for large windows. const seen = new Set(); + const isNew = (raw: RawCustomer): boolean => { + const key = String(raw.id ?? raw.email ?? ''); + if (key.length === 0) { + return true; + } + if (seen.has(key)) { + return false; + } + seen.add(key); + return true; + }; if (config.mode === 'customers') { - // Customers created within the window. + // Customers created within the window. The Stripe SDK auto-paginates lazily. for await (const customer of stripe.customers.list({ created: { gte: sinceUnix, lt: untilUnix }, limit: 100, @@ -214,16 +250,11 @@ async function extractFromStripe( continue; } const raw = stripeCustomerToRaw(customer, null); - const key = String(raw.id ?? raw.email ?? ''); - if (key.length > 0 && seen.has(key)) { - continue; - } - if (key.length > 0) { - seen.add(key); + if (isNew(raw)) { + yield raw; } - out.push(raw); } - return out; + return; } // mode === 'charges' (default): customers who were successfully charged within the window. @@ -240,17 +271,10 @@ async function extractFromStripe( if (raw === null) { continue; } - const key = String(raw.id ?? raw.email ?? ''); - if (key.length > 0 && seen.has(key)) { - continue; - } - if (key.length > 0) { - seen.add(key); + if (isNew(raw)) { + yield raw; } - out.push(raw); } - - return out; } /* ============================================================================================== * @@ -258,20 +282,21 @@ async function extractFromStripe( * ============================================================================================== */ /** - * Extract raw customers for the given window from whichever source is configured. The result is an - * in-memory array; nothing is persisted to disk at any point. + * Stream raw customers for the given window from whichever source is configured, one record at a + * time. SQL sources use server-side cursors / row streaming and Stripe uses lazy pagination, so the + * caller can process arbitrarily large audiences with bounded memory. Nothing touches disk. */ -export async function extractCustomers( +export function streamCustomers( source: SourceConfig, window: SyncWindow, -): Promise { +): AsyncGenerator { switch (source.kind) { case 'postgres': - return extractFromPostgres(source, window); + return streamFromPostgres(source, window); case 'mysql': - return extractFromMysql(source, window); + return streamFromMysql(source, window); case 'stripe': - return extractFromStripe(source, window); + return streamFromStripe(source, window); default: { // Exhaustiveness guard โ€” a new SourceKind without a branch is a compile error. const _never: never = source; @@ -279,3 +304,40 @@ export async function extractCustomers( } } } + +/** + * Group an async iterable into arrays of at most `size` items. The final group may be smaller. This + * lets a streaming source be processed in bounded-memory batches without ever materializing the + * whole stream. + */ +export async function* batchAsync(source: AsyncIterable, size: number): AsyncGenerator { + if (size <= 0) { + throw new Error(`batch size must be > 0, received ${size}`); + } + let buffer: T[] = []; + for await (const item of source) { + buffer.push(item); + if (buffer.length >= size) { + yield buffer; + buffer = []; + } + } + if (buffer.length > 0) { + yield buffer; + } +} + +/** + * Convenience wrapper that fully drains {@link streamCustomers} into an array. Prefer the streaming + * API for large audiences; this exists for callers and tests that want the whole set at once. + */ +export async function extractCustomers( + source: SourceConfig, + window: SyncWindow, +): Promise { + const out: RawCustomer[] = []; + for await (const raw of streamCustomers(source, window)) { + out.push(raw); + } + return out; +} diff --git a/src/index.ts b/src/index.ts index ed38c34..607f2ef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,9 +19,9 @@ import { Command, Option } from 'commander'; import 'dotenv/config'; import cron from 'node-cron'; -import { extractCustomers } from './extractor.js'; +import { batchAsync, streamCustomers } from './extractor.js'; import { hashCustomers } from './normalizer.js'; -import { syncToDestinations } from './sync.js'; +import { createSyncSession } from './sync.js'; import type { AppConfig, SourceConfig, SyncRunResult, SyncWindow } from './types.js'; /* ============================================================================================== * @@ -242,47 +242,45 @@ async function runSync(config: AppConfig, window: SyncWindow): Promise 0) { + await session.send(hashedBatch); + } } + info( + `Extracted ${extractedCount} record(s); hashed ${hashedCount} ` + + `(${extractedCount - hashedCount} dropped as unmatchable)`, + ); + if (hashedCount === 0) { + warn('No hashable records โ€” nothing was uploaded.'); + } + + const results = await session.finalize(); + const finishedAt = new Date(); const result: SyncRunResult = { startedAt, finishedAt, window, - extractedCount: raws.length, - hashedCount: hashed.length, + extractedCount, + hashedCount, results, dryRun: config.dryRun, }; diff --git a/src/sync.ts b/src/sync.ts index a59bfa3..c55f8cb 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -179,80 +179,52 @@ function toMetaRow(customer: HashedCustomer): string[] { ]; } -async function uploadToMeta( +/** Outcome of uploading a single batch: how many records the platform accepted vs. rejected. */ +interface BatchOutcome { + readonly accepted: number; + readonly rejected: number; +} + +/** + * Upload one Meta batch (already sized to <= config.batchSize) to the Custom Audience. Retries + * transient failures and reports the accepted/rejected split from Meta's response. + */ +async function sendMetaBatch( http: AxiosInstance, config: MetaDestinationConfig, - customers: readonly HashedCustomer[], + batch: readonly HashedCustomer[], + label: string, app: AppConfig, log: (msg: string) => void, -): Promise { - if (!config.enabled) { - return { - platform: 'meta', - enabled: false, - batchesSent: 0, - recordsAccepted: 0, - recordsRejected: 0, - skipped: true, - }; - } - +): Promise { const url = `https://graph.facebook.com/${config.apiVersion}/${config.audienceId}/users`; - const batches = chunk(customers, config.batchSize); - - let batchesSent = 0; - let accepted = 0; - let rejected = 0; - - for (let i = 0; i < batches.length; i += 1) { - const batch = batches[i]!; - const body: MetaUsersRequest = { - payload: { - schema: META_SCHEMA, - data: batch.map(toMetaRow), - }, - }; - - if (app.dryRun) { - log(`[meta] (dry-run) would POST batch ${i + 1}/${batches.length} (${batch.length} users)`); - batchesSent += 1; - accepted += batch.length; - continue; - } - - const response = await withRetry( - async () => { - const res = await http.post(url, body, { - params: { access_token: config.accessToken }, - }); - return res.data; - }, - app.maxRetries, - app.retryBaseDelayMs, - (attempt, delay, error) => { - log( - `[meta] batch ${i + 1}/${batches.length} attempt ${attempt} failed, ` + - `retrying in ${delay}ms โ€” ${describeError(error)}`, - ); - }, - ); + const body: MetaUsersRequest = { + payload: { + schema: META_SCHEMA, + data: batch.map(toMetaRow), + }, + }; - const received = response.num_received ?? batch.length; - const invalid = response.num_invalid_entries ?? 0; - accepted += Math.max(0, received - invalid); - rejected += invalid; - batchesSent += 1; - log(`[meta] batch ${i + 1}/${batches.length} ok โ€” received=${received} invalid=${invalid}`); - } + const response = await withRetry( + async () => { + const res = await http.post(url, body, { + params: { access_token: config.accessToken }, + }); + return res.data; + }, + app.maxRetries, + app.retryBaseDelayMs, + (attempt, delay, error) => { + log( + `[meta] ${label} attempt ${attempt} failed, retrying in ${delay}ms โ€” ${describeError(error)}`, + ); + }, + ); - return { - platform: 'meta', - enabled: true, - batchesSent, - recordsAccepted: accepted, - recordsRejected: rejected, - skipped: false, - }; + const received = response.num_received ?? batch.length; + const invalid = response.num_invalid_entries ?? 0; + log(`[meta] ${label} ok โ€” received=${received} invalid=${invalid}`); + return { accepted: Math.max(0, received - invalid), rejected: invalid }; } /* ============================================================================================== * @@ -432,101 +404,66 @@ async function resolveGoogleAccessToken( return token; } -async function uploadToGoogle( +/** + * Add one Google batch (already sized to <= config.batchSize) of operations to an existing offline + * user data job. Retries transient failures and reports the accepted/rejected split parsed from any + * partial-failure error. + */ +async function addGoogleOperations( http: AxiosInstance, config: GoogleDestinationConfig, - customers: readonly HashedCustomer[], + accessToken: string, + jobResourceName: string, + batch: readonly HashedCustomer[], + label: string, app: AppConfig, log: (msg: string) => void, -): Promise { - if (!config.enabled) { - return { - platform: 'google', - enabled: false, - batchesSent: 0, - recordsAccepted: 0, - recordsRejected: 0, - skipped: true, - }; - } - - const batches = chunk(customers, config.batchSize); - - let batchesSent = 0; - let accepted = 0; - // Incremented per batch from parsed partial-failure details (see countFailedOperations). - let rejected = 0; - - // Dry-run: validate batching + payload shape without creating a job or hitting the network. - if (app.dryRun) { - for (let i = 0; i < batches.length; i += 1) { - const batch = batches[i]!; - log(`[google] (dry-run) would add batch ${i + 1}/${batches.length} (${batch.length} users)`); - batchesSent += 1; - accepted += batch.length; - } - return { - platform: 'google', - enabled: true, - batchesSent, - recordsAccepted: accepted, - recordsRejected: rejected, - skipped: false, - }; - } - - // 0) Resolve a fresh access token (refresh_token exchange when configured). - const accessToken = await resolveGoogleAccessToken(http, config, app, log); - - // 1) Create the job. - const jobResourceName = await createGoogleJob(http, config, accessToken, app, log); - - // 2) Add operations in batches. +): Promise { const addUrl = `https://googleads.googleapis.com/${config.apiVersion}/${jobResourceName}:addOperations`; + const body: GoogleAddOperationsRequest = { + enablePartialFailure: true, + operations: batch.map((customer) => ({ + create: { userIdentifiers: toGoogleIdentifiers(customer) }, + })), + }; - for (let i = 0; i < batches.length; i += 1) { - const batch = batches[i]!; - const body: GoogleAddOperationsRequest = { - enablePartialFailure: true, - operations: batch.map((customer) => ({ - create: { userIdentifiers: toGoogleIdentifiers(customer) }, - })), - }; - - const data = await withRetry( - async () => { - const res = await http.post(addUrl, body, { - headers: googleHeaders(config, accessToken), - }); - return res.data; - }, - app.maxRetries, - app.retryBaseDelayMs, - (attempt, delay, error) => { - log( - `[google] add-ops batch ${i + 1}/${batches.length} attempt ${attempt} failed, ` + - `retrying in ${delay}ms โ€” ${describeError(error)}`, - ); - }, - ); - - if (data.partialFailureError !== undefined) { - // Partial failure: parse the detailed error to count exactly which operations were rejected. - const failed = countFailedOperations(data.partialFailureError, batch.length); - rejected += failed; - accepted += batch.length - failed; + const data = await withRetry( + async () => { + const res = await http.post(addUrl, body, { + headers: googleHeaders(config, accessToken), + }); + return res.data; + }, + app.maxRetries, + app.retryBaseDelayMs, + (attempt, delay, error) => { log( - `[google] add-ops batch ${i + 1}/${batches.length} partial failure โ€” ` + - `${failed}/${batch.length} rejected: ${data.partialFailureError.message ?? 'see Google Ads logs'}`, + `[google] ${label} attempt ${attempt} failed, retrying in ${delay}ms โ€” ${describeError(error)}`, ); - } else { - accepted += batch.length; - log(`[google] add-ops batch ${i + 1}/${batches.length} ok (${batch.length} users)`); - } - batchesSent += 1; + }, + ); + + if (data.partialFailureError !== undefined) { + const failed = countFailedOperations(data.partialFailureError, batch.length); + log( + `[google] ${label} partial failure โ€” ${failed}/${batch.length} rejected: ` + + `${data.partialFailureError.message ?? 'see Google Ads logs'}`, + ); + return { accepted: batch.length - failed, rejected: failed }; } + log(`[google] ${label} ok (${batch.length} users)`); + return { accepted: batch.length, rejected: 0 }; +} - // 3) Run the job to begin asynchronous server-side processing. +/** Run an offline user data job to begin asynchronous server-side processing. */ +async function runGoogleJob( + http: AxiosInstance, + config: GoogleDestinationConfig, + accessToken: string, + jobResourceName: string, + app: AppConfig, + log: (msg: string) => void, +): Promise { const runUrl = `https://googleads.googleapis.com/${config.apiVersion}/${jobResourceName}:run`; await withRetry( async () => { @@ -546,30 +483,41 @@ async function uploadToGoogle( }, ); log(`[google] job ${jobResourceName} submitted for processing`); - - return { - platform: 'google', - enabled: true, - batchesSent, - recordsAccepted: accepted, - recordsRejected: rejected, - skipped: false, - }; } /* ============================================================================================== * - * Public sync orchestrator. + * Public streaming sync session. * ============================================================================================== */ +/** Mutable per-platform accumulator used while streaming batches through a session. */ +interface PlatformState { + batchesSent: number; + accepted: number; + rejected: number; + failed: boolean; + error: string | undefined; +} + +function newPlatformState(): PlatformState { + return { batchesSent: 0, accepted: 0, rejected: 0, failed: false, error: undefined }; +} + /** - * Push hashed customers to every enabled destination. Meta and Google run concurrently; a failure - * in one is captured per-platform and does not abort the other. Returns one result per platform. + * A streaming upload session. Feed it hashed batches as they are produced from the source stream; + * each batch is fanned out to every enabled destination (further sub-chunked to each platform's own + * limit). Google's offline job is created lazily on the first batch and run on {@link finalize}. + * + * A failure in one platform is captured and stops further sends to that platform only โ€” the other + * keeps going, and the source stream is never aborted. */ -export async function syncToDestinations( - customers: readonly HashedCustomer[], - app: AppConfig, - log: (msg: string) => void, -): Promise { +export interface SyncSession { + /** Upload one hashed batch to all enabled destinations. */ + send(batch: readonly HashedCustomer[]): Promise; + /** Finish the run (run the Google job) and return one result per platform. */ + finalize(): Promise; +} + +export function createSyncSession(app: AppConfig, log: (msg: string) => void): SyncSession { const http = axios.create({ timeout: 30_000, // Validate ourselves so non-2xx responses surface as errors for the retry layer. @@ -577,30 +525,143 @@ export async function syncToDestinations( headers: { 'User-Agent': 'AudienceSync/1.0' }, }); - const tasks: Array> = [ - uploadToMeta(http, app.destinations.meta, customers, app, log).catch( - (error: unknown): PlatformSyncResult => ({ - platform: 'meta', - enabled: app.destinations.meta.enabled, - batchesSent: 0, - recordsAccepted: 0, - recordsRejected: 0, - skipped: false, - error: describeError(error), - }), - ), - uploadToGoogle(http, app.destinations.google, customers, app, log).catch( - (error: unknown): PlatformSyncResult => ({ - platform: 'google', - enabled: app.destinations.google.enabled, + const meta = app.destinations.meta; + const google = app.destinations.google; + const metaState = newPlatformState(); + const googleState = newPlatformState(); + + // Lazily-established Google job context (created on the first non-dry-run batch). + let googleAccessToken: string | undefined; + let googleJobResourceName: string | undefined; + + let metaBatchCounter = 0; + let googleBatchCounter = 0; + + async function ensureGoogleJob(): Promise { + if (googleJobResourceName !== undefined) { + return; + } + googleAccessToken = await resolveGoogleAccessToken(http, google, app, log); + googleJobResourceName = await createGoogleJob(http, google, googleAccessToken, app, log); + } + + async function sendToMeta(batch: readonly HashedCustomer[]): Promise { + if (!meta.enabled || metaState.failed) { + return; + } + try { + for (const sub of chunk(batch, meta.batchSize)) { + metaBatchCounter += 1; + const label = `batch #${metaBatchCounter}`; + if (app.dryRun) { + log(`[meta] (dry-run) would POST ${label} (${sub.length} users)`); + metaState.accepted += sub.length; + } else { + const outcome = await sendMetaBatch(http, meta, sub, label, app, log); + metaState.accepted += outcome.accepted; + metaState.rejected += outcome.rejected; + } + metaState.batchesSent += 1; + } + } catch (error: unknown) { + metaState.failed = true; + metaState.error = describeError(error); + log(`[meta] aborting after error โ€” ${metaState.error}`); + } + } + + async function sendToGoogle(batch: readonly HashedCustomer[]): Promise { + if (!google.enabled || googleState.failed) { + return; + } + try { + if (!app.dryRun) { + await ensureGoogleJob(); + } + for (const sub of chunk(batch, google.batchSize)) { + googleBatchCounter += 1; + const label = `batch #${googleBatchCounter}`; + if (app.dryRun) { + log(`[google] (dry-run) would add ${label} (${sub.length} users)`); + googleState.accepted += sub.length; + } else { + const outcome = await addGoogleOperations( + http, + google, + googleAccessToken!, + googleJobResourceName!, + sub, + label, + app, + log, + ); + googleState.accepted += outcome.accepted; + googleState.rejected += outcome.rejected; + } + googleState.batchesSent += 1; + } + } catch (error: unknown) { + googleState.failed = true; + googleState.error = describeError(error); + log(`[google] aborting after error โ€” ${googleState.error}`); + } + } + + function toResult( + platform: 'meta' | 'google', + enabled: boolean, + state: PlatformState, + ): PlatformSyncResult { + if (!enabled) { + return { + platform, + enabled: false, batchesSent: 0, recordsAccepted: 0, recordsRejected: 0, - skipped: false, - error: describeError(error), - }), - ), - ]; + skipped: true, + }; + } + const base: PlatformSyncResult = { + platform, + enabled: true, + batchesSent: state.batchesSent, + recordsAccepted: state.accepted, + recordsRejected: state.rejected, + skipped: false, + }; + return state.error !== undefined ? { ...base, error: state.error } : base; + } - return Promise.all(tasks); + return { + async send(batch: readonly HashedCustomer[]): Promise { + if (batch.length === 0) { + return; + } + // Fan out to both platforms concurrently; each captures its own errors. + await Promise.all([sendToMeta(batch), sendToGoogle(batch)]); + }, + + async finalize(): Promise { + // Run the Google job iff it was created (i.e. at least one real batch was added) and healthy. + if ( + google.enabled && + !googleState.failed && + !app.dryRun && + googleJobResourceName !== undefined && + googleAccessToken !== undefined + ) { + try { + await runGoogleJob(http, google, googleAccessToken, googleJobResourceName, app, log); + } catch (error: unknown) { + googleState.failed = true; + googleState.error = describeError(error); + } + } + return [ + toResult('meta', meta.enabled, metaState), + toResult('google', google.enabled, googleState), + ]; + }, + }; } diff --git a/test/extractor.test.ts b/test/extractor.test.ts new file mode 100644 index 0000000..e7609e5 --- /dev/null +++ b/test/extractor.test.ts @@ -0,0 +1,63 @@ +import { describe, expect, it } from 'vitest'; + +import { batchAsync } from '../src/extractor.js'; + +/** Build an async iterable yielding 0..n-1, recording how many items have been pulled so far. */ +function counting(n: number): { iterable: AsyncIterable; pulled: () => number } { + let pulled = 0; + async function* gen(): AsyncGenerator { + for (let i = 0; i < n; i += 1) { + pulled += 1; + yield i; + } + } + return { iterable: gen(), pulled: () => pulled }; +} + +async function collect(source: AsyncIterable): Promise { + const out: T[] = []; + for await (const item of source) { + out.push(item); + } + return out; +} + +describe('batchAsync', () => { + it('groups an async stream into fixed-size batches', async () => { + const { iterable } = counting(10); + const batches = await collect(batchAsync(iterable, 3)); + expect(batches).toEqual([[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]); + }); + + it('emits a single full batch when size matches the stream length', async () => { + const { iterable } = counting(4); + const batches = await collect(batchAsync(iterable, 4)); + expect(batches).toEqual([[0, 1, 2, 3]]); + }); + + it('yields nothing for an empty stream', async () => { + async function* empty(): AsyncGenerator { + // intentionally yields nothing + } + const batches = await collect(batchAsync(empty(), 5)); + expect(batches).toEqual([]); + }); + + it('rejects a non-positive batch size', async () => { + async function* one(): AsyncGenerator { + yield 1; + } + await expect(collect(batchAsync(one(), 0))).rejects.toThrow(/batch size must be > 0/); + }); + + it('is lazy: never buffers more than one batch worth of items ahead', async () => { + // Pull exactly one batch and stop. The source should not have been drained beyond what was + // needed to fill (and detect the end of) that first batch โ€” i.e. peak buffering is bounded. + const { iterable, pulled } = counting(100); + const gen = batchAsync(iterable, 10); + const first = await gen.next(); + expect(first.value).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + // Only the 10 items of the first batch have been pulled; the remaining 90 stay un-evaluated. + expect(pulled()).toBe(10); + }); +});