From 4db27052114cd2544719c19ec173b80df07f9b13 Mon Sep 17 00:00:00 2001 From: Gabriel Tozatti Date: Mon, 9 Mar 2026 14:03:11 -0300 Subject: [PATCH 1/2] feat: create schema and service for managing UTXOs --- .../src/queues/gaslessUtxos/constants.ts | 2 ++ .../worker/src/queues/gaslessUtxos/index.ts | 18 +++++++++++++ .../worker/src/queues/gaslessUtxos/types.ts | 26 ++++++++++++++++++ .../gaslessUtxos/utils/findAvailable.ts | 8 ++++++ .../src/queues/gaslessUtxos/utils/getStats.ts | 27 +++++++++++++++++++ .../queues/gaslessUtxos/utils/markSpent.ts | 17 ++++++++++++ .../src/queues/gaslessUtxos/utils/release.ts | 16 +++++++++++ .../src/queues/gaslessUtxos/utils/reserve.ts | 22 +++++++++++++++ 8 files changed, 136 insertions(+) create mode 100644 packages/worker/src/queues/gaslessUtxos/constants.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/index.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/types.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/getStats.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/release.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/reserve.ts diff --git a/packages/worker/src/queues/gaslessUtxos/constants.ts b/packages/worker/src/queues/gaslessUtxos/constants.ts new file mode 100644 index 000000000..6354af8e8 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/constants.ts @@ -0,0 +1,2 @@ +export const COLLECTION_GASLESS_UTXOS = "gasless_utxos"; +export const DEFAULT_TTL_SECONDS = 60 * 60; diff --git a/packages/worker/src/queues/gaslessUtxos/index.ts b/packages/worker/src/queues/gaslessUtxos/index.ts new file mode 100644 index 000000000..5208685c9 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/index.ts @@ -0,0 +1,18 @@ +import { Collection } from 'mongodb'; +import { GaslessUtxo, type ReserveUtxoOptions, GaslessUtxoStats } from './types'; +import { findAvailable } from './utils/findAvailable'; +import { reserve } from './utils/reserve'; +import { release } from './utils/release'; +import { markSpent } from './utils/markSpent'; +import { getStats } from './utils/getStats'; + +export const gaslessUtxosCollection = (collection: Collection) => ({ + findAvailable: (): Promise => findAvailable(collection), + reserve: (options: ReserveUtxoOptions): Promise => reserve(collection, options), + release: (utxoId: string): Promise => release(collection, utxoId), + markSpent: (utxoId: string, spentTxHash: string): Promise => markSpent(collection, utxoId, spentTxHash), + getStats: (): Promise => getStats(collection), +}); + +export * from './types'; +export * from './constants'; \ No newline at end of file diff --git a/packages/worker/src/queues/gaslessUtxos/types.ts b/packages/worker/src/queues/gaslessUtxos/types.ts new file mode 100644 index 000000000..130db3dc4 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/types.ts @@ -0,0 +1,26 @@ +import { ObjectId } from "mongodb"; + +export interface GaslessUtxo { + _id?: ObjectId; + utxoId: string; + txId: string; + outputIndex: number; + amount: string; + status: "available" | "reserved" | "spent"; + reservedAt?: Date; + reservedBy?: string; + spentTxHash?: string; + createdAt: Date; +} + +export interface GaslessUtxoStats { + available: number; + reserved: number; + spent: number; + total: number; +} + +export interface ReserveUtxoOptions { + reservedBy: string; + ttlSeconds?: number; +} diff --git a/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts b/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts new file mode 100644 index 000000000..4542ae4bf --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/findAvailable.ts @@ -0,0 +1,8 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const findAvailable = async ( + collection: Collection +): Promise => { + return collection.find({ status: "available" }).toArray(); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts b/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts new file mode 100644 index 000000000..4b1f52415 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/getStats.ts @@ -0,0 +1,27 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo, GaslessUtxoStats } from "../types"; + +export const getStats = async ( + collection: Collection +): Promise => { + const rows = await collection + .aggregate<{ _id: string; count: number }>([ + { $group: { _id: "$status", count: { $sum: 1 } } }, + ]) + .toArray(); + + const stats: GaslessUtxoStats = { + available: 0, + reserved: 0, + spent: 0, + total: 0, + }; + + for (const row of rows) { + const key = row._id as keyof Omit; + if (key in stats) stats[key] = row.count; + stats.total += row.count; + } + + return stats; +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts b/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts new file mode 100644 index 000000000..538fe799a --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/markSpent.ts @@ -0,0 +1,17 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const markSpent = async ( + collection: Collection, + utxoId: string, + spentTxHash: string +): Promise => { + return collection.findOneAndUpdate( + { utxoId, status: "reserved" }, + { + $set: { status: "spent", spentTxHash }, + $unset: { reservedBy: "", reservedAt: "" }, + }, + { returnDocument: "after" } + ); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/release.ts b/packages/worker/src/queues/gaslessUtxos/utils/release.ts new file mode 100644 index 000000000..840203a48 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/release.ts @@ -0,0 +1,16 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "../types"; + +export const release = async ( + collection: Collection, + utxoId: string +): Promise => { + return collection.findOneAndUpdate( + { utxoId, status: "reserved" }, + { + $set: { status: "available" }, + $unset: { reservedBy: "", reservedAt: "" }, + }, + { returnDocument: "after" } + ); +}; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts b/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts new file mode 100644 index 000000000..ce322a04b --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/reserve.ts @@ -0,0 +1,22 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo, ReserveUtxoOptions } from "../types"; +import { DEFAULT_TTL_SECONDS } from "@/queues/gaslessUtxos/constants"; + +export const reserve = async ( + collection: Collection, + options: ReserveUtxoOptions +): Promise => { + const { reservedBy, ttlSeconds = DEFAULT_TTL_SECONDS } = options; + + return collection.findOneAndUpdate( + { status: "available" }, + { + $set: { + status: "reserved", + reservedBy, + reservedAt: new Date(), + }, + }, + { returnDocument: "after" } + ); +}; From a75ec1ab1df16837879646a0507e1a694594dec3 Mon Sep 17 00:00:00 2001 From: Gabriel Tozatti Date: Tue, 10 Mar 2026 12:32:39 -0300 Subject: [PATCH 2/2] feat: implement automatic cleanup of expired UTXO reservations --- packages/worker/src/index.ts | 8 +++- .../src/queues/gaslessUtxos/constants.ts | 2 + .../queues/gaslessUtxos/gaslessUtxoCleanup.ts | 43 +++++++++++++++++++ .../worker/src/queues/gaslessUtxos/index.ts | 2 + .../gaslessUtxos/utils/releaseExpired.ts | 19 ++++++++ 5 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts create mode 100644 packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index f04e0d111..4c02e4f80 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -8,7 +8,12 @@ import AssetCron from "./queues/assetsValue/scheduler"; import assetQueue from "./queues/assetsValue/queue"; import { MongoDatabase } from "./clients/mongoClient"; import { PsqlClient } from "./clients"; -import { userBlockSyncQueue, userLogoutSyncQueue, UserBlockSyncCron } from "./queues/userBlockSync"; +import { + userBlockSyncQueue, + userLogoutSyncQueue, + UserBlockSyncCron, +} from "./queues/userBlockSync"; +import { GaslessUtxoCleanup } from "@/queues/gaslessUtxos/gaslessUtxoCleanup"; const { WORKER_PORT, @@ -73,6 +78,7 @@ PsqlClient.connect(); BalanceCron.create(); AssetCron.create(); UserBlockSyncCron.create(); +GaslessUtxoCleanup.start(); app.listen(WORKER_PORT ?? 3063, () => console.log(`Server running on ${WORKER_PORT}`) diff --git a/packages/worker/src/queues/gaslessUtxos/constants.ts b/packages/worker/src/queues/gaslessUtxos/constants.ts index 6354af8e8..221e2da35 100644 --- a/packages/worker/src/queues/gaslessUtxos/constants.ts +++ b/packages/worker/src/queues/gaslessUtxos/constants.ts @@ -1,2 +1,4 @@ export const COLLECTION_GASLESS_UTXOS = "gasless_utxos"; export const DEFAULT_TTL_SECONDS = 60 * 60; +export const CLEANUP_INTERVAL_MS = 60 * 1000; +export const RESERVE_TTLS_MS = 5 * 60 * 1000; diff --git a/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts b/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts new file mode 100644 index 000000000..5d9884ed5 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/gaslessUtxoCleanup.ts @@ -0,0 +1,43 @@ +import { MongoDatabase } from "@/clients/mongoClient"; +import { + COLLECTION_GASLESS_UTXOS, + CLEANUP_INTERVAL_MS, +} from "@/queues/gaslessUtxos/constants"; +import { GaslessUtxo } from "@/queues/gaslessUtxos/types"; +import { gaslessUtxosCollection } from "@/queues/gaslessUtxos"; + +export class GaslessUtxoCleanup { + private static instance?: GaslessUtxoCleanup; + private static intervalRef?: NodeJS.Timeout; + + private constructor() {} + + static start(): GaslessUtxoCleanup { + if (!GaslessUtxoCleanup.instance) { + GaslessUtxoCleanup.instance = new GaslessUtxoCleanup(); + + GaslessUtxoCleanup.intervalRef = setInterval(async () => { + const db = await MongoDatabase.connect(); + const utxos = gaslessUtxosCollection( + db.getCollection(COLLECTION_GASLESS_UTXOS) + ); + const released = await utxos.releaseExpired(); + if (released > 0) { + console.log( + `[GASLESS_UTXO_CLEANUP]: Released ${released} expired reservation(s).` + ); + } + }, CLEANUP_INTERVAL_MS); + } + + return GaslessUtxoCleanup.instance; + } + + static stop(): void { + if (GaslessUtxoCleanup.intervalRef) { + clearInterval(GaslessUtxoCleanup.intervalRef); + GaslessUtxoCleanup.intervalRef = undefined; + console.log("[GASLESS_UTXO_CLEANUP]: Stopped."); + } + } +} diff --git a/packages/worker/src/queues/gaslessUtxos/index.ts b/packages/worker/src/queues/gaslessUtxos/index.ts index 5208685c9..749282ddb 100644 --- a/packages/worker/src/queues/gaslessUtxos/index.ts +++ b/packages/worker/src/queues/gaslessUtxos/index.ts @@ -5,6 +5,7 @@ import { reserve } from './utils/reserve'; import { release } from './utils/release'; import { markSpent } from './utils/markSpent'; import { getStats } from './utils/getStats'; +import { releaseExpired } from './utils/releaseExpired'; export const gaslessUtxosCollection = (collection: Collection) => ({ findAvailable: (): Promise => findAvailable(collection), @@ -12,6 +13,7 @@ export const gaslessUtxosCollection = (collection: Collection) => ( release: (utxoId: string): Promise => release(collection, utxoId), markSpent: (utxoId: string, spentTxHash: string): Promise => markSpent(collection, utxoId, spentTxHash), getStats: (): Promise => getStats(collection), + releaseExpired: (): Promise => releaseExpired(collection), }); export * from './types'; diff --git a/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts b/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts new file mode 100644 index 000000000..ed65ee135 --- /dev/null +++ b/packages/worker/src/queues/gaslessUtxos/utils/releaseExpired.ts @@ -0,0 +1,19 @@ +import { Collection } from "mongodb"; +import { GaslessUtxo } from "@/queues/gaslessUtxos"; +import { RESERVE_TTLS_MS } from "@/queues/gaslessUtxos"; + +export const releaseExpired = async ( + collection: Collection +): Promise => { + const expiredBefore = new Date(Date.now() - RESERVE_TTLS_MS); + + const result = await collection.updateMany( + { status: "reserved", reservedAt: { $lte: expiredBefore } }, + { + $set: { status: "available" }, + $unset: { reservedBy: "", reservedAt: "" }, + } + ); + + return result.modifiedCount; +};