From 5d216f8f9d5c995587bb5bd869b7d9c360a22571 Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Fri, 1 May 2026 00:13:22 -0400 Subject: [PATCH 1/3] feat(clickhouse): add configurable request timeout with abort signal - Add CLICKHOUSE_REQUEST_TIMEOUT_MS config (default 30s) - Apply timeout to all ClickHouse client instances (query, insert, setup) - Implement AbortController-based timeout for query() operations - Add launch.json to .gitignore --- .gitignore | 2 ++ lib/clickhouse.ts | 16 ++++++++++++++++ lib/config.ts | 13 +++++++++++++ 3 files changed, 31 insertions(+) diff --git a/.gitignore b/.gitignore index 9d4fe4c..a062b4c 100644 --- a/.gitignore +++ b/.gitignore @@ -139,3 +139,5 @@ dist # Vite logs files vite.config.js.timestamp-* vite.config.ts.timestamp-* + +launch.json diff --git a/lib/clickhouse.ts b/lib/clickhouse.ts index 4c1af84..b26f49b 100644 --- a/lib/clickhouse.ts +++ b/lib/clickhouse.ts @@ -1,4 +1,5 @@ import { type ClickHouseClient, createClient } from '@clickhouse/client'; +import { CLICKHOUSE_REQUEST_TIMEOUT_MS } from './config'; import { createLogger } from './logger'; import { trackClickHouseOperation } from './prometheus'; @@ -31,6 +32,7 @@ function getClient(): ClickHouseClient { username: process.env.CLICKHOUSE_USERNAME || 'default', password: process.env.CLICKHOUSE_PASSWORD || '', database: process.env.CLICKHOUSE_DATABASE, + request_timeout: CLICKHOUSE_REQUEST_TIMEOUT_MS, }); } return _client; @@ -47,6 +49,7 @@ function getInsertClient(): ClickHouseClient { username: process.env.CLICKHOUSE_USERNAME || 'default', password: process.env.CLICKHOUSE_PASSWORD || '', database: getInsertDatabase(), + request_timeout: CLICKHOUSE_REQUEST_TIMEOUT_MS, }); } return _insertClient; @@ -64,6 +67,7 @@ function getSetupClient(): ClickHouseClient { username: process.env.CLICKHOUSE_USERNAME || 'default', password: process.env.CLICKHOUSE_PASSWORD || '', database: getInsertDatabase(), + request_timeout: CLICKHOUSE_REQUEST_TIMEOUT_MS, }); } return _setupClient; @@ -149,6 +153,15 @@ export async function query( // Track total operation time const startTime = performance.now(); + const controller = new AbortController(); + const timeoutHandle = setTimeout(() => { + controller.abort( + new Error( + `ClickHouse query timed out after ${CLICKHOUSE_REQUEST_TIMEOUT_MS}ms`, + ), + ); + }, CLICKHOUSE_REQUEST_TIMEOUT_MS); + try { // Track query execution time const queryStartTime = performance.now(); @@ -156,6 +169,7 @@ export async function query( query, query_params, format: 'JSONEachRow', + abort_signal: controller.signal, }); trackClickHouseOperation('read', 'success', startTime); const queryEndTime = performance.now(); @@ -182,6 +196,7 @@ export async function query( totalTimeMs, }); + clearTimeout(timeoutHandle); return { data, metrics: { @@ -191,6 +206,7 @@ export async function query( }, }; } catch (error: unknown) { + clearTimeout(timeoutHandle); trackClickHouseOperation('read', 'error', startTime); const url = process.env.CLICKHOUSE_URL || 'http://localhost:8123'; const urlObj = new URL(url); diff --git a/lib/config.ts b/lib/config.ts index 29b8977..263ecfe 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -40,6 +40,9 @@ export const DEFAULT_CONFIG = { // Auto-restart AUTO_RESTART_DELAY: 10, + + // ClickHouse request timeout + CLICKHOUSE_REQUEST_TIMEOUT_MS: 30_000, } as const; // ============================================================================ @@ -112,6 +115,16 @@ export const CLICKHOUSE_PASSWORD = process.env.CLICKHOUSE_PASSWORD || DEFAULT_CONFIG.CLICKHOUSE_PASSWORD; export const CLICKHOUSE_DATABASE = process.env.CLICKHOUSE_DATABASE; +/** + * Timeout in milliseconds for ClickHouse HTTP requests + * Default: 30000ms (30 seconds) + */ +export const CLICKHOUSE_REQUEST_TIMEOUT_MS = parseInt( + process.env.CLICKHOUSE_REQUEST_TIMEOUT_MS || + String(DEFAULT_CONFIG.CLICKHOUSE_REQUEST_TIMEOUT_MS), + 10, +); + /** * Database name for insert operations (INSERT, DDL) * Optional override - falls back to CLICKHOUSE_DATABASE if not set From 94d7a0e30defe8d1744ebff2198ffa5b06d8a380 Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Fri, 1 May 2026 00:13:28 -0400 Subject: [PATCH 2/3] lint --- lib/logger.ts | 5 +- services/polymarket/fetch-gamma.test.ts | 7 +- services/polymarket/index.test.ts | 211 ++++++++++++++++++------ services/polymarket/index.ts | 4 +- 4 files changed, 173 insertions(+), 54 deletions(-) diff --git a/lib/logger.ts b/lib/logger.ts index b7661ea..41511f9 100644 --- a/lib/logger.ts +++ b/lib/logger.ts @@ -36,7 +36,10 @@ function transportJSON(logObj: Record): void { logger: meta.name, msg, }; - if (payload !== undefined && (payload === null || typeof payload !== 'object')) { + if ( + payload !== undefined && + (payload === null || typeof payload !== 'object') + ) { flat.data = payload; } process.stdout.write(JSON.stringify(flat) + '\n'); diff --git a/services/polymarket/fetch-gamma.test.ts b/services/polymarket/fetch-gamma.test.ts index 8e69cad..42138da 100644 --- a/services/polymarket/fetch-gamma.test.ts +++ b/services/polymarket/fetch-gamma.test.ts @@ -12,8 +12,7 @@ const mockFetch = mock(() => ); globalThis.fetch = mockFetch as unknown as typeof fetch; -const conditionId = (i: number) => - `0x${i.toString(16).padStart(64, '0')}`; +const conditionId = (i: number) => `0x${i.toString(16).padStart(64, '0')}`; const marketStub = (id: number) => ({ id: String(id), @@ -31,7 +30,9 @@ describe('fetchGammaApi', () => { Promise.resolve({ ok: true, json: () => - Promise.resolve({ markets: [marketStub(1), marketStub(2)] }), + Promise.resolve({ + markets: [marketStub(1), marketStub(2)], + }), }), ); const result = await fetchGammaApi( diff --git a/services/polymarket/index.test.ts b/services/polymarket/index.test.ts index 769ba28..93b3aa4 100644 --- a/services/polymarket/index.test.ts +++ b/services/polymarket/index.test.ts @@ -58,29 +58,86 @@ mock.module('../../lib/batch-insert', () => ({ globalThis.fetch = mockFetch as unknown as typeof fetch; const baseMockMarket = { - id: '1', conditionId: '0xaaa', question: 'Test?', description: '', - slug: 'test', outcomes: '["Yes", "No"]', outcomePrices: '["0.5", "0.5"]', - resolutionSource: '', image: '', icon: '', questionID: '', - clobTokenIds: '["111", "222"]', submitted_by: '', marketMakerAddress: '', - enableOrderBook: true, orderPriceMinTickSize: 0.001, orderMinSize: 5, - negRisk: false, negRiskRequestID: '', negRiskOther: false, - archived: false, new: false, featured: false, resolvedBy: '', - restricted: false, hasReviewedDates: false, umaBond: '', umaReward: '', - customLiveness: 0, acceptingOrders: true, ready: true, funded: true, - acceptingOrdersTimestamp: '', cyom: false, competitive: 0, - pagerDutyNotificationEnabled: false, approved: true, rewardsMinSize: 0, - rewardsMaxSpread: 0, spread: 0, automaticallyActive: true, - clearBookOnStart: false, manualActivation: false, pendingDeployment: false, - deploying: false, deployingTimestamp: '', rfqEnabled: false, - eventStartTime: '', holdingRewardsEnabled: false, feesEnabled: false, - requiresTranslation: false, startDate: '', endDate: '', startDateIso: '', - endDateIso: '', umaEndDate: '', createdAt: '', events: [] as any[], - liquidity: '', volume: '', volumeNum: 0, liquidityNum: 0, - volume24hr: 0, volume1wk: 0, volume1mo: 0, volume1yr: 0, - volume24hrClob: 0, volume1wkClob: 0, volume1moClob: 0, volume1yrClob: 0, - volumeClob: 0, liquidityClob: 0, active: true, closed: false, - oneDayPriceChange: 0, oneHourPriceChange: 0, lastTradePrice: 0, - bestBid: 0, bestAsk: 0, umaResolutionStatuses: '', + id: '1', + conditionId: '0xaaa', + question: 'Test?', + description: '', + slug: 'test', + outcomes: '["Yes", "No"]', + outcomePrices: '["0.5", "0.5"]', + resolutionSource: '', + image: '', + icon: '', + questionID: '', + clobTokenIds: '["111", "222"]', + submitted_by: '', + marketMakerAddress: '', + enableOrderBook: true, + orderPriceMinTickSize: 0.001, + orderMinSize: 5, + negRisk: false, + negRiskRequestID: '', + negRiskOther: false, + archived: false, + new: false, + featured: false, + resolvedBy: '', + restricted: false, + hasReviewedDates: false, + umaBond: '', + umaReward: '', + customLiveness: 0, + acceptingOrders: true, + ready: true, + funded: true, + acceptingOrdersTimestamp: '', + cyom: false, + competitive: 0, + pagerDutyNotificationEnabled: false, + approved: true, + rewardsMinSize: 0, + rewardsMaxSpread: 0, + spread: 0, + automaticallyActive: true, + clearBookOnStart: false, + manualActivation: false, + pendingDeployment: false, + deploying: false, + deployingTimestamp: '', + rfqEnabled: false, + eventStartTime: '', + holdingRewardsEnabled: false, + feesEnabled: false, + requiresTranslation: false, + startDate: '', + endDate: '', + startDateIso: '', + endDateIso: '', + umaEndDate: '', + createdAt: '', + events: [] as any[], + liquidity: '', + volume: '', + volumeNum: 0, + liquidityNum: 0, + volume24hr: 0, + volume1wk: 0, + volume1mo: 0, + volume1yr: 0, + volume24hrClob: 0, + volume1wkClob: 0, + volume1moClob: 0, + volume1yrClob: 0, + volumeClob: 0, + liquidityClob: 0, + active: true, + closed: false, + oneDayPriceChange: 0, + oneHourPriceChange: 0, + lastTradePrice: 0, + bestBid: 0, + bestAsk: 0, + umaResolutionStatuses: '', }; describe('Polymarket markets service', () => { @@ -108,7 +165,11 @@ describe('Polymarket markets service', () => { mockQuery.mockReturnValue( Promise.resolve({ data: [], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); }); @@ -831,7 +892,8 @@ describe('Polymarket markets service', () => { test('should find closed market on retry with closed=true', async () => { const registeredTokens = [ { - condition_id: '0xclosed111111111111111111111111111111111111111111111111111111111', + condition_id: + '0xclosed111111111111111111111111111111111111111111111111111111111', token0: '111', token1: '222', timestamp: '2025-06-01 00:00:00', @@ -843,7 +905,8 @@ describe('Polymarket markets service', () => { const closedMarket = { ...baseMockMarket, id: '999', - conditionId: '0xclosed111111111111111111111111111111111111111111111111111111111', + conditionId: + '0xclosed111111111111111111111111111111111111111111111111111111111', question: 'Resolved market?', slug: 'resolved-market', closed: true, @@ -853,17 +916,27 @@ describe('Polymarket markets service', () => { mockQuery.mockReturnValueOnce( Promise.resolve({ data: registeredTokens, - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); // First fetch (open markets) returns empty, retry with closed=true finds it mockFetch .mockReturnValueOnce( - Promise.resolve({ ok: true, json: () => Promise.resolve({ markets: [] }) }), + Promise.resolve({ + ok: true, + json: () => Promise.resolve({ markets: [] }), + }), ) .mockReturnValueOnce( - Promise.resolve({ ok: true, json: () => Promise.resolve({ markets: [closedMarket] }) }), + Promise.resolve({ + ok: true, + json: () => Promise.resolve({ markets: [closedMarket] }), + }), ); const { run } = await import('./index'); @@ -874,7 +947,8 @@ describe('Polymarket markets service', () => { expect(mockInsertRow).toHaveBeenCalledWith( 'polymarket_markets', expect.objectContaining({ - condition_id: '0xclosed111111111111111111111111111111111111111111111111111111111', + condition_id: + '0xclosed111111111111111111111111111111111111111111111111111111111', closed: true, }), expect.any(String), @@ -1084,21 +1158,33 @@ describe('Polymarket markets service', () => { mockQuery.mockReturnValueOnce( Promise.resolve({ data: [], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); // Second query: one event slug to enrich mockQuery.mockReturnValueOnce( Promise.resolve({ data: [{ event_slug: 'test-event' }], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); // Third query: batch existence check returns empty (no existing markets) mockQuery.mockReturnValueOnce( Promise.resolve({ data: [], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); @@ -1114,8 +1200,14 @@ describe('Polymarket markets service', () => { slug: 'test-event', title: 'Test Event', markets: [ - { conditionId: '0xaaa', question: 'Market A?' }, - { conditionId: '0xbbb', question: 'Market B?' }, + { + conditionId: '0xaaa', + question: 'Market A?', + }, + { + conditionId: '0xbbb', + question: 'Market B?', + }, ], }, ], @@ -1124,17 +1216,26 @@ describe('Polymarket markets service', () => { ); // Second fetch: batch /markets/keyset returns both child markets in one call - const mockMarketA = { ...baseMockMarket, conditionId: '0xaaa', question: 'Market A?', slug: 'market-a' }; + const mockMarketA = { + ...baseMockMarket, + conditionId: '0xaaa', + question: 'Market A?', + slug: 'market-a', + }; const mockMarketB = { ...baseMockMarket, - id: '2', conditionId: '0xbbb', question: 'Market B?', - slug: 'market-b', clobTokenIds: '["333", "444"]', + id: '2', + conditionId: '0xbbb', + question: 'Market B?', + slug: 'market-b', + clobTokenIds: '["333", "444"]', }; mockFetch.mockReturnValueOnce( Promise.resolve({ ok: true, - json: () => Promise.resolve({ markets: [mockMarketA, mockMarketB] }), + json: () => + Promise.resolve({ markets: [mockMarketA, mockMarketB] }), }), ); @@ -1142,9 +1243,7 @@ describe('Polymarket markets service', () => { await run(); // Should insert: market A, market B, enrichment tracking record - const insertCalls = mockInsertRow.mock.calls.map( - (call) => call[0], - ); + const insertCalls = mockInsertRow.mock.calls.map((call) => call[0]); expect(insertCalls).toContain('polymarket_markets'); expect(insertCalls).toContain('polymarket_events_enriched'); @@ -1166,21 +1265,33 @@ describe('Polymarket markets service', () => { mockQuery.mockReturnValueOnce( Promise.resolve({ data: [], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); // Event slugs to enrich mockQuery.mockReturnValueOnce( Promise.resolve({ data: [{ event_slug: 'existing-event' }], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); // Batch existence check: one market already exists mockQuery.mockReturnValueOnce( Promise.resolve({ data: [{ condition_id: '0xaaa' }], - metrics: { httpRequestTimeMs: 0, dataFetchTimeMs: 0, totalTimeMs: 0 }, + metrics: { + httpRequestTimeMs: 0, + dataFetchTimeMs: 0, + totalTimeMs: 0, + }, }), ); @@ -1196,8 +1307,14 @@ describe('Polymarket markets service', () => { slug: 'existing-event', title: 'Existing Event', markets: [ - { conditionId: '0xaaa', question: 'Already scraped' }, - { conditionId: '0xbbb', question: 'New market' }, + { + conditionId: '0xaaa', + question: 'Already scraped', + }, + { + conditionId: '0xbbb', + question: 'New market', + }, ], }, ], diff --git a/services/polymarket/index.ts b/services/polymarket/index.ts index 70c9a62..6229d7e 100644 --- a/services/polymarket/index.ts +++ b/services/polymarket/index.ts @@ -527,9 +527,7 @@ async function refreshOpenMarkets(): Promise { } const stale = await query( - await Bun.file( - __dirname + '/get_stale_markets_for_refresh.sql', - ).text(), + await Bun.file(__dirname + '/get_stale_markets_for_refresh.sql').text(), { db: CLICKHOUSE_DATABASE_INSERT, limit: REFRESH_BATCH_SIZE, From 5a2a1900a0208c3bfc5a8434e26a5849333d0f7c Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Fri, 1 May 2026 00:14:29 -0400 Subject: [PATCH 3/3] docs(config): document CLICKHOUSE_REQUEST_TIMEOUT_MS setting --- docs/CONFIGURATION.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index edd0b65..972eacc 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -25,6 +25,11 @@ cp .env.example .env - **`CLICKHOUSE_DATABASE`** - ClickHouse database name - Default: `default` +- **`CLICKHOUSE_REQUEST_TIMEOUT_MS`** - Timeout in milliseconds for ClickHouse HTTP requests + - Default: `30000` (30 seconds) + - Guards against indefinitely hung queries — particularly important when running under Bun, which does not send TCP keepalive probes via `http.Agent`, meaning a silently dead connection (e.g. NAT table expiry in Kubernetes) would otherwise hang forever + - Increase if your ClickHouse queries legitimately take longer than 30 seconds + ### RPC Configuration - **`NODE_URL`** - EVM RPC node URL (required)