From d02a0a451bc4cee29062ccade9cddee11307468b Mon Sep 17 00:00:00 2001 From: collinsadi Date: Wed, 27 May 2026 11:33:23 +0000 Subject: [PATCH 1/5] Add WebSocket channels and session ordering --- backend/src/websocket/managedConnection.ts | 65 +++++++++++++++- backend/src/websocket/scaling.ts | 41 ++++++++++ backend/src/websocket/server.ts | 90 ++++++++++++++++++++-- backend/src/websocket/types.ts | 18 ++++- 4 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 backend/src/websocket/scaling.ts diff --git a/backend/src/websocket/managedConnection.ts b/backend/src/websocket/managedConnection.ts index 19d890da..63ffc983 100644 --- a/backend/src/websocket/managedConnection.ts +++ b/backend/src/websocket/managedConnection.ts @@ -1,9 +1,16 @@ import type WebSocket from 'ws'; -import type { WebSocketOutboundMessage, WebSocketServerMetrics } from './types.js'; +import { randomUUID } from 'node:crypto'; +import type { + WebSocketChannel, + WebSocketOutboundMessage, + WebSocketServerMetrics, + WebSocketWireMessage, +} from './types.js'; type QueueItem = { json: string; priority: 'high' | 'normal' }; export class ManagedConnection { + readonly sessionId = randomUUID(); private readonly ws: WebSocket; private readonly metrics: WebSocketServerMetrics; private readonly maxQueueSize: number; @@ -11,6 +18,9 @@ export class ManagedConnection { private readonly maxBatchSize: number; private readonly queueHigh: QueueItem[] = []; private readonly queueNormal: QueueItem[] = []; + private readonly channels = new Set(); + private sequence = 0; + private authExpiresAtMs: number | undefined; constructor(params: { ws: WebSocket; @@ -18,17 +28,36 @@ export class ManagedConnection { maxQueueSize: number; maxBufferedAmountBytes: number; maxBatchSize: number; + defaultChannels: WebSocketChannel[]; + authExpiresAtMs?: number; }) { this.ws = params.ws; this.metrics = params.metrics; this.maxQueueSize = params.maxQueueSize; this.maxBufferedAmountBytes = params.maxBufferedAmountBytes; this.maxBatchSize = params.maxBatchSize; + this.authExpiresAtMs = params.authExpiresAtMs; + + for (const channel of params.defaultChannels) { + this.subscribe(channel); + } } enqueue(message: WebSocketOutboundMessage): { accepted: boolean; reason?: string } { + if (message.channel && !this.channels.has(message.channel)) { + return { accepted: false, reason: 'NOT_SUBSCRIBED' }; + } + const priority = message.priority === 'high' ? 'high' : 'normal'; - const json = JSON.stringify({ ...message, priority: undefined }); + const wireMessage: WebSocketWireMessage = { + type: message.type, + channel: message.channel, + payload: message.payload, + sessionId: this.sessionId, + sequence: ++this.sequence, + emittedAt: new Date().toISOString(), + }; + const json = JSON.stringify(wireMessage); const totalSize = this.queueHigh.length + this.queueNormal.length; if (totalSize >= this.maxQueueSize) { @@ -47,6 +76,37 @@ export class ManagedConnection { return { accepted: true }; } + subscribe(channel: WebSocketChannel): void { + if (this.channels.has(channel)) return; + this.channels.add(channel); + this.metrics.subscribedChannels[channel] = (this.metrics.subscribedChannels[channel] ?? 0) + 1; + } + + unsubscribe(channel: WebSocketChannel): void { + if (!this.channels.delete(channel)) return; + const next = Math.max(0, (this.metrics.subscribedChannels[channel] ?? 0) - 1); + if (next === 0) delete this.metrics.subscribedChannels[channel]; + else this.metrics.subscribedChannels[channel] = next; + } + + hasChannel(channel: WebSocketChannel): boolean { + return this.channels.has(channel); + } + + refreshAuth(expiresAtMs?: number): void { + this.authExpiresAtMs = expiresAtMs; + } + + isAuthExpired(now = Date.now()): boolean { + return this.authExpiresAtMs !== undefined && now >= this.authExpiresAtMs; + } + + close(): void { + for (const channel of [...this.channels]) { + this.unsubscribe(channel); + } + } + flush(): void { if (this.ws.readyState !== this.ws.OPEN) return; if (this.ws.bufferedAmount > this.maxBufferedAmountBytes) return; @@ -70,4 +130,3 @@ export class ManagedConnection { return this.queueHigh.length + this.queueNormal.length; } } - diff --git a/backend/src/websocket/scaling.ts b/backend/src/websocket/scaling.ts new file mode 100644 index 00000000..47b641c5 --- /dev/null +++ b/backend/src/websocket/scaling.ts @@ -0,0 +1,41 @@ +import type { WebSocketOutboundMessage } from './types.js'; + +export type WebSocketScalingAdapter = { + publish(message: WebSocketOutboundMessage): Promise | void; + subscribe(handler: (message: WebSocketOutboundMessage) => void): Promise<() => void> | (() => void); +}; + +type RedisLikePublisher = { + publish(channel: string, message: string): Promise | unknown; +}; + +type RedisLikeSubscriber = { + subscribe(channel: string, handler: (message: string) => void): Promise | unknown; + unsubscribe(channel: string): Promise | unknown; +}; + +export class RedisWebSocketScalingAdapter implements WebSocketScalingAdapter { + constructor( + private readonly publisher: RedisLikePublisher, + private readonly subscriber: RedisLikeSubscriber, + private readonly channel = 'agenticpay:websocket:broadcast' + ) {} + + publish(message: WebSocketOutboundMessage): Promise | unknown { + return this.publisher.publish(this.channel, JSON.stringify(message)); + } + + async subscribe(handler: (message: WebSocketOutboundMessage) => void): Promise<() => void> { + await this.subscriber.subscribe(this.channel, (raw) => { + try { + handler(JSON.parse(raw) as WebSocketOutboundMessage); + } catch { + // Ignore malformed cross-node messages. + } + }); + + return () => { + void this.subscriber.unsubscribe(this.channel); + }; + } +} diff --git a/backend/src/websocket/server.ts b/backend/src/websocket/server.ts index 4cc7720f..5d62a151 100644 --- a/backend/src/websocket/server.ts +++ b/backend/src/websocket/server.ts @@ -2,12 +2,20 @@ import type http from 'node:http'; import { WebSocketServer } from 'ws'; import type WebSocket from 'ws'; import { ManagedConnection } from './managedConnection.js'; -import type { WebSocketOutboundMessage, WebSocketServerMetrics, WebSocketServerOptions } from './types.js'; +import type { + WebSocketChannel, + WebSocketClientMessage, + WebSocketOutboundMessage, + WebSocketServerMetrics, + WebSocketServerOptions, +} from './types.js'; +import type { WebSocketScalingAdapter } from './scaling.js'; export type AgenticPayWebSocketServer = { wss: WebSocketServer; metrics: WebSocketServerMetrics; broadcast: (message: WebSocketOutboundMessage) => void; + broadcastToChannel: (channel: WebSocketChannel, message: Omit) => void; close: () => Promise; }; @@ -20,12 +28,32 @@ function createMetrics(): WebSocketServerMetrics { enqueuedMessages: 0, droppedMessages: 0, sentMessages: 0, + subscribedChannels: {}, }; } +function parseAuthExpiry(value: string | null, maxAuthAgeMs: number): number { + if (!value) return Date.now() + maxAuthAgeMs; + const parsed = Number(value); + if (Number.isFinite(parsed)) return parsed < 10_000_000_000 ? parsed * 1000 : parsed; + const ms = Date.parse(value); + return Number.isFinite(ms) ? ms : Date.now() + maxAuthAgeMs; +} + +function parseClientMessage(raw: WebSocket.RawData): WebSocketClientMessage | null { + try { + const value = JSON.parse(raw.toString()) as WebSocketClientMessage; + if (value && typeof value.type === 'string') return value; + } catch { + return null; + } + return null; +} + export function attachWebSocketServer(params: { server: http.Server; options?: Partial; + scaling?: WebSocketScalingAdapter; }): AgenticPayWebSocketServer { const options: WebSocketServerOptions = { path: '/ws', @@ -36,6 +64,8 @@ export function attachWebSocketServer(params: { maxBatchSize: 50, pingIntervalMs: 30_000, pongTimeoutMs: 10_000, + defaultChannels: ['payment.events', 'dispute.updates', 'analytics.updates'], + maxAuthAgeMs: 60 * 60 * 1000, ...params.options, }; @@ -43,6 +73,7 @@ export function attachWebSocketServer(params: { const wss = new WebSocketServer({ noServer: true }); const connections = new Map(); const lastPongAt = new Map(); + let unsubscribeScaling: (() => void) | undefined; params.server.on('upgrade', (req, socket, head) => { try { @@ -65,9 +96,10 @@ export function attachWebSocketServer(params: { } }); - wss.on('connection', (ws: WebSocket) => { + wss.on('connection', (ws: WebSocket, req) => { metrics.activeConnections += 1; metrics.acceptedConnections += 1; + const url = new URL(req.url || '', `http://${req.headers.host || 'localhost'}`); const managed = new ManagedConnection({ ws, @@ -75,6 +107,8 @@ export function attachWebSocketServer(params: { maxQueueSize: options.maxQueueSizePerConnection, maxBufferedAmountBytes: options.maxBufferedAmountBytes, maxBatchSize: options.maxBatchSize, + defaultChannels: options.defaultChannels, + authExpiresAtMs: parseAuthExpiry(url.searchParams.get('expiresAt'), options.maxAuthAgeMs), }); connections.set(ws, managed); @@ -82,12 +116,26 @@ export function attachWebSocketServer(params: { ws.on('pong', () => lastPongAt.set(ws, Date.now())); - ws.on('message', () => { - // Reserved for future client->server messages (e.g. subscriptions / acks). - // Intentionally a no-op to avoid unbounded per-message CPU for now. + ws.on('message', (raw) => { + const message = parseClientMessage(raw); + if (!message) return; + + if (message.type === 'subscribe') { + for (const channel of message.channels.slice(0, 25)) managed.subscribe(channel); + managed.enqueue({ type: 'subscription.updated', payload: { channels: message.channels }, priority: 'high' }); + } else if (message.type === 'unsubscribe') { + for (const channel of message.channels) managed.unsubscribe(channel); + managed.enqueue({ type: 'subscription.updated', payload: { channels: message.channels }, priority: 'high' }); + } else if (message.type === 'auth.refresh') { + managed.refreshAuth(parseAuthExpiry(message.expiresAt ?? null, options.maxAuthAgeMs)); + managed.enqueue({ type: 'auth.refreshed', priority: 'high' }); + } else if (message.type === 'ping') { + managed.enqueue({ type: 'pong', priority: 'high' }); + } }); ws.on('close', () => { + managed.close(); connections.delete(ws); lastPongAt.delete(ws); metrics.activeConnections = Math.max(0, metrics.activeConnections - 1); @@ -110,22 +158,48 @@ export function attachWebSocketServer(params: { ws.terminate(); continue; } + const managed = connections.get(ws); + if (managed?.isAuthExpired(now)) { + managed.enqueue({ type: 'auth.expired', priority: 'high' }); + ws.close(4001, 'Auth token expired'); + continue; + } ws.ping(); } }, options.pingIntervalMs); - const broadcast = (message: WebSocketOutboundMessage) => { + const broadcastLocal = (message: WebSocketOutboundMessage) => { for (const managed of connections.values()) { managed.enqueue(message); } }; + const broadcast = (message: WebSocketOutboundMessage) => { + broadcastLocal(message); + void params.scaling?.publish(message); + }; + + const broadcastToChannel = ( + channel: WebSocketChannel, + message: Omit + ) => broadcast({ ...message, channel }); + + if (params.scaling) { + Promise.resolve(params.scaling.subscribe((message) => broadcastLocal(message))) + .then((unsubscribe) => { + unsubscribeScaling = unsubscribe; + }) + .catch(() => { + metrics.lastOverloadAtMs = Date.now(); + }); + } + const close = async () => { clearInterval(flushTimer); clearInterval(pingTimer); + unsubscribeScaling?.(); await new Promise((resolve) => wss.close(() => resolve())); }; - return { wss, metrics, broadcast, close }; + return { wss, metrics, broadcast, broadcastToChannel, close }; } - diff --git a/backend/src/websocket/types.ts b/backend/src/websocket/types.ts index 3f8a961c..2c0cb440 100644 --- a/backend/src/websocket/types.ts +++ b/backend/src/websocket/types.ts @@ -1,11 +1,25 @@ export type WebSocketEventPriority = 'high' | 'normal'; +export type WebSocketChannel = 'payment.events' | 'dispute.updates' | 'analytics.updates' | string; export type WebSocketOutboundMessage = { type: string; + channel?: WebSocketChannel; payload?: unknown; priority?: WebSocketEventPriority; }; +export type WebSocketClientMessage = + | { type: 'subscribe'; channels: WebSocketChannel[] } + | { type: 'unsubscribe'; channels: WebSocketChannel[] } + | { type: 'auth.refresh'; expiresAt?: string } + | { type: 'ping' }; + +export type WebSocketWireMessage = Omit & { + sequence: number; + sessionId: string; + emittedAt: string; +}; + export type WebSocketServerMetrics = { activeConnections: number; acceptedConnections: number; @@ -14,6 +28,7 @@ export type WebSocketServerMetrics = { enqueuedMessages: number; droppedMessages: number; sentMessages: number; + subscribedChannels: Record; lastOverloadAtMs?: number; }; @@ -26,5 +41,6 @@ export type WebSocketServerOptions = { maxBatchSize: number; pingIntervalMs: number; pongTimeoutMs: number; + defaultChannels: WebSocketChannel[]; + maxAuthAgeMs: number; }; - From ec62f1f88792313867a435052d33fdd155e813fd Mon Sep 17 00:00:00 2001 From: collinsadi Date: Wed, 27 May 2026 11:33:51 +0000 Subject: [PATCH 2/5] Bridge domain events to WebSocket channels --- backend/src/events/event-bus.ts | 22 ++++++++++++++++++++++ backend/src/index.ts | 4 +++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/backend/src/events/event-bus.ts b/backend/src/events/event-bus.ts index 64943203..c254e5f2 100644 --- a/backend/src/events/event-bus.ts +++ b/backend/src/events/event-bus.ts @@ -1,9 +1,21 @@ import type { DomainEventType, EventHandler, StoredEvent } from './event-types.js'; +import type { AgenticPayWebSocketServer } from '../websocket/server.js'; type WildcardHandler = (event: StoredEvent) => void | Promise; const handlers = new Map>(); let wildcardHandlers: Set = new Set(); +let websocketServer: AgenticPayWebSocketServer | undefined; + +const channelByEventPrefix: Array<{ prefix: string; channel: string }> = [ + { prefix: 'payment.', channel: 'payment.events' }, + { prefix: 'dispute.', channel: 'dispute.updates' }, + { prefix: 'project.disputed', channel: 'dispute.updates' }, +]; + +export function bindWebSocketServer(server: AgenticPayWebSocketServer): void { + websocketServer = server; +} export function subscribe(type: DomainEventType, handler: EventHandler): () => void { const set = handlers.get(type) ?? new Set(); @@ -29,9 +41,19 @@ export async function publish(event: StoredEvent): Promise { if (wildcardHandlers.size > 0) { await Promise.all(Array.from(wildcardHandlers).map((h) => h(event))); } + + const channel = channelByEventPrefix.find(({ prefix }) => event.type.startsWith(prefix))?.channel; + if (channel) { + websocketServer?.broadcastToChannel(channel, { + type: event.type, + payload: event, + priority: channel === 'dispute.updates' ? 'high' : 'normal', + }); + } } export function clearHandlers(): void { handlers.clear(); wildcardHandlers = new Set(); + websocketServer = undefined; } diff --git a/backend/src/index.ts b/backend/src/index.ts index f7a1814c..8f9a618b 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -73,6 +73,7 @@ import { disputeService } from './disputes/disputeService.js'; import http from 'node:http'; import { attachWebSocketServer } from './websocket/server.js'; import { createWebSocketRouter } from './routes/websocket.js'; +import { bindWebSocketServer } from './events/event-bus.js'; import { receiptsRouter } from './routes/receipts.js'; import { eventsRouter } from './routes/events.js'; import { threatDetectionRouter } from './routes/threat-detection.js'; @@ -358,12 +359,13 @@ setInterval(async () => { const server = http.createServer(app); const wsServer = attachWebSocketServer({ server, options: { path: '/ws' } }); +bindWebSocketServer(wsServer); app.use('/api/v1/websocket', createWebSocketRouter(wsServer)); app.use('/api/v1/analytics', createAnalyticsRouter(wsServer)); // Broadcast analytics snapshot every 30 seconds to all connected WebSocket clients const analyticsInterval = setInterval(() => { - wsServer.broadcast({ type: 'analytics:update', payload: analyticsService.snapshot() }); + wsServer.broadcastToChannel('analytics.updates', { type: 'analytics:update', payload: analyticsService.snapshot() }); }, 30_000); server.listen(config.server.port, () => { From 0a719bc444b85d896c5cbd054f57b7cc61b238dc Mon Sep 17 00:00:00 2001 From: collinsadi Date: Wed, 27 May 2026 11:34:15 +0000 Subject: [PATCH 3/5] Add reconnecting ordered WebSocket subscriptions --- frontend/lib/websocket/pool.ts | 86 +++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/frontend/lib/websocket/pool.ts b/frontend/lib/websocket/pool.ts index 547780c8..9d62328b 100644 --- a/frontend/lib/websocket/pool.ts +++ b/frontend/lib/websocket/pool.ts @@ -1,5 +1,7 @@ export type WebSocketPoolOptions = { url: string; + channels?: string[]; + authExpiresAt?: string; maxBufferedAmountBytes?: number; maxQueueSize?: number; reconnect?: { @@ -15,9 +17,19 @@ type PoolState = { connected: boolean; reconnecting: boolean; lastError?: string; + lastSequence?: number; + droppedMessages?: number; }; type OutboundItem = { data: string; priority: "high" | "normal" }; +type WireMessage = { + type: string; + channel?: string; + sequence?: number; + sessionId?: string; + emittedAt?: string; + payload?: unknown; +}; export class WebSocketPool { private readonly options: Required; @@ -30,13 +42,17 @@ export class WebSocketPool { private readonly queueHigh: OutboundItem[] = []; private readonly queueNormal: OutboundItem[] = []; + private readonly pendingBySequence = new Map(); private reconnectAttempt = 0; + private expectedSequence = 1; private reconnectTimer: number | null = null; private flushTimer: number | null = null; constructor(options: WebSocketPoolOptions) { this.options = { url: options.url, + channels: options.channels ?? ["payment.events", "dispute.updates"], + authExpiresAt: options.authExpiresAt ?? "", maxBufferedAmountBytes: options.maxBufferedAmountBytes ?? 512 * 1024, maxQueueSize: options.maxQueueSize ?? 500, reconnect: options.reconnect ?? { initialDelayMs: 250, maxDelayMs: 10_000, jitterRatio: 0.25 }, @@ -64,13 +80,16 @@ export class WebSocketPool { ws.onopen = () => { this.reconnectAttempt = 0; + this.expectedSequence = 1; + this.pendingBySequence.clear(); this.setState({ connected: true, reconnecting: false }); + this.sendControl({ type: "subscribe", channels: this.options.channels }); this.startFlushLoop(); }; ws.onmessage = (event) => { const data = typeof event.data === "string" ? event.data : ""; - for (const listener of this.messageListeners) listener(data); + this.deliverOrdered(data); }; ws.onerror = () => { @@ -97,6 +116,18 @@ export class WebSocketPool { return { accepted: true }; } + subscribe(channels: string[]): void { + this.sendControl({ type: "subscribe", channels }); + } + + unsubscribe(channels: string[]): void { + this.sendControl({ type: "unsubscribe", channels }); + } + + refreshAuth(expiresAt: string): void { + this.sendControl({ type: "auth.refresh", expiresAt }); + } + destroy(): void { this.destroyed = true; if (this.reconnectTimer) window.clearTimeout(this.reconnectTimer); @@ -129,6 +160,49 @@ export class WebSocketPool { ws.send(next.data); } + private sendControl(message: unknown): void { + const ws = this.ws; + if (!ws || ws.readyState !== WebSocket.OPEN) return; + ws.send(JSON.stringify(message)); + } + + private deliverOrdered(data: string): void { + const messages = parseWireMessages(data); + for (const message of messages) { + if (typeof message.sequence !== "number") { + this.emitMessage(JSON.stringify(message)); + continue; + } + + this.pendingBySequence.set(message.sequence, JSON.stringify(message)); + } + + let delivered = false; + while (this.pendingBySequence.has(this.expectedSequence)) { + const next = this.pendingBySequence.get(this.expectedSequence)!; + this.pendingBySequence.delete(this.expectedSequence); + this.emitMessage(next); + this.expectedSequence += 1; + delivered = true; + } + + if (!delivered && this.pendingBySequence.size > 100) { + const nextSequence = Math.min(...this.pendingBySequence.keys()); + this.expectedSequence = nextSequence; + this.setState({ + ...this.state, + droppedMessages: (this.state.droppedMessages ?? 0) + 1, + }); + this.deliverOrdered("[]"); + } + + this.setState({ ...this.state, lastSequence: this.expectedSequence - 1 }); + } + + private emitMessage(data: string): void { + for (const listener of this.messageListeners) listener(data); + } + private scheduleReconnect(): void { if (this.destroyed) return; if (this.reconnectTimer) return; @@ -153,6 +227,15 @@ export class WebSocketPool { } } +function parseWireMessages(data: string): WireMessage[] { + try { + const parsed = JSON.parse(data) as WireMessage | WireMessage[]; + return Array.isArray(parsed) ? parsed : [parsed]; + } catch { + return [{ type: "raw", payload: data }]; + } +} + const poolByUrl = new Map(); export function getWebSocketPool(options: WebSocketPoolOptions): WebSocketPool { @@ -162,4 +245,3 @@ export function getWebSocketPool(options: WebSocketPoolOptions): WebSocketPool { poolByUrl.set(options.url, created); return created; } - From bda63a9f83829d44a860645f0e39605f77f0b4a7 Mon Sep 17 00:00:00 2001 From: collinsadi Date: Wed, 27 May 2026 11:35:21 +0000 Subject: [PATCH 4/5] Add receipt proof search archive and PDF APIs --- backend/src/routes/receipts.ts | 69 ++++++++++++ backend/src/schemas/receipts.ts | 9 +- backend/src/services/receipts.ts | 182 ++++++++++++++++++++++++++++++- 3 files changed, 254 insertions(+), 6 deletions(-) diff --git a/backend/src/routes/receipts.ts b/backend/src/routes/receipts.ts index 3841e4b4..b3bb5e0c 100644 --- a/backend/src/routes/receipts.ts +++ b/backend/src/routes/receipts.ts @@ -13,8 +13,14 @@ import { getReceiptByTxHash, getReceiptsByWallet, getAllReceipts, + verifyReceiptProof, + getReceiptByMerkleRoot, + searchReceipts, + archiveReceipts, + generateReceiptPdf, } from '../services/receipts.js'; import { + archiveReceiptSchema, mintReceiptSchema, batchMintReceiptSchema, transferReceiptSchema, @@ -72,6 +78,31 @@ receiptsRouter.delete( }) ); +receiptsRouter.get( + '/search', + cacheControl({ maxAge: CacheTTL.SHORT }), + asyncHandler(async (req, res) => { + res.json(searchReceipts({ + paymentId: req.query.paymentId as string | undefined, + txHash: req.query.txHash as string | undefined, + wallet: req.query.wallet as string | undefined, + currency: req.query.currency as string | undefined, + from: req.query.from as string | undefined, + to: req.query.to as string | undefined, + includeArchived: req.query.includeArchived === 'true', + })); + }) +); + +receiptsRouter.post( + '/archive', + validate(archiveReceiptSchema), + asyncHandler(async (req, res) => { + const archived = archiveReceipts(req.body.retentionBefore); + res.json({ archived, count: archived.length }); + }) +); + receiptsRouter.get( '/', cacheControl({ maxAge: CacheTTL.SHORT }), @@ -91,6 +122,16 @@ receiptsRouter.get( }) ); +receiptsRouter.get( + '/by-root/:root', + cacheControl({ maxAge: CacheTTL.SHORT }), + asyncHandler(async (req, res) => { + const receipt = getReceiptByMerkleRoot(req.params.root); + if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND'); + res.json(receipt); + }) +); + receiptsRouter.get( '/by-tx/:txHash', cacheControl({ maxAge: CacheTTL.SHORT }), @@ -119,6 +160,34 @@ receiptsRouter.get( }) ); +receiptsRouter.get( + '/:tokenId/verify', + cacheControl({ maxAge: CacheTTL.SHORT }), + asyncHandler(async (req, res) => { + const receipt = getReceiptByTokenId(req.params.tokenId); + if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND'); + res.json({ + tokenId: receipt.tokenId, + merkleRoot: receipt.merkleRoot, + valid: verifyReceiptProof(receipt), + proof: receipt.merkleProof, + }); + }) +); + +receiptsRouter.get( + '/:tokenId/pdf', + cacheControl({ maxAge: CacheTTL.LONG }), + asyncHandler(async (req, res) => { + const receipt = getReceiptByTokenId(req.params.tokenId); + if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND'); + const pdf = generateReceiptPdf(receipt); + res.setHeader('Content-Type', 'application/pdf'); + res.setHeader('Content-Disposition', `attachment; filename="${receipt.tokenId}.pdf"`); + res.send(pdf); + }) +); + receiptsRouter.get( '/:tokenId/metadata', cacheControl({ maxAge: CacheTTL.LONG }), diff --git a/backend/src/schemas/receipts.ts b/backend/src/schemas/receipts.ts index f0b12ad7..69386274 100644 --- a/backend/src/schemas/receipts.ts +++ b/backend/src/schemas/receipts.ts @@ -6,7 +6,10 @@ export const mintReceiptSchema = z.object({ sender: z.string().min(1, 'Sender address is required'), recipient: z.string().min(1, 'Recipient address is required'), amount: z.number().positive('Amount must be positive'), - asset: z.string().min(1, 'Asset is required'), + asset: z.string().min(1, 'Asset is required').optional(), + currency: z.string().min(1, 'Currency is required').optional(), + timestamp: z.string().datetime().optional(), + retentionUntil: z.string().datetime().optional(), }); export const batchMintReceiptSchema = z.object({ @@ -16,3 +19,7 @@ export const batchMintReceiptSchema = z.object({ export const transferReceiptSchema = z.object({ newOwner: z.string().min(1, 'New owner address is required'), }); + +export const archiveReceiptSchema = z.object({ + retentionBefore: z.string().datetime('Retention cutoff must be an ISO timestamp'), +}); diff --git a/backend/src/services/receipts.ts b/backend/src/services/receipts.ts index 1a8f736e..099a164f 100644 --- a/backend/src/services/receipts.ts +++ b/backend/src/services/receipts.ts @@ -1,4 +1,6 @@ -import { randomUUID } from 'node:crypto'; +import { createHash, randomUUID } from 'node:crypto'; +import { appendEvent } from '../events/event-store.js'; +import { publish } from '../events/event-bus.js'; export interface ReceiptNFT { id: string; @@ -9,6 +11,13 @@ export interface ReceiptNFT { recipient: string; amount: number; asset: string; + currency: string; + timestamp: string; + merkleRoot: string; + merkleProof: MerkleProofNode[]; + archived: boolean; + archivedAt?: string; + retentionUntil?: string; mintedAt: string; owner: string; burned: boolean; @@ -16,6 +25,11 @@ export interface ReceiptNFT { metadata: ReceiptMetadata; } +export interface MerkleProofNode { + position: 'left' | 'right'; + hash: string; +} + export interface ReceiptMetadata { name: string; description: string; @@ -30,7 +44,10 @@ interface MintReceiptInput { sender: string; recipient: string; amount: number; - asset: string; + asset?: string; + currency?: string; + timestamp?: string; + retentionUntil?: string; } interface BatchMintInput { @@ -41,6 +58,7 @@ const receipts = new Map(); const paymentIndex = new Map(); const walletIndex = new Map>(); const txHashIndex = new Map(); +const receiptRootIndex = new Map(); let tokenCounter = 0; @@ -61,12 +79,54 @@ function buildMetadata(receipt: Omit): ReceiptMetadata { { trait_type: 'Sender', value: receipt.sender }, { trait_type: 'Recipient', value: receipt.recipient }, { trait_type: 'Amount', value: receipt.amount }, - { trait_type: 'Asset', value: receipt.asset }, + { trait_type: 'Currency', value: receipt.currency }, { trait_type: 'Minted At', value: receipt.mintedAt }, + { trait_type: 'Merkle Root', value: receipt.merkleRoot }, ], }; } +function hashReceiptField(value: string | number): string { + return createHash('sha256').update(String(value)).digest('hex'); +} + +function hashPair(left: string, right: string): string { + return createHash('sha256').update(`${left}:${right}`).digest('hex'); +} + +function buildReceiptProof(fields: Array): { root: string; proof: MerkleProofNode[] } { + let layer = fields.map(hashReceiptField); + let index = 0; + const proof: MerkleProofNode[] = []; + + while (layer.length > 1) { + if (layer.length % 2 === 1) layer.push(layer[layer.length - 1]); + const siblingIndex = index % 2 === 0 ? index + 1 : index - 1; + proof.push({ + position: index % 2 === 0 ? 'right' : 'left', + hash: layer[siblingIndex], + }); + + const next: string[] = []; + for (let i = 0; i < layer.length; i += 2) { + next.push(hashPair(layer[i], layer[i + 1])); + } + index = Math.floor(index / 2); + layer = next; + } + + return { root: layer[0], proof }; +} + +export function verifyReceiptProof(receipt: ReceiptNFT): boolean { + const leaf = hashReceiptField(receipt.paymentId); + const root = receipt.merkleProof.reduce((acc, node) => { + return node.position === 'left' ? hashPair(node.hash, acc) : hashPair(acc, node.hash); + }, leaf); + + return root === receipt.merkleRoot; +} + function indexByWallet(walletAddress: string, tokenId: string): void { const existing = walletIndex.get(walletAddress) ?? new Set(); existing.add(tokenId); @@ -89,7 +149,17 @@ export function mintReceipt(input: MintReceiptInput): ReceiptNFT { } const tokenId = nextTokenId(); - const now = new Date().toISOString(); + const now = input.timestamp ?? new Date().toISOString(); + const currency = input.currency ?? input.asset ?? 'USD'; + const proof = buildReceiptProof([ + input.paymentId, + input.transactionHash, + input.sender, + input.recipient, + input.amount, + currency, + now, + ]); const base = { id: randomUUID(), @@ -99,7 +169,13 @@ export function mintReceipt(input: MintReceiptInput): ReceiptNFT { sender: input.sender, recipient: input.recipient, amount: input.amount, - asset: input.asset, + asset: currency, + currency, + timestamp: now, + merkleRoot: proof.root, + merkleProof: proof.proof, + archived: false, + retentionUntil: input.retentionUntil, mintedAt: now, owner: input.recipient, burned: false, @@ -110,8 +186,20 @@ export function mintReceipt(input: MintReceiptInput): ReceiptNFT { receipts.set(tokenId, receipt); paymentIndex.set(input.paymentId, tokenId); txHashIndex.set(input.transactionHash, tokenId); + receiptRootIndex.set(receipt.merkleRoot, tokenId); indexByWallet(input.recipient, tokenId); + const event = appendEvent('receipt', tokenId, 'receipt.minted', { + tokenId, + paymentId: receipt.paymentId, + sender: receipt.sender, + recipient: receipt.recipient, + amount: receipt.amount, + asset: receipt.currency, + merkleRoot: receipt.merkleRoot, + }); + void publish(event); + return receipt; } @@ -142,6 +230,9 @@ export function burnReceipt(tokenId: string): ReceiptNFT { removeFromWalletIndex(receipt.owner, tokenId); receipts.set(tokenId, receipt); + const event = appendEvent('receipt', tokenId, 'receipt.burned', { tokenId }); + void publish(event); + return receipt; } @@ -170,3 +261,84 @@ export function getAllReceipts(includesBurned = false): ReceiptNFT[] { const all = Array.from(receipts.values()); return includesBurned ? all : all.filter((r) => !r.burned); } + +export function searchReceipts(query: { + paymentId?: string; + txHash?: string; + wallet?: string; + currency?: string; + from?: string; + to?: string; + includeArchived?: boolean; +}): ReceiptNFT[] { + const fromMs = query.from ? Date.parse(query.from) : undefined; + const toMs = query.to ? Date.parse(query.to) : undefined; + + return getAllReceipts(false).filter((receipt) => { + if (!query.includeArchived && receipt.archived) return false; + if (query.paymentId && receipt.paymentId !== query.paymentId) return false; + if (query.txHash && receipt.transactionHash !== query.txHash) return false; + if (query.wallet && receipt.sender !== query.wallet && receipt.recipient !== query.wallet && receipt.owner !== query.wallet) return false; + if (query.currency && receipt.currency !== query.currency) return false; + + const timestampMs = Date.parse(receipt.timestamp); + if (fromMs !== undefined && timestampMs < fromMs) return false; + if (toMs !== undefined && timestampMs > toMs) return false; + return true; + }); +} + +export function getReceiptByMerkleRoot(root: string): ReceiptNFT | undefined { + const tokenId = receiptRootIndex.get(root); + return tokenId ? receipts.get(tokenId) : undefined; +} + +export function archiveReceipts(retentionBefore: string): ReceiptNFT[] { + const cutoff = Date.parse(retentionBefore); + const archived: ReceiptNFT[] = []; + + for (const receipt of receipts.values()) { + const retentionMs = receipt.retentionUntil ? Date.parse(receipt.retentionUntil) : undefined; + const timestampMs = Date.parse(receipt.timestamp); + if (receipt.archived) continue; + if ((retentionMs !== undefined && retentionMs <= cutoff) || timestampMs <= cutoff) { + receipt.archived = true; + receipt.archivedAt = new Date().toISOString(); + receipts.set(receipt.tokenId, receipt); + archived.push(receipt); + } + } + + return archived; +} + +export function generateReceiptPdf(receipt: ReceiptNFT): Buffer { + const lines = [ + 'AgenticPay Payment Receipt', + `Receipt: ${receipt.tokenId}`, + `Payment: ${receipt.paymentId}`, + `Amount: ${receipt.amount} ${receipt.currency}`, + `Sender: ${receipt.sender}`, + `Recipient: ${receipt.recipient}`, + `Transaction: ${receipt.transactionHash}`, + `Timestamp: ${receipt.timestamp}`, + `Merkle Root: ${receipt.merkleRoot}`, + ]; + const escaped = lines.join('\\n').replace(/[()]/g, ''); + const content = `BT /F1 12 Tf 72 760 Td (${escaped}) Tj ET`; + const pdf = `%PDF-1.4 +1 0 obj << /Type /Catalog /Pages 2 0 R >> endobj +2 0 obj << /Type /Pages /Kids [3 0 R] /Count 1 >> endobj +3 0 obj << /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Resources << /Font << /F1 4 0 R >> >> /Contents 5 0 R >> endobj +4 0 obj << /Type /Font /Subtype /Type1 /BaseFont /Helvetica >> endobj +5 0 obj << /Length ${content.length} >> stream +${content} +endstream endobj +xref +0 6 +0000000000 65535 f +trailer << /Root 1 0 R /Size 6 >> +%%EOF`; + + return Buffer.from(pdf); +} From 7dbe27364df3d6ed9dbdaa112e038ca6e5fc54ca Mon Sep 17 00:00:00 2001 From: collinsadi Date: Wed, 27 May 2026 11:35:57 +0000 Subject: [PATCH 5/5] Emit on-chain receipts for completed payments --- contracts/src/lib.rs | 77 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/contracts/src/lib.rs b/contracts/src/lib.rs index 5694effb..4ce92173 100644 --- a/contracts/src/lib.rs +++ b/contracts/src/lib.rs @@ -34,10 +34,24 @@ pub struct Project { pub deadline: u64, } +#[contracttype] +#[derive(Clone, Debug)] +pub struct Receipt { + pub id: u64, + pub project_id: u64, + pub amount: i128, + pub currency: String, + pub sender: Address, + pub recipient: Address, + pub timestamp: u64, +} + #[contracttype] pub enum DataKey { Project(u64), ProjectCount, + Receipt(u64), + ReceiptCount, Admin, Metadata(String), } @@ -62,6 +76,7 @@ impl AgenticPayContract { admin.require_auth(); env.storage().instance().set(&DataKey::Admin, &admin); env.storage().instance().set(&DataKey::ProjectCount, &0u64); + env.storage().instance().set(&DataKey::ReceiptCount, &0u64); } fn get_admin(env: &Env) -> Address { @@ -262,8 +277,6 @@ impl AgenticPayContract { "Work must be submitted or verified" ); - // TODO: Transfer deposited funds to freelancer via Stellar token transfer - let amount_released = project.deposited; project.status = ProjectStatus::Completed; project.deposited = 0; @@ -276,6 +289,50 @@ impl AgenticPayContract { (symbol_short!("project"), symbol_short!("payment")), (project_id, amount_released), ); + + Self::record_receipt( + &env, + project_id, + amount_released, + String::from_str(&env, "XLM"), + project.client, + project.freelancer, + ); + } + + fn record_receipt( + env: &Env, + project_id: u64, + amount: i128, + currency: String, + sender: Address, + recipient: Address, + ) -> u64 { + let mut count: u64 = env + .storage() + .instance() + .get(&DataKey::ReceiptCount) + .unwrap_or(0); + count += 1; + + let receipt = Receipt { + id: count, + project_id, + amount, + currency: currency.clone(), + sender: sender.clone(), + recipient: recipient.clone(), + timestamp: env.ledger().timestamp(), + }; + + env.storage().persistent().set(&DataKey::Receipt(count), &receipt); + env.storage().instance().set(&DataKey::ReceiptCount, &count); + env.events().publish( + (symbol_short!("receipt"), symbol_short!("issued")), + (count, project_id, amount, currency, sender, recipient), + ); + + count } /// Raise a dispute on a project @@ -407,6 +464,22 @@ impl AgenticPayContract { .unwrap_or(0) } + /// Get receipt details by on-chain receipt id. + pub fn get_receipt(env: Env, receipt_id: u64) -> Receipt { + env.storage() + .persistent() + .get(&DataKey::Receipt(receipt_id)) + .expect("Receipt not found") + } + + /// Get total receipt count. + pub fn get_receipt_count(env: Env) -> u64 { + env.storage() + .instance() + .get(&DataKey::ReceiptCount) + .unwrap_or(0) + } + /// Store metadata key-value pair (admin only) pub fn set_metadata(env: Env, admin: Address, key: String, value: String) { admin.require_auth();