diff --git a/package.json b/package.json index 7bad8da..128a690 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ }, "dependencies": { "@cowprotocol/cow-sdk": "^7.3.8", + "drizzle-orm": "0.41.0", "@hono/swagger-ui": "^0.5.3", "@hono/zod-openapi": "^0.19.10", "hono": "^4.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d9206a1..b4eb45f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -17,6 +17,9 @@ importers: '@hono/zod-openapi': specifier: ^0.19.10 version: 0.19.10(hono@4.12.3)(zod@3.25.76) + drizzle-orm: + specifier: 0.41.0 + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0)(kysely@0.26.3)(pg@8.19.0) hono: specifier: ^4.5.0 version: 4.12.3 diff --git a/src/api/endpoints/execution-summary.ts b/src/api/endpoints/execution-summary.ts index ae8f557..908b3a7 100644 --- a/src/api/endpoints/execution-summary.ts +++ b/src/api/endpoints/execution-summary.ts @@ -1,6 +1,7 @@ import type { RouteHandler } from "@hono/zod-openapi"; import { db } from "ponder:api"; -import { sql } from "ponder"; +import { discreteOrder } from "ponder:schema"; +import { and, count, eq } from "ponder"; import type { executionSummaryRoute } from "../routes"; export const executionSummaryHandler: RouteHandler< @@ -9,17 +10,20 @@ export const executionSummaryHandler: RouteHandler< const { eventId } = c.req.valid("param"); const { chainId } = c.req.valid("query"); - const rows = await db.execute<{ status: string; count: string }>( - sql`SELECT status, COUNT(*)::text AS count - FROM discrete_order - WHERE conditional_order_generator_id = ${eventId} - AND chain_id = ${chainId} - GROUP BY status`, - ); + const rows = await db + .select({ status: discreteOrder.status, count: count() }) + .from(discreteOrder) + .where( + and( + eq(discreteOrder.conditionalOrderGeneratorId, eventId), + eq(discreteOrder.chainId, chainId), + ), + ) + .groupBy(discreteOrder.status); const counts: Record = {}; - for (const row of rows.rows) { - counts[row.status] = Number(row.count); + for (const row of rows) { + counts[row.status] = row.count; } const filledParts = counts["fulfilled"] ?? 0; diff --git a/src/application/handlers/blockHandler.ts b/src/application/handlers/blockHandler.ts index 817453c..6e660e9 100644 --- a/src/application/handlers/blockHandler.ts +++ b/src/application/handlers/blockHandler.ts @@ -16,7 +16,7 @@ import { ponder } from "ponder:registry"; import { bootstrapRetryQueue, candidateDiscreteOrder, conditionalOrderGenerator, discreteOrder } from "ponder:schema"; -import { and, asc, eq, inArray, lte, or, sql } from "ponder"; +import { and, asc, eq, inArray, isNull, lte, or, sql } from "ponder"; import type { Hex } from "viem"; import { COMPOSABLE_COW_ADDRESS_BY_CHAIN_ID, @@ -409,7 +409,7 @@ ponder.on("CandidateConfirmer:block", async ({ event, context }) => { and( eq(candidateDiscreteOrder.chainId, chainId), or( - sql`${candidateDiscreteOrder.possibleValidAfterTimestamp} IS NULL`, + isNull(candidateDiscreteOrder.possibleValidAfterTimestamp), lte(candidateDiscreteOrder.possibleValidAfterTimestamp, event.block.timestamp), ), ), @@ -723,7 +723,7 @@ ponder.on("HistoricalBootstrap:block", async ({ event, context }) => { eq(conditionalOrderGenerator.chainId, chainId), eq(conditionalOrderGenerator.status, "Active"), inArray(conditionalOrderGenerator.orderType, [...NON_DETERMINISTIC_TYPES]), - sql`${discreteOrder.orderUid} IS NULL`, + isNull(discreteOrder.orderUid), ), ) as { generatorId: string; @@ -810,7 +810,7 @@ ponder.on("DeterministicCancellationSweeper:block", async ({ event, context }) = eq(conditionalOrderGenerator.status, "Active"), eq(conditionalOrderGenerator.allCandidatesKnown, true), or( - sql`${conditionalOrderGenerator.nextCheckBlock} IS NULL`, + isNull(conditionalOrderGenerator.nextCheckBlock), lte(conditionalOrderGenerator.nextCheckBlock, currentBlock), ), ), diff --git a/src/application/handlers/setup.ts b/src/application/handlers/setup.ts index bc4b219..95a9ea6 100644 --- a/src/application/handlers/setup.ts +++ b/src/application/handlers/setup.ts @@ -2,13 +2,13 @@ import { ponder } from "ponder:registry"; import { sql } from "ponder"; /** - * Creates the cow_cache schema and orderbook_cache table on startup. + * Creates the cow_cache schema and persistent cache tables on startup. * * The cow_cache schema is separate from Ponder's per-deployment schema, so it * survives `ponder start` redeployments (which create a new namespace each time). * Ponder's `user` pool does not restrict search_path, so fully qualified names - * (cow_cache.orderbook_cache) work from event handlers. The `readonly` pool used - * by the API layer also works with fully qualified names. + * work from event handlers. The `readonly` pool used by the API layer also works + * with fully qualified names. * * Cache semantics (enforced by consumers, not here): * - Terminal states (fulfilled/expired/cancelled): cached indefinitely (cannot change) @@ -18,15 +18,6 @@ ponder.on("ComposableCow:setup", async ({ context }) => { // Create a separate schema that Ponder's per-deployment schema management won't touch. await context.db.sql.execute(sql`CREATE SCHEMA IF NOT EXISTS cow_cache`); - // Legacy per-owner cache (kept for backward compat, no longer actively used) - await context.db.sql.execute(sql` - CREATE TABLE IF NOT EXISTS cow_cache.orderbook_cache ( - cache_key TEXT PRIMARY KEY, - response_json TEXT NOT NULL, - fetched_at BIGINT NOT NULL - ) - `); - // Per-UID cache for terminal order statuses + executed amounts await context.db.sql.execute(sql` CREATE TABLE IF NOT EXISTS cow_cache.order_uid_cache ( diff --git a/src/application/helpers/orderbookClient.ts b/src/application/helpers/orderbookClient.ts index 225b21a..16a6cf6 100644 --- a/src/application/helpers/orderbookClient.ts +++ b/src/application/helpers/orderbookClient.ts @@ -18,7 +18,8 @@ import { conditionalOrderGenerator, discreteOrder, } from "ponder:schema"; -import { and, eq, sql } from "ponder"; +import { and, eq, inArray } from "ponder"; +import { pgSchema, integer, text } from "drizzle-orm/pg-core"; import { encodeAbiParameters, keccak256, type Hex } from "viem"; import { COMPOSABLE_COW_HANDLER_ADDRESSES, ORDERBOOK_API_URLS } from "../../data"; import { ORDERBOOK_HTTP_TIMEOUT_MS, SIGNING_SCHEME_EIP1271 } from "../../constants"; @@ -459,7 +460,16 @@ async function filterAndProcess( } // ─── Per-UID cache helpers ────────────────────────────────────────────────── -// cow_cache.order_uid_cache is created by setup.ts. Fully qualified names required. +// cow_cache.order_uid_cache is created by setup.ts. Table defined here for typed queries. +const cowCacheSchema = pgSchema("cow_cache"); +const orderUidCache = cowCacheSchema.table("order_uid_cache", { + chainId: integer("chain_id").notNull(), + orderUid: text("order_uid").notNull(), + status: text("status").notNull(), + fetchedAt: integer("fetched_at").notNull(), + executedSellAmount: text("executed_sell_amount"), + executedBuyAmount: text("executed_buy_amount"), +}); /** Cached order data returned by getCachedUidStatuses. */ interface CachedOrderData { @@ -483,19 +493,25 @@ async function getCachedUidStatuses( const batchSize = 500; for (let i = 0; i < uids.length; i += batchSize) { const batch = uids.slice(i, i + batchSize); - const placeholders = batch.map((uid) => `'${uid.replace(/'/g, "''")}'`).join(","); - const rows = (await context.db.sql.execute( - sql.raw( - `SELECT order_uid, status, executed_sell_amount, executed_buy_amount - FROM cow_cache.order_uid_cache - WHERE chain_id = ${chainId} AND order_uid IN (${placeholders})`, - ), - )) as { order_uid: string; status: string; executed_sell_amount: string | null; executed_buy_amount: string | null }[]; + const rows = await context.db.sql + .select({ + orderUid: orderUidCache.orderUid, + status: orderUidCache.status, + executedSellAmount: orderUidCache.executedSellAmount, + executedBuyAmount: orderUidCache.executedBuyAmount, + }) + .from(orderUidCache) + .where( + and( + eq(orderUidCache.chainId, chainId), + inArray(orderUidCache.orderUid, batch), + ), + ); for (const row of rows) { - result.set(row.order_uid, { + result.set(row.orderUid, { status: row.status, - executedSellAmount: row.executed_sell_amount, - executedBuyAmount: row.executed_buy_amount, + executedSellAmount: row.executedSellAmount, + executedBuyAmount: row.executedBuyAmount, }); } } @@ -516,17 +532,25 @@ async function cacheUidStatuses( const now = Math.floor(Date.now() / 1000); for (const order of orders) { try { - await context.db.sql.execute( - sql`INSERT INTO cow_cache.order_uid_cache - (chain_id, order_uid, status, fetched_at, executed_sell_amount, executed_buy_amount) - VALUES (${chainId}, ${order.uid}, ${order.status}, ${now}, - ${order.executedSellAmount}, ${order.executedBuyAmount}) - ON CONFLICT (chain_id, order_uid) DO UPDATE SET - status = EXCLUDED.status, - fetched_at = EXCLUDED.fetched_at, - executed_sell_amount = EXCLUDED.executed_sell_amount, - executed_buy_amount = EXCLUDED.executed_buy_amount`, - ); + await context.db.sql + .insert(orderUidCache) + .values({ + chainId, + orderUid: order.uid, + status: order.status, + fetchedAt: now, + executedSellAmount: order.executedSellAmount, + executedBuyAmount: order.executedBuyAmount, + }) + .onConflictDoUpdate({ + target: [orderUidCache.chainId, orderUidCache.orderUid], + set: { + status: order.status, + fetchedAt: now, + executedSellAmount: order.executedSellAmount, + executedBuyAmount: order.executedBuyAmount, + }, + }); } catch { // Best-effort cache write } diff --git a/tests/api/execution-summary.test.ts b/tests/api/execution-summary.test.ts index 1712208..e496771 100644 --- a/tests/api/execution-summary.test.ts +++ b/tests/api/execution-summary.test.ts @@ -3,12 +3,14 @@ import { OpenAPIHono } from "@hono/zod-openapi"; import { z } from "zod"; // Mock virtual modules before any ponder-importing source files are loaded. -vi.mock("ponder:api", () => ({ db: { execute: vi.fn() } })); +vi.mock("ponder:api", () => ({ db: { select: vi.fn() } })); vi.mock("ponder", () => ({ - sql: Object.assign( - (_s: TemplateStringsArray, ..._v: unknown[]) => ({}), - { raw: (_s: string) => ({}) }, - ), + and: (..._args: unknown[]) => ({}), + eq: (..._args: unknown[]) => ({}), + count: () => ({}), +})); +vi.mock("ponder:schema", () => ({ + discreteOrder: { status: "status", conditionalOrderGeneratorId: "conditionalOrderGeneratorId", chainId: "chainId" }, })); import { db } from "ponder:api"; @@ -17,7 +19,7 @@ import { executionSummaryHandler } from "../../src/api/endpoints/execution-summa import { DiscreteOrderStatusQuery } from "../../src/api/schemas/common"; const Status = DiscreteOrderStatusQuery.enum; -type StatusRow = { status: z.infer; count: string }; +type StatusRow = { status: z.infer; count: number }; function buildApp() { const app = new OpenAPIHono(); @@ -31,13 +33,20 @@ function makeUrl(eventId = EVENT_ID, chainId = 1) { return `http://localhost/generator/${eventId}/execution-summary?chainId=${chainId}`; } +function makeSelectChain(rows: unknown[] = []) { + const groupBy = vi.fn().mockResolvedValue(rows); + const where = vi.fn().mockReturnValue({ groupBy }); + const from = vi.fn().mockReturnValue({ where }); + return { from }; +} + beforeEach(() => { - vi.mocked(db.execute).mockReset(); + vi.mocked(db.select).mockReset(); }); describe("GET /api/generator/:eventId/execution-summary", () => { it("returns all-zero counts when no discrete orders exist", async () => { - vi.mocked(db.execute).mockResolvedValue({ rows: [] } as never); + vi.mocked(db.select).mockReturnValueOnce(makeSelectChain([]) as never); const res = await buildApp().request(makeUrl()); expect(res.status).toBe(200); @@ -53,11 +62,11 @@ describe("GET /api/generator/:eventId/execution-summary", () => { it("maps fulfilled, expired, open, unfilled, cancelled to the right fields", async () => { const rows: StatusRow[] = [ - { status: Status.fulfilled, count: "3" }, - { status: Status.expired, count: "7" }, - { status: Status.open, count: "2" }, + { status: Status.fulfilled, count: 3 }, + { status: Status.expired, count: 7 }, + { status: Status.open, count: 2 }, ]; - vi.mocked(db.execute).mockResolvedValue({ rows } as never); + vi.mocked(db.select).mockReturnValueOnce(makeSelectChain(rows) as never); const body = await (await buildApp().request(makeUrl())).json() as Record; @@ -71,18 +80,18 @@ describe("GET /api/generator/:eventId/execution-summary", () => { it("totalParts is the sum of all status counts", async () => { const rows: StatusRow[] = [ - { status: Status.fulfilled, count: "10" }, - { status: Status.cancelled, count: "5" }, - { status: Status.unfilled, count: "3" }, + { status: Status.fulfilled, count: 10 }, + { status: Status.cancelled, count: 5 }, + { status: Status.unfilled, count: 3 }, ]; - vi.mocked(db.execute).mockResolvedValue({ rows } as never); + vi.mocked(db.select).mockReturnValueOnce(makeSelectChain(rows) as never); const body = await (await buildApp().request(makeUrl())).json() as Record; expect(body["totalParts"]).toBe(18); }); it("echoes back the generatorId and chainId", async () => { - vi.mocked(db.execute).mockResolvedValue({ rows: [] } as never); + vi.mocked(db.select).mockReturnValueOnce(makeSelectChain([]) as never); const body = await (await buildApp().request(makeUrl(EVENT_ID, 100))).json() as Record; expect(body["generatorId"]).toBe(EVENT_ID); @@ -90,15 +99,17 @@ describe("GET /api/generator/:eventId/execution-summary", () => { }); it("returns 400 when chainId query param is missing", async () => { - const app = buildApp(); - const res = await app.request( + const res = await buildApp().request( `http://localhost/generator/${EVENT_ID}/execution-summary`, ); expect(res.status).toBe(400); }); it("returns 500 when the DB throws", async () => { - vi.mocked(db.execute).mockRejectedValueOnce(new Error("db error")); + const groupBy = vi.fn().mockRejectedValueOnce(new Error("db error")); + const where = vi.fn().mockReturnValue({ groupBy }); + const from = vi.fn().mockReturnValue({ where }); + vi.mocked(db.select).mockReturnValueOnce({ from } as never); const res = await buildApp().request(makeUrl()); expect(res.status).toBe(500);