Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ DATABASE_SCHEMA=programmatic_orders
# MAX_GENERATORS_PER_BLOCK_1=200 # mainnet
# MAX_GENERATORS_PER_BLOCK_100=400 # gnosis (shorter block time → higher budget)

# Per-block cap on how many open discrete orders OrderStatusTracker checks per chain.
# Default: 200. Excess orders defer to next block, prioritized by oldest promotedAt first.
# MAX_DISCRETE_ORDERS_PER_BLOCK_1=200
# MAX_DISCRETE_ORDERS_PER_BLOCK_100=400

# eth_getLogs block range cap (optional; default: 1000)
# Increase if your RPC provider supports a larger range to speed up backfill.
# Override per chain with the numeric chain-id suffix:
Expand Down
1 change: 1 addition & 0 deletions docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Example: `DATABASE_URL=postgresql://cow_programmatic:secretpass@localhost:5433/c
| `DISABLE_POLL_RESULT_CHECK` | No | Disables the `OrderDiscoveryPoller` block handler. Skips RPC multicalls for non-deterministic generators. Saves RPC calls during initial sync at the cost of not detecting poll results until re-enabled. |
| `DISABLE_DETERMINISTIC_CANCEL_SWEEP` | No | Disables the `CancellationWatcher`. Skips periodic `singleOrders()` reads on deterministic generators. While disabled, on-chain `ComposableCoW.remove()` calls on TWAP/StopLoss/CirclesBackingOrder generators will not be detected and those generators stay `Active`. |
| `MAX_GENERATORS_PER_BLOCK_<chainId>` | No | Per-block cap on how many generators `OrderDiscoveryPoller` and `CancellationWatcher` will touch on the given chain (e.g. `MAX_GENERATORS_PER_BLOCK_1=200`, `MAX_GENERATORS_PER_BLOCK_100=400`). Default is 200. Excess generators defer to the next block, prioritized by oldest `lastCheckBlock` first. |
| `MAX_DISCRETE_ORDERS_PER_BLOCK_<chainId>` | No | Per-block cap on how many open discrete orders `OrderStatusTracker` will check on the given chain (e.g. `MAX_DISCRETE_ORDERS_PER_BLOCK_1=200`). Default is 200. Excess orders are deferred to the next block, prioritised by oldest `promotedAt` first. |
| `DISABLE_SETTLEMENT_FACTORY_CHECK` | No | Skips `getCode` + `FACTORY()` RPC calls in the GPv2Settlement handler. Useful for benchmarking base sync throughput. |
| `PINO_LOG_LEVEL` | No | Log verbosity: `debug`, `info`, `warn`, `error`. Defaults to Ponder's built-in default. |

Expand Down
162 changes: 101 additions & 61 deletions src/application/handlers/blockHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
import {
BLOCK_HANDLER_RPC_TIMEOUT_MS,
BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS,
DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK,
DEFAULT_MAX_GENERATORS_PER_BLOCK,
DETERMINISTIC_CANCEL_SWEEP_INTERVAL,
ORDERBOOK_HTTP_TIMEOUT_MS,
Expand Down Expand Up @@ -82,9 +83,9 @@ ponder.on("OrderDiscoveryPoller:block", async ({ event, context }) => {
const currentBlock = event.block.number;
const currentTimestamp = event.block.timestamp;

const rawGeneratorCap = Number(process.env[`MAX_GENERATORS_PER_BLOCK_${chainId}`]);
const maxGeneratorsPerBlock =
Number(process.env[`MAX_GENERATORS_PER_BLOCK_${chainId}`]) ||
DEFAULT_MAX_GENERATORS_PER_BLOCK;
Number.isFinite(rawGeneratorCap) && rawGeneratorCap > 0 ? rawGeneratorCap : DEFAULT_MAX_GENERATORS_PER_BLOCK;

const dueOrders = await context.db.sql
.select({
Expand Down Expand Up @@ -367,41 +368,46 @@ ponder.on("CandidateConfirmer:block", async ({ event, context }) => {

// onConflictDoNothing: if C3 already promoted this UID with a terminal status
// (e.g. 'fulfilled'), the existing row wins and this insert is a no-op.
// Chunked to avoid PostgreSQL bind-message parameter limits on large cascades.
// preflightKnown counts API hits, not rows actually written.
await context.db.sql
.insert(discreteOrder)
.values(
orphanCandidates.map((c) => {
const apiEntry = preflightStatuses.get(c.orderUid);
return {
orderUid: c.orderUid,
chainId,
conditionalOrderGeneratorId: c.generatorId,
status: (apiEntry?.status ?? "cancelled") as DiscreteStatus,
sellAmount: c.sellAmount,
buyAmount: c.buyAmount,
feeAmount: c.feeAmount,
validTo: c.validTo,
creationDate: c.creationDate,
executedSellAmount: apiEntry?.executedSellAmount ?? null,
executedBuyAmount: apiEntry?.executedBuyAmount ?? null,
promotedAt: event.block.timestamp,
};
}),
)
.onConflictDoNothing();
const CASCADE_CHUNK_SIZE = 500;
for (let i = 0; i < orphanCandidates.length; i += CASCADE_CHUNK_SIZE) {
const chunk = orphanCandidates.slice(i, i + CASCADE_CHUNK_SIZE);
await context.db.sql
.insert(discreteOrder)
.values(
chunk.map((c) => {
const apiEntry = preflightStatuses.get(c.orderUid);
return {
orderUid: c.orderUid,
chainId,
conditionalOrderGeneratorId: c.generatorId,
status: (apiEntry?.status ?? "cancelled") as DiscreteStatus,
sellAmount: c.sellAmount,
buyAmount: c.buyAmount,
feeAmount: c.feeAmount,
validTo: c.validTo,
creationDate: c.creationDate,
executedSellAmount: apiEntry?.executedSellAmount ?? null,
executedBuyAmount: apiEntry?.executedBuyAmount ?? null,
promotedAt: event.block.timestamp,
};
}),
)
.onConflictDoNothing();

await context.db.sql
.delete(candidateDiscreteOrder)
.where(
and(
eq(candidateDiscreteOrder.chainId, chainId),
inArray(
candidateDiscreteOrder.orderUid,
orphanCandidates.map((c) => c.orderUid),
await context.db.sql
.delete(candidateDiscreteOrder)
.where(
and(
eq(candidateDiscreteOrder.chainId, chainId),
inArray(
candidateDiscreteOrder.orderUid,
chunk.map((c) => c.orderUid),
),
),
),
);
);
}

const preflightKnown = preflightStatuses.size;
log("info", "CandidateConfirmer:parent_cancelled", { block: String(event.block.number), chainId, parentCancelled: orphanCandidates.length, preflightKnown });
Expand Down Expand Up @@ -615,48 +621,82 @@ ponder.on("OrderStatusTracker:block", async ({ event, context }) => {
const chainId = context.chain.id as SupportedChainId;
const currentTimestamp = event.block.timestamp;

const rawOrderCap = Number(process.env[`MAX_DISCRETE_ORDERS_PER_BLOCK_${chainId}`]);
const maxOrdersPerBlock =
Number.isFinite(rawOrderCap) && rawOrderCap > 0 ? rawOrderCap : DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK;

const openOrders = await context.db.sql
.select({
orderUid: discreteOrder.orderUid,
conditionalOrderGeneratorId: discreteOrder.conditionalOrderGeneratorId,
sellAmount: discreteOrder.sellAmount,
buyAmount: discreteOrder.buyAmount,
feeAmount: discreteOrder.feeAmount,
validTo: discreteOrder.validTo,
creationDate: discreteOrder.creationDate,
promotedAt: discreteOrder.promotedAt,
})
.from(discreteOrder)
.where(
and(
eq(discreteOrder.chainId, chainId),
eq(discreteOrder.status, "open"),
),
) as { orderUid: string }[];
)
.orderBy(asc(discreteOrder.promotedAt))
.limit(maxOrdersPerBlock) as {
orderUid: string;
conditionalOrderGeneratorId: string;
sellAmount: string;
buyAmount: string;
feeAmount: string;
validTo: number | null;
creationDate: bigint;
promotedAt: bigint | null;
}[];

if (openOrders.length > 0) {
const uids = openOrders.map((o) => o.orderUid);
const statuses = await fetchOrderStatusByUids(context, chainId, uids);

let updated = 0;
for (const [uid, info] of statuses) {
if (VALID_DISCRETE_STATUSES.has(info.status)) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const setFields: Record<string, any> = {
status: info.status as "fulfilled" | "unfilled" | "expired" | "cancelled",
};
if (info.executedSellAmount != null) {
setFields.executedSellAmount = info.executedSellAmount;
setFields.executedBuyAmount = info.executedBuyAmount;
}
await context.db.sql
.update(discreteOrder)
.set(setFields)
.where(
and(
eq(discreteOrder.chainId, chainId),
eq(discreteOrder.orderUid, uid),
),
);
updated++;
}
type DiscreteStatus = "open" | "fulfilled" | "unfilled" | "expired" | "cancelled";
const rowsToUpdate: (typeof discreteOrder.$inferInsert)[] = [];

for (const order of openOrders) {
const info = statuses.get(order.orderUid);
if (!info || !VALID_DISCRETE_STATUSES.has(info.status)) continue;
rowsToUpdate.push({
orderUid: order.orderUid,
chainId,
conditionalOrderGeneratorId: order.conditionalOrderGeneratorId,
status: info.status as DiscreteStatus,
sellAmount: order.sellAmount,
buyAmount: order.buyAmount,
feeAmount: order.feeAmount,
validTo: order.validTo,
creationDate: order.creationDate,
executedSellAmount: info.executedSellAmount ?? null,
executedBuyAmount: info.executedBuyAmount ?? null,
promotedAt: order.promotedAt,
});
}

if (updated > 0) {
log("info", "OrderStatusTracker:DONE", { block: String(event.block.number), chainId, open: openOrders.length, updated });
// One multi-row upsert keeps the block TX open for one round-trip instead of N.
if (rowsToUpdate.length > 0) {
await context.db.sql
.insert(discreteOrder)
.values(rowsToUpdate)
// promotedAt is intentionally omitted — preserve the original promotion timestamp across status updates.
.onConflictDoUpdate({
target: [discreteOrder.chainId, discreteOrder.orderUid],
set: {
status: sql`excluded.status`,
executedSellAmount: sql`excluded.executed_sell_amount`,
executedBuyAmount: sql`excluded.executed_buy_amount`,
},
});

log("info", "OrderStatusTracker:DONE", { block: String(event.block.number), chainId, open: String(openOrders.length), updated: String(rowsToUpdate.length) });
}
}

Expand Down Expand Up @@ -834,9 +874,9 @@ ponder.on("CancellationWatcher:block", async ({ event, context }) => {

const currentBlock = event.block.number;

const rawGeneratorCap2 = Number(process.env[`MAX_GENERATORS_PER_BLOCK_${chainId}`]);
const maxGeneratorsPerBlock =
Number(process.env[`MAX_GENERATORS_PER_BLOCK_${chainId}`]) ||
DEFAULT_MAX_GENERATORS_PER_BLOCK;
Number.isFinite(rawGeneratorCap2) && rawGeneratorCap2 > 0 ? rawGeneratorCap2 : DEFAULT_MAX_GENERATORS_PER_BLOCK;

const dueGenerators = await context.db.sql
.select({
Expand Down
10 changes: 10 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,13 @@ export const SETTLEMENT_INNER_RPC_TIMEOUT_MS = 5_000;
* the normal OrderDiscoveryPoller / CandidateConfirmer path picks them up on subsequent blocks.
*/
export const BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS = 30_000;

/**
* Hard per-block ceiling on how many open discrete orders the C3
* StatusUpdater will check in a single block. Caps the /by_uids batch size
* and keeps block handler transactions short.
*
* Override per chain with env var MAX_DISCRETE_ORDERS_PER_BLOCK_<chainId>, e.g.
* MAX_DISCRETE_ORDERS_PER_BLOCK_1=200, MAX_DISCRETE_ORDERS_PER_BLOCK_100=500.
*/
export const DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK = 200;
135 changes: 135 additions & 0 deletions tests/constants.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { describe, it, expect } from "vitest";
import {
SIGNING_SCHEME_EIP1271,
RECHECK_INTERVAL,
TRY_NEXT_BLOCK_WARMUP_THRESHOLD,
TRY_NEXT_BLOCK_COOLDOWN_THRESHOLD,
TRY_NEXT_BLOCK_BACKOFF_WARMUP,
TRY_NEXT_BLOCK_BACKOFF_MID,
TRY_NEXT_BLOCK_BACKOFF_COLD,
DETERMINISTIC_CANCEL_SWEEP_INTERVAL,
ORDERBOOK_HTTP_TIMEOUT_MS,
BLOCK_HANDLER_RPC_TIMEOUT_MS,
BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS,
DEFAULT_MAX_GENERATORS_PER_BLOCK,
DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK,
} from "../src/constants";

describe("DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK (COW-988)", () => {
it("is 200", () => {
expect(DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK).toBe(200);
});

it("is a positive integer", () => {
expect(Number.isInteger(DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK)).toBe(true);
expect(DEFAULT_MAX_DISCRETE_ORDERS_PER_BLOCK).toBeGreaterThan(0);
});
});

describe("DEFAULT_MAX_GENERATORS_PER_BLOCK", () => {
it("is 200", () => {
expect(DEFAULT_MAX_GENERATORS_PER_BLOCK).toBe(200);
});
});

describe("SIGNING_SCHEME_EIP1271", () => {
it('is the string "eip1271"', () => {
expect(SIGNING_SCHEME_EIP1271).toBe("eip1271");
});

it('is not "erc1271" — the API uses eip1271 spelling', () => {
expect(SIGNING_SCHEME_EIP1271).not.toBe("erc1271");
});
});

describe("RECHECK_INTERVAL", () => {
it("is a bigint", () => {
expect(typeof RECHECK_INTERVAL).toBe("bigint");
});

it("equals BigInt(ORDERBOOK_POLL_INTERVAL) which is 20", () => {
// ORDERBOOK_POLL_INTERVAL = 20 (from data.ts)
expect(RECHECK_INTERVAL).toBe(20n);
});
});

describe("TryNextBlock backoff thresholds", () => {
it("WARMUP_THRESHOLD is 50", () => {
expect(TRY_NEXT_BLOCK_WARMUP_THRESHOLD).toBe(50);
});

it("COOLDOWN_THRESHOLD is 200", () => {
expect(TRY_NEXT_BLOCK_COOLDOWN_THRESHOLD).toBe(200);
});

it("WARMUP < COOLDOWN — thresholds are ordered correctly", () => {
expect(TRY_NEXT_BLOCK_WARMUP_THRESHOLD).toBeLessThan(
TRY_NEXT_BLOCK_COOLDOWN_THRESHOLD,
);
});
});

describe("TryNextBlock backoff block offsets", () => {
it("WARMUP backoff is 1 block", () => {
expect(TRY_NEXT_BLOCK_BACKOFF_WARMUP).toBe(1n);
});

it("MID backoff is 10 blocks", () => {
expect(TRY_NEXT_BLOCK_BACKOFF_MID).toBe(10n);
});

it("COLD backoff is 50 blocks", () => {
expect(TRY_NEXT_BLOCK_BACKOFF_COLD).toBe(50n);
});

it("backoff levels are strictly increasing", () => {
expect(TRY_NEXT_BLOCK_BACKOFF_WARMUP).toBeLessThan(
TRY_NEXT_BLOCK_BACKOFF_MID,
);
expect(TRY_NEXT_BLOCK_BACKOFF_MID).toBeLessThan(
TRY_NEXT_BLOCK_BACKOFF_COLD,
);
});

it("all backoff values are bigints", () => {
expect(typeof TRY_NEXT_BLOCK_BACKOFF_WARMUP).toBe("bigint");
expect(typeof TRY_NEXT_BLOCK_BACKOFF_MID).toBe("bigint");
expect(typeof TRY_NEXT_BLOCK_BACKOFF_COLD).toBe("bigint");
});
});

describe("DETERMINISTIC_CANCEL_SWEEP_INTERVAL", () => {
it("is 100n", () => {
expect(DETERMINISTIC_CANCEL_SWEEP_INTERVAL).toBe(100n);
});

it("is a bigint", () => {
expect(typeof DETERMINISTIC_CANCEL_SWEEP_INTERVAL).toBe("bigint");
});
});

describe("Timeout constants", () => {
it("ORDERBOOK_HTTP_TIMEOUT_MS is 10_000", () => {
expect(ORDERBOOK_HTTP_TIMEOUT_MS).toBe(10_000);
});

it("BLOCK_HANDLER_RPC_TIMEOUT_MS is 15_000", () => {
expect(BLOCK_HANDLER_RPC_TIMEOUT_MS).toBe(15_000);
});

it("BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS is 30_000", () => {
expect(BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS).toBe(30_000);
});

it("RPC timeout is shorter than bootstrap timeout — boot has more slack", () => {
expect(BLOCK_HANDLER_RPC_TIMEOUT_MS).toBeLessThan(
BOOTSTRAP_OWNER_FETCH_TIMEOUT_MS,
);
});

it("HTTP timeout is shorter than RPC timeout", () => {
expect(ORDERBOOK_HTTP_TIMEOUT_MS).toBeLessThan(
BLOCK_HANDLER_RPC_TIMEOUT_MS,
);
});
});
Loading
Loading