From 4a269da19bad04506f62432f4da3fd6c221793e9 Mon Sep 17 00:00:00 2001 From: bdj Date: Wed, 17 Jun 2026 09:30:33 -0700 Subject: [PATCH 1/4] feat(payments): relocate settlement to session close (phase 1) Introduce an implicit, request-scoped PaymentSession and move payment settlement off the inbound express middleware and onto session close at the end of the response. - @atxp/server: new PaymentSession (cap/spent/charge); ALS context holds the session; requirePayment charges it locally and falls back to paymentServer.charge only when no session is open (preserves Cloudflare + direct-caller behavior). Settlement happens once at close, idempotently. - @atxp/express: middleware stashes the credential and opens the session instead of settling eagerly; settle is awaited inside the existing res.end interception (before the response finishes), so it stays observable and runs after the route's requirePayment charges accumulate. - Fixed amounts only; cap is best-effort per protocol. No auth/accounts changes. Backwards-compatible. Co-Authored-By: Claude Opus 4.8 --- .../atxp-cloudflare/src/requirePayment.ts | 8 + packages/atxp-express/src/atxpExpress.test.ts | 110 ++++++++++++- packages/atxp-express/src/atxpExpress.ts | 101 +++++++----- .../atxp-express/src/omniChallenge.test.ts | 30 ++-- packages/atxp-server/src/atxpContext.ts | 49 ++++++ packages/atxp-server/src/index.ts | 8 + .../atxp-server/src/paymentSession.test.ts | 98 ++++++++++++ packages/atxp-server/src/paymentSession.ts | 149 ++++++++++++++++++ .../atxp-server/src/requirePayment.test.ts | 75 ++++++++- packages/atxp-server/src/requirePayment.ts | 27 +++- 10 files changed, 593 insertions(+), 62 deletions(-) create mode 100644 packages/atxp-server/src/paymentSession.test.ts create mode 100644 packages/atxp-server/src/paymentSession.ts diff --git a/packages/atxp-cloudflare/src/requirePayment.ts b/packages/atxp-cloudflare/src/requirePayment.ts index 82c3592f..2908c256 100644 --- a/packages/atxp-cloudflare/src/requirePayment.ts +++ b/packages/atxp-cloudflare/src/requirePayment.ts @@ -2,6 +2,14 @@ import { RequirePaymentConfig } from "@atxp/common"; import { ATXPArgs, buildServerConfig, requirePayment as requirePaymentSDK, withATXPContext } from "@atxp/server"; import { ATXPMCPAgentProps } from "./types.js"; +// TODO(phase-1): This Cloudflare path does NOT detect a payment credential, +// does NOT open an implicit PaymentSession, and does NOT call +// ProtocolSettlement.settle — it never has. With no session open, the SDK +// requirePayment() falls back to debiting the auth ledger via +// paymentServer.charge (its prior behavior), so this remains correct and +// unchanged by Phase 1. Settle-at-close is wired only into @atxp/express for +// now; bringing this path onto the session model (credential detection + +// settle at response close) is deferred to a later phase. export async function requirePayment(paymentConfig: RequirePaymentConfig, configOpts: ATXPArgs, {resource, tokenCheck}: ATXPMCPAgentProps): Promise { const config = buildServerConfig(configOpts); diff --git a/packages/atxp-express/src/atxpExpress.test.ts b/packages/atxp-express/src/atxpExpress.test.ts index 227c77c3..7d1ebb5d 100644 --- a/packages/atxp-express/src/atxpExpress.test.ts +++ b/packages/atxp-express/src/atxpExpress.test.ts @@ -1,7 +1,9 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { atxpExpress } from './atxpExpress.js'; import { MemoryOAuthDb } from '@atxp/common'; +import { requirePayment } from '@atxp/server'; import * as TH from '@atxp/server/serverTestHelpers'; +import { BigNumber } from 'bignumber.js'; import express from 'express'; import request from 'supertest'; @@ -220,7 +222,11 @@ describe('ATXP', () => { const app = express(); app.use(express.json()); app.use(router); - app.post('/', (_req, res) => res.json({ ok: true })); + // requirePayment charges the implicit session; settlement fires at close. + app.post('/', async (_req, res) => { + await requirePayment({ price: BigNumber(0.01) }); + res.json({ ok: true }); + }); await sendMcpToolCall(app).expect(200); @@ -241,7 +247,10 @@ describe('ATXP', () => { const app = express(); app.use(express.json()); app.use(router); - app.post('/', (_req, res) => res.json({ ok: true })); + app.post('/', async (_req, res) => { + await requirePayment({ price: BigNumber(0.01) }); + res.json({ ok: true }); + }); await sendMcpToolCall(app).expect(200); @@ -255,4 +264,101 @@ describe('ATXP', () => { } }); }); + + // Phase 1: settlement moved off the inbound request and onto session close. + describe('settlement happens once at session close (not inbound)', () => { + const mockFetch = vi.fn(); + + beforeEach(() => { + mockFetch.mockReset(); + mockFetch.mockResolvedValue({ + ok: true, + json: async () => ({ txHash: '0xabc', settledAmount: '0.01' }), + text: async () => '', + }); + vi.stubGlobal('fetch', mockFetch); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + const atxpCredential = JSON.stringify({ + sourceAccountId: 'atxp_acct_test123', + sourceAccountToken: 'tok_abc', + }); + + const settleCalls = () => mockFetch.mock.calls.filter( + ([url]) => typeof url === 'string' && url.includes('/settle/'), + ); + + const sendPaidMcpCall = (app: express.Application) => + request(app) + .post('/') + .set('Content-Type', 'application/json') + .set('Authorization', 'Bearer test-access-token') + .set('X-ATXP-PAYMENT', atxpCredential) + .send(TH.mcpToolRequest()); + + it('settles exactly once, AFTER the route ran, for a single paid tool call', async () => { + const order: string[] = []; + mockFetch.mockImplementation(async (url: string | URL) => { + if (String(url).includes('/settle/')) order.push('settle'); + return { ok: true, json: async () => ({ txHash: '0xabc', settledAmount: '0.01' }), text: async () => '' }; + }); + + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', async (_req, res) => { + order.push('route'); + await requirePayment({ price: BigNumber(0.01) }); + res.json({ ok: true }); + }); + + await sendPaidMcpCall(app).expect(200); + + expect(settleCalls()).toHaveLength(1); + // Settle is deferred until response close, so it runs after the route. + expect(order).toEqual(['route', 'settle']); + }); + + it('does NOT settle when the route never calls requirePayment (nothing charged)', async () => { + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', (_req, res) => res.json({ ok: true })); + + await sendPaidMcpCall(app).expect(200); + + expect(settleCalls()).toHaveLength(0); + }); + + it('settles once even when the route calls requirePayment multiple times', async () => { + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', async (_req, res) => { + await requirePayment({ price: BigNumber(0.01) }); + await requirePayment({ price: BigNumber(0.01) }); + res.json({ ok: true }); + }); + + await sendPaidMcpCall(app).expect(200); + + expect(settleCalls()).toHaveLength(1); + }); + }); }); \ No newline at end of file diff --git a/packages/atxp-express/src/atxpExpress.ts b/packages/atxp-express/src/atxpExpress.ts index 3f683ebb..b81bb4ff 100644 --- a/packages/atxp-express/src/atxpExpress.ts +++ b/packages/atxp-express/src/atxpExpress.ts @@ -14,13 +14,15 @@ import { detectProtocol, getPendingPaymentChallenge, setPaymentRequestId, + setDetectedCredential, + openPaymentSession, + closePaymentSession, type PaymentProtocol, type ATXPConfig, type TokenCheck, type PendingPaymentChallenge, verifyOpaqueIdentity, parseCredentialBase64, - ProtocolSettlement, } from "@atxp/server"; export function atxpExpress(args: ATXPArgs): Router { @@ -107,12 +109,14 @@ export function atxpExpress(args: ATXPArgs): Router { return; } - // Set up ATXP context, settle any payment credential, then run route. - // Settlement happens HERE (in middleware) rather than in requirePayment() - // so the ledger is credited before any route code runs. This avoids - // footguns where tool handlers call requirePayment() multiple times - // (e.g., pre-flight balance check + post-generation charge) and the - // first call consumes the credential, leaving nothing for the second. + // Set up ATXP context, stash any payment credential, then run route. + // Settlement no longer happens here. Instead the middleware opens an + // implicit, request-scoped PaymentSession holding the credential and the + // settlement context; requirePayment() charges the session locally, and + // the session settles exactly once at response close (see below). This + // keeps a single credential usable across multiple requirePayment() calls + // in one request while deferring the actual settle until the route's + // charges have accumulated. return withATXPContext(config, resource, tokenCheck, async () => { if (detected) { // Resolve identity for the settlement ledger credit. @@ -120,16 +124,7 @@ export function atxpExpress(args: ATXPArgs): Router { ? user : resolveIdentitySync(config, req, detected.protocol, detected.credential) || user || undefined; - // Settle the credential immediately — credits the auth server's - // ledger so subsequent charge() calls in requirePayment() succeed. const destinationAccountId = await config.destination.getAccountId(); - const settlement = new ProtocolSettlement( - config.server, - logger, - fetch.bind(globalThis), - destinationAccountId, - { appName: config.appName }, - ); // For X402: the credential's parsed payload contains `accepted` — the // exact payment requirement the client signed off on. Pass it directly @@ -151,24 +146,15 @@ export function atxpExpress(args: ATXPArgs): Router { } } - try { - const result = await settlement.settle( - detected.protocol, - detected.credential, - context as Parameters[2], - ); - logger.info(`Settled ${detected.protocol} in middleware: txHash=${result.txHash ?? ''}, amount=${result.settledAmount}`); - } catch (error) { - logger.error(`Middleware settlement failed for ${detected.protocol}: ${error instanceof Error ? error.message : String(error)}`); - // Don't store the credential — it's already consumed/invalid. - // requirePayment() will see no credential, charge will fail, - // and a fresh payment challenge will be issued. - } + setDetectedCredential({ ...detected, ...(sourceAccountId && { sourceAccountId }) }); + openPaymentSession(detected, context as Parameters[1]); } - // Intercept the response to rewrite McpServer's wrapped payment errors - // back into proper JSON-RPC errors with full challenge data. - installPaymentResponseRewriter(res, logger); + // Intercept the response to (1) settle the payment session at close — + // after the route has run and all requirePayment() charges have + // accumulated, but before the response finishes — and (2) rewrite + // McpServer's wrapped payment errors into proper JSON-RPC errors. + installPaymentResponseRewriter(res, logger, closePaymentSession); return next(); }); @@ -198,8 +184,19 @@ export function atxpExpress(args: ATXPArgs): Router { * Old clients: see JSON-RPC error with code -30402 → Branch 1 matches * New clients: see JSON-RPC error with code -30402 + full error.data → x402/mpp works */ -/** @internal Exported for testing only. */ -export function installPaymentResponseRewriter(res: Response, logger: import("@atxp/common").Logger): void { +/** + * @internal Exported for testing only. + * + * @param onBeforeEnd Optional async hook run once, before the response body is + * sent, when res.end is first invoked. Used to settle the payment session at + * close (after the route ran, before the response finishes). When omitted, + * res.end stays fully synchronous. + */ +export function installPaymentResponseRewriter( + res: Response, + logger: import("@atxp/common").Logger, + onBeforeEnd?: () => Promise, +): void { const origEnd = res.end; const origWrite = res.write; const origWriteHead = res.writeHead; @@ -246,9 +243,10 @@ export function installPaymentResponseRewriter(res: Response, logger: import("@a return (origWrite as any).apply(this, args); } as any; - // Hook res.end for non-SSE (enableJsonResponse) responses. + // Finalize the response: rewrite the body, flush the deferred writeHead with + // a corrected Content-Length, restore the original methods, and call origEnd. // eslint-disable-next-line @typescript-eslint/no-explicit-any - res.end = function endWithPaymentRewrite(this: Response, ...args: any[]): any { + function finalizeEnd(self: Response, args: any[]): any { res.end = origEnd; res.write = origWrite; res.writeHead = origWriteHead; @@ -272,11 +270,38 @@ export function installPaymentResponseRewriter(res: Response, logger: import("@a } } } - (origWriteHead as any).apply(this, deferredWriteHead); + (origWriteHead as any).apply(self, deferredWriteHead); deferredWriteHead = null; } - return (origEnd as any).apply(this, args); + return (origEnd as any).apply(self, args); + } + + // Guard so the onBeforeEnd hook (session settle) runs at most once even if + // res.end is invoked multiple times. + let closed = false; + + // Hook res.end for non-SSE (enableJsonResponse) responses. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + res.end = function endWithPaymentRewrite(this: Response, ...args: any[]): any { + if (!onBeforeEnd) { + return finalizeEnd(this, args); + } + // Settle before finishing the response. Awaiting here (rather than firing + // on res.on('finish')) guarantees the settle completes before the client + // sees the response. Idempotent via `closed`. + if (closed) { + return finalizeEnd(this, args); + } + closed = true; + onBeforeEnd() + .catch((error) => { + logger.error(`onBeforeEnd hook failed: ${error instanceof Error ? error.message : String(error)}`); + }) + .finally(() => { + finalizeEnd(this, args); + }); + return this; } as any; } diff --git a/packages/atxp-express/src/omniChallenge.test.ts b/packages/atxp-express/src/omniChallenge.test.ts index fcf33182..091713ec 100644 --- a/packages/atxp-express/src/omniChallenge.test.ts +++ b/packages/atxp-express/src/omniChallenge.test.ts @@ -272,11 +272,16 @@ describe('credential detection Express middleware', () => { expect(storedCredential).toBeNull(); }); - // Settlement now happens in the middleware, not in requirePayment(). - // The credential is NOT stored — it's settled immediately, so - // getDetectedCredential() returns null in route handlers. - it('should settle ATXP credential in middleware (not store for later)', async () => { + // Phase 1: the middleware no longer settles on the inbound request. It + // stashes the credential (via setDetectedCredential) and opens an implicit + // PaymentSession; settlement happens at response close once requirePayment() + // has charged the session. So getDetectedCredential() now returns the + // credential in route handlers, and fetch (/settle/*) is NOT called inbound. + it('should store ATXP credential in context without settling inbound', async () => { let storedCredential: DetectedCredential | null = null; + mockFetch.mockImplementation(async () => { + throw new Error('fetch should not be called — middleware does not settle inbound'); + }); const atxpCredential = JSON.stringify({ sourceAccountId: 'atxp_acct_raw123', @@ -303,12 +308,14 @@ describe('credential detection Express middleware', () => { .send(TH.mcpToolRequest()); expect(response.status).toBe(200); - // Credential was settled in middleware, not stored for requirePayment() - expect(storedCredential).toBeNull(); + expect(storedCredential).toMatchObject({ protocol: 'atxp', credential: atxpCredential }); }); - it('should settle X402 credential in middleware (not store for later)', async () => { + it('should store X402 credential in context without settling inbound', async () => { let storedCredential: DetectedCredential | null = null; + mockFetch.mockImplementation(async () => { + throw new Error('fetch should not be called — middleware does not settle inbound'); + }); const router = atxpExpress(TH.config({ oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'atxp:atxp_acct_x402user' }) }), @@ -330,11 +337,14 @@ describe('credential detection Express middleware', () => { .send(TH.mcpToolRequest()); expect(response.status).toBe(200); - expect(storedCredential).toBeNull(); + expect(storedCredential).toMatchObject({ protocol: 'x402', credential: 'x402-payment-credential' }); }); - it('should settle base64-encoded ATXP credential in middleware (not store for later)', async () => { + it('should store base64-encoded ATXP credential in context without settling inbound', async () => { let storedCredential: DetectedCredential | null = null; + mockFetch.mockImplementation(async () => { + throw new Error('fetch should not be called — middleware does not settle inbound'); + }); const atxpCredential = Buffer.from(JSON.stringify({ sourceAccountId: 'atxp_acct_b64_456', @@ -361,7 +371,7 @@ describe('credential detection Express middleware', () => { .send(TH.mcpToolRequest()); expect(response.status).toBe(200); - expect(storedCredential).toBeNull(); + expect(storedCredential).toMatchObject({ protocol: 'atxp', credential: atxpCredential }); }); }); }); diff --git a/packages/atxp-server/src/atxpContext.ts b/packages/atxp-server/src/atxpContext.ts index 2b99311c..cdb2c7a4 100644 --- a/packages/atxp-server/src/atxpContext.ts +++ b/packages/atxp-server/src/atxpContext.ts @@ -1,6 +1,13 @@ import { TokenData, AccountId, type PaymentProtocol } from "@atxp/common"; import { ATXPConfig, TokenCheck } from "./types.js"; import { AsyncLocalStorage } from "async_hooks"; +import { SettlementContext } from "./protocol.js"; +import { + PaymentSession, + PaymentSessionState, + buildPaymentSession, + settlePaymentSession, +} from "./paymentSession.js"; const contextStorage = new AsyncLocalStorage(); @@ -39,6 +46,9 @@ type ATXPContext = { paymentRequestId?: string; /** Payment challenge pending response rewrite (set by omniChallengeMcpError) */ pendingPaymentChallenge?: PendingPaymentChallenge; + /** Implicit request-scoped payment session, opened by the middleware when a + * credential is detected. requirePayment() charges it; it settles at close. */ + paymentSession?: PaymentSessionState; } export function getATXPConfig(): ATXPConfig | null { @@ -85,6 +95,45 @@ export function setDetectedCredential(credential: DetectedCredential): void { } } +/** + * Open an implicit payment session for the detected credential (called by + * middleware). Derives the authorized cap from the credential and stores the + * session + settlement context in the ALS context. + */ +export function openPaymentSession(credential: DetectedCredential, context: SettlementContext): void { + const ctx = contextStorage.getStore(); + if (ctx) { + ctx.paymentSession = buildPaymentSession(credential, context, ctx.config.logger); + } +} + +/** + * Get the current request's payment session, if one was opened. + */ +export function paymentSession(): PaymentSession | null { + const context = contextStorage.getStore(); + return context?.paymentSession ?? null; +} + +/** + * Settle the current request's payment session if it was charged. Idempotent. + * Called at response close (by the express response interceptor). + */ +export async function closePaymentSession(): Promise { + const context = contextStorage.getStore(); + const session = context?.paymentSession; + if (!context || !session) return; + const config = context.config; + const destinationAccountId = await config.destination.getAccountId(); + await settlePaymentSession( + session, + config.server, + destinationAccountId, + config.appName, + config.logger, + ); +} + export function getPaymentRequestId(): string | null { const context = contextStorage.getStore(); return context?.paymentRequestId ?? null; diff --git a/packages/atxp-server/src/index.ts b/packages/atxp-server/src/index.ts index d646f81e..e21ecb0e 100644 --- a/packages/atxp-server/src/index.ts +++ b/packages/atxp-server/src/index.ts @@ -30,10 +30,18 @@ export { getDetectedCredential, setDetectedCredential, getPendingPaymentChallenge, + openPaymentSession, + paymentSession, + closePaymentSession, type DetectedCredential, type PendingPaymentChallenge, } from './atxpContext.js'; +// Payment session (request-scoped local charging + settle-at-close) +export { + type PaymentSession, +} from './paymentSession.js'; + // Core platform-agnostic business logic (no I/O dependencies) export { checkTokenCore, diff --git a/packages/atxp-server/src/paymentSession.test.ts b/packages/atxp-server/src/paymentSession.test.ts new file mode 100644 index 00000000..308d031e --- /dev/null +++ b/packages/atxp-server/src/paymentSession.test.ts @@ -0,0 +1,98 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { BigNumber } from 'bignumber.js'; +import { PaymentSessionState, settlePaymentSession } from './paymentSession.js'; +import { ProtocolSettlement } from './protocol.js'; +import * as TH from './serverTestHelpers.js'; + +const logger = TH.logger(); + +describe('PaymentSession.charge', () => { + it('accumulates charges across multiple calls', () => { + // x402 cap of 1.00 USDC (atomic 1_000_000 / 1e6). + const session = new PaymentSessionState('x402', 'cred', { paymentRequirements: { amount: '1000000' } }, logger); + expect(session.cap.toNumber()).toBe(1); + + expect(session.charge(BigNumber(0.3))).toBe(true); + expect(session.charge(BigNumber(0.2))).toBe(true); + expect(session.spent.toNumber()).toBeCloseTo(0.5); + }); + + it('returns false and does not record when a charge would exceed the cap', () => { + const session = new PaymentSessionState('x402', 'cred', { paymentRequirements: { amount: '500000' } }, logger); + expect(session.cap.toNumber()).toBe(0.5); + + expect(session.charge(BigNumber(0.4))).toBe(true); + // 0.4 + 0.2 = 0.6 > 0.5 → rejected, spent unchanged + expect(session.charge(BigNumber(0.2))).toBe(false); + expect(session.spent.toNumber()).toBeCloseTo(0.4); + }); + + it('allows a charge that exactly equals the cap', () => { + const session = new PaymentSessionState('x402', 'cred', { paymentRequirements: { amount: '100000' } }, logger); + expect(session.charge(BigNumber(0.1))).toBe(true); + expect(session.charge(BigNumber(0.0001))).toBe(false); + }); + + it('derives cap from mpp credential challenge.request.amount', () => { + const credential = Buffer.from(JSON.stringify({ + challenge: { id: 'ch_1', request: { amount: '250000' } }, + })).toString('base64'); + const session = new PaymentSessionState('mpp', credential, {}, logger); + expect(session.cap.toNumber()).toBe(0.25); + }); + + it('derives cap from atxp credential options[].amount (human-readable)', () => { + const credential = JSON.stringify({ + sourceAccountId: 'atxp_acct_1', + options: [{ amount: '0.05' }], + }); + const session = new PaymentSessionState('atxp', credential, {}, logger); + expect(session.cap.toNumber()).toBe(0.05); + }); + + it('defaults cap to Infinity when the amount cannot be parsed (best-effort)', () => { + // atxp credential with no amount → no limit; single-charge path still works. + const credential = JSON.stringify({ sourceAccountId: 'atxp_acct_1', sourceAccountToken: 'tok' }); + const session = new PaymentSessionState('atxp', credential, {}, logger); + expect(session.cap.isEqualTo(Infinity)).toBe(true); + expect(session.charge(BigNumber(999))).toBe(true); + }); +}); + +describe('settlePaymentSession', () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('does not settle when the session was never charged (spent == 0)', async () => { + const settleSpy = vi.spyOn(ProtocolSettlement.prototype, 'settle').mockResolvedValue({ txHash: '0x', settledAmount: '0' }); + const session = new PaymentSessionState('atxp', '{}', {}, logger); + await settlePaymentSession(session, 'https://auth.atxp.ai', 'base:dest', undefined, logger); + expect(settleSpy).not.toHaveBeenCalled(); + expect(session.settled).toBe(false); + }); + + it('settles once when charged, and is idempotent across repeat calls', async () => { + const settleSpy = vi.spyOn(ProtocolSettlement.prototype, 'settle').mockResolvedValue({ txHash: '0xabc', settledAmount: '0.01' }); + const session = new PaymentSessionState('atxp', '{}', {}, logger); + session.charge(BigNumber(0.01)); + + await settlePaymentSession(session, 'https://auth.atxp.ai', 'base:dest', undefined, logger); + await settlePaymentSession(session, 'https://auth.atxp.ai', 'base:dest', undefined, logger); + + expect(settleSpy).toHaveBeenCalledTimes(1); + expect(session.settled).toBe(true); + }); + + it('marks settled even if settle throws, so it is not retried at close', async () => { + const settleSpy = vi.spyOn(ProtocolSettlement.prototype, 'settle').mockRejectedValue(new Error('settle failed')); + const session = new PaymentSessionState('atxp', '{}', {}, logger); + session.charge(BigNumber(0.01)); + + await settlePaymentSession(session, 'https://auth.atxp.ai', 'base:dest', undefined, logger); + await settlePaymentSession(session, 'https://auth.atxp.ai', 'base:dest', undefined, logger); + + expect(settleSpy).toHaveBeenCalledTimes(1); + expect(session.settled).toBe(true); + }); +}); diff --git a/packages/atxp-server/src/paymentSession.ts b/packages/atxp-server/src/paymentSession.ts new file mode 100644 index 00000000..b63b2a28 --- /dev/null +++ b/packages/atxp-server/src/paymentSession.ts @@ -0,0 +1,149 @@ +import { BigNumber } from "bignumber.js"; +import { Logger, type PaymentProtocol } from "@atxp/common"; +import { ProtocolSettlement, SettlementContext, parseCredentialBase64 } from "./protocol.js"; +import type { DetectedCredential } from "./atxpContext.js"; + +/** + * Request-scoped payment session. + * + * Opened implicitly by the middleware when a payment credential is detected, + * charged locally by requirePayment(), and settled once at response close. + * Replaces the prior flow where the middleware settled eagerly on the inbound + * request and requirePayment() debited the auth ledger via paymentServer.charge. + */ +export interface PaymentSession { + /** Authorized amount derived from the credential. */ + readonly cap: BigNumber; + /** Accumulated charges recorded against this session. */ + readonly spent: BigNumber; + /** Record a charge locally. Returns false if it would exceed the cap or there is no credential. */ + charge(cost: BigNumber): boolean; +} + +/** Atomic-unit divisor for USDC (6 decimals). */ +const USDC_ATOMIC = 1e6; + +/** + * Derive the authorized cap from a detected credential. + * + * Best-effort for Phase 1 (fixed amounts): if the amount cannot be parsed + * reliably for a protocol, returns Infinity and logs a warning so the + * single-charge path always works. Settlement still settles the credential in + * full; cap enforcement is a guard, not the source of the settled amount. + */ +function deriveCap( + protocol: PaymentProtocol, + credential: string, + context: SettlementContext, + logger: Logger, +): BigNumber { + try { + if (protocol === 'x402') { + // context.paymentRequirements is the parsed `accepted` object the client + // signed. Its `amount` is atomic USDC units (6 decimals). + const reqs = context.paymentRequirements as { amount?: string | number } | undefined; + if (reqs?.amount != null) { + return new BigNumber(reqs.amount).dividedBy(USDC_ATOMIC); + } + } else if (protocol === 'mpp') { + const parsed = parseCredentialBase64(credential); + const challenge = parsed?.challenge as Record | undefined; + // mppx reads amount from challenge.request.amount; fall back to challenge.amount. + const request = challenge?.request as Record | undefined; + const amount = (request?.amount ?? challenge?.amount) as string | number | undefined; + if (amount != null) { + return new BigNumber(amount).dividedBy(USDC_ATOMIC); + } + } else if (protocol === 'atxp') { + const parsed = parseCredentialBase64(credential); + // ATXP credentials carry authorized amounts as human-readable USDC. + const options = parsed?.options as Array<{ amount?: string | number }> | undefined; + const optionAmount = options?.find(o => o.amount != null)?.amount; + const amount = optionAmount ?? (parsed?.amount as string | number | undefined); + if (amount != null) { + return new BigNumber(amount); + } + } + } catch (error) { + logger.warn(`PaymentSession: failed to derive cap for ${protocol}: ${error instanceof Error ? error.message : String(error)}`); + } + + logger.warn(`PaymentSession: could not derive cap for ${protocol} credential, defaulting to no limit`); + return new BigNumber(Infinity); +} + +/** + * Internal session state. Holds the detected credential and settlement context + * so closePaymentSession() can settle exactly the credential the client signed. + */ +export class PaymentSessionState implements PaymentSession { + cap: BigNumber; + spent: BigNumber = new BigNumber(0); + settled = false; + + constructor( + readonly protocol: PaymentProtocol, + readonly credential: string, + readonly context: SettlementContext, + logger: Logger, + ) { + this.cap = deriveCap(protocol, credential, context, logger); + } + + charge(cost: BigNumber): boolean { + const next = this.spent.plus(cost); + if (next.isGreaterThan(this.cap)) { + return false; + } + this.spent = next; + return true; + } +} + +/** + * Open a payment session for a detected credential. + * Returns the session state; the caller stores it in the ALS context. + */ +export function buildPaymentSession( + detected: DetectedCredential, + context: SettlementContext, + logger: Logger, +): PaymentSessionState { + return new PaymentSessionState(detected.protocol, detected.credential, context, logger); +} + +/** + * Settle the session if it was charged and not already settled. Idempotent: + * subsequent calls (e.g. if res.end fires more than once) are no-ops. Builds + * the ProtocolSettlement from config exactly as the middleware did previously. + */ +export async function settlePaymentSession( + session: PaymentSessionState, + authServer: import("@atxp/common").AuthorizationServerUrl, + destinationAccountId: string | undefined, + appName: string | undefined, + logger: Logger, +): Promise { + if (session.settled) return; + if (session.spent.isLessThanOrEqualTo(0)) return; + session.settled = true; + + const settlement = new ProtocolSettlement( + authServer, + logger, + fetch.bind(globalThis), + destinationAccountId, + { appName }, + ); + + try { + const result = await settlement.settle( + session.protocol, + session.credential, + session.context, + ); + logger.info(`Settled ${session.protocol} at session close: txHash=${result.txHash ?? ''}, amount=${result.settledAmount}`); + } catch (error) { + logger.error(`Session close settlement failed for ${session.protocol}: ${error instanceof Error ? error.message : String(error)}`); + } +} diff --git a/packages/atxp-server/src/requirePayment.test.ts b/packages/atxp-server/src/requirePayment.test.ts index f60f6fd9..67b52475 100644 --- a/packages/atxp-server/src/requirePayment.test.ts +++ b/packages/atxp-server/src/requirePayment.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, vi } from 'vitest'; import { requirePayment } from './index.js'; import * as TH from './serverTestHelpers.js'; import { BigNumber } from 'bignumber.js'; -import { withATXPContext } from './atxpContext.js'; +import { withATXPContext, openPaymentSession, paymentSession } from './atxpContext.js'; import { PAYMENT_REQUIRED_ERROR_CODE } from '@atxp/common'; import { ProtocolSettlement } from './protocol.js'; @@ -340,10 +340,11 @@ describe('requirePayment', () => { }); }); - // Settlement is now handled by the middleware (atxpExpress), not requirePayment(). - // See atxpExpress.test.ts for settlement tests. - describe('requirePayment does not settle (settlement moved to middleware)', () => { - it('should charge directly without settling — middleware handles settlement before route code runs', async () => { + // Settlement is now handled at response close via the implicit PaymentSession, + // not by requirePayment() itself. requirePayment() only records local charges. + // See atxpExpress.test.ts for the settle-at-close integration tests. + describe('requirePayment does not settle directly', () => { + it('should charge directly without settling — settlement happens at session close', async () => { const mockSettle = vi.fn(); vi.spyOn(ProtocolSettlement.prototype, 'settle').mockImplementation(mockSettle); @@ -360,4 +361,68 @@ describe('requirePayment', () => { }); }); + // When the middleware has opened an implicit PaymentSession, requirePayment() + // charges it locally instead of debiting the auth ledger via paymentServer.charge. + describe('implicit PaymentSession charging', () => { + const atxpCredential = { + protocol: 'atxp' as const, + // No amount in the credential → cap is Infinity (best-effort Phase 1), + // so the local charge always succeeds. + credential: JSON.stringify({ sourceAccountId: 'test-user', sourceAccountToken: 'tok' }), + }; + + it('charges the session locally and does NOT call paymentServer.charge', async () => { + const paymentServer = TH.paymentServer({ charge: vi.fn().mockResolvedValue(true) }); + const config = TH.config({ paymentServer }); + + await withATXPContext(config, new URL('https://example.com'), TH.tokenCheck(), async () => { + openPaymentSession(atxpCredential, {}); + await expect(requirePayment({ price: BigNumber(0.01) })).resolves.not.toThrow(); + + expect(paymentServer.charge).not.toHaveBeenCalled(); + expect(paymentSession()!.spent.toNumber()).toBeCloseTo(0.01); + }); + }); + + it('accumulates multiple charges into one session', async () => { + const paymentServer = TH.paymentServer({ charge: vi.fn().mockResolvedValue(true) }); + const config = TH.config({ paymentServer }); + + await withATXPContext(config, new URL('https://example.com'), TH.tokenCheck(), async () => { + openPaymentSession(atxpCredential, {}); + await requirePayment({ price: BigNumber(0.01) }); + await requirePayment({ price: BigNumber(0.02) }); + + expect(paymentServer.charge).not.toHaveBeenCalled(); + expect(paymentSession()!.spent.toNumber()).toBeCloseTo(0.03); + }); + }); + + it('creates a payment request and throws when a charge would exceed the cap', async () => { + const paymentServer = TH.paymentServer({ charge: vi.fn().mockResolvedValue(true) }); + const config = TH.config({ paymentServer }); + + // x402 session capped at 0.01 USDC; a 0.02 charge exceeds it. + const x402Credential = { + protocol: 'x402' as const, + credential: 'x402-cred', + }; + + await withATXPContext(config, new URL('https://example.com'), TH.tokenCheck(), async () => { + openPaymentSession(x402Credential, { paymentRequirements: { amount: '10000' } }); + try { + await requirePayment({ price: BigNumber(0.02) }); + throw new Error('expected requirePayment to throw a payment challenge'); + } catch (err: any) { + expect(err.code).toBe(PAYMENT_REQUIRED_ERROR_CODE); + // Did not fall through to ledger charge — session path was used. + expect(paymentServer.charge).not.toHaveBeenCalled(); + expect(paymentServer.createPaymentRequest).toHaveBeenCalled(); + // Nothing recorded against the session. + expect(paymentSession()!.spent.toNumber()).toBe(0); + } + }); + }); + }); + }); \ No newline at end of file diff --git a/packages/atxp-server/src/requirePayment.ts b/packages/atxp-server/src/requirePayment.ts index dda32771..deb8af88 100644 --- a/packages/atxp-server/src/requirePayment.ts +++ b/packages/atxp-server/src/requirePayment.ts @@ -1,6 +1,6 @@ import { RequirePaymentConfig, extractNetworkFromAccountId, extractAddressFromAccountId, Network, AuthorizationServerUrl } from "@atxp/common"; import { BigNumber } from "bignumber.js"; -import { getATXPConfig, atxpAccountId, atxpToken, getPaymentRequestId, setPendingPaymentChallenge } from "./atxpContext.js"; +import { getATXPConfig, atxpAccountId, atxpToken, getPaymentRequestId, setPendingPaymentChallenge, paymentSession } from "./atxpContext.js"; import { buildPaymentOptions, omniChallengeMcpError } from "./omniChallenge.js"; import { getATXPResource } from "./atxpContext.js"; import { signOpaqueIdentity } from "./opaqueIdentity.js"; @@ -45,14 +45,27 @@ export async function requirePayment(paymentConfig: RequirePaymentConfig): Promi ...(paymentRequestId && { paymentRequestId }), }; - // Settlement is handled by the middleware (atxpExpress) before route code runs. - // The ledger is already credited by the time we get here on a retry request. config.logger.debug(`Charging ${paymentConfig.price} to ${charge.options.length} options for source ${user}`); - const chargeSucceeded = await config.paymentServer.charge(charge); - if (chargeSucceeded) { - config.logger.info(`Charged ${paymentConfig.price} for source ${user}`); - return; + // Prefer the implicit request-scoped session: charge locally and let the + // credential settle once at response close. The session is opened by the + // middleware when a payment credential is detected on the request. + // + // When no session is open (e.g. the Cloudflare path, or a direct caller + // outside the express middleware), fall back to debiting the auth ledger via + // paymentServer.charge — preserving the prior behavior for those callers. + const session = paymentSession(); + if (session) { + if (session.charge(paymentConfig.price)) { + config.logger.info(`Charged ${paymentConfig.price} to session for source ${user}`); + return; + } + } else { + const chargeSucceeded = await config.paymentServer.charge(charge); + if (chargeSucceeded) { + config.logger.info(`Charged ${paymentConfig.price} for source ${user}`); + return; + } } // Check for an existing payment ID first (idempotency) — avoids the From 9da28cea92d36965a039bc7df048fd15c0f9b6aa Mon Sep 17 00:00:00 2001 From: bdj Date: Wed, 17 Jun 2026 13:17:50 -0700 Subject: [PATCH 2/4] fix(payments): address phase 1 review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - deriveCap: branch MPP amount scaling on challenge.method — Tempo uses human-readable decimals (no /1e6), Solana uses micro-units (/1e6); unknown method falls back to decimal to avoid under-scaling the cap and falsely re-challenging already-paid requests. (blocking #1) - settle-at-close no longer depends on AsyncLocalStorage at res.end: the express middleware captures the session + server/destination/appName/logger in a closure built inside withATXPContext, so SSE/abort/timeout paths (where res.end fires outside the ALS chain) can't silently skip billing. Adds an integration test against a real McpServer + StreamableHTTPServerTransport. (blocking #2) - settle-failure logs a greppable `settle_failed_at_close protocol=… amount=…` marker; adds an express test that a failed close-time settle still returns 200 and logs. (no retry/outbox yet — tracked as follow-up) (#3) - close-time settle bounded by a 10s timeout so a hung auth can't stall the client; finalizeEnd guards against double origEnd; corrected the buffered-vs-SSE comment; documented the cap as a forward-looking (Phase-1-inert) guard. (minors) Co-Authored-By: Claude Opus 4.8 --- packages/atxp-express/src/atxpExpress.test.ts | 60 +++++++ packages/atxp-express/src/atxpExpress.ts | 87 ++++++++-- .../src/paymentSessionMcp.test.ts | 157 ++++++++++++++++++ packages/atxp-server/src/atxpContext.ts | 4 +- packages/atxp-server/src/index.ts | 2 + .../atxp-server/src/paymentSession.test.ts | 27 ++- packages/atxp-server/src/paymentSession.ts | 31 +++- 7 files changed, 350 insertions(+), 18 deletions(-) create mode 100644 packages/atxp-express/src/paymentSessionMcp.test.ts diff --git a/packages/atxp-express/src/atxpExpress.test.ts b/packages/atxp-express/src/atxpExpress.test.ts index 7d1ebb5d..27bfefa8 100644 --- a/packages/atxp-express/src/atxpExpress.test.ts +++ b/packages/atxp-express/src/atxpExpress.test.ts @@ -361,4 +361,64 @@ describe('ATXP', () => { expect(settleCalls()).toHaveLength(1); }); }); + + // FIX 3: Phase 1 has no retry/outbox. A close-time settle failure must NOT + // fail the already-served request — the route still returns 200 — and the + // failure must be logged with a greppable, metric-able marker. + describe('settle failure at close: route still returns 200 and logs a marker', () => { + const mockFetch = vi.fn(); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + const atxpCredential = JSON.stringify({ + sourceAccountId: 'atxp_acct_test123', + sourceAccountToken: 'tok_abc', + }); + + const sendPaidMcpCall = (app: express.Application) => + request(app) + .post('/') + .set('Content-Type', 'application/json') + .set('Authorization', 'Bearer test-access-token') + .set('X-ATXP-PAYMENT', atxpCredential) + .send(TH.mcpToolRequest()); + + it('returns 200 and logs settle_failed_at_close when /settle/* rejects', async () => { + // Auth /settle/* returns a non-OK status → ProtocolSettlement.settle throws. + mockFetch.mockReset(); + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + json: async () => ({}), + text: async () => 'settle exploded', + }); + vi.stubGlobal('fetch', mockFetch); + + const logger = TH.logger(); + const router = atxpExpress(TH.config({ + logger, + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', async (_req, res) => { + await requirePayment({ price: BigNumber(0.01) }); + res.json({ ok: true }); + }); + + // The served request still succeeds despite the settle failure. + const response = await sendPaidMcpCall(app).expect(200); + expect(response.body).toMatchObject({ ok: true }); + + // The failure is logged with the actionable marker (protocol + amount). + const errorLog = (logger.error as any).mock.calls.map((c: any[]) => String(c[0])).join('\n'); + expect(errorLog).toContain('settle_failed_at_close'); + expect(errorLog).toContain('protocol=atxp'); + expect(errorLog).toContain('amount=0.01'); + }); + }); }); \ No newline at end of file diff --git a/packages/atxp-express/src/atxpExpress.ts b/packages/atxp-express/src/atxpExpress.ts index b81bb4ff..db727802 100644 --- a/packages/atxp-express/src/atxpExpress.ts +++ b/packages/atxp-express/src/atxpExpress.ts @@ -16,7 +16,8 @@ import { setPaymentRequestId, setDetectedCredential, openPaymentSession, - closePaymentSession, + settlePaymentSession, + type PaymentSessionState, type PaymentProtocol, type ATXPConfig, type TokenCheck, @@ -25,6 +26,27 @@ import { parseCredentialBase64, } from "@atxp/server"; +/** Max time the close-time settle may run before the response is finished anyway. */ +const SETTLE_AT_CLOSE_TIMEOUT_MS = 10_000; + +/** + * Resolve when `promise` settles, or reject after `ms` so a slow/hung auth + * server cannot stall the client's connection at response close. + */ +function withTimeout(promise: Promise, ms: number): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error(`settle timed out after ${ms}ms`)), ms); + // Don't keep the event loop alive solely for this timer. + if (typeof timer === 'object' && typeof (timer as { unref?: () => void }).unref === 'function') { + (timer as { unref: () => void }).unref(); + } + promise.then( + (value) => { clearTimeout(timer); resolve(value); }, + (error) => { clearTimeout(timer); reject(error); }, + ); + }); +} + export function atxpExpress(args: ATXPArgs): Router { const config = buildServerConfig(args); const router = Router(); @@ -118,6 +140,7 @@ export function atxpExpress(args: ATXPArgs): Router { // in one request while deferring the actual settle until the route's // charges have accumulated. return withATXPContext(config, resource, tokenCheck, async () => { + let session: PaymentSessionState | null = null; if (detected) { // Resolve identity for the settlement ledger credit. const sourceAccountId = (detected.protocol === 'mpp' && user) @@ -147,14 +170,35 @@ export function atxpExpress(args: ATXPArgs): Router { } setDetectedCredential({ ...detected, ...(sourceAccountId && { sourceAccountId }) }); - openPaymentSession(detected, context as Parameters[1]); + session = openPaymentSession(detected, context as Parameters[1]); } + // Capture everything settle needs in a closure built HERE, inside + // withATXPContext (the ALS context is guaranteed present). The res.end + // hook can fire OUTSIDE the AsyncLocalStorage causal chain — true SSE + // streaming, client abort, server timeout — at which point + // contextStorage.getStore() returns null and a getStore()-based settle + // would be silently skipped, leaving a served request unbilled. Binding + // the session reference + server/destination/appName/logger now makes + // the close-time settle independent of ALS at res.end. + const destinationAccountIdForSettle = detected ? await config.destination.getAccountId() : undefined; + const onBeforeEnd = session + ? async () => { + await settlePaymentSession( + session!, + config.server, + destinationAccountIdForSettle, + config.appName, + config.logger, + ); + } + : undefined; + // Intercept the response to (1) settle the payment session at close — // after the route has run and all requirePayment() charges have // accumulated, but before the response finishes — and (2) rewrite // McpServer's wrapped payment errors into proper JSON-RPC errors. - installPaymentResponseRewriter(res, logger, closePaymentSession); + installPaymentResponseRewriter(res, logger, onBeforeEnd); return next(); }); @@ -187,10 +231,14 @@ export function atxpExpress(args: ATXPArgs): Router { /** * @internal Exported for testing only. * - * @param onBeforeEnd Optional async hook run once, before the response body is - * sent, when res.end is first invoked. Used to settle the payment session at - * close (after the route ran, before the response finishes). When omitted, - * res.end stays fully synchronous. + * @param onBeforeEnd Optional async hook run once when res.end is first invoked. + * Used to settle the payment session at close (after the route ran). For + * buffered responses (enableJsonResponse) the body is delivered in res.end, so + * the hook completes before the client sees the body. For SSE the body is + * already flushed via res.write before res.end, so the hook only delays the + * stream's terminator — it cannot run before the body is seen. The hook is + * bounded by SETTLE_AT_CLOSE_TIMEOUT_MS so a hung auth server can't stall the + * connection. When omitted, res.end stays fully synchronous. */ export function installPaymentResponseRewriter( res: Response, @@ -243,10 +291,18 @@ export function installPaymentResponseRewriter( return (origWrite as any).apply(this, args); } as any; + // Guard against double-invocation: the async settle window between the first + // res.end and finalizeEnd can let a second res.end through (e.g. client abort + // racing the settle). Without this, origEnd would be applied twice and Node + // throws ERR_STREAM_WRITE_AFTER_END. + let finalized = false; + // Finalize the response: rewrite the body, flush the deferred writeHead with // a corrected Content-Length, restore the original methods, and call origEnd. // eslint-disable-next-line @typescript-eslint/no-explicit-any function finalizeEnd(self: Response, args: any[]): any { + if (finalized) return self; + finalized = true; res.end = origEnd; res.write = origWrite; res.writeHead = origWriteHead; @@ -287,16 +343,23 @@ export function installPaymentResponseRewriter( if (!onBeforeEnd) { return finalizeEnd(this, args); } - // Settle before finishing the response. Awaiting here (rather than firing - // on res.on('finish')) guarantees the settle completes before the client - // sees the response. Idempotent via `closed`. + // The hook runs at most once (idempotent via `closed`); a second res.end + // during the settle window finalizes immediately (guarded again in + // finalizeEnd against double origEnd). if (closed) { return finalizeEnd(this, args); } closed = true; - onBeforeEnd() + // For buffered responses (enableJsonResponse), the body is delivered in this + // res.end call, so awaiting onBeforeEnd here guarantees the settle completes + // before the client sees the body. For SSE the body has already been flushed + // via res.write before res.end, so this only delays the stream's terminator + // (settlement still completes server-side before the connection closes). + // Bound the settle with a timeout so a slow/hung auth server can't stall the + // client's connection indefinitely. + withTimeout(onBeforeEnd(), SETTLE_AT_CLOSE_TIMEOUT_MS) .catch((error) => { - logger.error(`onBeforeEnd hook failed: ${error instanceof Error ? error.message : String(error)}`); + logger.error(`settle_failed_at_close (timeout/error): ${error instanceof Error ? error.message : String(error)}`); }) .finally(() => { finalizeEnd(this, args); diff --git a/packages/atxp-express/src/paymentSessionMcp.test.ts b/packages/atxp-express/src/paymentSessionMcp.test.ts new file mode 100644 index 00000000..88607113 --- /dev/null +++ b/packages/atxp-express/src/paymentSessionMcp.test.ts @@ -0,0 +1,157 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import express, { Request, Response } from 'express'; +import request from 'supertest'; +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { CallToolResult } from '@modelcontextprotocol/sdk/types.js'; +import { z } from 'zod'; +import { BigNumber } from 'bignumber.js'; +import { atxpExpress } from './atxpExpress.js'; +import { requirePayment } from '@atxp/server'; +import * as TH from '@atxp/server/serverTestHelpers'; + +/** + * Integration test for FIX 2: settlement at response close must fire through a + * REAL McpServer + StreamableHTTPServerTransport (modeled on src/dev/resource.ts), + * not just a synchronous express route handler. The transport's res.end happens + * inside its own async machinery; this proves the captured-closure settle runs + * regardless of where res.end is invoked from. + */ + +// A self-contained ATXP credential (X-ATXP-PAYMENT header → atxp protocol). +const atxpCredential = JSON.stringify({ + sourceAccountId: 'atxp_acct_test123', + sourceAccountToken: 'tok_abc', +}); + +function buildMcpServer(): McpServer { + const server = new McpServer( + { name: 'test-mcp-server', version: '1.0.0' }, + { capabilities: { logging: {} } }, + ); + + server.registerTool( + 'paid-tool', + { + description: 'A tool that requires payment.', + inputSchema: { message: z.string().optional() }, + }, + async ({ message }: { message?: string }): Promise => { + await requirePayment({ price: BigNumber(0.01) }); + return { content: [{ type: 'text', text: `paid: ${message ?? 'ok'}` }] }; + }, + ); + + return server; +} + +function buildApp(): express.Application { + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + + const app = express(); + app.use(express.json()); + app.use(router); + + // Stateless streamable HTTP transport per request, exactly as resource.ts does. + app.post('/', async (req: Request, res: Response) => { + const server = buildMcpServer(); + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + enableJsonResponse: true, + }); + res.on('close', () => { + transport.close(); + server.close(); + }); + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + }); + + return app; +} + +const sendPaidToolCall = (app: express.Application) => + request(app) + .post('/') + .set('Content-Type', 'application/json') + .set('Accept', 'application/json, text/event-stream') + .set('Authorization', 'Bearer test-access-token') + .set('X-ATXP-PAYMENT', atxpCredential) + .send({ + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'paid-tool', arguments: { message: 'hi' } }, + }); + +describe('settlement fires through a real McpServer + StreamableHTTPServerTransport', () => { + const mockFetch = vi.fn(); + + beforeEach(() => { + mockFetch.mockReset(); + mockFetch.mockResolvedValue({ + ok: true, + json: async () => ({ txHash: '0xabc', settledAmount: '0.01' }), + text: async () => '', + }); + vi.stubGlobal('fetch', mockFetch); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + const settleCalls = () => mockFetch.mock.calls.filter( + ([url]) => typeof url === 'string' && url.includes('/settle/'), + ); + + it('calls /settle/* exactly once when a paid tool runs through the transport', async () => { + const app = buildApp(); + + const response = await sendPaidToolCall(app); + + expect(response.status).toBe(200); + // The tool result is delivered through the transport. + const body = JSON.stringify(response.body); + expect(body).toContain('paid: hi'); + + // Settlement fired at response close — through the real transport's res.end, + // not a synchronous express route. + expect(settleCalls()).toHaveLength(1); + expect(String(settleCalls()[0][0])).toContain('/settle/atxp'); + }); + + it('does NOT settle through the transport when the tool charges nothing', async () => { + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', async (req: Request, res: Response) => { + const server = new McpServer({ name: 'test', version: '1.0.0' }, { capabilities: { logging: {} } }); + server.registerTool( + 'free-tool', + { description: 'free', inputSchema: { message: z.string().optional() } }, + async (): Promise => ({ content: [{ type: 'text', text: 'free' }] }), + ); + const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: true }); + res.on('close', () => { transport.close(); server.close(); }); + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + }); + + const response = await request(app) + .post('/') + .set('Content-Type', 'application/json') + .set('Accept', 'application/json, text/event-stream') + .set('Authorization', 'Bearer test-access-token') + .set('X-ATXP-PAYMENT', atxpCredential) + .send({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 'free-tool', arguments: {} } }); + + expect(response.status).toBe(200); + expect(settleCalls()).toHaveLength(0); + }); +}); diff --git a/packages/atxp-server/src/atxpContext.ts b/packages/atxp-server/src/atxpContext.ts index cdb2c7a4..69bec269 100644 --- a/packages/atxp-server/src/atxpContext.ts +++ b/packages/atxp-server/src/atxpContext.ts @@ -100,11 +100,13 @@ export function setDetectedCredential(credential: DetectedCredential): void { * middleware). Derives the authorized cap from the credential and stores the * session + settlement context in the ALS context. */ -export function openPaymentSession(credential: DetectedCredential, context: SettlementContext): void { +export function openPaymentSession(credential: DetectedCredential, context: SettlementContext): PaymentSessionState | null { const ctx = contextStorage.getStore(); if (ctx) { ctx.paymentSession = buildPaymentSession(credential, context, ctx.config.logger); + return ctx.paymentSession; } + return null; } /** diff --git a/packages/atxp-server/src/index.ts b/packages/atxp-server/src/index.ts index e21ecb0e..fe2307ab 100644 --- a/packages/atxp-server/src/index.ts +++ b/packages/atxp-server/src/index.ts @@ -40,6 +40,8 @@ export { // Payment session (request-scoped local charging + settle-at-close) export { type PaymentSession, + type PaymentSessionState, + settlePaymentSession, } from './paymentSession.js'; // Core platform-agnostic business logic (no I/O dependencies) diff --git a/packages/atxp-server/src/paymentSession.test.ts b/packages/atxp-server/src/paymentSession.test.ts index 308d031e..3bd5b4e5 100644 --- a/packages/atxp-server/src/paymentSession.test.ts +++ b/packages/atxp-server/src/paymentSession.test.ts @@ -33,14 +33,37 @@ describe('PaymentSession.charge', () => { expect(session.charge(BigNumber(0.0001))).toBe(false); }); - it('derives cap from mpp credential challenge.request.amount', () => { + it('derives cap from Solana mpp credential as micro-units (challenge.method=solana)', () => { + // Solana MPP amounts are micro-unit integer strings → divide by 1e6. const credential = Buffer.from(JSON.stringify({ - challenge: { id: 'ch_1', request: { amount: '250000' } }, + challenge: { id: 'ch_1', method: 'solana', request: { amount: '250000' } }, })).toString('base64'); const session = new PaymentSessionState('mpp', credential, {}, logger); expect(session.cap.toNumber()).toBe(0.25); }); + it('derives cap from Tempo mpp credential as a decimal (challenge.method=tempo, NO /1e6)', () => { + // Tempo MPP amounts are human-readable decimal strings → use as-is. + // Regression: dividing by 1e6 made the cap 1e-9, falsely re-challenging + // already-paid Tempo requests (infinite loop). + const credential = Buffer.from(JSON.stringify({ + challenge: { id: 'ch_1', method: 'tempo', request: { amount: '0.001' } }, + })).toString('base64'); + const session = new PaymentSessionState('mpp', credential, {}, logger); + expect(session.cap.toNumber()).toBe(0.001); + // A charge at the Tempo price must fit under the cap (not be rejected). + expect(session.charge(BigNumber(0.001))).toBe(true); + }); + + it('treats an mpp credential with no recognized method as a decimal (avoids under-scaling)', () => { + // Missing/unknown method → decimal interpretation (safe: never under-scales). + const credential = Buffer.from(JSON.stringify({ + challenge: { id: 'ch_1', amount: '0.05' }, + })).toString('base64'); + const session = new PaymentSessionState('mpp', credential, {}, logger); + expect(session.cap.toNumber()).toBe(0.05); + }); + it('derives cap from atxp credential options[].amount (human-readable)', () => { const credential = JSON.stringify({ sourceAccountId: 'atxp_acct_1', diff --git a/packages/atxp-server/src/paymentSession.ts b/packages/atxp-server/src/paymentSession.ts index b63b2a28..bf582853 100644 --- a/packages/atxp-server/src/paymentSession.ts +++ b/packages/atxp-server/src/paymentSession.ts @@ -12,7 +12,16 @@ import type { DetectedCredential } from "./atxpContext.js"; * request and requirePayment() debited the auth ledger via paymentServer.charge. */ export interface PaymentSession { - /** Authorized amount derived from the credential. */ + /** + * Authorized amount derived from the credential. + * + * Forward-looking guard only. In Phase 1, settlement uses the credential's + * own amount (see settlePaymentSession), NOT `spent`, so the cap does not yet + * bound the settled amount — it merely rejects local charges that exceed it. + * Full enforcement (settling `spent` up to the cap) arrives with the `upto` + * scheme in a later phase. For ATXP credentials carrying no amount, the cap is + * Infinity so the single-charge path always works. + */ readonly cap: BigNumber; /** Accumulated charges recorded against this session. */ readonly spent: BigNumber; @@ -52,7 +61,20 @@ function deriveCap( const request = challenge?.request as Record | undefined; const amount = (request?.amount ?? challenge?.amount) as string | number | undefined; if (amount != null) { - return new BigNumber(amount).dividedBy(USDC_ATOMIC); + // MPP amount encoding is chain-dependent (see protocol.ts MppChallengeData): + // - Solana: micro-units integer string (e.g. "1000" = 0.001 USDC) → /1e6 + // - Tempo: human-readable decimal string (e.g. "0.001") → as-is + // Branch on challenge.method (the chain). When method is unavailable, + // prefer the decimal interpretation: dividing a decimal by 1e6 under-scales + // the cap to ~1e-8, which would falsely re-challenge an already-paid request. + const method = challenge?.method as string | undefined; + if (method === 'solana') { + return new BigNumber(amount).dividedBy(USDC_ATOMIC); + } + if (method !== 'tempo') { + logger.debug(`PaymentSession: MPP credential has no recognized method ('${method ?? 'undefined'}'); treating amount as decimal to avoid under-scaling the cap`); + } + return new BigNumber(amount); } } else if (protocol === 'atxp') { const parsed = parseCredentialBase64(credential); @@ -144,6 +166,9 @@ export async function settlePaymentSession( ); logger.info(`Settled ${session.protocol} at session close: txHash=${result.txHash ?? ''}, amount=${result.settledAmount}`); } catch (error) { - logger.error(`Session close settlement failed for ${session.protocol}: ${error instanceof Error ? error.message : String(error)}`); + // Phase 1 has no retry/outbox: the request is already served, settled=true + // prevents re-attempt at close. Log a greppable, metric-able marker carrying + // protocol + amount so an unbilled served request can be reconciled later. + logger.error(`settle_failed_at_close protocol=${session.protocol} amount=${session.spent.toString()}: ${error instanceof Error ? error.message : String(error)}`); } } From b4059ee1eb00d051b3cc4c0e27e63dedeaa51d7e Mon Sep 17 00:00:00 2001 From: bdj Date: Wed, 17 Jun 2026 13:39:24 -0700 Subject: [PATCH 3/4] fix(payments): address phase 1 re-review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Tighten the ALS-independence guard: add a test that ends the response from OUTSIDE the withATXPContext run (where getStore() is null) and asserts settle still fires. Verified it fails against the old getStore()-based close and passes against the captured-closure — so it actually guards the #2 regression. - Distinguish the close-time timeout marker (`settle_unconfirmed_at_close`, warn) from a hard settle failure (`settle_failed_at_close`, error): the in-flight settle isn't aborted on timeout and may still complete, so reconciliation must not treat a timeout as definitely-unbilled. - Hoist the duplicate destination getAccountId() — resolved once and reused for the settlement context and the close-time settle closure. - Comment hygiene: drop project-timeline ("Phase 1") framing in favor of stated invariants; anchor forward-looking notes to tracking issues + the design doc (cap → upto work; Cloudflare TODO → #179; retry/outbox → #178). Co-Authored-By: Claude Opus 4.8 --- .../atxp-cloudflare/src/requirePayment.ts | 15 ++++---- packages/atxp-express/src/atxpExpress.test.ts | 38 ++++++++++++++++++- packages/atxp-express/src/atxpExpress.ts | 16 ++++++-- packages/atxp-server/src/paymentSession.ts | 29 +++++++------- 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/packages/atxp-cloudflare/src/requirePayment.ts b/packages/atxp-cloudflare/src/requirePayment.ts index 2908c256..75622c1e 100644 --- a/packages/atxp-cloudflare/src/requirePayment.ts +++ b/packages/atxp-cloudflare/src/requirePayment.ts @@ -2,14 +2,13 @@ import { RequirePaymentConfig } from "@atxp/common"; import { ATXPArgs, buildServerConfig, requirePayment as requirePaymentSDK, withATXPContext } from "@atxp/server"; import { ATXPMCPAgentProps } from "./types.js"; -// TODO(phase-1): This Cloudflare path does NOT detect a payment credential, -// does NOT open an implicit PaymentSession, and does NOT call -// ProtocolSettlement.settle — it never has. With no session open, the SDK -// requirePayment() falls back to debiting the auth ledger via -// paymentServer.charge (its prior behavior), so this remains correct and -// unchanged by Phase 1. Settle-at-close is wired only into @atxp/express for -// now; bringing this path onto the session model (credential detection + -// settle at response close) is deferred to a later phase. +// TODO(#179): This Cloudflare path does NOT detect a payment credential, open +// an implicit PaymentSession, or call ProtocolSettlement.settle — it never has. +// With no session open, the SDK requirePayment() falls back to debiting the auth +// ledger via paymentServer.charge (its prior behavior), so this stays correct. +// Bringing this path onto the session / settle-at-close model used by +// @atxp/express is tracked in atxp-dev/sdk#179. +// Design: https://github.com/circuitandchisel/accounts/blob/main/docs/STREAMING_PAYMENT_SESSIONS.md export async function requirePayment(paymentConfig: RequirePaymentConfig, configOpts: ATXPArgs, {resource, tokenCheck}: ATXPMCPAgentProps): Promise { const config = buildServerConfig(configOpts); diff --git a/packages/atxp-express/src/atxpExpress.test.ts b/packages/atxp-express/src/atxpExpress.test.ts index 27bfefa8..211d7dc6 100644 --- a/packages/atxp-express/src/atxpExpress.test.ts +++ b/packages/atxp-express/src/atxpExpress.test.ts @@ -360,9 +360,45 @@ describe('ATXP', () => { expect(settleCalls()).toHaveLength(1); }); + + it('settles even when res.end fires OUTSIDE the AsyncLocalStorage context', async () => { + // Guards the ALS-independence fix: the route charges the session in-context, + // captures `res`, and returns WITHOUT ending the response. The test then + // ends it from its OWN context — outside the middleware's withATXPContext + // run — so a getStore()-based settle would see a null store and silently + // skip billing. The captured-closure settle (bound in-context) must still + // fire. (The enableJsonResponse integration test can't reach this: there + // res.end runs in-context, so it passes for both impls — this is the case + // that actually fails against the old getStore() code.) + let capturedRes: import('express').Response | null = null; + let signalCaptured!: () => void; + const captured = new Promise((r) => { signalCaptured = r; }); + + const router = atxpExpress(TH.config({ + oAuthClient: TH.oAuthClient({ introspectResult: TH.tokenData({ active: true, sub: 'test-user' }) }), + })); + const app = express(); + app.use(express.json()); + app.use(router); + app.post('/', async (_req, res) => { + await requirePayment({ price: BigNumber(0.01) }); // charges the session in-context + capturedRes = res; + signalCaptured(); // hand res to the test; do NOT end the response here + }); + + // .then() triggers supertest to send immediately (it otherwise defers + // until awaited). The route captures res and returns without ending, so + // the response stays open until the test ends it below. + const reqPromise = sendPaidMcpCall(app).then((r) => r); + await captured; // resumes in the TEST's context — outside the withATXPContext run + capturedRes!.end(JSON.stringify({ jsonrpc: '2.0', id: 1, result: { ok: true } })); // res.end fires here; ALS store is null + await reqPromise; + + expect(settleCalls()).toHaveLength(1); + }); }); - // FIX 3: Phase 1 has no retry/outbox. A close-time settle failure must NOT + // FIX 3: a close-time settle failure must NOT // fail the already-served request — the route still returns 200 — and the // failure must be logged with a greppable, metric-able marker. describe('settle failure at close: route still returns 200 and logs a marker', () => { diff --git a/packages/atxp-express/src/atxpExpress.ts b/packages/atxp-express/src/atxpExpress.ts index db727802..95f27efd 100644 --- a/packages/atxp-express/src/atxpExpress.ts +++ b/packages/atxp-express/src/atxpExpress.ts @@ -141,13 +141,16 @@ export function atxpExpress(args: ATXPArgs): Router { // charges have accumulated. return withATXPContext(config, resource, tokenCheck, async () => { let session: PaymentSessionState | null = null; + // Resolved once here (inside withATXPContext) and reused for both the + // settlement context and the close-time settle closure below. + let destinationAccountId: string | undefined; if (detected) { // Resolve identity for the settlement ledger credit. const sourceAccountId = (detected.protocol === 'mpp' && user) ? user : resolveIdentitySync(config, req, detected.protocol, detected.credential) || user || undefined; - const destinationAccountId = await config.destination.getAccountId(); + destinationAccountId = await config.destination.getAccountId(); // For X402: the credential's parsed payload contains `accepted` — the // exact payment requirement the client signed off on. Pass it directly @@ -181,13 +184,12 @@ export function atxpExpress(args: ATXPArgs): Router { // would be silently skipped, leaving a served request unbilled. Binding // the session reference + server/destination/appName/logger now makes // the close-time settle independent of ALS at res.end. - const destinationAccountIdForSettle = detected ? await config.destination.getAccountId() : undefined; const onBeforeEnd = session ? async () => { await settlePaymentSession( session!, config.server, - destinationAccountIdForSettle, + destinationAccountId, config.appName, config.logger, ); @@ -357,9 +359,15 @@ export function installPaymentResponseRewriter( // (settlement still completes server-side before the connection closes). // Bound the settle with a timeout so a slow/hung auth server can't stall the // client's connection indefinitely. + // settlePaymentSession catches its own errors (logs `settle_failed_at_close`) + // and never rejects, so the only rejection here is the timeout. The in-flight + // settle is NOT aborted and may still complete after this fires — so this is + // "not confirmed within the timeout", NOT "definitely unbilled". Use a + // distinct marker so reconciliation doesn't treat late-but-successful settles + // as failures. withTimeout(onBeforeEnd(), SETTLE_AT_CLOSE_TIMEOUT_MS) .catch((error) => { - logger.error(`settle_failed_at_close (timeout/error): ${error instanceof Error ? error.message : String(error)}`); + logger.warn(`settle_unconfirmed_at_close: ${error instanceof Error ? error.message : String(error)} (settle may still complete; not confirmed within ${SETTLE_AT_CLOSE_TIMEOUT_MS}ms)`); }) .finally(() => { finalizeEnd(this, args); diff --git a/packages/atxp-server/src/paymentSession.ts b/packages/atxp-server/src/paymentSession.ts index bf582853..cefa4af8 100644 --- a/packages/atxp-server/src/paymentSession.ts +++ b/packages/atxp-server/src/paymentSession.ts @@ -15,12 +15,14 @@ export interface PaymentSession { /** * Authorized amount derived from the credential. * - * Forward-looking guard only. In Phase 1, settlement uses the credential's - * own amount (see settlePaymentSession), NOT `spent`, so the cap does not yet - * bound the settled amount — it merely rejects local charges that exceed it. - * Full enforcement (settling `spent` up to the cap) arrives with the `upto` - * scheme in a later phase. For ATXP credentials carrying no amount, the cap is - * Infinity so the single-charge path always works. + * Guard only: settlement currently settles the credential's own amount (see + * settlePaymentSession), NOT `spent`, so the cap does not bound the settled + * amount — it merely rejects local charges that would exceed it. Settling + * `spent` up to the cap arrives with the `upto` scheme (streaming-payment- + * sessions design doc: + * https://github.com/circuitandchisel/accounts/blob/main/docs/STREAMING_PAYMENT_SESSIONS.md). + * For ATXP credentials carrying no amount, the cap is Infinity so the + * single-charge path always works. */ readonly cap: BigNumber; /** Accumulated charges recorded against this session. */ @@ -35,10 +37,10 @@ const USDC_ATOMIC = 1e6; /** * Derive the authorized cap from a detected credential. * - * Best-effort for Phase 1 (fixed amounts): if the amount cannot be parsed - * reliably for a protocol, returns Infinity and logs a warning so the - * single-charge path always works. Settlement still settles the credential in - * full; cap enforcement is a guard, not the source of the settled amount. + * Best-effort: if the amount cannot be parsed reliably for a protocol, returns + * Infinity and logs a warning so the single-charge path always works. + * Settlement settles the credential's own amount; the cap is a guard, not the + * source of the settled amount. */ function deriveCap( protocol: PaymentProtocol, @@ -166,9 +168,10 @@ export async function settlePaymentSession( ); logger.info(`Settled ${session.protocol} at session close: txHash=${result.txHash ?? ''}, amount=${result.settledAmount}`); } catch (error) { - // Phase 1 has no retry/outbox: the request is already served, settled=true - // prevents re-attempt at close. Log a greppable, metric-able marker carrying - // protocol + amount so an unbilled served request can be reconciled later. + // No retry/outbox yet (tracked in atxp-dev/sdk#178): the request is already + // served and settled=true prevents re-attempt at close. Log a greppable, + // metric-able marker carrying protocol + amount so an unbilled served + // request can be reconciled later. logger.error(`settle_failed_at_close protocol=${session.protocol} amount=${session.spent.toString()}: ${error instanceof Error ? error.message : String(error)}`); } } From 56bb6bf8d322fb283275225712199792bd50e1a6 Mon Sep 17 00:00:00 2001 From: bdj Date: Wed, 17 Jun 2026 13:55:13 -0700 Subject: [PATCH 4/4] fix(payments): bump close-time settle timeout 10s -> 30s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A legitimate ATXP IOU->USDC `/pay` (two on-chain transfers + Base confirmations) runs ~14.5s end-to-end — longer than the 10s timeout, which made every IOU settle log a spurious `settle_unconfirmed_at_close` despite succeeding. 30s sits well above observed on-chain latency so the marker flags genuinely stuck settles, while still bounding a hung auth server. Co-Authored-By: Claude Opus 4.8 --- packages/atxp-express/src/atxpExpress.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/atxp-express/src/atxpExpress.ts b/packages/atxp-express/src/atxpExpress.ts index 95f27efd..a408f679 100644 --- a/packages/atxp-express/src/atxpExpress.ts +++ b/packages/atxp-express/src/atxpExpress.ts @@ -26,8 +26,14 @@ import { parseCredentialBase64, } from "@atxp/server"; -/** Max time the close-time settle may run before the response is finished anyway. */ -const SETTLE_AT_CLOSE_TIMEOUT_MS = 10_000; +/** + * Max time the close-time settle may run before the response is finished anyway. + * Must exceed a legitimate slow settle: an ATXP IOU→USDC conversion `/pay` + * (two on-chain transfers + Base confirmations) was observed at ~14.5s end-to-end. + * Set well above that so `settle_unconfirmed_at_close` flags genuinely stuck + * settles, not normal on-chain latency; still bounds a truly hung auth server. + */ +const SETTLE_AT_CLOSE_TIMEOUT_MS = 30_000; /** * Resolve when `promise` settles, or reject after `ms` so a slow/hung auth