diff --git a/.gitignore b/.gitignore index 6d7069a..0ccd585 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ coverage/ .agents/ skills-lock.json -postgres/ \ No newline at end of file +postgres/ +CLAUDE.md +plan.md \ No newline at end of file diff --git a/src/agent/loop.ts b/src/agent/loop.ts index 0467a96..bd9fb78 100644 --- a/src/agent/loop.ts +++ b/src/agent/loop.ts @@ -7,7 +7,7 @@ import { logger } from '../utils/logger'; import { scanAllProtocols } from './scanner'; import { executeRebalanceIfNeeded, getThresholds, logAgentAction } from './router'; import { captureAllUserBalances, cleanupOldSnapshots } from './snapshotter'; -import { PrismaClient } from '@prisma/client'; +import db from '../db'; import { updateAgentHeartbeat, updateAgentStatus, @@ -16,8 +16,6 @@ import { recordDbOperation } from '../utils/metrics'; -const prisma = new PrismaClient(); - let isRunning = false; let lastRebalanceAt: Date | null = null; let currentProtocol: string | null = null; @@ -75,7 +73,7 @@ async function rebalanceCheckJob(): Promise { updateAgentHeartbeat(); // Get all active positions - const positions = await prisma.position.findMany({ + const positions = await db.position.findMany({ where: { status: 'ACTIVE', }, @@ -295,9 +293,6 @@ export async function stopAgentLoop(): Promise { }); cronJobs.length = 0; - // Close database connection - await prisma.$disconnect(); - isRunning = false; logger.info('✅ Agent loop stopped gracefully'); } catch (error) { @@ -311,15 +306,6 @@ export async function stopAgentLoop(): Promise { * Setup graceful shutdown handlers */ function setupGracefulShutdown(): void { - const shutdown = async (signal: string) => { - logger.info(`Received ${signal}, shutting down gracefully...`); - await stopAgentLoop(); - process.exit(0); - }; - - process.on('SIGTERM', () => shutdown('SIGTERM')); - process.on('SIGINT', () => shutdown('SIGINT')); - // Handle uncaught exceptions process.on('uncaughtException', error => { logger.error('Uncaught exception in agent', { diff --git a/src/agent/router.ts b/src/agent/router.ts index f6c7975..9fb4461 100644 --- a/src/agent/router.ts +++ b/src/agent/router.ts @@ -2,13 +2,11 @@ * Router - Compares APYs and triggers rebalancing when conditions are met */ -import { PrismaClient } from '@prisma/client'; import { logger } from '../utils/logger'; import { ProtocolComparison, RebalanceDetails, RebalanceThresholds } from './types'; import { scanAllProtocols, getCurrentOnChainApy } from './scanner'; import { triggerRebalance as submitRebalance } from '../stellar/contract'; - -const prisma = new PrismaClient(); +import db from '../db'; const DEFAULT_THRESHOLDS: RebalanceThresholds = { minimumImprovement: 0.5, // Must improve by at least 0.5% @@ -153,7 +151,7 @@ export async function triggerRebalance( ); if (positionIds.length > 0) { - const representativePosition = await prisma.position.findFirst({ + const representativePosition = await db.position.findFirst({ where: { id: { in: positionIds }, }, @@ -167,7 +165,7 @@ export async function triggerRebalance( }); if (representativePosition) { - await prisma.transaction.create({ + await db.transaction.create({ data: { userId: representativePosition.userId, positionId: representativePosition.id, @@ -290,7 +288,7 @@ export async function logAgentAction( ): Promise { try { // Log to all users for now - in production, could be per-user - const users = await prisma.user.findMany({ + const users = await db.user.findMany({ select: { id: true }, take: 1, // For now, just log to first user }); @@ -302,7 +300,7 @@ export async function logAgentAction( const userId = users[0].id; - await prisma.agentLog.create({ + await db.agentLog.create({ data: { userId, action: action as any, diff --git a/src/agent/scanner.ts b/src/agent/scanner.ts index 2b5eba7..7e8103d 100644 --- a/src/agent/scanner.ts +++ b/src/agent/scanner.ts @@ -4,9 +4,7 @@ import { logger } from '../utils/logger'; import { YieldProtocol, ProtocolRate } from './types'; -import { PrismaClient } from '@prisma/client'; - -const prisma = new PrismaClient(); +import db from '../db'; const PROTOCOLS = ['Blend', 'Stellar DEX', 'Luma']; const ASSET_SYMBOL = 'USDC'; @@ -161,7 +159,7 @@ async function saveProtocolRates(protocols: YieldProtocol[]): Promise { const networkLabel = normalizeNetwork() for (const protocol of protocols) { - await prisma.protocolRate.create({ + await db.protocolRate.create({ data: { protocolName: protocol.name, assetSymbol: protocol.assetSymbol, @@ -185,7 +183,7 @@ async function saveProtocolRates(protocols: YieldProtocol[]): Promise { */ export async function getCurrentOnChainApy(protocolName: string): Promise { try { - const latestRate = await prisma.protocolRate.findFirst({ + const latestRate = await db.protocolRate.findFirst({ where: { protocolName, assetSymbol: ASSET_SYMBOL, diff --git a/src/agent/snapshotter.ts b/src/agent/snapshotter.ts index b4f083c..f54faac 100644 --- a/src/agent/snapshotter.ts +++ b/src/agent/snapshotter.ts @@ -2,11 +2,9 @@ * Snapshotter - Captures user balance snapshots for historical charting */ -import { PrismaClient } from '@prisma/client'; import { logger } from '../utils/logger'; import { UserBalance } from './types'; - -const prisma = new PrismaClient(); +import db from '../db'; /** * Capture all user balance snapshots @@ -14,7 +12,7 @@ const prisma = new PrismaClient(); */ export async function captureAllUserBalances(): Promise { try { - const positions = await prisma.position.findMany({ + const positions = await db.position.findMany({ where: { status: 'ACTIVE', }, @@ -56,7 +54,7 @@ export async function captureAllUserBalances(): Promise { // Single batch insert is much faster than individual creates if (snapshotData.length > 0) { - await prisma.yieldSnapshot.createMany({ + await db.yieldSnapshot.createMany({ data: snapshotData, skipDuplicates: false, }); @@ -103,7 +101,7 @@ export async function getPositionHistory( const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - days); - const snapshots = await prisma.yieldSnapshot.findMany({ + const snapshots = await db.yieldSnapshot.findMany({ where: { positionId, snapshotAt: { @@ -151,7 +149,7 @@ export async function cleanupOldSnapshots(retentionDays: number = 90): Promise { try { - const snapshot = await prisma.yieldSnapshot.findFirst({ + const snapshot = await db.yieldSnapshot.findFirst({ where: { positionId, }, diff --git a/src/jobs/sessionCleanup.ts b/src/jobs/sessionCleanup.ts index 904145a..5d3f6e8 100644 --- a/src/jobs/sessionCleanup.ts +++ b/src/jobs/sessionCleanup.ts @@ -1,16 +1,14 @@ -import { PrismaClient } from '@prisma/client'; +import db from '../db'; import { logger } from '../utils/logger'; import { config } from '../config/env'; -const prisma = new PrismaClient(); - /** * Delete all sessions whose expiration timestamp is in the past. * Safe to call multiple times — it is idempotent. */ export async function cleanupExpiredSessions(): Promise { try { - const result = await prisma.session.deleteMany({ + const result = await db.session.deleteMany({ where: { expiresAt: { lt: new Date() } }, }); if (result.count > 0) { diff --git a/src/stellar/__tests__/events.helpers.test.ts b/src/stellar/__tests__/events.helpers.test.ts index b575fa6..b8892f9 100644 --- a/src/stellar/__tests__/events.helpers.test.ts +++ b/src/stellar/__tests__/events.helpers.test.ts @@ -20,14 +20,11 @@ jest.mock('@stellar/stellar-sdk', () => { }; }); -// Avoid spinning a real PrismaClient on import. -jest.mock('@prisma/client', () => { - const enums = jest.requireActual('@prisma/client'); - return { - ...enums, - PrismaClient: jest.fn().mockImplementation(() => ({})), - }; -}); +// Keep enum values from @prisma/client; no PrismaClient instantiation needed. +jest.mock('@prisma/client', () => jest.requireActual('@prisma/client')); + +// Prevent the db singleton from opening a real connection. +jest.mock('../../db', () => ({ default: {} })); // Avoid the dlq module pulling in a Prisma connection too. jest.mock('../dlq', () => ({ diff --git a/src/stellar/events.ts b/src/stellar/events.ts index 2238f39..31ec37c 100644 --- a/src/stellar/events.ts +++ b/src/stellar/events.ts @@ -1,5 +1,6 @@ import { rpc, scValToNative, xdr } from '@stellar/stellar-sdk'; -import { PrismaClient, TransactionType, TransactionStatus, Network } from '@prisma/client'; +import { TransactionType, TransactionStatus, Network } from '@prisma/client'; +import db from '../db'; import { Decimal } from '@prisma/client/runtime/library'; import { getRpcServer } from './client'; import { ContractEvent, DepositEvent, WithdrawEvent, RebalanceEvent, EventMetrics } from './types'; @@ -25,8 +26,6 @@ import { const VAULT_CONTRACT_ID = config.stellar.vaultContractId; const POLL_INTERVAL_MS = 5000; -const prisma = new PrismaClient(); - let lastProcessedLedger = 0; let isListening = false; @@ -201,7 +200,7 @@ function parseRebalanceEvent(event: ContractEvent): RebalanceEvent { /** * Handle deposit event - persist to database */ -async function handleDepositEvent(depositData: DepositEvent, event: ContractEvent, tx: any = prisma): Promise { +async function handleDepositEvent(depositData: DepositEvent, event: ContractEvent, tx: any = db): Promise { const user = await timedDbOperation(() => tx.user.findUnique({ where: { walletAddress: depositData.user } }) ) as any; @@ -270,7 +269,7 @@ async function handleDepositEvent(depositData: DepositEvent, event: ContractEven /** * Handle withdraw event - persist to database */ -async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractEvent, tx: any = prisma): Promise { +async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractEvent, tx: any = db): Promise { const user = await timedDbOperation(() => tx.user.findUnique({ where: { walletAddress: withdrawData.user } }) ) as any; @@ -322,7 +321,7 @@ async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractE /** * Handle rebalance event - persist to database */ -async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: ContractEvent, tx: any = prisma): Promise { +async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: ContractEvent, tx: any = db): Promise { await timedDbOperation(() => tx.protocolRate.create({ data: { @@ -341,7 +340,7 @@ async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: Contra /** * Handle contract event with persistence, idempotency, and validation (Issue #53) */ -export async function handleEvent(event: ContractEvent, tx: any = prisma): Promise { +export async function handleEvent(event: ContractEvent, tx: any = db): Promise { const startTime = Date.now(); try { logger.info(`[Event] ${event.type} detected at ledger ${event.ledger}, tx: ${event.txHash}`); @@ -437,7 +436,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise try { // Multiple events processed in a single transaction - await prisma.$transaction(async (tx) => { + await db.$transaction(async (tx) => { for (const event of events) { await handleEvent(event, tx); processedCount++; @@ -450,7 +449,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise // Fallback: Process individually so robust events succeed for (const event of events) { try { - await handleEvent(event, prisma); + await handleEvent(event, db); } catch (individualError) { logger.error(`[Batch Fallback Error] Event processing completely failed for ${event.txHash}`); } @@ -462,7 +461,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise * Load last processed ledger from database */ async function loadLastProcessedLedger(): Promise { - const cursor = await prisma.eventCursor.findUnique({ + const cursor = await db.eventCursor.findUnique({ where: { contractId: VAULT_CONTRACT_ID }, }); @@ -483,7 +482,7 @@ async function loadLastProcessedLedger(): Promise { * Update last processed ledger in database */ async function persistLastProcessedLedger(ledger: number): Promise { - await prisma.eventCursor.upsert({ + await db.eventCursor.upsert({ where: { contractId: VAULT_CONTRACT_ID }, update: { lastProcessedLedger: ledger, @@ -605,7 +604,7 @@ export async function backfillEvents(startLedger: number, endLedger?: number): P export async function retryDeadLetterEvents(): Promise { logger.info(`[DLQ] Starting manual intervention retry for all DLQ events`); await DeadLetterQueue.retryAll(async (eventPayload) => { - await handleEvent(eventPayload, prisma); + await handleEvent(eventPayload, db); }); } diff --git a/tests/unit/stellar/events.test.ts b/tests/unit/stellar/events.test.ts index 77e2cd3..b038776 100644 --- a/tests/unit/stellar/events.test.ts +++ b/tests/unit/stellar/events.test.ts @@ -10,6 +10,9 @@ jest.mock('@prisma/client', () => { }; }); +// Point the db singleton at the mock so events.ts uses it instead of a real connection. +jest.mock('../../../src/db', () => ({ default: mockPrisma })); + jest.mock('../../../src/stellar/client'); jest.mock('../../../src/utils/logger'); jest.mock('../../../src/config', () => ({