diff --git a/sdk/src/client.ts b/sdk/src/client.ts index 5afb91c..8481d1b 100644 --- a/sdk/src/client.ts +++ b/sdk/src/client.ts @@ -29,6 +29,7 @@ import { } from './utils'; import { SimulationError, RPCError } from './errors'; +import { EventParser, EventFilter, bcForgeEvent } from './events'; // ─── Types ─────────────────────────────────────────────────────────────────── @@ -635,6 +636,32 @@ export class bcForgeClient { return response.events; } + /** + * Get event history with filtering. + */ + async getEventHistory(filter: EventFilter = {}): Promise { + const parser = new EventParser(); + const response = await this.server.getEvents({ + startLedger: filter.startLedger, + filters: [ + { + contractIds: filter.contractIds || [this.contractId], + type: 'contract', + }, + ], + }); + let events = parser.parseEvents(response.events); + // Apply event type filter + if (filter.eventTypes && filter.eventTypes.length > 0) { + events = events.filter((e) => filter.eventTypes!.includes(e.type)); + } + // Apply end ledger filter + if (filter.endLedger !== undefined) { + events = events.filter((e) => e.ledger <= filter.endLedger!); + } + return events; + } + /** * Update the token symbol. Admin-only. * diff --git a/sdk/src/events.test.ts b/sdk/src/events.test.ts new file mode 100644 index 0000000..375133b --- /dev/null +++ b/sdk/src/events.test.ts @@ -0,0 +1,218 @@ +/** + * @bc-forge/sdk — Tests for EventParser and EventStream classes + */ + +import { EventParser, EventStream, bcForgeEventType, bcForgeEvent } from './events'; +import { SorobanRpc, scValToNative, nativeToScVal } from '@stellar/stellar-sdk'; + +// Mock SorobanRpc.Server +jest.mock('@stellar/stellar-sdk', () => { + const original = jest.requireActual('@stellar/stellar-sdk'); + return { + ...original, + SorobanRpc: { + ...original.SorobanRpc, + Server: jest.fn().mockImplementation(() => ({ + getLatestLedger: jest.fn(), + getEvents: jest.fn(), + })), + }, + }; +}); + +describe('EventParser', () => { + let parser: EventParser; + + beforeEach(() => { + parser = new EventParser(); + }); + + describe('parseEvent', () => { + it('should parse valid event correctly', () => { + const mockEvent = { + topic: [nativeToScVal('mint')], + ledger: 12345, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ amount: 100, to: 'GABC123' }), + } as unknown as SorobanRpc.Api.EventResponse; + + const result = parser.parseEvent(mockEvent); + expect(result).not.toBeNull(); + if (result) { + expect(result.type).toBe(bcForgeEventType.MINT); + expect(result.ledger).toBe(12345); + expect(result.contractId).toBe('CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526'); + } + }); + + it('should return null for invalid event type', () => { + const mockEvent = { + topic: [nativeToScVal('invalid-type')], + ledger: 12345, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({}), + } as unknown as SorobanRpc.Api.EventResponse; + + const result = parser.parseEvent(mockEvent); + expect(result).toBeNull(); + }); + + it('should return null for event without topic', () => { + const mockEvent = { + topic: [], + ledger: 12345, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({}), + } as unknown as SorobanRpc.Api.EventResponse; + + const result = parser.parseEvent(mockEvent); + expect(result).toBeNull(); + }); + }); + + describe('parseEvents', () => { + it('should parse multiple events and filter out invalid ones', () => { + const mockEvents = [ + { + topic: [nativeToScVal('mint')], + ledger: 12345, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ amount: 100, to: 'GABC123' }), + }, + { + topic: [nativeToScVal('invalid-type')], + ledger: 12346, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({}), + }, + { + topic: [nativeToScVal('xfer')], + ledger: 12347, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ from: 'GABC123', to: 'GDEF456', amount: 50 }), + }, + ] as unknown as SorobanRpc.Api.EventResponse[]; + + const results = parser.parseEvents(mockEvents); + expect(results).toHaveLength(2); + expect(results[0].type).toBe(bcForgeEventType.MINT); + expect(results[1].type).toBe(bcForgeEventType.TRANSFER); + }); + }); +}); + +describe('EventStream', () => { + let mockServer: any; + let eventStream: EventStream; + + beforeEach(() => { + mockServer = { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), + getEvents: jest.fn(), + }; + (SorobanRpc.Server as jest.Mock).mockImplementation(() => mockServer); + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + jest.clearAllMocks(); + }); + + describe('subscribe and unsubscribe', () => { + it('should subscribe and call callback on new events', async () => { + const callback = jest.fn(); + const mockEvents = [ + { + topic: [nativeToScVal('mint')], + ledger: 1001, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ amount: 100, to: 'GABC123' }), + }, + ] as unknown as SorobanRpc.Api.EventResponse[]; + mockServer.getEvents.mockResolvedValue({ + events: mockEvents, + latestLedger: 1001, + }); + + eventStream = new EventStream('https://mock.rpc', 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526'); + await eventStream.subscribe(callback); + // Fast-forward time to trigger poll + jest.runAllTimers(); + // Wait for promises to resolve + await Promise.resolve(); + + expect(callback).toHaveBeenCalled(); + }); + + it('should unsubscribe and stop calling callback', async () => { + const callback = jest.fn(); + eventStream = new EventStream('https://mock.rpc', 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526'); + await eventStream.subscribe(callback); + eventStream.unsubscribe(); + + mockServer.getEvents.mockResolvedValue({ + events: [], + latestLedger: 1001, + }); + + jest.runAllTimers(); + expect(callback).not.toHaveBeenCalled(); + }); + }); + + describe('error handling and auto-reconnect', () => { + it('should call error callback on poll failure and retry', async () => { + const errorCallback = jest.fn(); + const callback = jest.fn(); + mockServer.getEvents.mockRejectedValue(new Error('RPC failed')); + + eventStream = new EventStream('https://mock.rpc', 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526'); + eventStream.onError(errorCallback); + await eventStream.subscribe(callback); + + jest.runAllTimers(); + await Promise.resolve(); + + expect(errorCallback).toHaveBeenCalled(); + }); + }); + + describe('filtering', () => { + it('should only emit events matching filter', async () => { + const callback = jest.fn(); + const mockEvents = [ + { + topic: [nativeToScVal('mint')], + ledger: 1001, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ amount: 100, to: 'GABC123' }), + }, + { + topic: [nativeToScVal('xfer')], + ledger: 1002, + contractId: 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + value: nativeToScVal({ from: 'GABC123', to: 'GDEF456', amount: 50 }), + }, + ] as unknown as SorobanRpc.Api.EventResponse[]; + mockServer.getEvents.mockResolvedValue({ + events: mockEvents, + latestLedger: 1002, + }); + + eventStream = new EventStream( + 'https://mock.rpc', + 'CAAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQCAIBAEAQC526', + { eventTypes: [bcForgeEventType.MINT] } + ); + + await eventStream.subscribe(callback); + jest.runAllTimers(); + await Promise.resolve(); + + expect(callback).toHaveBeenCalledTimes(1); + const event = callback.mock.calls[0][0]; + expect(event.type).toBe(bcForgeEventType.MINT); + }); + }); +}); diff --git a/sdk/src/events.ts b/sdk/src/events.ts index 7aa47a0..117c2fb 100644 --- a/sdk/src/events.ts +++ b/sdk/src/events.ts @@ -32,12 +32,24 @@ export interface bcForgeEvent { data: any; } +/** + * Event filter for filtering events. + */ +export interface EventFilter { + contractIds?: string[]; + eventTypes?: bcForgeEventType[]; + startLedger?: number; + endLedger?: number; +} + /** * Options for event subscriptions. */ export interface SubscriptionOptions { pollingIntervalMs?: number; startLedger?: number; + maxRetryAttempts?: number; + retryDelayMs?: number; } /** @@ -92,67 +104,207 @@ export function decodeDiagnosticEvent(rawEvent: xdr.DiagnosticEvent): bcForgeEve } /** - * Subscribes to real-time events for a given bc-forge contract. - * - * @param rpcUrl - Soroban RPC endpoint - * @param contractId - Target contract ID - * @param callback - Function called for every new decoded event - * @param options - Polking and ledger range options - * @returns An unsubscribe function to stop polling. + * EventParser class for parsing Soroban events into bcForgeEvent objects. */ -export async function subscribeEvents( - rpcUrl: string, - contractId: string, - callback: (event: bcForgeEvent) => void, - options: SubscriptionOptions = {}, -): Promise<() => void> { - const server = new SorobanRpc.Server(rpcUrl); +export class EventParser { + /** + * Parses raw event responses into bcForgeEvent objects. + * @param events Raw Soroban event responses. + * @returns Array of parsed bcForgeEvent objects, filtering out invalid ones. + */ + parseEvents(events: SorobanRpc.Api.EventResponse[]): bcForgeEvent[] { + return events.map((event) => decodeEvent(event)).filter((e): e is bcForgeEvent => e !== null); + } - // Default to starting from the latest ledger if not specified - let lastLedger = options.startLedger; - if (!lastLedger) { - const latest = await server.getLatestLedger(); - lastLedger = latest.sequence; + /** + * Parses a single event response into a bcForgeEvent object. + * @param event Raw Soroban event response. + * @returns Parsed bcForgeEvent or null if invalid. + */ + parseEvent(event: SorobanRpc.Api.EventResponse): bcForgeEvent | null { + return decodeEvent(event); } - let active = true; + /** + * Parses diagnostic events into bcForgeEvent objects. + * @param rawEvents Array of raw xdr.DiagnosticEvent. + * @returns Array of parsed bcForgeEvent objects. + */ + parseDiagnosticEvents(rawEvents: xdr.DiagnosticEvent[]): bcForgeEvent[] { + return rawEvents + .map((event) => decodeDiagnosticEvent(event)) + .filter((e): e is bcForgeEvent => e !== null); + } +} - const poll = async () => { - if (!active) return; +/** + * EventStream class for managing real-time event subscriptions. + */ +export class EventStream { + private rpcUrl: string; + private server: SorobanRpc.Server; + private contractId: string; + private filter: EventFilter; + private options: SubscriptionOptions; + private active: boolean = false; + private lastLedger: number | null = null; + private pollTimeout: NodeJS.Timeout | null = null; + private retryCount: number = 0; + private callback: ((event: bcForgeEvent) => void) | null = null; + private errorCallback: ((error: Error) => void) | null = null; + private parser: EventParser; + + constructor( + rpcUrl: string, + contractId: string, + filter: EventFilter = {}, + options: SubscriptionOptions = {}, + ) { + this.rpcUrl = rpcUrl; + this.server = new SorobanRpc.Server(rpcUrl); + this.contractId = contractId; + this.filter = filter; + this.options = { + pollingIntervalMs: 3000, + maxRetryAttempts: 5, + retryDelayMs: 1000, + ...options, + }; + this.parser = new EventParser(); + } + + /** + * Subscribes to real-time events. + * @param callback Function called for every new decoded event. + */ + async subscribe(callback: (event: bcForgeEvent) => void): Promise { + if (this.active) { + throw new Error('Already subscribed'); + } + + this.active = true; + this.callback = callback; + this.retryCount = 0; + + // Initialize lastLedger + if (this.filter.startLedger) { + this.lastLedger = this.filter.startLedger; + } else if (this.options.startLedger) { + this.lastLedger = this.options.startLedger; + } else { + const latest = await this.server.getLatestLedger(); + this.lastLedger = latest.sequence; + } + + await this.poll(); + } + + /** + * Unsubscribes from real-time events. + */ + unsubscribe(): void { + this.active = false; + if (this.pollTimeout) { + clearTimeout(this.pollTimeout); + this.pollTimeout = null; + } + } + + /** + * Registers an error callback for stream errors. + * @param callback Function called when an error occurs. + */ + onError(callback: (error: Error) => void): void { + this.errorCallback = callback; + } + + private async poll(): Promise { + if (!this.active || !this.lastLedger) return; try { - const response = await server.getEvents({ - startLedger: lastLedger!, - filters: [ - { - contractIds: [contractId], - type: 'contract', - }, - ], + const rpcFilters: SorobanRpc.Api.EventFilter[] = [ + { + contractIds: this.filter.contractIds || [this.contractId], + type: 'contract', + }, + ]; + + const response = await this.server.getEvents({ + startLedger: this.lastLedger, + filters: rpcFilters, }); - for (const event of response.events) { - const decoded = decodeEvent(event); - if (decoded) { - callback(decoded); + this.retryCount = 0; + + const parsedEvents = this.parser.parseEvents(response.events); + for (const event of parsedEvents) { + // Apply filter + if (this.matchesFilter(event)) { + this.callback?.(event); } - if (event.ledger >= lastLedger!) { - lastLedger = event.ledger + 1; + if (event.ledger >= this.lastLedger!) { + this.lastLedger = event.ledger + 1; } } - } catch { - // Retry in the next poll cycle on failure + + // If no events, just increment lastLedger to avoid re-polling the same ledger + if (response.events.length === 0 && response.latestLedger) { + this.lastLedger = response.latestLedger + 1; + } + } catch (error) { + this.retryCount++; + if (this.retryCount <= (this.options.maxRetryAttempts || 5)) { + const delay = (this.options.retryDelayMs || 1000) * this.retryCount; + this.errorCallback?.( + new Error(`Poll failed (attempt ${this.retryCount}), retrying in ${delay}ms`), + ); + this.pollTimeout = setTimeout(() => this.poll(), delay); + return; + } else { + this.active = false; + this.errorCallback?.(new Error('Max retry attempts exceeded, stopping stream')); + return; + } + } + + if (this.active) { + this.pollTimeout = setTimeout(() => this.poll(), this.options.pollingIntervalMs || 3000); } + } - if (active) { - setTimeout(poll, options.pollingIntervalMs || 3000); + private matchesFilter(event: bcForgeEvent): boolean { + // Check event type filter + if (this.filter.eventTypes && this.filter.eventTypes.length > 0) { + if (!this.filter.eventTypes.includes(event.type)) { + return false; + } } - }; - poll(); + // Check end ledger filter + if (this.filter.endLedger && event.ledger > this.filter.endLedger) { + return false; + } - // Return unsubscribe closure - return () => { - active = false; - }; + return true; + } +} + +/** + * Subscribes to real-time events for a given bc-forge contract. + * + * @param rpcUrl - Soroban RPC endpoint + * @param contractId - Target contract ID + * @param callback - Function called for every new decoded event + * @param options - Polking and ledger range options + * @returns An unsubscribe function to stop polling. + */ +export async function subscribeEvents( + rpcUrl: string, + contractId: string, + callback: (event: bcForgeEvent) => void, + options: SubscriptionOptions = {}, +): Promise<() => void> { + const stream = new EventStream(rpcUrl, contractId, {}, options); + await stream.subscribe(callback); + return () => stream.unsubscribe(); } diff --git a/sdk/src/index.ts b/sdk/src/index.ts index d88a55e..46c4e00 100644 --- a/sdk/src/index.ts +++ b/sdk/src/index.ts @@ -21,6 +21,6 @@ export { bcForgeClient } from './client'; export type { BatchMintRecipient, bcForgeClientConfig, TransactionResult } from './client'; export { buildInvokeTransaction, submitTransaction, scValToNative } from './utils'; -export { bcForgeEventType, decodeEvent, decodeDiagnosticEvent, subscribeEvents } from './events'; -export type { bcForgeEvent, SubscriptionOptions } from './events'; +export { bcForgeEventType, decodeEvent, decodeDiagnosticEvent, subscribeEvents, EventParser, EventStream } from './events'; +export type { bcForgeEvent, SubscriptionOptions, EventFilter } from './events'; export * from './mockClient';