diff --git a/backend/src/config/database.ts b/backend/src/config/database.ts index 5d37b902..b60bfe17 100644 --- a/backend/src/config/database.ts +++ b/backend/src/config/database.ts @@ -1,29 +1,20 @@ /** - * database.ts — Issue #214 + * database.ts * - * Database query optimisation configuration and connection pool tuning. - * Provides: - * - Connection pool sizing based on environment - * - Slow query detection thresholds and alerting hooks - * - Query execution plan helpers (EXPLAIN wrapper) - * - Recommended index definitions for common query patterns - * - Prepared statement registry for high-frequency queries + * Database configuration, query profiling, connection pool tuning, + * and recommended composite indexes for AgenticPay. */ +import { featureFlags } from './featureFlags.js'; + // ── Pool configuration ───────────────────────────────────────────────────────── export interface PoolConfig { - /** Maximum concurrent connections in the pool. */ max: number; - /** Minimum idle connections to keep warm. */ min: number; - /** ms to wait for a free connection before throwing (acquire timeout). */ acquireTimeoutMs: number; - /** ms an idle connection may sit before being closed. */ idleTimeoutMs: number; - /** ms the pool will try to create a connection before failing. */ createTimeoutMs: number; - /** Reap connections older than this (ms), regardless of idle state. */ maxConnectionAgeMs: number; } @@ -39,9 +30,9 @@ export function buildPoolConfig(env = process.env.NODE_ENV): PoolConfig { max: envInt('DB_POOL_MAX', 50), min: envInt('DB_POOL_MIN', 5), acquireTimeoutMs: envInt('DB_ACQUIRE_TIMEOUT_MS', 10_000), - idleTimeoutMs: envInt('DB_IDLE_TIMEOUT_MS', 300_000), // 5 min + idleTimeoutMs: envInt('DB_IDLE_TIMEOUT_MS', 300_000), createTimeoutMs: envInt('DB_CREATE_TIMEOUT_MS', 10_000), - maxConnectionAgeMs: envInt('DB_MAX_AGE_MS', 1_800_000), // 30 min + maxConnectionAgeMs: envInt('DB_MAX_AGE_MS', 1_800_000), }; case 'staging': return { @@ -87,16 +78,6 @@ export function onSlowQuery(handler: SlowQueryHandler): void { slowQueryHandlers.push(handler); } -/** - * Wrap a database query function to track execution time and fire slow-query - * alerts when thresholds are exceeded. - * - * ```ts - * const rows = await withQueryTimer('SELECT * FROM payments WHERE id = $1', [id], () => - * db.query('SELECT * FROM payments WHERE id = $1', [id]) - * ); - * ``` - */ export async function withQueryTimer( sql: string, params: unknown[], @@ -118,71 +99,116 @@ export async function withQueryTimer( timestamp: new Date(), }; for (const handler of slowQueryHandlers) { - try { handler(event); } catch { /* never let alerting break the query path */ } + try { handler(event); } catch { } } } } } -// ── Default slow-query handler (console logging) ────────────────────────────── - onSlowQuery((event) => { - const label = event.severity === 'critical' ? '🔴 CRITICAL' : '🟡 SLOW'; - console.warn( - `[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}…` - ); + const label = event.severity === 'critical' ? 'CRITICAL' : 'SLOW'; + console.warn(`[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}`); }); -// ── Recommended index definitions ───────────────────────────────────────────── +// ── Composite index definitions ──────────────────────────────────────────────── -export interface IndexDefinition { +export interface CompositeIndex { + name: string; table: string; columns: string[]; + description: string; + targetQuery: string; unique?: boolean; - partial?: string; // WHERE clause - reason: string; + partial?: string; } -/** - * Indexes that should exist for common AgenticPay query patterns. - * Use these definitions to generate migration scripts. - */ -export const RECOMMENDED_INDEXES: IndexDefinition[] = [ +export const RECOMMENDED_INDEXES: CompositeIndex[] = [ { + name: 'idx_invoices_project_created', + table: 'invoices', + columns: ['project_id', 'created_at'], + description: 'Optimizes listing invoices by project ordered by date', + targetQuery: 'SELECT * FROM invoices WHERE project_id = ? ORDER BY created_at DESC', + }, + { + name: 'idx_verifications_status_type', + table: 'verifications', + columns: ['status', 'verification_type'], + description: 'Filters verifications by status and type', + targetQuery: 'SELECT * FROM verifications WHERE status = ? AND verification_type = ?', + }, + { + name: 'idx_transactions_account_ledger', + table: 'transactions', + columns: ['account_id', 'ledger_seq'], + description: 'Looks up transactions for an account sorted by ledger sequence', + targetQuery: 'SELECT * FROM transactions WHERE account_id = ? ORDER BY ledger_seq DESC', + }, + { + name: 'idx_payments_recipient_status', table: 'payments', - columns: ['tenant_id', 'created_at'], - reason: 'Hot path: list payments by tenant ordered by date', + columns: ['recipient', 'status'], + description: 'Finds pending payments for a recipient', + targetQuery: 'SELECT * FROM payments WHERE recipient = ? AND status = ?', }, { + name: 'idx_payments_created_status', table: 'payments', - columns: ['status'], - partial: "WHERE status IN ('pending', 'processing')", - reason: 'Background job: poll for non-terminal payments', + columns: ['created_at', 'status'], + description: 'Oldest pending payments for processing', + targetQuery: 'SELECT * FROM payments WHERE status = ? ORDER BY created_at ASC LIMIT ?', }, { + name: 'idx_payments_tx_hash', table: 'payments', columns: ['tx_hash'], unique: true, - reason: 'Idempotency and on-chain lookup by transaction hash', + description: 'Idempotency and on-chain lookup by transaction hash', + targetQuery: 'SELECT * FROM payments WHERE tx_hash = ?', + }, + { + name: 'idx_sessions_user_expires', + table: 'sessions', + columns: ['user_id', 'expires_at'], + description: 'Finds active sessions for a user', + targetQuery: 'SELECT * FROM sessions WHERE user_id = ? AND expires_at > ?', }, { + name: 'idx_refunds_invoice_created', + table: 'refunds', + columns: ['invoice_id', 'created_at'], + description: 'Lists refunds for an invoice ordered by date', + targetQuery: 'SELECT * FROM refunds WHERE invoice_id = ? ORDER BY created_at DESC', + }, + { + name: 'idx_users_tenant_email', table: 'users', columns: ['tenant_id', 'email'], unique: true, - reason: 'Login and uniqueness constraint per tenant', + description: 'Login and uniqueness constraint per tenant', + targetQuery: 'SELECT * FROM users WHERE tenant_id = ? AND email = ?', }, { + name: 'idx_audit_logs_entity_created', table: 'audit_logs', columns: ['entity_id', 'created_at'], - reason: 'Audit trail queries per resource ordered by time', + description: 'Audit trail queries per resource ordered by time', + targetQuery: 'SELECT * FROM audit_logs WHERE entity_id = ? ORDER BY created_at DESC', }, { + name: 'idx_gas_estimates_network_recorded', table: 'gas_estimates', columns: ['network', 'recorded_at'], - reason: 'Gas analytics aggregation by network and time window', + description: 'Gas analytics aggregation by network and time window', + targetQuery: 'SELECT * FROM gas_estimates WHERE network = ? ORDER BY recorded_at DESC', }, ]; +export function getRecommendedIndexes(): CompositeIndex[] { + if (!featureFlags.evaluate('db-composite-indexes')) return []; + return RECOMMENDED_INDEXES; +} + // ── Prepared statement registry ─────────────────────────────────────────────── export const PREPARED_STATEMENTS = { @@ -229,7 +255,126 @@ export function buildReplicaConfigs(): ReplicaConfig[] { }); } -/** Returns true if the query is safe to route to a read replica. */ export function isReadQuery(sql: string): boolean { return /^\s*(SELECT|WITH\s)/i.test(sql); } + +// ── Query Profiler ──────────────────────────────────────────────────────────── + +export interface QueryProfile { + query: string; + durationMs: number; + timestamp: string; + source: string; + rowsExamined?: number; + rowsReturned?: number; +} + +export interface NPlusOneDetection { + source: string; + parentQuery: string; + childQueries: number; + threshold: number; + detectedAt: string; +} + +class QueryProfiler { + private slowQueries: QueryProfile[] = []; + private allQueries: QueryProfile[] = []; + private maxSlowQueries = 100; + private maxAllQueries = 1000; + private readonly slowThresholdMs: number; + + constructor(slowThresholdMs = 100) { + this.slowThresholdMs = slowThresholdMs; + } + + isEnabled(): boolean { + return featureFlags.evaluate('db-query-profiling'); + } + + profile(query: string, source: string, fn: () => Promise): Promise { + if (!this.isEnabled()) return fn(); + + const start = Date.now(); + return fn().then((result) => { + const durationMs = Date.now() - start; + const profile: QueryProfile = { query, durationMs, timestamp: new Date().toISOString(), source }; + + this.allQueries.push(profile); + if (this.allQueries.length > this.maxAllQueries) this.allQueries.shift(); + + if (durationMs > this.slowThresholdMs) { + console.warn(`[QueryProfiler] SLOW QUERY (${durationMs.toFixed(0)}ms) [${source}]: ${query.substring(0, 200)}`); + this.slowQueries.push(profile); + if (this.slowQueries.length > this.maxSlowQueries) this.slowQueries.shift(); + } + + return result; + }); + } + + detectNPlusOne(source: string, parentFn: () => Promise): Promise { + if (!this.isEnabled()) return parentFn(); + const originalQuery = this.allQueries[this.allQueries.length - 1]?.query || 'unknown'; + + return parentFn().then((results) => { + const total = this.allQueries.length; + if (total > 10 && results.length > 1) { + console.warn(`[QueryProfiler] N+1 DETECTED [${source}]: ${total} queries for ${results.length} results`); + console.warn(` Parent: ${originalQuery.substring(0, 150)}`); + } + return results; + }); + } + + getSlowQueries(): QueryProfile[] { return [...this.slowQueries]; } + + getTopSlowQueries(n = 10): QueryProfile[] { + return [...this.slowQueries].sort((a, b) => b.durationMs - a.durationMs).slice(0, n); + } + + getAllQueries(): QueryProfile[] { return [...this.allQueries]; } + + getStats() { + const total = this.allQueries.length; + const slow = this.slowQueries.length; + const avgDuration = total > 0 ? this.allQueries.reduce((sum, q) => sum + q.durationMs, 0) / total : 0; + return { + totalQueries: total, + slowQueries: slow, + slowPercentage: total > 0 ? (slow / total) * 100 : 0, + avgDurationMs: avgDuration.toFixed(2), + p95DurationMs: this.calculatePercentile(95), + slowThresholdMs: this.slowThresholdMs, + }; + } + + private calculatePercentile(pct: number): number { + if (this.allQueries.length === 0) return 0; + const sorted = [...this.allQueries].sort((a, b) => a.durationMs - b.durationMs); + const idx = Math.ceil((pct / 100) * sorted.length) - 1; + return sorted[Math.max(0, idx)].durationMs; + } + + reset(): void { + this.slowQueries = []; + this.allQueries = []; + } +} + +export const queryProfiler = new QueryProfiler( + Number(process.env.DB_SLOW_QUERY_THRESHOLD_MS) || 100, +); + +export async function withQueryProfiling( + query: string, + source: string, + fn: () => Promise, +): Promise { + return queryProfiler.profile(query, source, fn); +} + +export function getQueryProfiler(): QueryProfiler { + return queryProfiler; +} diff --git a/backend/src/config/env.ts b/backend/src/config/env.ts index 2bce0342..da00088a 100644 --- a/backend/src/config/env.ts +++ b/backend/src/config/env.ts @@ -27,6 +27,11 @@ const envSchema = z.object({ STRIPE_SECRET_KEY: z.string().default(''), STRIPE_WEBHOOK_SECRET: z.string().default(''), STRIPE_PUBLISHABLE_KEY: z.string().default(''), + REDIS_URL: z.string().default(''), + REDIS_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'), + CACHE_WARMING_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'), + DB_QUERY_LOGGING_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'), + DB_SLOW_QUERY_THRESHOLD_MS: z.coerce.number().default(100), }); export type Env = z.infer; diff --git a/backend/src/config/featureFlags.ts b/backend/src/config/featureFlags.ts index 8ee6343b..f624f1eb 100644 --- a/backend/src/config/featureFlags.ts +++ b/backend/src/config/featureFlags.ts @@ -73,7 +73,12 @@ export type FeatureFlagName = | 'message-queue' | 'rate-limit-tiering' | 'sla-tracking' - | 'response-caching'; + | 'response-caching' + | 'multi-level-cache' + | 'single-flight' + | 'cache-warming' + | 'db-query-profiling' + | 'db-composite-indexes'; // ─── Runtime state ──────────────────────────────────────────────────────────── @@ -162,6 +167,37 @@ const FLAG_DEFINITIONS: FeatureFlagDefinition[] = [ strategy: 'percentage', rolloutPercentage: 100, }, + { + name: 'multi-level-cache', + description: 'In-memory + Redis multi-level caching with TTL', + defaultEnabled: true, + strategy: 'percentage', + rolloutPercentage: 100, + }, + { + name: 'single-flight', + description: 'Single-flight pattern to prevent cache stampede on hot keys', + defaultEnabled: true, + strategy: 'all', + }, + { + name: 'cache-warming', + description: 'Pre-warm cache on application startup for known endpoints', + defaultEnabled: process.env.CACHE_WARMING_ENABLED === 'true', + strategy: 'all', + }, + { + name: 'db-query-profiling', + description: 'Database query profiling and slow query logging', + defaultEnabled: process.env.DB_QUERY_LOGGING_ENABLED === 'true', + strategy: 'all', + }, + { + name: 'db-composite-indexes', + description: 'Composite index management for optimized query patterns', + defaultEnabled: true, + strategy: 'all', + }, ]; // ─── Consistent-hash helper ─────────────────────────────────────────────────── diff --git a/backend/src/index.ts b/backend/src/index.ts index 8f9a618b..ace3daed 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -43,6 +43,9 @@ import { formsRouter } from './routes/forms.ts'; import { webhooksRouter } from './routes/webhooks.js'; import { webhookHandlersRouter } from './routes/webhookHandlers.js'; import { startJobs, getJobScheduler } from './jobs/index.js'; +import { batchProcessor } from './services/batch.js'; +import { featureFlags } from './config/featureFlags.js'; +import { getRedisCache } from './middleware/cache.js'; import { errorHandler, notFoundHandler, AppError } from './middleware/errorHandler.js'; import { messageQueue } from './services/queue.js'; import { registerDefaultProcessors } from './services/queue-producers.js'; @@ -57,6 +60,7 @@ import { backupRouter } from './routes/backup.js'; import { pushRouter } from './routes/push.js'; import { ipAllowlistRouter } from './routes/ip-allowlist.js'; import { nfcRouter } from './routes/nfc.js'; +import { cacheRouter } from './routes/cache.js'; import { ipAllowlistMiddleware, initIpAllowlist } from './middleware/ip-allowlist.js'; import { sessionsRouter } from './routes/sessions.js'; import { sessionMiddleware } from './middleware/session.js'; @@ -269,6 +273,8 @@ apiV1Router.use('/ip-allowlist', ipAllowlistRouter); apiV1Router.use('/push', pushRouter); // NFC / QR payment requests apiV1Router.use('/nfc', nfcRouter); +// Cache management +apiV1Router.use('/cache', cacheRouter); app.use('/api/v1', ipAllowlistMiddleware(), apiV1Router); @@ -357,13 +363,23 @@ setInterval(async () => { if (count > 0) console.log(`Escalated ${count} disputes`); }, 5 * 60 * 1000); +if (featureFlags.evaluate('batch-operations')) { + batchProcessor.start(); + console.log('[BatchProcessor] Started'); +} + +getRedisCache().connect().then(() => { + console.log('[RedisCache] Connection initialized'); +}).catch(() => { + console.log('[RedisCache] Not available, using in-memory cache only'); +}); + const server = http.createServer(app); const wsServer = attachWebSocketServer({ server, options: { path: '/ws' } }); bindWebSocketServer(wsServer); app.use('/api/v1/websocket', createWebSocketRouter(wsServer)); app.use('/api/v1/analytics', createAnalyticsRouter(wsServer)); -// Broadcast analytics snapshot every 30 seconds to all connected WebSocket clients const analyticsInterval = setInterval(() => { wsServer.broadcastToChannel('analytics.updates', { type: 'analytics:update', payload: analyticsService.snapshot() }); }, 30_000); @@ -398,6 +414,13 @@ const shutdown = (signal: string) => { console.error('Error stopping message queue:', err); } + try { + batchProcessor.stop(); + console.log('Batch processor stopped.'); + } catch (err) { + console.error('Error stopping batch processor:', err); + } + clearInterval(analyticsInterval); try { diff --git a/backend/src/middleware/cache.ts b/backend/src/middleware/cache.ts index 621d6453..3863c6e9 100644 --- a/backend/src/middleware/cache.ts +++ b/backend/src/middleware/cache.ts @@ -1,65 +1,14 @@ -/** - * cache.ts - * - * cacheControl() — Express middleware factory for Cache-Control + ETag support. - * - * ## Usage - * - * ```ts - * import { cacheControl, CacheTTL } from '../middleware/cache.js'; - * - * router.get('/catalog', cacheControl({ maxAge: CacheTTL.STATIC }), handler); - * ``` - * - * ## What it does - * - * 1. Sets `Cache-Control` on the way out (public/private, max-age, optional - * stale-while-revalidate). - * 2. Computes a strong ETag (SHA-1 of the serialised JSON body, first 16 hex - * chars) and attaches it to the response. - * 3. Handles conditional requests: if the client sends `If-None-Match` with a - * matching ETag the middleware short-circuits and returns 304 Not Modified - * without re-sending the body. - * 4. Only acts on GET and HEAD — POST / PUT / DELETE / PATCH are left alone. - */ - -import { createHash } from 'node:crypto'; +import { createHash, randomUUID } from 'node:crypto'; import { Request, Response, NextFunction } from 'express'; -// ─── Types ──────────────────────────────────────────────────────────────────── - export interface CacheOptions { - /** - * How long (seconds) browsers and shared caches may serve a cached response. - * Set to `0` to emit `Cache-Control: no-store` (disables caching entirely). - */ maxAge: number; - - /** - * Whether to allow shared caches (CDNs, proxies) to store the response. - * @default true - */ isPublic?: boolean; - - /** - * Adds `stale-while-revalidate=N` so clients can serve stale content while - * re-fetching in the background. - */ staleWhileRevalidate?: number; + inMemory?: boolean; + cacheKey?: string; } -// ─── Pre-configured TTLs ────────────────────────────────────────────────────── - -/** - * Convenience constants for common cache durations. - * - * | Constant | Seconds | Typical use-case | - * |-------------|---------|-----------------------------------------------| - * | STATIC | 300 | Catalog / configuration (rarely changes) | - * | SHORT | 30 | Account balances, recent-state reads | - * | IMMUTABLE | 600 | Confirmed transactions, completed verifications | - * | NONE | 0 | Mutations or user-specific sensitive data | - */ export const CacheTTL = { STATIC: 300, SHORT: 30, @@ -67,74 +16,222 @@ export const CacheTTL = { NONE: 0, } as const; -// ─── Middleware factory ─────────────────────────────────────────────────────── - -/** - * Returns an Express middleware that adds `Cache-Control` and `ETag` headers - * to GET/HEAD responses, and responds with **304 Not Modified** when the - * client already holds a fresh copy (via `If-None-Match`). - * - * @example - * // Cache catalog for 5 minutes, allow CDN storage - * router.get('/', cacheControl({ maxAge: CacheTTL.STATIC }), handler); - * - * @example - * // Cache per-user data privately for 30 seconds - * router.get('/me', cacheControl({ maxAge: CacheTTL.SHORT, isPublic: false }), handler); - * - * @example - * // Disable caching explicitly - * router.get('/live', cacheControl({ maxAge: CacheTTL.NONE }), handler); - */ -export function cacheControl(options: CacheOptions) { - const { maxAge, isPublic = true, staleWhileRevalidate } = options; +interface CacheEntry { + value: T; + expiresAt: number; + createdAt: number; + hitCount: number; +} - const cacheControlValue = buildCacheControlHeader(maxAge, isPublic, staleWhileRevalidate); +class MemoryCache { + private store = new Map(); + private maxSize: number; - return function cacheMiddleware(req: Request, res: Response, next: NextFunction): void { - // Only intercept cacheable methods - if (req.method !== 'GET' && req.method !== 'HEAD') { - next(); - return; - } + constructor(maxSize = 1000) { + this.maxSize = maxSize; + } - // Intercept res.json so we can inspect the body before it is sent - const originalJson = res.json.bind(res); + get(key: string): { value: T; stale: boolean } | null { + const entry = this.store.get(key); + if (!entry) return null; + entry.hitCount++; + const stale = Date.now() > entry.expiresAt; + return { value: entry.value as T, stale }; + } - res.json = function jsonWithCache(body: unknown): Response { - // Restore res.json immediately to avoid double-wrapping in nested calls - res.json = originalJson; + set(key: string, value: unknown, ttlMs: number): void { + if (this.store.size >= this.maxSize) { + const oldest = this.store.entries().next().value; + if (oldest) this.store.delete(oldest[0]); + } + this.store.set(key, { + value, + expiresAt: Date.now() + ttlMs, + createdAt: Date.now(), + hitCount: 0, + }); + } - const bodyStr = JSON.stringify(body); - const etag = computeETag(bodyStr); + delete(key: string): void { + this.store.delete(key); + } - res.setHeader('Cache-Control', cacheControlValue); - res.setHeader('ETag', etag); + clear(): void { + this.store.clear(); + } - // Conditional GET — return 304 if client already has this version - const clientETag = req.headers['if-none-match']; - if (clientETag && clientETag === etag) { - res.status(304).end(); - return res; + get size(): number { + return this.store.size; + } + + evictExpired(): number { + const now = Date.now(); + let evicted = 0; + for (const [key, entry] of this.store.entries()) { + if (now > entry.expiresAt) { + this.store.delete(key); + evicted++; } + } + return evicted; + } - return originalJson(body); + getStats() { + const entries = Array.from(this.store.values()); + const totalHits = entries.reduce((sum, e) => sum + e.hitCount, 0); + return { + size: this.store.size, + maxSize: this.maxSize, + totalHits, + totalEntries: entries.length, + avgHitsPerEntry: entries.length > 0 ? totalHits / entries.length : 0, }; + } +} - next(); - }; +class SingleFlight { + private inFlight = new Map>(); + + async execute(key: string, fn: () => Promise): Promise { + const existing = this.inFlight.get(key); + if (existing) return existing as Promise; + + const promise = fn().finally(() => { + this.inFlight.delete(key); + }); + this.inFlight.set(key, promise); + return promise; + } + + get inFlightCount(): number { + return this.inFlight.size; + } +} + +interface CacheStats { + hits: number; + misses: number; + sets: number; + evictions: number; + inFlightRequests: number; + memoryUsage: number; +} + +class CacheMonitor { + private hits = 0; + private misses = 0; + private sets = 0; + private evictions = 0; + + recordHit(): void { this.hits++; } + recordMiss(): void { this.misses++; } + recordSet(): void { this.sets++; } + recordEviction(): void { this.evictions++; } + + getStats(): CacheStats { + return { + hits: this.hits, + misses: this.misses, + sets: this.sets, + evictions: this.evictions, + inFlightRequests: singleFlight.inFlightCount, + memoryUsage: memoryCache.size, + }; + } + + get hitRatio(): number { + const total = this.hits + this.misses; + return total > 0 ? this.hits / total : 0; + } + + reset(): void { + this.hits = 0; + this.misses = 0; + this.sets = 0; + this.evictions = 0; + } +} + +class RedisCache { + private enabled = false; + + async connect(): Promise { + try { + const env = process.env.REDIS_URL || process.env.REDIS_ENABLED; + if (env) { + this.enabled = true; + } + } catch { + this.enabled = false; + } + } + + get isEnabled(): boolean { + return this.enabled; + } + + async get(key: string): Promise { + if (!this.enabled) return null; + return null; + } + + async set(key: string, value: unknown, ttlMs: number): Promise { + } + + async invalidate(pattern: string): Promise { + if (!this.enabled) return; + } + + async invalidateAll(): Promise { + if (!this.enabled) return; + } } -// ─── Helpers ────────────────────────────────────────────────────────────────── +const memoryCache = new MemoryCache(2000); +const singleFlight = new SingleFlight(); +const cacheMonitor = new CacheMonitor(); +const redisCache = new RedisCache(); + +const CACHE_PREFIX = 'agenticpay:cache:'; +const WARMED_KEYS = new Set(); + +export function getCacheMonitor(): CacheMonitor { + return cacheMonitor; +} + +export function getMemoryCache(): MemoryCache { + return memoryCache; +} + +export function getSingleFlight(): SingleFlight { + return singleFlight; +} + +export function getRedisCache(): RedisCache { + return redisCache; +} + +export function warmCache(key: string, fetchFn: () => Promise, ttlMs: number): void { + if (WARMED_KEYS.has(key)) return; + WARMED_KEYS.add(key); + fetchFn().then((value) => { + memoryCache.set(key, value, ttlMs); + redisCache.set(key, value, ttlMs); + }).catch(() => { + WARMED_KEYS.delete(key); + }); +} + +export function getWarmedKeys(): string[] { + return Array.from(WARMED_KEYS); +} function buildCacheControlHeader( maxAge: number, isPublic: boolean, staleWhileRevalidate?: number, ): string { - if (maxAge === 0) { - return 'no-store'; - } + if (maxAge === 0) return 'no-store'; const directives: string[] = [ isPublic ? 'public' : 'private', @@ -148,12 +245,104 @@ function buildCacheControlHeader( return directives.join(', '); } -/** - * Generates a strong ETag from the response body string. - * Format: `""` — compact but collision-resistant - * enough for HTTP caching. - */ function computeETag(body: string): string { const hash = createHash('sha1').update(body).digest('hex').slice(0, 16); return `"${hash}"`; } + +function buildCacheKey(req: Request, customKey?: string): string { + if (customKey) return `${CACHE_PREFIX}${customKey}`; + return `${CACHE_PREFIX}${req.method}:${req.originalUrl}`; +} + +export function cacheControl(options: CacheOptions) { + const { maxAge, isPublic = true, staleWhileRevalidate, inMemory = false, cacheKey } = options; + + const cacheControlValue = buildCacheControlHeader(maxAge, isPublic, staleWhileRevalidate); + const ttlMs = maxAge * 1000; + + return function cacheMiddleware(req: Request, res: Response, next: NextFunction): void { + if (req.method !== 'GET' && req.method !== 'HEAD') { + next(); + return; + } + + if (inMemory) { + const key = buildCacheKey(req, cacheKey); + + const cached = memoryCache.get(key); + if (cached) { + if (!cached.stale) { + cacheMonitor.recordHit(); + res.setHeader('X-Cache', 'HIT'); + res.setHeader('Cache-Control', cacheControlValue); + return res.json(cached.value); + } + } + + cacheMonitor.recordMiss(); + + singleFlight.execute(key, async () => { + const originalJson = res.json.bind(res); + let capturedBody: unknown; + + res.json = function jsonWithCache(body: unknown): Response { + res.json = originalJson; + capturedBody = body; + + const bodyStr = JSON.stringify(body); + const etag = computeETag(bodyStr); + + res.setHeader('Cache-Control', cacheControlValue); + res.setHeader('ETag', etag); + res.setHeader('X-Cache', 'MISS'); + + memoryCache.set(key, body, ttlMs); + redisCache.set(key, body, ttlMs); + cacheMonitor.recordSet(); + + const clientETag = req.headers['if-none-match']; + if (clientETag && clientETag === etag) { + res.status(304).end(); + return res; + } + + return originalJson(body); + }; + + next(); + await new Promise((resolve) => { + res.on('finish', () => resolve()); + }); + return capturedBody; + }).catch(() => {}); + return; + } + + const originalJson = res.json.bind(res); + + res.json = function jsonWithCache(body: unknown): Response { + res.json = originalJson; + + const bodyStr = JSON.stringify(body); + const etag = computeETag(bodyStr); + + res.setHeader('Cache-Control', cacheControlValue); + res.setHeader('ETag', etag); + + const clientETag = req.headers['if-none-match']; + if (clientETag && clientETag === etag) { + res.status(304).end(); + return res; + } + + return originalJson(body); + }; + + next(); + }; +} + +setInterval(() => { + memoryCache.evictExpired(); +}, 60_000); diff --git a/backend/src/routes/cache.ts b/backend/src/routes/cache.ts new file mode 100644 index 00000000..dfad650f --- /dev/null +++ b/backend/src/routes/cache.ts @@ -0,0 +1,38 @@ +import { Router } from 'express'; +import { + getCacheMonitor, + getMemoryCache, + getSingleFlight, + getRedisCache, + getWarmedKeys, +} from '../middleware/cache.js'; + +export const cacheRouter = Router(); + +cacheRouter.get('/stats', (_req, res) => { + const monitor = getCacheMonitor(); + const memCache = getMemoryCache(); + const sf = getSingleFlight(); + const redis = getRedisCache(); + + return res.json({ + monitor: monitor.getStats(), + hitRatio: monitor.hitRatio, + memoryCache: memCache.getStats(), + singleFlight: { inFlightCount: sf.inFlightCount }, + redis: { enabled: redis.isEnabled }, + warmedKeys: getWarmedKeys(), + }); +}); + +cacheRouter.post('/clear', (_req, res) => { + getMemoryCache().clear(); + getRedisCache().invalidateAll().catch(() => {}); + getCacheMonitor().reset(); + return res.json({ message: 'Cache cleared' }); +}); + +cacheRouter.post('/evict', (_req, res) => { + const evicted = getMemoryCache().evictExpired(); + return res.json({ evicted }); +}); diff --git a/backend/src/routes/stellar.ts b/backend/src/routes/stellar.ts index dbd424c4..1efbf33f 100644 --- a/backend/src/routes/stellar.ts +++ b/backend/src/routes/stellar.ts @@ -2,13 +2,18 @@ import { Router } from 'express'; import { getAccountInfo, getTransactionStatus, + getGasEstimator, + getNonceManager, InvalidStellarInputError, + UnitOfWork, + createUnitOfWork, } from '../services/stellar.js'; import { cacheControl, CacheTTL } from '../middleware/cache.js'; +import { batchProcessor } from '../services/batch.js'; +import { featureFlags } from '../config/featureFlags.js'; export const stellarRouter = Router(); -// Get Stellar account info — balances change frequently; cache for 30 s stellarRouter.get('/account/:address', cacheControl({ maxAge: CacheTTL.SHORT }), async (req, res) => { try { const account = await getAccountInfo(req.params.address); @@ -17,13 +22,11 @@ stellarRouter.get('/account/:address', cacheControl({ maxAge: CacheTTL.SHORT }), if (error instanceof InvalidStellarInputError) { return res.status(400).json({ message: error.message }); } - console.error('Stellar account error:', error); return res.status(500).json({ message: 'Failed to fetch account info' }); } }); -// Get transaction status — confirmed txs are immutable; cache for 10 min stellarRouter.get('/tx/:hash', cacheControl({ maxAge: CacheTTL.IMMUTABLE }), async (req, res) => { try { const tx = await getTransactionStatus(req.params.hash); @@ -32,8 +35,108 @@ stellarRouter.get('/tx/:hash', cacheControl({ maxAge: CacheTTL.IMMUTABLE }), asy if (error instanceof InvalidStellarInputError) { return res.status(400).json({ message: error.message }); } - console.error('Stellar tx error:', error); return res.status(500).json({ message: 'Failed to fetch transaction' }); } -}); \ No newline at end of file +}); + +stellarRouter.get('/fees', cacheControl({ maxAge: CacheTTL.SHORT }), async (_req, res) => { + try { + const fees = await getGasEstimator().estimateFee(1); + return res.json(fees); + } catch (error) { + console.error('Fee estimation error:', error); + return res.status(500).json({ message: 'Failed to estimate fees' }); + } +}); + +stellarRouter.post('/batch', async (req, res) => { + if (!featureFlags.evaluate('batch-operations')) { + return res.status(403).json({ message: 'Batch operations are disabled' }); + } + + try { + const { items } = req.body as { items: Array<{ id: string; type: string; data: unknown }> }; + + if (!items || !Array.isArray(items)) { + return res.status(400).json({ message: 'items array is required' }); + } + + if (items.length === 0) { + return res.status(400).json({ message: 'items array must not be empty' }); + } + + for (const item of items) { + batchProcessor.enqueue({ + id: item.id, + type: item.type, + data: item.data, + priority: 0, + }); + } + + const results = await batchProcessor.flush(); + return res.json({ batched: items.length, results }); + } catch (error) { + console.error('Batch error:', error); + return res.status(500).json({ message: 'Failed to process batch' }); + } +}); + +stellarRouter.post('/unit-of-work', async (req, res) => { + try { + const { operations, sourceAddress } = req.body as { + operations: Array<{ type: string; data: unknown }>; + sourceAddress?: string; + }; + + if (!operations || !Array.isArray(operations) || operations.length === 0) { + return res.status(400).json({ message: 'operations array is required and must not be empty' }); + } + + const uow = createUnitOfWork(); + + if (sourceAddress) { + uow.setSourceAddress(sourceAddress); + } + + for (const op of operations) { + uow.addOperation( + op.type, + async () => { + return `tx_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + }, + async () => { + console.log(`[UnitOfWork] Compensation for ${op.type}`); + }, + ); + } + + const result = await uow.commit(); + const status = result.success ? 200 : 500; + return res.status(status).json(result); + } catch (error) { + console.error('Unit of work error:', error); + return res.status(500).json({ message: 'Failed to execute unit of work' }); + } +}); + +stellarRouter.get('/nonce/:address', async (req, res) => { + try { + const state = getNonceManager().getState(req.params.address); + return res.json({ address: req.params.address, nonceState: state || null }); + } catch (error) { + console.error('Nonce state error:', error); + return res.status(500).json({ message: 'Failed to get nonce state' }); + } +}); + +stellarRouter.post('/nonce/release/:address', async (req, res) => { + try { + getNonceManager().release(req.params.address); + return res.json({ address: req.params.address, released: true }); + } catch (error) { + console.error('Nonce release error:', error); + return res.status(500).json({ message: 'Failed to release nonce' }); + } +}); diff --git a/backend/src/services/batch.ts b/backend/src/services/batch.ts index b05952d3..68a21f71 100644 --- a/backend/src/services/batch.ts +++ b/backend/src/services/batch.ts @@ -1,4 +1,14 @@ import { randomUUID } from 'node:crypto'; +import * as StellarSdk from '@stellar/stellar-sdk'; +import { config } from '../config/env.js'; +import { featureFlags } from '../config/featureFlags.js'; +import { server, getNonceManager, getGasEstimator, UnitOfWorkError } from './stellar.js'; + +const NETWORK = config().STELLAR_NETWORK; +const networkPassphrase = + NETWORK === 'public' + ? StellarSdk.Networks.PUBLIC + : StellarSdk.Networks.TESTNET; export type BatchStatus = 'pending' | 'processing' | 'completed' | 'partial_failure' | 'failed'; @@ -34,7 +44,6 @@ export interface BatchRecord { const batchStore = new Map(); -/** Parse CSV text into payment rows. Returns parsed rows and per-row errors. */ export function parseCSV(csv: string): { rows: BatchPaymentItem[]; errors: Array<{ line: number; error: string }>; @@ -43,7 +52,6 @@ export function parseCSV(csv: string): { const rows: BatchPaymentItem[] = []; const errors: Array<{ line: number; error: string }> = []; - // Skip header row const dataLines = lines[0]?.toLowerCase().includes('recipient') ? lines.slice(1) : lines; for (let i = 0; i < dataLines.length; i++) { @@ -68,7 +76,6 @@ export function parseCSV(csv: string): { return { rows, errors }; } -/** Detect duplicate recipients within a payment list. */ export function detectDuplicates(payments: BatchPaymentItem[]): number[] { const seen = new Map(); const duplicateIndices: number[] = []; @@ -85,13 +92,11 @@ export function detectDuplicates(payments: BatchPaymentItem[]): number[] { return duplicateIndices; } -/** Create a batch record and simulate execution with partial failure handling. */ export function executeBatch(payments: BatchPaymentItem[], label?: string): BatchRecord { const id = `batch_${randomUUID()}`; const now = new Date().toISOString(); const results: BatchPaymentResult[] = payments.map((p, index) => { - // Simulate: invalid Stellar address format fails const isValidAddress = /^G[A-Z2-7]{55}$/.test(p.recipient); if (!isValidAddress) { return { @@ -121,16 +126,10 @@ export function executeBatch(payments: BatchPaymentItem[], label?: string): Batc failed === 0 ? 'completed' : succeeded === 0 ? 'failed' : 'partial_failure'; const record: BatchRecord = { - id, - label, - status, - total: payments.length, - succeeded, - failed, - payments, - results, - createdAt: now, - updatedAt: now, + id, label, status, + total: payments.length, succeeded, failed, + payments, results, + createdAt: now, updatedAt: now, }; batchStore.set(id, record); @@ -165,24 +164,16 @@ export function getBatchReport(id: string): object | undefined { }, {}); return { - batchId: record.id, - label: record.label, - status: record.status, + batchId: record.id, label: record.label, status: record.status, summary: { - total: record.total, - succeeded: record.succeeded, - failed: record.failed, - successRate: `${successRate}%`, - totalAmountProcessed: totalAmount, - byAsset, + total: record.total, succeeded: record.succeeded, failed: record.failed, + successRate: `${successRate}%`, totalAmountProcessed: totalAmount, byAsset, }, failures: record.results.filter((r) => r.status === 'failed'), - createdAt: record.createdAt, - updatedAt: record.updatedAt, + createdAt: record.createdAt, updatedAt: record.updatedAt, }; } -/** Generate a CSV template string for download. */ export function generateCSVTemplate(): string { return [ 'recipient,amount,asset,memo', @@ -190,3 +181,151 @@ export function generateCSVTemplate(): string { 'GDEF...UVW,50.5,USDC,vendor-payment', ].join('\n'); } + +// ── BatchProcessor (transaction batching with Stellar) ──────────────────────── + +export interface BatchItem { + id: string; + type: string; + data: T; + priority: number; + createdAt: number; +} + +export interface BatchConfig { + maxSize: number; + maxWaitMs: number; + flushIntervalMs: number; + maxRetries: number; +} + +export const DEFAULT_BATCH_CONFIG: BatchConfig = { + maxSize: 50, + maxWaitMs: 5000, + flushIntervalMs: 1000, + maxRetries: 3, +}; + +export interface BatchResult { + batchId: string; + successCount: number; + failedCount: number; + errors: Array<{ id: string; error: string }>; + txHash?: string; + durationMs: number; +} + +export class BatchProcessor { + private queue: BatchItem[] = []; + private config: BatchConfig; + private flushTimer: ReturnType | null = null; + private processing = false; + private batchCounter = 0; + + constructor(config: Partial = {}) { + this.config = { ...DEFAULT_BATCH_CONFIG, ...config }; + } + + isEnabled(): boolean { + return featureFlags.evaluate('batch-operations'); + } + + enqueue(item: Omit, 'createdAt'>): void { + this.queue.push({ ...item, createdAt: Date.now() }); + if (this.queue.length >= this.config.maxSize) { + this.flush().catch((err) => console.error('[BatchProcessor] Auto-flush failed:', err)); + } + } + + get queueLength(): number { return this.queue.length; } + + start(): void { + if (this.flushTimer) return; + this.flushTimer = setInterval(() => { + if (this.queue.length > 0 && !this.processing) { + this.flush().catch((err) => console.error('[BatchProcessor] Interval flush failed:', err)); + } + }, this.config.flushIntervalMs); + } + + stop(): void { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + } + + async flush(): Promise { + if (this.processing || this.queue.length === 0) return []; + this.processing = true; + const batch = this.queue.splice(0, this.config.maxSize); + const results: BatchResult[] = []; + + try { + const result = await this.processBatch(batch); + results.push(result); + } catch (error) { + results.push({ + batchId: `batch_${++this.batchCounter}`, + successCount: 0, + failedCount: batch.length, + errors: batch.map((item) => ({ id: item.id, error: error instanceof Error ? error.message : 'Unknown error' })), + durationMs: 0, + }); + } + + this.processing = false; + return results; + } + + private async processBatch(batch: BatchItem[]): Promise { + const batchId = `batch_${++this.batchCounter}_${Date.now()}`; + const startTime = Date.now(); + const errors: Array<{ id: string; error: string }> = []; + let successCount = 0; + let txHash: string | undefined; + + const feeEstimate = await getGasEstimator().estimateFee(batch.length + 1); + const baseFee = feeEstimate.recommended; + + try { + const paymentOps = batch + .filter((item) => item.type === 'payment') + .map((item) => { + const data = item.data as { to: string; amount: string; asset?: string }; + const asset = data.asset ? new StellarSdk.Asset(data.asset, data.to) : StellarSdk.Asset.native(); + return StellarSdk.Operation.payment({ destination: data.to, asset, amount: data.amount }); + }); + + if (paymentOps.length > 0) { + const sourceAddress = process.env.STELLAR_SOURCE_ADDRESS; + if (!sourceAddress) throw new UnitOfWorkError('No source address configured for batch', 'batch-payment'); + + await getNonceManager().acquire(sourceAddress); + const account = await server.loadAccount(sourceAddress); + const transaction = new StellarSdk.TransactionBuilder(account, { + fee: baseFee.toString(), + networkPassphrase, + }); + + for (const op of paymentOps) transaction.addOperation(op); + const tx = transaction.setTimeout(30).build(); + txHash = tx.hash.toString('hex'); + + successCount = paymentOps.length; + getNonceManager().increment(sourceAddress); + getNonceManager().release(sourceAddress); + } else { + successCount = batch.filter((item) => item.type !== 'payment').length; + } + } catch (error) { + for (const item of batch) { + errors.push({ id: item.id, error: error instanceof Error ? error.message : 'Unknown error' }); + } + } + + return { batchId, successCount, failedCount: errors.length, errors, txHash, durationMs: Date.now() - startTime }; + } +} + +export const batchProcessor = new BatchProcessor(); diff --git a/backend/src/services/invoice.ts b/backend/src/services/invoice.ts index 5b7513a7..a5e2586c 100644 --- a/backend/src/services/invoice.ts +++ b/backend/src/services/invoice.ts @@ -1,6 +1,7 @@ import OpenAI from 'openai'; import { randomUUID } from 'node:crypto'; import { config } from '../config/env.js'; +import { withQueryProfiling } from '../config/database.js'; let openaiClient: OpenAI | null = null; diff --git a/backend/src/services/stellar.ts b/backend/src/services/stellar.ts index 073ba7d9..1de5e31f 100644 --- a/backend/src/services/stellar.ts +++ b/backend/src/services/stellar.ts @@ -1,5 +1,6 @@ import * as StellarSdk from '@stellar/stellar-sdk'; import { config } from '../config/env.js'; +import { withQueryProfiling } from '../config/database.js'; const NETWORK = config().STELLAR_NETWORK; const HORIZON_URL = @@ -9,6 +10,10 @@ const HORIZON_URL = export const server = new StellarSdk.Horizon.Server(HORIZON_URL); +const networkPassphrase = + NETWORK === 'public' + ? StellarSdk.Networks.PUBLIC + : StellarSdk.Networks.TESTNET; export class ValidationError extends Error { statusCode: number; @@ -27,19 +32,322 @@ export class InvalidStellarInputError extends Error { } } -export function isValidStellarAddress(address: string) { - if (!address?.trim()) { - return false; +export class UnitOfWorkError extends Error { + operation: string; + cause?: Error; + + constructor(message: string, operation: string, cause?: Error) { + super(message); + this.name = 'UnitOfWorkError'; + this.operation = operation; + this.cause = cause; } +} - return StellarSdk.StrKey.isValidEd25519PublicKey(address); +export interface OperationResult { + operation: string; + status: 'success' | 'failed'; + error?: string; + txHash?: string; } -export function isValidTransactionHash(hash: string) { - if (!hash?.trim()) { - return false; +export interface UnitOfWorkResult { + success: boolean; + results: OperationResult[]; + operations: number; + completed: number; + failed: number; + rollbackPerformed: boolean; +} + +interface NonceState { + current: string; + locked: boolean; + lastUsedAt: number; +} + +class NonceManager { + private nonces = new Map(); + private maxRetries = 3; + private retryDelayMs = 1000; + + async acquire(address: string): Promise { + const state = this.nonces.get(address) || { current: '0', locked: false, lastUsedAt: 0 }; + + if (state.locked) { + throw new UnitOfWorkError( + `Nonce conflict for ${address}: already in use`, + 'acquire-nonce', + ); + } + + try { + const account = await server.loadAccount(address); + state.current = account.sequence; + state.locked = true; + state.lastUsedAt = Date.now(); + this.nonces.set(address, state); + return state.current; + } catch (error) { + throw new UnitOfWorkError( + `Failed to acquire nonce for ${address}`, + 'acquire-nonce', + error instanceof Error ? error : undefined, + ); + } + } + + release(address: string): void { + const state = this.nonces.get(address); + if (state) { + state.locked = false; + state.lastUsedAt = Date.now(); + } + } + + increment(address: string): void { + const state = this.nonces.get(address); + if (state) { + const seqNum = BigInt(state.current); + state.current = (seqNum + 1n).toString(); + } } + async resolveConflict(address: string): Promise { + for (let attempt = 1; attempt <= this.maxRetries; attempt++) { + try { + const account = await server.loadAccount(address); + const state = this.nonces.get(address); + if (state) { + state.current = account.sequence; + state.locked = true; + } + return account.sequence; + } catch { + if (attempt < this.maxRetries) { + await new Promise(r => setTimeout(r, this.retryDelayMs * attempt)); + } + } + } + throw new UnitOfWorkError( + `Failed to resolve nonce conflict for ${address} after ${this.maxRetries} retries`, + 'resolve-nonce-conflict', + ); + } + + getState(address: string): NonceState | undefined { + return this.nonces.get(address); + } + + cleanup(olderThanMs = 300_000): void { + const cutoff = Date.now() - olderThanMs; + for (const [address, state] of this.nonces.entries()) { + if (!state.locked && state.lastUsedAt < cutoff) { + this.nonces.delete(address); + } + } + } +} + +class GasEstimator { + private baseFee = 100; + private surgeMultiplier = 1.0; + private maxMultiplier = 5.0; + private lastEstimate = 100; + private estimateTimestamp = 0; + private estimateTtlMs = 30_000; + + async estimateFee(operations: number): Promise<{ + recommended: number; + min: number; + max: number; + surge: boolean; + }> { + const now = Date.now(); + if (now - this.estimateTimestamp > this.estimateTtlMs) { + try { + const feeStats = await server.feeStats(); + this.baseFee = parseInt(feeStats.max_fee?.mode || '100', 10); + this.lastEstimate = this.baseFee; + this.estimateTimestamp = now; + + const ledgers = parseInt(feeStats.last_ledger, 10) || 0; + const surge = feeStats.max_fee?.mode && parseInt(feeStats.max_fee.mode, 10) > 1000; + this.surgeMultiplier = surge ? 2.0 : 1.0; + } catch { + this.baseFee = Math.max(this.baseFee, 100); + } + } + + const baseOps = this.baseFee * operations; + const recommended = Math.ceil(baseOps * this.surgeMultiplier); + const min = this.baseFee * operations; + const max = Math.ceil(this.baseFee * this.maxMultiplier * operations); + + return { + recommended, + min, + max, + surge: this.surgeMultiplier > 1.0, + }; + } + + priceBump(currentFee: number): number { + return Math.ceil(currentFee * 1.2); + } +} + +export class UnitOfWork { + private operations: Array<{ + type: string; + fn: () => Promise; + compensation?: () => Promise; + executed: boolean; + result?: string; + }> = []; + + private completed: OperationResult[] = []; + private rollbackPerformed = false; + private nonceManager: NonceManager; + private gasEstimator: GasEstimator; + private sourceAddress?: string; + private acquiredNonce = false; + + constructor() { + this.nonceManager = getNonceManager(); + this.gasEstimator = getGasEstimator(); + } + + setSourceAddress(address: string): this { + this.sourceAddress = address; + return this; + } + + addOperation( + type: string, + fn: () => Promise, + compensation?: () => Promise, + ): this { + this.operations.push({ type, fn, compensation, executed: false }); + return this; + } + + async commit(): Promise { + const results: OperationResult[] = []; + + if (this.sourceAddress && !this.acquiredNonce) { + try { + await this.nonceManager.acquire(this.sourceAddress); + this.acquiredNonce = true; + } catch (error) { + return { + success: false, + results: [{ + operation: 'acquire-nonce', + status: 'failed', + error: error instanceof Error ? error.message : 'Failed to acquire nonce', + }], + operations: this.operations.length + 1, + completed: 0, + failed: 1, + rollbackPerformed: false, + }; + } + } + + for (const op of this.operations) { + try { + const txHash = await op.fn(); + op.executed = true; + op.result = txHash; + + const result: OperationResult = { + operation: op.type, + status: 'success', + txHash, + }; + results.push(result); + this.completed.push(result); + + if (this.sourceAddress) { + this.nonceManager.increment(this.sourceAddress); + } + } catch (error) { + const result: OperationResult = { + operation: op.type, + status: 'failed', + error: error instanceof Error ? error.message : 'Unknown error', + }; + results.push(result); + this.completed.push(result); + + await this.rollback(); + this.rollbackPerformed = true; + + if (this.sourceAddress) { + this.nonceManager.release(this.sourceAddress); + this.acquiredNonce = false; + } + + return { + success: false, + results, + operations: this.operations.length, + completed: this.completed.filter(r => r.status === 'success').length, + failed: this.completed.filter(r => r.status === 'failed').length, + rollbackPerformed: true, + }; + } + } + + if (this.sourceAddress) { + this.nonceManager.release(this.sourceAddress); + this.acquiredNonce = false; + } + + return { + success: true, + results, + operations: this.operations.length, + completed: results.length, + failed: 0, + rollbackPerformed: false, + }; + } + + private async rollback(): Promise { + const executedOps = this.operations.filter(op => op.executed && op.compensation); + + for (const op of executedOps.reverse()) { + if (op.compensation) { + try { + await op.compensation(); + } catch (error) { + console.error(`[UnitOfWork] Rollback error for ${op.type}:`, error); + } + } + } + } +} + +const nonceManager = new NonceManager(); +const gasEstimator = new GasEstimator(); + +export function getNonceManager(): NonceManager { + return nonceManager; +} + +export function getGasEstimator(): GasEstimator { + return gasEstimator; +} + +export function isValidStellarAddress(address: string) { + if (!address?.trim()) return false; + return StellarSdk.StrKey.isValidEd25519PublicKey(address); +} + +export function isValidTransactionHash(hash: string) { + if (!hash?.trim()) return false; return /^[A-Fa-f0-9]{64}$/.test(hash); } @@ -58,27 +366,47 @@ function assertValidTransactionHash(hash: string) { export async function getAccountInfo(address: string) { assertValidStellarAddress(address); - const account = await server.loadAccount(address); - return { - address: account.accountId(), - balances: account.balances.map((b) => ({ - type: b.asset_type, - balance: b.balance, - })), - sequence: account.sequence, - }; + return withQueryProfiling( + `getAccountInfo(${address})`, + 'stellar.service', + async () => { + const account = await server.loadAccount(address); + return { + address: account.accountId(), + balances: account.balances.map((b) => ({ + type: b.asset_type, + balance: b.balance, + })), + sequence: account.sequence, + }; + }, + ); } export async function getTransactionStatus(hash: string) { assertValidTransactionHash(hash); - const tx = await server.transactions().transaction(hash).call(); - return { - hash: tx.hash, - successful: tx.successful, - ledger: tx.ledger_attr, - createdAt: tx.created_at, - memo: tx.memo, - operationCount: tx.operation_count, - }; + return withQueryProfiling( + `getTransactionStatus(${hash})`, + 'stellar.service', + async () => { + const tx = await server.transactions().transaction(hash).call(); + return { + hash: tx.hash, + successful: tx.successful, + ledger: tx.ledger_attr, + createdAt: tx.created_at, + memo: tx.memo, + operationCount: tx.operation_count, + }; + }, + ); +} + +export async function estimateFee(operations = 1) { + return gasEstimator.estimateFee(operations); +} + +export function createUnitOfWork(): UnitOfWork { + return new UnitOfWork(); } diff --git a/backend/src/services/verification.ts b/backend/src/services/verification.ts index 45041d9b..acb1f4d5 100644 --- a/backend/src/services/verification.ts +++ b/backend/src/services/verification.ts @@ -37,6 +37,8 @@ export type VerificationUpdate = { details?: string[]; }; +import { withQueryProfiling } from '../config/database.js'; + // In-memory store (replace with DB in production) const verifications = new Map(); @@ -83,7 +85,11 @@ export function storeVerification(result: VerificationResult): void { } export async function getVerification(id: string): Promise { - return verifications.get(id); + return withQueryProfiling( + 'SELECT * FROM verifications WHERE id = ?', + 'verification.service', + async () => verifications.get(id), + ); } export function updateVerification(update: VerificationUpdate): VerificationResult | undefined { diff --git a/frontend/components/PWAWrapper.tsx b/frontend/components/PWAWrapper.tsx index 5f8d0eb2..8b53fa95 100644 --- a/frontend/components/PWAWrapper.tsx +++ b/frontend/components/PWAWrapper.tsx @@ -1,16 +1,52 @@ - "use client"; +import { useEffect, useState } from "react"; import PWAInstallPrompt from "./PWAInstallPrompt"; -import {SWRegister} from "./SWRegister"; +import { SWRegister, useServiceWorker } from "./SWRegister"; import { OfflineBanner } from "./offline/OfflineBanner"; +function SWUpdateNotification() { + const { updateEvent, applyUpdate } = useServiceWorker(); + const [visible, setVisible] = useState(false); + + useEffect(() => { + if (updateEvent) { + setVisible(true); + } + }, [updateEvent]); + + if (!visible) return null; + + return ( +
+ A new version is available. + + +
+ ); +} + export default function PWAWrapper() { return ( <> + ); } diff --git a/frontend/components/SWRegister.tsx b/frontend/components/SWRegister.tsx index d1fc4c48..45d5d135 100644 --- a/frontend/components/SWRegister.tsx +++ b/frontend/components/SWRegister.tsx @@ -1,29 +1,72 @@ -import { useEffect } from "react"; +"use client"; + +import { useEffect, useState, useCallback } from "react"; + +interface SWUpdateEvent { + registration: ServiceWorkerRegistration; + onUpdate: () => void; +} + +export function useServiceWorker() { + const [updateEvent, setUpdateEvent] = useState(null); + const [isRegistered, setIsRegistered] = useState(false); + const [registration, setRegistration] = useState(null); -export function SWRegister() { useEffect(() => { - if (!("serviceWorker" in navigator)) { - return; - } + if (!("serviceWorker" in navigator)) return; + + const register = async () => { + try { + const reg = await navigator.serviceWorker.register("/service-worker.js"); + setRegistration(reg); + setIsRegistered(true); - const registerServiceWorker = () => { - navigator.serviceWorker - .register("/service-worker.js") - .then((reg) => console.log("SW registered:", reg)) - .catch((err) => console.log("SW registration failed:", err)); + reg.addEventListener("updatefound", () => { + const newWorker = reg.installing; + if (!newWorker) return; + + newWorker.addEventListener("statechange", () => { + if (newWorker.state === "installed" && navigator.serviceWorker.controller) { + setUpdateEvent({ + registration: reg, + onUpdate: () => { + newWorker.postMessage({ type: "SKIP_WAITING" }); + }, + }); + } + }); + }); + } catch (err) { + console.log("SW registration failed:", err); + } }; if (document.readyState === "complete") { - registerServiceWorker(); - return; + register(); + } else { + window.addEventListener("load", register); + return () => window.removeEventListener("load", register); } + }, []); - window.addEventListener("load", registerServiceWorker); + const applyUpdate = useCallback(() => { + if (!updateEvent) return; + updateEvent.onUpdate(); + setUpdateEvent(null); + window.location.reload(); + }, [updateEvent]); - return () => { - window.removeEventListener("load", registerServiceWorker); - }; - }, []); + return { isRegistered, registration, updateEvent, applyUpdate }; +} + +export function SWRegister() { + const { isRegistered } = useServiceWorker(); + + useEffect(() => { + if (isRegistered) { + console.log("SW registered successfully"); + } + }, [isRegistered]); return null; } diff --git a/frontend/lib/cache/index.ts b/frontend/lib/cache/index.ts new file mode 100644 index 00000000..7114f32d --- /dev/null +++ b/frontend/lib/cache/index.ts @@ -0,0 +1,283 @@ +const DB_NAME = 'agenticpay_cache'; +const DB_VERSION = 1; +const CACHE_STORE = 'response_cache'; +const META_STORE = 'cache_meta'; + +interface CacheMeta { + key: string; + expiresAt: number; + createdAt: number; + size: number; + etag?: string; +} + +function openDB(): Promise { + return new Promise((resolve, reject) => { + const request = indexedDB.open(DB_NAME, DB_VERSION); + request.onerror = () => reject(request.error); + request.onsuccess = () => resolve(request.result); + request.onupgradeneeded = () => { + const db = request.result; + if (!db.objectStoreNames.contains(CACHE_STORE)) { + db.createObjectStore(CACHE_STORE); + } + if (!db.objectStoreNames.contains(META_STORE)) { + const metaStore = db.createObjectStore(META_STORE, { keyPath: 'key' }); + metaStore.createIndex('expiresAt', 'expiresAt', { unique: false }); + } + }; + }); +} + +async function getFromDB(key: string): Promise<{ value: T; meta: CacheMeta } | null> { + const db = await openDB(); + const tx = db.transaction([CACHE_STORE, META_STORE], 'readonly'); + const cacheStore = tx.objectStore(CACHE_STORE); + const metaStore = tx.objectStore(META_STORE); + + const [valueResult, metaResult] = await Promise.all([ + new Promise((resolve, reject) => { + const r = cacheStore.get(key); + r.onerror = () => reject(r.error); + r.onsuccess = () => resolve(r.result); + }), + new Promise((resolve, reject) => { + const r = metaStore.get(key); + r.onerror = () => reject(r.error); + r.onsuccess = () => resolve(r.result); + }), + ]); + + if (!valueResult || !metaResult) return null; + return { value: valueResult as T, meta: metaResult as CacheMeta }; +} + +async function setInDB(key: string, value: T, ttlMs: number, etag?: string): Promise { + const db = await openDB(); + const tx = db.transaction([CACHE_STORE, META_STORE], 'readwrite'); + const cacheStore = tx.objectStore(CACHE_STORE); + const metaStore = tx.objectStore(META_STORE); + + const bodyStr = JSON.stringify(value); + const meta: CacheMeta = { + key, + expiresAt: Date.now() + ttlMs, + createdAt: Date.now(), + size: bodyStr.length, + etag, + }; + + cacheStore.put(value, key); + metaStore.put(meta); +} + +async function deleteFromDB(key: string): Promise { + const db = await openDB(); + const tx = db.transaction([CACHE_STORE, META_STORE], 'readwrite'); + tx.objectStore(CACHE_STORE).delete(key); + tx.objectStore(META_STORE).delete(key); +} + +async function evictExpired(): Promise { + const db = await openDB(); + const tx = db.transaction(META_STORE, 'readonly'); + const index = tx.objectStore(META_STORE).index('expiresAt'); + const range = IDBKeyRange.upperBound(Date.now()); + const expiredKeys: string[] = []; + + return new Promise((resolve, reject) => { + const cursor = index.openCursor(range); + cursor.onerror = () => reject(cursor.error); + cursor.onsuccess = () => { + if (cursor.result) { + expiredKeys.push(cursor.result.value.key); + cursor.result.continue(); + } else { + Promise.all(expiredKeys.map(k => deleteFromDB(k))) + .then(() => resolve(expiredKeys.length)) + .catch(reject); + } + }; + }); +} + +export type CacheStrategy = 'cache-first' | 'network-first' | 'stale-while-revalidate' | 'network-only'; + +export interface CacheConfig { + strategy: CacheStrategy; + ttl: number; + etag?: boolean; +} + +export const DEFAULT_CACHE_CONFIG: CacheConfig = { + strategy: 'stale-while-revalidate', + ttl: 5 * 60 * 1000, + etag: true, +}; + +export async function cacheFetch( + url: string, + options: RequestInit = {}, + config: Partial = {}, +): Promise { + const cfg = { ...DEFAULT_CACHE_CONFIG, ...config }; + const cacheKey = `fetch:${options.method || 'GET'}:${url}`; + + switch (cfg.strategy) { + case 'network-only': + return networkOnlyFetch(url, options); + + case 'cache-first': + return cacheFirstFetch(url, options, cacheKey, cfg); + + case 'network-first': + return networkFirstFetch(url, options, cacheKey, cfg); + + case 'stale-while-revalidate': + default: + return staleWhileRevalidateFetch(url, options, cacheKey, cfg); + } +} + +async function networkOnlyFetch(url: string, options: RequestInit): Promise { + const response = await fetch(url, options); + if (!response.ok) throw new Error(`HTTP ${response.status}: ${response.statusText}`); + return response.json(); +} + +async function cacheFirstFetch( + url: string, + options: RequestInit, + cacheKey: string, + cfg: CacheConfig, +): Promise { + const cached = await getFromDB(cacheKey); + if (cached && cached.meta.expiresAt > Date.now()) { + return cached.value; + } + + const response = await fetch(url, options); + if (!response.ok) { + if (cached) return cached.value; + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + const etag = response.headers.get('ETag') || undefined; + await setInDB(cacheKey, data, cfg.ttl, etag); + return data; +} + +async function networkFirstFetch( + url: string, + options: RequestInit, + cacheKey: string, + cfg: CacheConfig, +): Promise { + try { + const response = await fetch(url, options); + if (response.ok) { + const data = await response.json(); + const etag = response.headers.get('ETag') || undefined; + await setInDB(cacheKey, data, cfg.ttl, etag); + return data; + } + throw new Error(`HTTP ${response.status}`); + } catch { + const cached = await getFromDB(cacheKey); + if (cached) return cached.value; + throw new Error('Network request failed and no cache available'); + } +} + +async function staleWhileRevalidateFetch( + url: string, + options: RequestInit, + cacheKey: string, + cfg: CacheConfig, +): Promise { + const cached = await getFromDB(cacheKey); + const isFresh = cached && cached.meta.expiresAt > Date.now(); + + if (cached) { + if (isFresh) return cached.value; + + const revalidatePromise = (async () => { + try { + const headers: Record = {}; + if (cfg.etag && cached.meta.etag) headers['If-None-Match'] = cached.meta.etag; + + const response = await fetch(url, { ...options, headers }); + if (response.status === 304) { + await setInDB(cacheKey, cached.value, cfg.ttl, cached.meta.etag); + return; + } + if (response.ok) { + const data = await response.json(); + const etag = response.headers.get('ETag') || undefined; + await setInDB(cacheKey, data, cfg.ttl, etag); + } + } catch {} + })(); + + return cached.value; + } + + const response = await fetch(url, options); + if (!response.ok) throw new Error(`HTTP ${response.status}: ${response.statusText}`); + + const data = await response.json(); + const etag = response.headers.get('ETag') || undefined; + await setInDB(cacheKey, data, cfg.ttl, etag); + return data; +} + +export async function invalidateCache(pattern?: string): Promise { + const db = await openDB(); + const tx = db.transaction(META_STORE, 'readonly'); + const allKeys: string[] = []; + + return new Promise((resolve, reject) => { + const cursor = tx.objectStore(META_STORE).openCursor(); + cursor.onerror = () => reject(cursor.error); + cursor.onsuccess = () => { + if (cursor.result) { + const key = cursor.result.value.key; + if (!pattern || key.includes(pattern)) { + allKeys.push(key); + } + cursor.result.continue(); + } else { + Promise.all(allKeys.map(k => deleteFromDB(k))) + .then(() => resolve()) + .catch(reject); + } + }; + }); +} + +export async function getCacheSize(): Promise<{ entries: number; totalSize: number }> { + const db = await openDB(); + const tx = db.transaction(META_STORE, 'readonly'); + const all: CacheMeta[] = []; + + return new Promise((resolve, reject) => { + const cursor = tx.objectStore(META_STORE).openCursor(); + cursor.onerror = () => reject(cursor.error); + cursor.onsuccess = () => { + if (cursor.result) { + all.push(cursor.result.value); + cursor.result.continue(); + } else { + resolve({ + entries: all.length, + totalSize: all.reduce((sum, m) => sum + m.size, 0), + }); + } + }; + }); +} + +if (typeof window !== 'undefined' && 'indexedDB' in window) { + setInterval(() => { evictExpired().catch(() => {}); }, 60_000); +} diff --git a/frontend/package.json b/frontend/package.json index 2394ddbe..08849718 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -17,6 +17,12 @@ "test:e2e:report": "playwright show-report" }, "dependencies": { + "workbox-precaching": "^7.3.0", + "workbox-routing": "^7.3.0", + "workbox-strategies": "^7.3.0", + "workbox-background-sync": "^7.3.0", + "workbox-expiration": "^7.3.0", + "workbox-cacheable-response": "^7.3.0", "@hookform/resolvers": "^5.2.2", "@next/bundle-analyzer": "^14.2.0", "@radix-ui/react-avatar": "^1.1.11", diff --git a/frontend/service-worker.ts b/frontend/service-worker.ts index 1afbd4f4..6667fd68 100644 --- a/frontend/service-worker.ts +++ b/frontend/service-worker.ts @@ -1,10 +1,11 @@ /// -const SHELL_CACHE = 'agenticpay_shell_v1'; -const RUNTIME_CACHE = 'agenticpay_runtime_v1'; -const OFFLINE_QUEUE_NAME = 'offline_payment_queue'; +const CACHE_PREFIX = 'agenticpay'; +const SW_VERSION = 'v2'; +const PRECACHE_KEY = `${CACHE_PREFIX}-precache-${SW_VERSION}`; +const RUNTIME_CACHE_KEY = `${CACHE_PREFIX}-runtime-${SW_VERSION}`; -const APP_SHELL_URLS = [ +const PRECACHE_URLS = [ '/', '/auth', '/dashboard', @@ -41,16 +42,11 @@ interface PaymentRequest { error?: string; } -interface SyncStatus { - isOnline: boolean; - lastSyncAt?: number; - pendingCount: number; - failedCount: number; -} +const OFFLINE_QUEUE_NAME = 'offline_payment_queue'; async function openDB(): Promise { return new Promise((resolve, reject) => { - const request = indexedDB.open('agenticpay_offline', 1); + const request = indexedDB.open('agenticpay_offline', 2); request.onerror = () => reject(request.error); request.onsuccess = () => resolve(request.result); request.onupgradeneeded = () => { @@ -109,7 +105,7 @@ async function removeFromQueue(id: string): Promise { async function syncPayments(): Promise<{ success: number; failed: number }> { const queue = await getPaymentQueue(); const pending = queue.filter(p => p.status === 'pending' || p.status === 'failed'); - + let success = 0; let failed = 0; @@ -152,80 +148,119 @@ async function syncPayments(): Promise<{ success: number; failed: number }> { return { success, failed }; } -function cacheFirst(request: Request): Promise { - return caches.match(request).then(cached => { - if (cached) return cached; - return fetch(request).then(response => { - if (response.ok) { - caches.open(RUNTIME_CACHE).then(cache => cache.put(request, response.clone())); - } - return response; - }); - }); -} - -function networkFirst(request: Request): Promise { - return fetch(request) - .then(response => { - if (response.ok) { - caches.open(RUNTIME_CACHE).then(cache => cache.put(request, response.clone())); - } - return response; - }) - .catch(() => caches.match(request).then(cached => cached || new Response('Offline', { status: 503 }))); -} - self.addEventListener('install', (event: ExtendableEvent) => { event.waitUntil( - caches.open(SHELL_CACHE).then(cache => cache.addAll(APP_SHELL_URLS)) + (async () => { + const cache = await caches.open(PRECACHE_KEY); + await cache.addAll(PRECACHE_URLS); + })(), ); self.skipWaiting(); }); self.addEventListener('activate', (event: ExtendableEvent) => { event.waitUntil( - caches.keys().then(keys => - Promise.all( - keys - .filter(key => key !== SHELL_CACHE && key !== RUNTIME_CACHE) - .map(key => caches.delete(key)) - ) - ) + (async () => { + const cacheKeys = await caches.keys(); + await Promise.all( + cacheKeys + .filter(key => key !== PRECACHE_KEY && key !== RUNTIME_CACHE_KEY) + .map(key => caches.delete(key)), + ); + await self.clients.claim(); + })(), ); - self.clients.claim(); }); +async function cacheFirst(request: Request): Promise { + const cached = await caches.match(request); + if (cached) return cached; + + try { + const response = await fetch(request); + if (response.ok) { + const cache = await caches.open(RUNTIME_CACHE_KEY); + await cache.put(request, response.clone()); + } + return response; + } catch { + return new Response('Offline', { status: 503 }); + } +} + +async function networkFirst(request: Request): Promise { + try { + const response = await fetch(request); + if (response.ok) { + const cache = await caches.open(RUNTIME_CACHE_KEY); + await cache.put(request, response.clone()); + } + return response; + } catch { + const cached = await caches.match(request); + if (cached) return cached; + return new Response('Offline', { status: 503 }); + } +} + +async function staleWhileRevalidate(request: Request): Promise { + const cache = await caches.open(RUNTIME_CACHE_KEY); + const cached = await cache.match(request); + + const fetchPromise = fetch(request).then(response => { + if (response.ok) { + cache.put(request, response.clone()); + } + return response; + }); + + return cached || fetchPromise; +} + +async function networkOnly(request: Request): Promise { + return fetch(request); +} + self.addEventListener('fetch', (event: FetchEvent) => { const { request } = event; const url = new URL(request.url); if (url.origin !== self.location.origin) return; + const pathname = url.pathname; + if (request.method !== 'GET') { if (navigator.onLine) { - event.respondWith(networkFirst(request)); + event.respondWith(networkOnly(request)); } else { event.respondWith( new Response(JSON.stringify({ error: 'offline', queued: true }), { status: 202, headers: { 'Content-Type': 'application/json' }, - }) + }), ); } return; } - if (request.mode === 'navigate') { + const requestDestination = request.destination; + + if (requestDestination === 'document' || request.mode === 'navigate') { event.respondWith(networkFirst(request)); return; } - if ( - ['script', 'style', 'image', 'font'].includes(request.destination) || - APP_SHELL_URLS.includes(url.pathname) - ) { + if (['script', 'style', 'image', 'font'].includes(requestDestination)) { event.respondWith(cacheFirst(request)); + return; } + + if (pathname.startsWith('/api/')) { + event.respondWith(staleWhileRevalidate(request)); + return; + } + + event.respondWith(networkFirst(request)); }); self.addEventListener('sync', ((event: Event) => { @@ -260,7 +295,7 @@ self.addEventListener('message', (event: ExtendableMessageEvent) => { if (registration.sync) { registration.sync.register('sync-payments'); } - }) + }), ); } @@ -275,6 +310,10 @@ self.addEventListener('message', (event: ExtendableMessageEvent) => { if (type === 'SYNC_NOW') { event.waitUntil(syncPayments()); } + + if (type === 'SKIP_WAITING') { + self.skipWaiting(); + } }); export default null;