Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Streaming extraction with bounded memory: PostgreSQL/MySQL use server-side cursors
(`pg-query-stream` / mysql2 row streams) and Stripe uses lazy pagination. The pipeline now runs
extract → hash → upload one batch at a time via a `SyncSession`, so the full audience is never
materialized in memory.
- Google Customer Match now sends a plain-text `countryCode` in `addressInfo`, so country
participates in address matching (while Meta continues to receive the hashed country column).
- Accurate Google partial-failure accounting: `recordsAccepted` / `recordsRejected` are now derived
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ because there is no CSV.

- **🔒 In-memory, file-free by design.** No CSV, no temp file, no staging table. PII exists only as
transient values in RAM and leaves the process exclusively as **SHA-256 digests**.
- **🌊 Streaming, bounded memory.** Sources are read through server-side cursors (Postgres/MySQL) and
lazy pagination (Stripe), then processed extract → hash → upload one batch at a time. The full
audience is **never materialized at once**, so peak memory stays flat whether you sync 10 K or
10 M records.
- **🧼 Platform-perfect normalization.** Emails and phone numbers are cleaned to the exact spec Meta
and Google publish (trim → lowercase → E.164 country code) so your match rate is maximized.
- **📦 Smart batching + retries.** Records are chunked to each platform's limit and uploaded
Expand Down
22 changes: 22 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"mysql2": "^3.11.3",
"node-cron": "^3.0.3",
"pg": "^8.13.0",
"pg-query-stream": "^4.16.0",
"stripe": "^16.12.0"
},
"devDependencies": {
Expand Down
150 changes: 106 additions & 44 deletions src/extractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import mysql from 'mysql2/promise';
import { Pool as PgPool } from 'pg';
import QueryStream from 'pg-query-stream';
import Stripe from 'stripe';

import type {
Expand Down Expand Up @@ -59,10 +60,10 @@ function rowToRawCustomer(row: Record<string, unknown>): RawCustomer {
* PostgreSQL extractor.
* ============================================================================================== */

async function extractFromPostgres(
async function* streamFromPostgres(
config: PostgresSourceConfig,
window: SyncWindow,
): Promise<RawCustomer[]> {
): AsyncGenerator<RawCustomer> {
const pool = new PgPool({
connectionString: config.connectionString,
ssl: config.ssl ? { rejectUnauthorized: true } : false,
Expand All @@ -73,14 +74,22 @@ async function extractFromPostgres(
statement_timeout: 60_000,
});

// A server-side cursor (pg-query-stream) fetches rows in pages of `batchSize` rather than
// buffering the whole result set, keeping peak memory bounded regardless of row count.
const client = await pool.connect();
try {
// `$1` = window.since (inclusive), `$2` = window.until (exclusive).
const result = await pool.query<Record<string, unknown>>(config.query, [
window.since.toISOString(),
window.until.toISOString(),
]);
return result.rows.map(rowToRawCustomer);
const query = new QueryStream(
config.query,
[window.since.toISOString(), window.until.toISOString()],
{ batchSize: 1_000 },
);
const stream = client.query(query);
for await (const row of stream as AsyncIterable<Record<string, unknown>>) {
yield rowToRawCustomer(row);
}
} finally {
client.release();
await pool.end();
}
}
Expand All @@ -89,10 +98,20 @@ async function extractFromPostgres(
* MySQL extractor.
* ============================================================================================== */

async function extractFromMysql(
/** Minimal shape of mysql2's callback query object, just enough to obtain a row stream. */
interface StreamableQuery {
stream(options?: Record<string, unknown>): AsyncIterable<Record<string, unknown>>;
}

/** Minimal shape of mysql2's core (callback) connection used for row streaming. */
interface StreamableConnection {
query(sql: string, values: unknown[]): StreamableQuery;
}

async function* streamFromMysql(
config: MysqlSourceConfig,
window: SyncWindow,
): Promise<RawCustomer[]> {
): AsyncGenerator<RawCustomer> {
const pool = mysql.createPool({
uri: config.connectionString,
connectionLimit: 4,
Expand All @@ -105,18 +124,23 @@ async function extractFromMysql(
...(config.ssl ? { ssl: { rejectUnauthorized: true } } : {}),
});

const conn = await pool.getConnection();
try {
// Two positional `?` placeholders: lower bound (inclusive), upper bound (exclusive).
const [rows] = await pool.query(config.query, [
const params = [
window.since.toISOString().slice(0, 19).replace('T', ' '),
window.until.toISOString().slice(0, 19).replace('T', ' '),
]);

if (!Array.isArray(rows)) {
return [];
];
// mysql2's row-streaming API lives on the underlying (callback) connection, which the promise
// typings don't surface cleanly. Cast to a minimal streamable shape: `.query(...).stream()`
// returns a Node Readable that emits one row at a time without buffering the whole result set.
const core = conn.connection as unknown as StreamableConnection;
const stream = core.query(config.query, params).stream();
for await (const row of stream) {
yield rowToRawCustomer(row);
}
return (rows as Record<string, unknown>[]).map(rowToRawCustomer);
} finally {
conn.release();
await pool.end();
}
}
Expand Down Expand Up @@ -186,10 +210,10 @@ function stripeCustomerToRaw(cust: Stripe.Customer, fallbackEmail: string | null
};
}

async function extractFromStripe(
async function* streamFromStripe(
config: StripeSourceConfig,
window: SyncWindow,
): Promise<RawCustomer[]> {
): AsyncGenerator<RawCustomer> {
const stripe = new Stripe(config.apiKey, {
apiVersion: '2024-06-20',
maxNetworkRetries: 3,
Expand All @@ -200,12 +224,24 @@ async function extractFromStripe(
const sinceUnix = Math.floor(window.since.getTime() / 1000);
const untilUnix = Math.floor(window.until.getTime() / 1000);

const out: RawCustomer[] = [];
// Dedup by stable customer id so repeat purchasers are only synced once per run.
// Dedup by stable customer id so repeat purchasers are only synced once per run. This Set is the
// single piece of unbounded state; it holds short id strings, not full records, so it stays small
// relative to the row data even for large windows.
const seen = new Set<string>();
const isNew = (raw: RawCustomer): boolean => {
const key = String(raw.id ?? raw.email ?? '');
if (key.length === 0) {
return true;
}
if (seen.has(key)) {
return false;
}
seen.add(key);
return true;
};

if (config.mode === 'customers') {
// Customers created within the window.
// Customers created within the window. The Stripe SDK auto-paginates lazily.
for await (const customer of stripe.customers.list({
created: { gte: sinceUnix, lt: untilUnix },
limit: 100,
Expand All @@ -214,16 +250,11 @@ async function extractFromStripe(
continue;
}
const raw = stripeCustomerToRaw(customer, null);
const key = String(raw.id ?? raw.email ?? '');
if (key.length > 0 && seen.has(key)) {
continue;
}
if (key.length > 0) {
seen.add(key);
if (isNew(raw)) {
yield raw;
}
out.push(raw);
}
return out;
return;
}

// mode === 'charges' (default): customers who were successfully charged within the window.
Expand All @@ -240,42 +271,73 @@ async function extractFromStripe(
if (raw === null) {
continue;
}
const key = String(raw.id ?? raw.email ?? '');
if (key.length > 0 && seen.has(key)) {
continue;
}
if (key.length > 0) {
seen.add(key);
if (isNew(raw)) {
yield raw;
}
out.push(raw);
}

return out;
}

/* ============================================================================================== *
* Public dispatcher.
* ============================================================================================== */

/**
* Extract raw customers for the given window from whichever source is configured. The result is an
* in-memory array; nothing is persisted to disk at any point.
* Stream raw customers for the given window from whichever source is configured, one record at a
* time. SQL sources use server-side cursors / row streaming and Stripe uses lazy pagination, so the
* caller can process arbitrarily large audiences with bounded memory. Nothing touches disk.
*/
export async function extractCustomers(
export function streamCustomers(
source: SourceConfig,
window: SyncWindow,
): Promise<RawCustomer[]> {
): AsyncGenerator<RawCustomer> {
switch (source.kind) {
case 'postgres':
return extractFromPostgres(source, window);
return streamFromPostgres(source, window);
case 'mysql':
return extractFromMysql(source, window);
return streamFromMysql(source, window);
case 'stripe':
return extractFromStripe(source, window);
return streamFromStripe(source, window);
default: {
// Exhaustiveness guard — a new SourceKind without a branch is a compile error.
const _never: never = source;
throw new Error(`Unsupported source kind: ${JSON.stringify(_never)}`);
}
}
}

/**
* Group an async iterable into arrays of at most `size` items. The final group may be smaller. This
* lets a streaming source be processed in bounded-memory batches without ever materializing the
* whole stream.
*/
export async function* batchAsync<T>(source: AsyncIterable<T>, size: number): AsyncGenerator<T[]> {
if (size <= 0) {
throw new Error(`batch size must be > 0, received ${size}`);
}
let buffer: T[] = [];
for await (const item of source) {
buffer.push(item);
if (buffer.length >= size) {
yield buffer;
buffer = [];
}
}
if (buffer.length > 0) {
yield buffer;
}
}

/**
* Convenience wrapper that fully drains {@link streamCustomers} into an array. Prefer the streaming
* API for large audiences; this exists for callers and tests that want the whole set at once.
*/
export async function extractCustomers(
source: SourceConfig,
window: SyncWindow,
): Promise<RawCustomer[]> {
const out: RawCustomer[] = [];
for await (const raw of streamCustomers(source, window)) {
out.push(raw);
}
return out;
}
Loading
Loading