From 201d3a22c23e0d6261e0d7fdde43290ee7ff96b8 Mon Sep 17 00:00:00 2001 From: Soil King <157099073+soilking@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:12:21 -0800 Subject: [PATCH] Rate limit by endpoint --- proxy-api/src/datasources/subgraph-clients.js | 2 +- proxy-api/src/scheduled/tasks/status.js | 2 +- .../src/services/subgraph-status-service.js | 9 +++- proxy-api/src/utils/env.js | 5 ++ .../src/utils/load/bottleneck-limiters.js | 49 ++++++++++++------- proxy-api/src/utils/load/endpoint-balance.js | 7 ++- 6 files changed, 50 insertions(+), 24 deletions(-) diff --git a/proxy-api/src/datasources/subgraph-clients.js b/proxy-api/src/datasources/subgraph-clients.js index 325895a..0e22311 100644 --- a/proxy-api/src/datasources/subgraph-clients.js +++ b/proxy-api/src/datasources/subgraph-clients.js @@ -20,7 +20,7 @@ class SubgraphClients { const response = await client.request(query, variables); return response; }; - const limiterWrapped = await BottleneckLimiters.wrap(endpointIndex, callableClient); + const limiterWrapped = await BottleneckLimiters.wrap(endpointIndex, subgraphName, callableClient); return limiterWrapped; } } diff --git a/proxy-api/src/scheduled/tasks/status.js b/proxy-api/src/scheduled/tasks/status.js index 208d9c9..49726fe 100644 --- a/proxy-api/src/scheduled/tasks/status.js +++ b/proxy-api/src/scheduled/tasks/status.js @@ -8,7 +8,7 @@ class StatusTask { static async checkAll() { for (const subgraphName of EnvUtil.getEnabledSubgraphs()) { for (const endpointIndex of EnvUtil.endpointsForSubgraph(subgraphName)) { - const utilization = await BottleneckLimiters.getUtilization(endpointIndex); + const utilization = await BottleneckLimiters.getUtilization(endpointIndex, subgraphName); if (utilization <= EnvUtil.getStatusCheckMaxUtilization()) { try { await SubgraphStatusService.checkFatalError(endpointIndex, subgraphName); diff --git a/proxy-api/src/services/subgraph-status-service.js b/proxy-api/src/services/subgraph-status-service.js index 0d84f2d..83fd27a 100644 --- a/proxy-api/src/services/subgraph-status-service.js +++ b/proxy-api/src/services/subgraph-status-service.js @@ -15,7 +15,7 @@ class SubgraphStatusService { const alchemyStatus = await this._getAlchemyStatus(endpointIndex, subgraphName); fatalError = alchemyStatus.data.data.indexingStatusForCurrentVersion.fatalError?.message; break; - case 'goldsky': + case 'ormi': return undefined; case 'graph': // Graph status endpoint changed, I couldnt find a realiable alternative. @@ -47,7 +47,11 @@ class SubgraphStatusService { static async _getAlchemyStatus(endpointIndex, subgraphName) { const statusUrl = EnvUtil.underlyingUrl(endpointIndex, subgraphName).replace('/api', '/status'); - const status = await BottleneckLimiters.schedule(endpointIndex, async () => await axios.post(statusUrl)); + const status = await BottleneckLimiters.schedule( + endpointIndex, + subgraphName, + async () => await axios.post(statusUrl) + ); return status; } @@ -62,6 +66,7 @@ class SubgraphStatusService { const status = await BottleneckLimiters.schedule( endpointIndex, + subgraphName, async () => await axios.post(statusUrl, { operationName: 'SubgraphIndexingStatusFatalError', diff --git a/proxy-api/src/utils/env.js b/proxy-api/src/utils/env.js index 1e688a4..b737613 100644 --- a/proxy-api/src/utils/env.js +++ b/proxy-api/src/utils/env.js @@ -7,6 +7,7 @@ const ENDPOINT_TYPES = process.env.ENDPOINT_TYPES?.split('|'); const ENDPOINT_RATE_LIMITS = process.env.ENDPOINT_RATE_LIMITS?.split('|').map((sg) => sg.split(',').map((s) => parseInt(s)) ); +const RATE_LIMIT_TYPE = process.env.RATE_LIMIT_TYPE?.split('|'); const ENDPOINT_UTILIZATION_PREFERENCE = process.env.ENDPOINT_UTILIZATION_PREFERENCE?.split('|').map((s) => parseFloat(s) ); @@ -83,6 +84,10 @@ class EnvUtil { return ENDPOINT_RATE_LIMITS; } + static getEndpointRateLimitType(endpointIndex) { + return RATE_LIMIT_TYPE[endpointIndex]; + } + static getEndpointUtilizationPreference() { return ENDPOINT_UTILIZATION_PREFERENCE; } diff --git a/proxy-api/src/utils/load/bottleneck-limiters.js b/proxy-api/src/utils/load/bottleneck-limiters.js index fb70968..1fc03d6 100644 --- a/proxy-api/src/utils/load/bottleneck-limiters.js +++ b/proxy-api/src/utils/load/bottleneck-limiters.js @@ -4,7 +4,7 @@ const EnvUtil = require('../env'); class BottleneckLimiters { static bottleneckLimiters = []; - static maxPeriodicRquests = []; + static maxPeriodicRequests = []; static maxReservoirSizes = []; // Create a limiter for each configured endpoint @@ -15,7 +15,7 @@ class BottleneckLimiters { throw new Error('Invalid .env configuration: bottleneck requires rate limit interval divisible by 250.'); } - this.bottleneckLimiters.push( + const limiterFactory = () => new Bottleneck({ reservoir: maxBurst, reservoirIncreaseAmount: rqPerInterval, @@ -23,39 +23,52 @@ class BottleneckLimiters { reservoirIncreaseMaximum: maxBurst, maxConcurrent: maxBurst, minTime: Math.ceil(interval / rqPerInterval) - }) - ); - this.maxPeriodicRquests.push(rqPerInterval); + }); + + let makeSubgraphLimiter; + if (EnvUtil.getEndpointRateLimitType(i) === 'per-subgraph') { + makeSubgraphLimiter = () => limiterFactory(); + } else { + const lim = limiterFactory(); + makeSubgraphLimiter = () => lim; + } + + for (const sgName of EnvUtil.subgraphsForEndpoint(i)) { + (this.bottleneckLimiters[i] ??= {})[sgName] = makeSubgraphLimiter(); + } + + this.bottleneckLimiters.push(); + this.maxPeriodicRequests.push(rqPerInterval); this.maxReservoirSizes.push(maxBurst); } } - static async wrap(endpointIndex, fnToWrap) { - if (await this.isBurstDepleted(endpointIndex)) { - throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}.`); + static async wrap(endpointIndex, subgraphName, fnToWrap) { + if (await this.isBurstDepleted(endpointIndex, subgraphName)) { + throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}-${subgraphName}.`); } - return this.bottleneckLimiters[endpointIndex].wrap(fnToWrap); + return this.bottleneckLimiters[endpointIndex][subgraphName].wrap(fnToWrap); } - static async schedule(endpointIndex, fnToSchedule) { - if (await this.isBurstDepleted(endpointIndex)) { - throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}.`); + static async schedule(endpointIndex, subgraphName, fnToSchedule) { + if (await this.isBurstDepleted(endpointIndex, subgraphName)) { + throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}-${subgraphName}.`); } - return await this.bottleneckLimiters[endpointIndex].schedule(fnToSchedule); + return await this.bottleneckLimiters[endpointIndex][subgraphName].schedule(fnToSchedule); } - static async isBurstDepleted(endpointIndex) { - return (await this.bottleneckLimiters[endpointIndex].currentReservoir()) === 0; + static async isBurstDepleted(endpointIndex, subgraphName) { + return (await this.bottleneckLimiters[endpointIndex][subgraphName].currentReservoir()) === 0; } // Returns the utilization as a ratio of current active requests / max rq per interval. // Can exceed 100% - static async getUtilization(endpointIndex) { - const currentReservoir = await this.bottleneckLimiters[endpointIndex].currentReservoir(); + static async getUtilization(endpointIndex, subgraphName) { + const currentReservoir = await this.bottleneckLimiters[endpointIndex][subgraphName].currentReservoir(); // These aren't necessarily still executing, but they are considered "active" in that they // were either scheduled recently or are queued to be executed. const activeRequests = this.maxReservoirSizes[endpointIndex] - currentReservoir; - return activeRequests / this.maxPeriodicRquests[endpointIndex]; + return activeRequests / this.maxPeriodicRequests[endpointIndex]; } } diff --git a/proxy-api/src/utils/load/endpoint-balance.js b/proxy-api/src/utils/load/endpoint-balance.js index 3469863..ab023b1 100644 --- a/proxy-api/src/utils/load/endpoint-balance.js +++ b/proxy-api/src/utils/load/endpoint-balance.js @@ -46,7 +46,7 @@ class EndpointBalanceUtil { static async getSubgraphUtilization(subgraphName) { const utilization = {}; for (const endpointIndex of EnvUtil.endpointsForSubgraph(subgraphName)) { - utilization[endpointIndex] = await BottleneckLimiters.getUtilization(endpointIndex); + utilization[endpointIndex] = await BottleneckLimiters.getUtilization(endpointIndex, subgraphName); } return utilization; } @@ -56,7 +56,10 @@ class EndpointBalanceUtil { let options = []; // Remove blacklisted/overutilized endpoints for (const endpointIndex of subgraphEndpoints) { - if (!(await BottleneckLimiters.isBurstDepleted(endpointIndex)) && !blacklist.includes(endpointIndex)) { + if ( + !(await BottleneckLimiters.isBurstDepleted(endpointIndex, subgraphName)) && + !blacklist.includes(endpointIndex) + ) { options.push(endpointIndex); } }