diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index f04e0d111..4c02e4f80 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -8,7 +8,12 @@ import AssetCron from "./queues/assetsValue/scheduler"; import assetQueue from "./queues/assetsValue/queue"; import { MongoDatabase } from "./clients/mongoClient"; import { PsqlClient } from "./clients"; -import { userBlockSyncQueue, userLogoutSyncQueue, UserBlockSyncCron } from "./queues/userBlockSync"; +import { + userBlockSyncQueue, + userLogoutSyncQueue, + UserBlockSyncCron, +} from "./queues/userBlockSync"; +import { GaslessUtxoCleanup } from "@/queues/gaslessUtxos/gaslessUtxoCleanup"; const { WORKER_PORT, @@ -73,6 +78,7 @@ PsqlClient.connect(); BalanceCron.create(); AssetCron.create(); UserBlockSyncCron.create(); +GaslessUtxoCleanup.start(); app.listen(WORKER_PORT ?? 3063, () => console.log(`Server running on ${WORKER_PORT}`) diff --git a/packages/worker/src/queues/gaslessUtxos/constants.ts b/packages/worker/src/queues/gaslessUtxos/constants.ts new file mode 100644 index 000000000..221e2da35 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/constants.ts @@ -0,0 +1,4 @@ +export const COLLECTION_GASLESS_UTXOS = "gasless_utxos"; +export const DEFAULT_TTL_SECONDS = 60 * 60; +export const CLEANUP_INTERVAL_MS = 60 * 1000; +export const RESERVE_TTLS_MS = 5 * 60 * 1000; diff --git a/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts b/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts new file mode 100644 index 000000000..5d9884ed5 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts @@ -0,0 +1,43 @@ +import { MongoDatabase } from "@/clients/mongoClient"; +import { + COLLECTION_GASLESS_UTXOS, + CLEANUP_INTERVAL_MS, +} from "@/queues/gaslessUtxos/constants"; +import { GaslessUtxo } from "@/queues/gaslessUtxos/types"; +import { gaslessUtxosCollection } from "@/queues/gaslessUtxos"; + +export class GaslessUtxoCleanup { + private static instance?: GaslessUtxoCleanup; + private static intervalRef?: NodeJS.Timeout; + + private constructor() {} + + static start(): GaslessUtxoCleanup { + if (!GaslessUtxoCleanup.instance) { + GaslessUtxoCleanup.instance = new GaslessUtxoCleanup(); + + GaslessUtxoCleanup.intervalRef = setInterval(async () => { + const db = await MongoDatabase.connect(); + const utxos = gaslessUtxosCollection( + db.getCollection(COLLECTION_GASLESS_UTXOS) + ); + const released = await utxos.releaseExpired(); + if (released > 0) { + console.log( + `[GASLESS_UTXO_CLEANUP]: Released ${released} expired reservation(s).` + ); + } + }, CLEANUP_INTERVAL_MS); + } + + return GaslessUtxoCleanup.instance; + } + + static stop(): void { + if (GaslessUtxoCleanup.intervalRef) { + clearInterval(GaslessUtxoCleanup.intervalRef); + GaslessUtxoCleanup.intervalRef = undefined; + console.log("[GASLESS_UTXO_CLEANUP]: Stopped."); + } + } +} diff --git a/packages/worker/src/queues/gaslessUtxos/index.ts b/packages/worker/src/queues/gaslessUtxos/index.ts new file mode 100644 index 000000000..749282ddb --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/index.ts @@ -0,0 +1,20 @@ +import { Collection } from 'mongodb'; +import { GaslessUtxo, type ReserveUtxoOptions, GaslessUtxoStats } from './types'; +import { findAvailable } from './utils/findAvailable'; +import { reserve } from './utils/reserve'; +import { release } from './utils/release'; +import { markSpent } from './utils/markSpent'; +import { getStats } from './utils/getStats'; +import { releaseExpired } from './utils/releaseExpired'; + +export const gaslessUtxosCollection = (collection: Collection) => ({ + findAvailable: (): Promise => findAvailable(collection), + reserve: (options: ReserveUtxoOptions): Promise => reserve(collection, options), + release: (utxoId: string): Promise => release(collection, utxoId), + markSpent: (utxoId: string, spentTxHash: string): Promise => markSpent(collection, utxoId, spentTxHash), + getStats: (): Promise => getStats(collection), + releaseExpired: (): Promise => releaseExpired(collection), +}); + +export * from './types'; +export * from './constants'; \ No newline at end of file diff --git a/packages/worker/src/queues/gaslessUtxos/types.ts b/packages/worker/src/queues/gaslessUtxos/types.ts new file mode 100644 index 000000000..130db3dc4 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/types.ts @@ -0,0 +1,26 @@ +import { ObjectId } from "mongodb"; + +export interface GaslessUtxo { + _id?: ObjectId; + utxoId: string; + txId: string; + outputIndex: number; + amount: string; + status: "available" | "reserved" | "spent"; + reservedAt?: Date; + reservedBy?: string; + spentTxHash?: string; + createdAt: Date; +} + +export interface GaslessUtxoStats { + available: number; + reserved: number; + spent: number; + total: number; +} + +export interface ReserveUtxoOptions { + reservedBy: string; + ttlSeconds?: number; +} diff --git a/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts b/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts new file mode 100644 index 000000000..4542ae4bf --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts @@ -0,0 +1,8 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const findAvailable = async ( + collection: Collection +): Promise => { + return collection.find({ status: "available" }).toArray(); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts b/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts new file mode 100644 index 000000000..4b1f52415 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts @@ -0,0 +1,27 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo, GaslessUtxoStats } from "../types"; + +export const getStats = async ( + collection: Collection +): Promise => { + const rows = await collection + .aggregate<{ _id: string; count: number }>([ + { $group: { _id: "$status", count: { $sum: 1 } } }, + ]) + .toArray(); + + const stats: GaslessUtxoStats = { + available: 0, + reserved: 0, + spent: 0, + total: 0, + }; + + for (const row of rows) { + const key = row._id as keyof Omit; + if (key in stats) stats[key] = row.count; + stats.total += row.count; + } + + return stats; +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts b/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts new file mode 100644 index 000000000..538fe799a --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts @@ -0,0 +1,17 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const markSpent = async ( + collection: Collection, + utxoId: string, + spentTxHash: string +): Promise => { + return collection.findOneAndUpdate( + { utxoId, status: "reserved" }, + { + $set: { status: "spent", spentTxHash }, + $unset: { reservedBy: "", reservedAt: "" }, + }, + { returnDocument: "after" } + ); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/release.ts b/packages/worker/src/queues/gaslessUtxos/utils/release.ts new file mode 100644 index 000000000..840203a48 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/release.ts @@ -0,0 +1,16 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const release = async ( + collection: Collection, + utxoId: string +): Promise => { + return collection.findOneAndUpdate( + { utxoId, status: "reserved" }, + { + $set: { status: "available" }, + $unset: { reservedBy: "", reservedAt: "" }, + }, + { returnDocument: "after" } + ); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts b/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts new file mode 100644 index 000000000..ed65ee135 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts @@ -0,0 +1,19 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "@/queues/gaslessUtxos"; +import { RESERVE_TTLS_MS } from "@/queues/gaslessUtxos"; + +export const releaseExpired = async ( + collection: Collection +): Promise => { + const expiredBefore = new Date(Date.now() - RESERVE_TTLS_MS); + + const result = await collection.updateMany( + { status: "reserved", reservedAt: { $lte: expiredBefore } }, + { + $set: { status: "available" }, + $unset: { reservedBy: "", reservedAt: "" }, + } + ); + + return result.modifiedCount; +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts b/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts new file mode 100644 index 000000000..ce322a04b --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts @@ -0,0 +1,22 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo, ReserveUtxoOptions } from "../types"; +import { DEFAULT_TTL_SECONDS } from "@/queues/gaslessUtxos/constants"; + +export const reserve = async ( + collection: Collection, + options: ReserveUtxoOptions +): Promise => { + const { reservedBy, ttlSeconds = DEFAULT_TTL_SECONDS } = options; + + return collection.findOneAndUpdate( + { status: "available" }, + { + $set: { + status: "reserved", + reservedBy, + reservedAt: new Date(), + }, + }, + { returnDocument: "after" } + ); +};