diff --git a/src/task/index/FetchDailyIndexTask.test.ts b/src/task/index/FetchDailyIndexTask.test.ts index b15aa38..ee9e5b5 100644 --- a/src/task/index/FetchDailyIndexTask.test.ts +++ b/src/task/index/FetchDailyIndexTask.test.ts @@ -9,12 +9,11 @@ import { afterAll, beforeAll, describe, expect, it, mock } from "bun:test"; import { FetchDailyIndexTask } from "./FetchDailyIndexTask"; import { TaskFailedError } from "@workglow/task-graph"; import { FetchUrlTaskOutput } from "@workglow/tasks"; -import { JobQueue } from "@workglow/job-queue"; +import { JobQueueServer, JobQueueClient, EvenlySpacedRateLimiter } from "@workglow/job-queue"; import { FetchUrlTaskInput } from "@workglow/tasks"; import { SecFetchJob } from "../../fetch/SecFetchJob"; import { SecJobQueueName } from "../../config/Constants"; import { InMemoryQueueStorage } from "@workglow/storage"; -import { InMemoryRateLimiter } from "@workglow/job-queue"; import { EnvToDI } from "../../config/EnvToDI"; import { readFileSync } from "fs"; import { join } from "path"; @@ -88,26 +87,42 @@ const oldFetch = global.fetch; EnvToDI(); describe("FetchDailyIndexTask", () => { let db: any; + let server: JobQueueServer; beforeAll(() => { (global as any).fetch = mockFetch; - getTaskQueueRegistry().registerQueue( - new JobQueue( - SecJobQueueName, - SecFetchJob, - { - storage: new InMemoryQueueStorage(SecJobQueueName), - limiter: new InMemoryRateLimiter({ maxExecutions: 10, windowSizeInSeconds: 1 }), - waitDurationInMilliseconds: 1, - } - ) + + const storage = new InMemoryQueueStorage(SecJobQueueName); + + server = new JobQueueServer( + SecFetchJob, + { + queueName: SecJobQueueName, + storage: storage, + limiter: new EvenlySpacedRateLimiter({ maxExecutions: 10, windowSizeInSeconds: 1 }), + pollIntervalMs: 1, + } ); - getTaskQueueRegistry().startQueues(); + + const client = new JobQueueClient({ + storage: storage, + queueName: SecJobQueueName, + }); + + client.attach(server); + + getTaskQueueRegistry().registerQueue({ + server: server, + client: client, + storage: storage, + }); + + server.start(); }); afterAll(() => { (global as any).fetch = oldFetch; - getTaskQueueRegistry().stopQueues(); + server.stop(); setTaskQueueRegistry(null); });