diff --git a/docs/architecture.md b/docs/architecture.md index 69b6e80..3cd93bf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,7 +6,7 @@ This document covers how the indexer works, from on-chain events to the GraphQL The system is a Ponder 0.16.x indexer that watches the ComposableCoW contract on Ethereum mainnet and Gnosis Chain. When a user creates a programmatic order (TWAP, Stop Loss, etc.), the contract emits a `ConditionalOrderCreated` event. The indexer picks that up, decodes the order parameters, resolves the actual owner (which may be behind a proxy), and writes the result to Postgres. A Hono HTTP server exposes the data through GraphQL and a SQL passthrough endpoint. -Ponder registers handlers for three independent on-chain event streams: `ComposableCow` (conditional order creation), `CoWShedFactory` (proxy wallet deployment), and `GPv2Settlement` (Aave adapter detection via `Settlement` events — `Trade` logs in the receipt identify the adapter address). During live sync, additional block handlers in `blockHandler.ts` poll contract state and the CoW orderbook API. See `blockHandler.ts` for the current handler list and responsibilities. +Ponder registers handlers for three independent on-chain event streams: `ComposableCow` (conditional order creation), `CoWShedFactory` (proxy wallet deployment), and `GPv2Settlement` (Aave adapter detection via `Settlement` events — `Trade` logs in the receipt identify the adapter address). During live sync, additional block handlers in `blockHandler.ts` poll contract state and the CoW orderbook API. See `blockHandler.ts` for the current handler list and responsibilities. `settlement.ts` detects Aave flash loan adapters via a queue-based approach: the `GPv2Settlement:Settlement` event handler enqueues tx hashes, and `SettlementResolver:block` drains the queue and does all RPC work so errors never crash the event handler. ## Contracts and Chains @@ -144,22 +144,32 @@ This is the primary event handler. When a `ConditionalOrderCreated` fires: When a CoWShed proxy wallet is deployed, this handler stores the mapping from the proxy address (`shed`) to the deploying user address in `ownerMapping`. This mapping is then available for the composableCow handler to resolve owners. -### settlement.ts -- GPv2Settlement Settlement +### settlement.ts -- Flash Loan Adapter Detection -This handler detects Aave V3 flash loan adapter contracts. The GPv2Settlement contract is filtered (in `ponder.config.ts`) to only index settlements from the FlashLoanRouter solver, which keeps the event volume low. +This file detects Aave V3 flash loan adapter contracts using a queue-based two-stage approach. The GPv2Settlement contract is filtered (in `ponder.config.ts`) to only index settlements from the FlashLoanRouter solver, so the event volume is very low. -For each Settlement event: +**Stage 1 — `GPv2Settlement:Settlement` event handler (enqueue only):** -1. Fetch the full transaction receipt and iterate over all logs. -2. Filter for Trade logs emitted by the settlement contract (matching the Trade event topic). -3. For each trade, extract the `owner` from the indexed topic. -4. Skip if already mapped, skip if the address is an EOA (no bytecode). -5. Call `FACTORY()` on the address using raw `eth_call` (not `readContract`, which would log warnings on reverts). If the returned address matches the known AaveV3AdapterFactory address, this is a flash loan adapter. -6. Call `owner()` on the adapter to get the EOA, then write the `ownerMapping` entry. +When a Settlement event fires, the handler writes the transaction hash into the `settlementQueue` table and returns immediately. No RPC calls are made here — errors in RPC never crash the event handler, keeping the indexer stable. -The handler uses raw `eth_call` for the FACTORY() check specifically to avoid Ponder's built-in WARN logging on contract call reverts. Most trade addresses are not Aave adapters, so FACTORY() reverts are the common case, and the warnings would flood the logs. +**Stage 2 — `SettlementResolver:block` block handler (drain and resolve):** -Stats are accumulated and logged every 30 seconds to track throughput without per-event log spam. +Every block, this handler drains up to `MAX_SETTLEMENTS_PER_BLOCK` rows from `settlementQueue` for the current chain. For each queued transaction: + +1. Fetch the full transaction receipt (with timeout). On error, log a warning and delete the queue row (skip it). +2. Iterate over all logs in the receipt. Keep only Trade logs emitted by the GPv2Settlement contract. +3. For each trade, extract the `owner` address from the indexed topic. +4. Skip if already in `ownerMapping` (adapter seen in a prior settlement). +5. Call `getCode` on the address (with timeout). Skip if EOA (no bytecode). +6. Call `FACTORY()` via raw `eth_call` (not `readContract`, which logs a WARN on every revert). If the returned address doesn't match the known AaveV3AdapterFactory address, skip. +7. Call `owner()` on the adapter to retrieve the underlying EOA. +8. Write a row to `ownerMapping` (`address` → `owner`, `addressType = FlashLoanHelper`). +9. Update any `conditionalOrderGenerator` rows owned by the adapter address to set `ownerAddressType = FlashLoanHelper`. +10. Delete the queue row. + +The raw `eth_call` for `FACTORY()` avoids Ponder's built-in WARN logs on reverts — most addresses are not Aave adapters, so reverts are the common case and would flood the logs if `readContract` were used. + +Stats (total settlements, trade logs found, EOA skips, adapter mappings, avg FACTORY() latency) are accumulated and logged every 30 seconds. ### blockHandler.ts -- live block handlers diff --git a/ponder.config.ts b/ponder.config.ts index ed83fdf..a1d18b5 100644 --- a/ponder.config.ts +++ b/ponder.config.ts @@ -122,5 +122,13 @@ export default createConfig({ ), interval: 1, }, + // SettlementResolver — async Aave adapter discovery from queued Settlement events. + // Only runs on chains that have a flash loan router (currently mainnet only). + SettlementResolver: { + chain: Object.fromEntries( + settlementChains.map((c) => [c.name, { startBlock: "latest" as const }]), + ), + interval: 1, + }, }, }); diff --git a/schema/tables.ts b/schema/tables.ts index bebbaa0..ae58d1d 100644 --- a/schema/tables.ts +++ b/schema/tables.ts @@ -149,6 +149,19 @@ export const bootstrapRetryQueue = onchainTable( }) ); +export const settlementQueue = onchainTable( + "settlement_queue", + (t) => ({ + txHash: t.hex().notNull(), + chainId: t.integer().notNull(), + blockNumber: t.bigint().notNull(), + blockTimestamp: t.bigint().notNull(), + }), + (table) => ({ + pk: primaryKey({ columns: [table.chainId, table.txHash] }), + }) +); + export const ownerMapping = onchainTable( "owner_mapping", (t) => ({ diff --git a/src/application/handlers/settlement.ts b/src/application/handlers/settlement.ts index fe4825b..3aad857 100644 --- a/src/application/handlers/settlement.ts +++ b/src/application/handlers/settlement.ts @@ -1,5 +1,11 @@ import { ponder } from "ponder:registry"; -import { AddressType, conditionalOrderGenerator, ownerMapping, transaction } from "ponder:schema"; +import { + AddressType, + conditionalOrderGenerator, + ownerMapping, + settlementQueue, + transaction, +} from "ponder:schema"; import { and, eq } from "ponder"; import { keccak256, toBytes } from "viem"; import { log } from "../helpers/logger"; @@ -8,6 +14,8 @@ import { AAVE_V3_ADAPTER_FACTORY_ADDRESSES, GPV2_SETTLEMENT_DEPLOYMENTS, } from "../../data"; +import { BLOCK_HANDLER_RPC_TIMEOUT_MS, SETTLEMENT_INNER_RPC_TIMEOUT_MS } from "../../constants"; +import { TimeoutError as _TimeoutError, withTimeout } from "../helpers/withTimeout"; // Trade(address,address,address,uint256,uint256,uint256,bytes) — topic0 hash const TRADE_TOPIC = keccak256( @@ -15,10 +23,9 @@ const TRADE_TOPIC = keccak256( ); // ── Stats / timing ──────────────────────────────────────────────────────────── -// Logged every LOG_INTERVAL_MS to measure per-step cost without flooding logs. const stats = { - total: 0, // Settlement events processed - tradeLogsFound: 0, // Trade logs found in receipts + total: 0, + tradeLogsFound: 0, skippedAlreadyMapped: 0, skippedEOA: 0, skippedNotAdapter: 0, @@ -49,19 +56,35 @@ function logStatsIfIntervalPassed() { // which floods the log since non-adapter contracts do not implement FACTORY(). const FACTORY_SELECTOR = "0x2dd31000" as const; +// Max settlements resolved per SettlementResolver block tick. +const MAX_SETTLEMENTS_PER_BLOCK = 20; + +// ── Event handler — enqueue only ───────────────────────────────────────────── +// All RPC work is deferred to SettlementResolver:block so errors in RPC calls +// never propagate to the event handler and crash the indexer. ponder.on("GPv2Settlement:Settlement", async ({ event, context }) => { - // Kill switch: set DISABLE_SETTLEMENT_FACTORY_CHECK=true to skip all RPC - // calls in this handler. Use to benchmark base throughput vs. factory cost. + if (process.env.DISABLE_SETTLEMENT_FACTORY_CHECK === "true") return; + + await context.db + .insert(settlementQueue) + .values({ + txHash: event.transaction.hash, + chainId: context.chain.id, + blockNumber: event.block.number, + blockTimestamp: event.block.timestamp, + }) + .onConflictDoNothing(); +}); + +// ── Block handler — drain queue and resolve adapters ───────────────────────── +ponder.on("SettlementResolver:block", async ({ event: _event, context }) => { if (process.env.DISABLE_SETTLEMENT_FACTORY_CHECK === "true") return; const chainId = context.chain.id; const chainName = context.chain.name; - // Resolve chain-specific addresses — skip safely if chain is not configured const settlementDeployment = - GPV2_SETTLEMENT_DEPLOYMENTS[ - chainName as keyof typeof GPV2_SETTLEMENT_DEPLOYMENTS - ]; + GPV2_SETTLEMENT_DEPLOYMENTS[chainName as keyof typeof GPV2_SETTLEMENT_DEPLOYMENTS]; if (!settlementDeployment) return; const settlementAddress = settlementDeployment.address.toLowerCase(); @@ -71,137 +94,159 @@ ponder.on("GPv2Settlement:Settlement", async ({ event, context }) => { ]?.toLowerCase(); if (!adapterFactoryAddress) return; - stats.total++; + const pending = await context.db.sql + .select() + .from(settlementQueue) + .where(eq(settlementQueue.chainId, chainId)) + .limit(MAX_SETTLEMENTS_PER_BLOCK); - // Fetch the full receipt to access all logs in the transaction. - // Volume is negligible (FlashLoanRouter settlements only), so the extra RPC - // call per settlement is acceptable and much cheaper than the old per-trade approach. - const receipt = await context.client.getTransactionReceipt({ - hash: event.transaction.hash, - }); + if (pending.length === 0) return; - for (const txLog of receipt.logs) { - // Only Trade logs emitted by GPv2Settlement in this same transaction - if (txLog.address.toLowerCase() !== settlementAddress) continue; - if (txLog.topics[0] !== TRADE_TOPIC) continue; - - stats.tradeLogsFound++; - - // Decode owner from topics[1] — ABI-encoded 32-byte padded address - const owner = `0x${txLog.topics[1]!.slice(26)}` as `0x${string}`; - const ownerAddress = owner.toLowerCase() as `0x${string}`; - - // Skip if already mapped (adapter seen in a prior settlement) - const existing = await context.db.sql - .select() - .from(ownerMapping) - .where( - and( - eq(ownerMapping.chainId, chainId), - eq(ownerMapping.address, ownerAddress), - ), - ) - .limit(1); - - if (existing.length > 0) { - stats.skippedAlreadyMapped++; - logStatsIfIntervalPassed(); - continue; - } + for (const item of pending) { + stats.total++; - // Skip if EOA (no bytecode) - const code = await context.client.getCode({ address: owner }); - if (!code || code === "0x") { - stats.skippedEOA++; - logStatsIfIntervalPassed(); + let receipt: Awaited>; + try { + receipt = await withTimeout( + context.client.getTransactionReceipt({ hash: item.txHash }), + BLOCK_HANDLER_RPC_TIMEOUT_MS, + "settlement:getTransactionReceipt", + ); + } catch (err) { + log("warn", "SettlementResolver:receipt_failed", { chainId, txHash: item.txHash, err: err instanceof Error ? err.message : String(err) }); + await context.db.sql + .delete(settlementQueue) + .where(and(eq(settlementQueue.chainId, chainId), eq(settlementQueue.txHash, item.txHash))); continue; } - // Check for Aave adapter via raw eth_call. - // readContract() is intentionally avoided here: Ponder logs a WARN for every - // revert, and FACTORY() reverts on any non-adapter contract. - const t1 = Date.now(); - let factoryData: `0x${string}` | undefined; - try { - const result = await context.client.call({ - to: owner, - data: FACTORY_SELECTOR, - }); - factoryData = result.data; - } catch { + for (const txLog of receipt.logs) { + if (txLog.address.toLowerCase() !== settlementAddress) continue; + if (txLog.topics[0] !== TRADE_TOPIC) continue; + + stats.tradeLogsFound++; + + const owner = `0x${txLog.topics[1]!.slice(26)}` as `0x${string}`; + const ownerAddress = owner.toLowerCase() as `0x${string}`; + + const existing = await context.db.sql + .select() + .from(ownerMapping) + .where(and(eq(ownerMapping.chainId, chainId), eq(ownerMapping.address, ownerAddress))) + .limit(1); + + if (existing.length > 0) { + stats.skippedAlreadyMapped++; + logStatsIfIntervalPassed(); + continue; + } + + let code: `0x${string}` | undefined; + try { + code = await withTimeout( + context.client.getCode({ address: owner }), + SETTLEMENT_INNER_RPC_TIMEOUT_MS, + "settlement:getCode", + ); + } catch (err) { + log("warn", "SettlementResolver:getCode_failed", { chainId, owner, err: err instanceof Error ? err.message : String(err) }); + continue; + } + if (!code || code === "0x") { + stats.skippedEOA++; + logStatsIfIntervalPassed(); + continue; + } + + const t1 = Date.now(); + let factoryData: `0x${string}` | undefined; + try { + const result = await withTimeout( + context.client.call({ to: owner, data: FACTORY_SELECTOR }), + SETTLEMENT_INNER_RPC_TIMEOUT_MS, + "settlement:call:FACTORY", + ); + factoryData = result.data; + } catch { + stats.msFactory += Date.now() - t1; + stats.skippedNotAdapter++; + logStatsIfIntervalPassed(); + continue; + } stats.msFactory += Date.now() - t1; - stats.skippedNotAdapter++; - logStatsIfIntervalPassed(); - continue; - } - stats.msFactory += Date.now() - t1; - // ABI-encoded address = 32 bytes = 66 hex chars (including 0x prefix) - if (!factoryData || factoryData.length < 66) { - stats.skippedNotAdapter++; + if (!factoryData || factoryData.length < 66) { + stats.skippedNotAdapter++; + logStatsIfIntervalPassed(); + continue; + } + + const factoryAddress = `0x${factoryData.slice(26)}` as `0x${string}`; + if (factoryAddress.toLowerCase() !== adapterFactoryAddress) { + stats.skippedNotAdapter++; + logStatsIfIntervalPassed(); + continue; + } + + let eoaOwner: `0x${string}`; + try { + eoaOwner = await withTimeout( + context.client.readContract({ + address: owner, + abi: AaveV3AdapterHelperAbi, + functionName: "owner", + }), + BLOCK_HANDLER_RPC_TIMEOUT_MS, + "settlement:readContract:owner", + ); + } catch (err) { + log("warn", "SettlementResolver:readOwner_failed", { chainId, owner, err: err instanceof Error ? err.message : String(err) }); + continue; + } + + await context.db + .insert(transaction) + .values({ + hash: item.txHash, + chainId, + blockNumber: item.blockNumber, + blockTimestamp: item.blockTimestamp, + }) + .onConflictDoNothing(); + + await context.db + .insert(ownerMapping) + .values({ + chainId, + address: ownerAddress, + owner: eoaOwner.toLowerCase() as `0x${string}`, + addressType: AddressType.FlashLoanHelper, + txHash: item.txHash, + blockNumber: item.blockNumber, + resolutionDepth: 1, + }) + .onConflictDoNothing(); + + await context.db.sql + .update(conditionalOrderGenerator) + .set({ ownerAddressType: AddressType.FlashLoanHelper }) + .where( + and( + eq(conditionalOrderGenerator.chainId, chainId), + eq(conditionalOrderGenerator.owner, ownerAddress), + ), + ); + + stats.mapped++; logStatsIfIntervalPassed(); - continue; - } - - // Decode padded address: 0x + 24 zero-padding hex chars + 40 address hex chars - const factoryAddress = `0x${factoryData.slice(26)}` as `0x${string}`; - if (factoryAddress.toLowerCase() !== adapterFactoryAddress) { - stats.skippedNotAdapter++; - logStatsIfIntervalPassed(); - continue; + log("info", "SettlementResolver:aave_adapter_mapped", { chainId, adapter: ownerAddress, eoa: eoaOwner.toLowerCase(), block: String(item.blockNumber) }); } - // Resolve EOA via owner() — this call should always succeed at this point - const eoaOwner = await context.client.readContract({ - address: owner, - abi: AaveV3AdapterHelperAbi, - functionName: "owner", - }); - - await context.db - .insert(transaction) - .values({ - hash: event.transaction.hash, - chainId, - blockNumber: event.block.number, - blockTimestamp: event.block.timestamp, - }) - .onConflictDoNothing(); - - await context.db - .insert(ownerMapping) - .values({ - chainId, - address: ownerAddress, - owner: eoaOwner.toLowerCase() as `0x${string}`, - addressType: AddressType.FlashLoanHelper, - txHash: event.transaction.hash, - blockNumber: event.block.number, - resolutionDepth: 1, - }) - .onConflictDoNothing(); - await context.db.sql - .update(conditionalOrderGenerator) - .set({ ownerAddressType: AddressType.FlashLoanHelper }) - .where( - and( - eq(conditionalOrderGenerator.chainId, chainId), - eq(conditionalOrderGenerator.owner, ownerAddress), - ), - ); + .delete(settlementQueue) + .where(and(eq(settlementQueue.chainId, chainId), eq(settlementQueue.txHash, item.txHash))); - stats.mapped++; logStatsIfIntervalPassed(); - - log("info", "settlement:aave_adapter_mapped", { - block: String(event.block.number), - chainId, - adapter: ownerAddress, - eoa: eoaOwner.toLowerCase(), - }); } - - logStatsIfIntervalPassed(); }); diff --git a/src/constants.ts b/src/constants.ts index 946f879..51c17d8 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -72,6 +72,10 @@ export const ORDERBOOK_HTTP_TIMEOUT_MS = 10_000; */ export const BLOCK_HANDLER_RPC_TIMEOUT_MS = 15_000; +// Tighter cap for cheap inner-loop calls (getCode, eth_call) in the settlement handler. +// The outer receipt fetch and readContract(owner()) keep the full 15 s. +export const SETTLEMENT_INNER_RPC_TIMEOUT_MS = 5_000; + /** * Hard wall-clock cap for the whole per-owner bootstrap fetch in OwnerBackfill * (account pagination + by_uids refresh). Owners that exceed this are skipped;