Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"dependencies": {
"@cowprotocol/cow-sdk": "^7.3.8",
"drizzle-orm": "0.41.0",
"@hono/swagger-ui": "^0.5.3",
"@hono/zod-openapi": "^0.19.10",
"hono": "^4.5.0",
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions src/api/endpoints/execution-summary.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { RouteHandler } from "@hono/zod-openapi";
import { db } from "ponder:api";
import { sql } from "ponder";
import { discreteOrder } from "ponder:schema";
import { and, count, eq } from "ponder";
import type { executionSummaryRoute } from "../routes";

export const executionSummaryHandler: RouteHandler<
Expand All @@ -9,17 +10,20 @@ export const executionSummaryHandler: RouteHandler<
const { eventId } = c.req.valid("param");
const { chainId } = c.req.valid("query");

const rows = await db.execute<{ status: string; count: string }>(
sql`SELECT status, COUNT(*)::text AS count
FROM discrete_order
WHERE conditional_order_generator_id = ${eventId}
AND chain_id = ${chainId}
GROUP BY status`,
);
const rows = await db
.select({ status: discreteOrder.status, count: count() })
.from(discreteOrder)
.where(
and(
eq(discreteOrder.conditionalOrderGeneratorId, eventId),
eq(discreteOrder.chainId, chainId),
),
)
.groupBy(discreteOrder.status);

const counts: Record<string, number> = {};
for (const row of rows.rows) {
counts[row.status] = Number(row.count);
for (const row of rows) {
counts[row.status] = row.count;
}

const filledParts = counts["fulfilled"] ?? 0;
Expand Down
8 changes: 4 additions & 4 deletions src/application/handlers/blockHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { ponder } from "ponder:registry";
import { bootstrapRetryQueue, candidateDiscreteOrder, conditionalOrderGenerator, discreteOrder } from "ponder:schema";
import { and, asc, eq, inArray, lte, or, sql } from "ponder";
import { and, asc, eq, inArray, isNull, lte, or, sql } from "ponder";
import type { Hex } from "viem";
import {
COMPOSABLE_COW_ADDRESS_BY_CHAIN_ID,
Expand Down Expand Up @@ -409,7 +409,7 @@ ponder.on("CandidateConfirmer:block", async ({ event, context }) => {
and(
eq(candidateDiscreteOrder.chainId, chainId),
or(
sql`${candidateDiscreteOrder.possibleValidAfterTimestamp} IS NULL`,
isNull(candidateDiscreteOrder.possibleValidAfterTimestamp),
lte(candidateDiscreteOrder.possibleValidAfterTimestamp, event.block.timestamp),
),
),
Expand Down Expand Up @@ -723,7 +723,7 @@ ponder.on("HistoricalBootstrap:block", async ({ event, context }) => {
eq(conditionalOrderGenerator.chainId, chainId),
eq(conditionalOrderGenerator.status, "Active"),
inArray(conditionalOrderGenerator.orderType, [...NON_DETERMINISTIC_TYPES]),
sql`${discreteOrder.orderUid} IS NULL`,
isNull(discreteOrder.orderUid),
),
) as {
generatorId: string;
Expand Down Expand Up @@ -810,7 +810,7 @@ ponder.on("DeterministicCancellationSweeper:block", async ({ event, context }) =
eq(conditionalOrderGenerator.status, "Active"),
eq(conditionalOrderGenerator.allCandidatesKnown, true),
or(
sql`${conditionalOrderGenerator.nextCheckBlock} IS NULL`,
isNull(conditionalOrderGenerator.nextCheckBlock),
lte(conditionalOrderGenerator.nextCheckBlock, currentBlock),
),
),
Expand Down
15 changes: 3 additions & 12 deletions src/application/handlers/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import { ponder } from "ponder:registry";
import { sql } from "ponder";

/**
* Creates the cow_cache schema and orderbook_cache table on startup.
* Creates the cow_cache schema and persistent cache tables on startup.
*
* The cow_cache schema is separate from Ponder's per-deployment schema, so it
* survives `ponder start` redeployments (which create a new namespace each time).
* Ponder's `user` pool does not restrict search_path, so fully qualified names
* (cow_cache.orderbook_cache) work from event handlers. The `readonly` pool used
* by the API layer also works with fully qualified names.
* work from event handlers. The `readonly` pool used by the API layer also works
* with fully qualified names.
*
* Cache semantics (enforced by consumers, not here):
* - Terminal states (fulfilled/expired/cancelled): cached indefinitely (cannot change)
Expand All @@ -18,15 +18,6 @@ ponder.on("ComposableCow:setup", async ({ context }) => {
// Create a separate schema that Ponder's per-deployment schema management won't touch.
await context.db.sql.execute(sql`CREATE SCHEMA IF NOT EXISTS cow_cache`);

// Legacy per-owner cache (kept for backward compat, no longer actively used)
await context.db.sql.execute(sql`
CREATE TABLE IF NOT EXISTS cow_cache.orderbook_cache (
cache_key TEXT PRIMARY KEY,
response_json TEXT NOT NULL,
fetched_at BIGINT NOT NULL
)
`);

// Per-UID cache for terminal order statuses + executed amounts
await context.db.sql.execute(sql`
CREATE TABLE IF NOT EXISTS cow_cache.order_uid_cache (
Expand Down
72 changes: 48 additions & 24 deletions src/application/helpers/orderbookClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
conditionalOrderGenerator,
discreteOrder,
} from "ponder:schema";
import { and, eq, sql } from "ponder";
import { and, eq, inArray } from "ponder";
import { pgSchema, integer, text } from "drizzle-orm/pg-core";
import { encodeAbiParameters, keccak256, type Hex } from "viem";
import { COMPOSABLE_COW_HANDLER_ADDRESSES, ORDERBOOK_API_URLS } from "../../data";
import { ORDERBOOK_HTTP_TIMEOUT_MS, SIGNING_SCHEME_EIP1271 } from "../../constants";
Expand Down Expand Up @@ -459,7 +460,16 @@ async function filterAndProcess(
}

// ─── Per-UID cache helpers ──────────────────────────────────────────────────
// cow_cache.order_uid_cache is created by setup.ts. Fully qualified names required.
// cow_cache.order_uid_cache is created by setup.ts. Table defined here for typed queries.
const cowCacheSchema = pgSchema("cow_cache");
const orderUidCache = cowCacheSchema.table("order_uid_cache", {
chainId: integer("chain_id").notNull(),
orderUid: text("order_uid").notNull(),
status: text("status").notNull(),
fetchedAt: integer("fetched_at").notNull(),
executedSellAmount: text("executed_sell_amount"),
executedBuyAmount: text("executed_buy_amount"),
});

/** Cached order data returned by getCachedUidStatuses. */
interface CachedOrderData {
Expand All @@ -483,19 +493,25 @@ async function getCachedUidStatuses(
const batchSize = 500;
for (let i = 0; i < uids.length; i += batchSize) {
const batch = uids.slice(i, i + batchSize);
const placeholders = batch.map((uid) => `'${uid.replace(/'/g, "''")}'`).join(",");
const rows = (await context.db.sql.execute(
sql.raw(
`SELECT order_uid, status, executed_sell_amount, executed_buy_amount
FROM cow_cache.order_uid_cache
WHERE chain_id = ${chainId} AND order_uid IN (${placeholders})`,
),
)) as { order_uid: string; status: string; executed_sell_amount: string | null; executed_buy_amount: string | null }[];
const rows = await context.db.sql
.select({
orderUid: orderUidCache.orderUid,
status: orderUidCache.status,
executedSellAmount: orderUidCache.executedSellAmount,
executedBuyAmount: orderUidCache.executedBuyAmount,
})
.from(orderUidCache)
.where(
and(
eq(orderUidCache.chainId, chainId),
inArray(orderUidCache.orderUid, batch),
),
);
for (const row of rows) {
result.set(row.order_uid, {
result.set(row.orderUid, {
status: row.status,
executedSellAmount: row.executed_sell_amount,
executedBuyAmount: row.executed_buy_amount,
executedSellAmount: row.executedSellAmount,
executedBuyAmount: row.executedBuyAmount,
});
}
}
Expand All @@ -516,17 +532,25 @@ async function cacheUidStatuses(
const now = Math.floor(Date.now() / 1000);
for (const order of orders) {
try {
await context.db.sql.execute(
sql`INSERT INTO cow_cache.order_uid_cache
(chain_id, order_uid, status, fetched_at, executed_sell_amount, executed_buy_amount)
VALUES (${chainId}, ${order.uid}, ${order.status}, ${now},
${order.executedSellAmount}, ${order.executedBuyAmount})
ON CONFLICT (chain_id, order_uid) DO UPDATE SET
status = EXCLUDED.status,
fetched_at = EXCLUDED.fetched_at,
executed_sell_amount = EXCLUDED.executed_sell_amount,
executed_buy_amount = EXCLUDED.executed_buy_amount`,
);
await context.db.sql
.insert(orderUidCache)
.values({
chainId,
orderUid: order.uid,
status: order.status,
fetchedAt: now,
executedSellAmount: order.executedSellAmount,
executedBuyAmount: order.executedBuyAmount,
})
.onConflictDoUpdate({
target: [orderUidCache.chainId, orderUidCache.orderUid],
set: {
status: order.status,
fetchedAt: now,
executedSellAmount: order.executedSellAmount,
executedBuyAmount: order.executedBuyAmount,
},
});
} catch {
// Best-effort cache write
}
Expand Down
51 changes: 31 additions & 20 deletions tests/api/execution-summary.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { OpenAPIHono } from "@hono/zod-openapi";
import { z } from "zod";

// Mock virtual modules before any ponder-importing source files are loaded.
vi.mock("ponder:api", () => ({ db: { execute: vi.fn() } }));
vi.mock("ponder:api", () => ({ db: { select: vi.fn() } }));
vi.mock("ponder", () => ({
sql: Object.assign(
(_s: TemplateStringsArray, ..._v: unknown[]) => ({}),
{ raw: (_s: string) => ({}) },
),
and: (..._args: unknown[]) => ({}),
eq: (..._args: unknown[]) => ({}),
count: () => ({}),
}));
vi.mock("ponder:schema", () => ({
discreteOrder: { status: "status", conditionalOrderGeneratorId: "conditionalOrderGeneratorId", chainId: "chainId" },
}));

import { db } from "ponder:api";
Expand All @@ -17,7 +19,7 @@ import { executionSummaryHandler } from "../../src/api/endpoints/execution-summa
import { DiscreteOrderStatusQuery } from "../../src/api/schemas/common";

const Status = DiscreteOrderStatusQuery.enum;
type StatusRow = { status: z.infer<typeof DiscreteOrderStatusQuery>; count: string };
type StatusRow = { status: z.infer<typeof DiscreteOrderStatusQuery>; count: number };

function buildApp() {
const app = new OpenAPIHono();
Expand All @@ -31,13 +33,20 @@ function makeUrl(eventId = EVENT_ID, chainId = 1) {
return `http://localhost/generator/${eventId}/execution-summary?chainId=${chainId}`;
}

function makeSelectChain(rows: unknown[] = []) {
const groupBy = vi.fn().mockResolvedValue(rows);
const where = vi.fn().mockReturnValue({ groupBy });
const from = vi.fn().mockReturnValue({ where });
return { from };
}

beforeEach(() => {
vi.mocked(db.execute).mockReset();
vi.mocked(db.select).mockReset();
});

describe("GET /api/generator/:eventId/execution-summary", () => {
it("returns all-zero counts when no discrete orders exist", async () => {
vi.mocked(db.execute).mockResolvedValue({ rows: [] } as never);
vi.mocked(db.select).mockReturnValueOnce(makeSelectChain([]) as never);

const res = await buildApp().request(makeUrl());
expect(res.status).toBe(200);
Expand All @@ -53,11 +62,11 @@ describe("GET /api/generator/:eventId/execution-summary", () => {

it("maps fulfilled, expired, open, unfilled, cancelled to the right fields", async () => {
const rows: StatusRow[] = [
{ status: Status.fulfilled, count: "3" },
{ status: Status.expired, count: "7" },
{ status: Status.open, count: "2" },
{ status: Status.fulfilled, count: 3 },
{ status: Status.expired, count: 7 },
{ status: Status.open, count: 2 },
];
vi.mocked(db.execute).mockResolvedValue({ rows } as never);
vi.mocked(db.select).mockReturnValueOnce(makeSelectChain(rows) as never);

const body = await (await buildApp().request(makeUrl())).json() as Record<string, unknown>;

Expand All @@ -71,34 +80,36 @@ describe("GET /api/generator/:eventId/execution-summary", () => {

it("totalParts is the sum of all status counts", async () => {
const rows: StatusRow[] = [
{ status: Status.fulfilled, count: "10" },
{ status: Status.cancelled, count: "5" },
{ status: Status.unfilled, count: "3" },
{ status: Status.fulfilled, count: 10 },
{ status: Status.cancelled, count: 5 },
{ status: Status.unfilled, count: 3 },
];
vi.mocked(db.execute).mockResolvedValue({ rows } as never);
vi.mocked(db.select).mockReturnValueOnce(makeSelectChain(rows) as never);

const body = await (await buildApp().request(makeUrl())).json() as Record<string, unknown>;
expect(body["totalParts"]).toBe(18);
});

it("echoes back the generatorId and chainId", async () => {
vi.mocked(db.execute).mockResolvedValue({ rows: [] } as never);
vi.mocked(db.select).mockReturnValueOnce(makeSelectChain([]) as never);

const body = await (await buildApp().request(makeUrl(EVENT_ID, 100))).json() as Record<string, unknown>;
expect(body["generatorId"]).toBe(EVENT_ID);
expect(body["chainId"]).toBe(100);
});

it("returns 400 when chainId query param is missing", async () => {
const app = buildApp();
const res = await app.request(
const res = await buildApp().request(
`http://localhost/generator/${EVENT_ID}/execution-summary`,
);
expect(res.status).toBe(400);
});

it("returns 500 when the DB throws", async () => {
vi.mocked(db.execute).mockRejectedValueOnce(new Error("db error"));
const groupBy = vi.fn().mockRejectedValueOnce(new Error("db error"));
const where = vi.fn().mockReturnValue({ groupBy });
const from = vi.fn().mockReturnValue({ where });
vi.mocked(db.select).mockReturnValueOnce({ from } as never);

const res = await buildApp().request(makeUrl());
expect(res.status).toBe(500);
Expand Down
Loading