-
Notifications
You must be signed in to change notification settings - Fork 33
feat(wallet-sdk,core-acs-reader): add caching to acs reader #1650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mateuszpiatkowski-da
wants to merge
17
commits into
main
Choose a base branch
from
mateuszpiatkowski-da/add-caching-to-acs-reader
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
9d3ddc1
refactor(wallet-sdk): remove unused method
mateuszpiatkowski-da b201c8d
feat(wallet-sdk): add cache to acs read method
mateuszpiatkowski-da 4c6ab4e
feat(wallet-sdk): :construction: statrt working on acs cache
mateuszpiatkowski-da 490992d
feat(wallet-sdk): :construction: add private methods
mateuszpiatkowski-da 18f0320
feat(wallet-sdk): :construction: add prune method
mateuszpiatkowski-da fbe1ae2
feat(wallet-sdk): :construction: add query helpers
mateuszpiatkowski-da 89c5fd3
feat(wallet-sdk,core-acs-reader): continue working on acs cache
mateuszpiatkowski-da 23681d1
fix(wallet-sdk): fix running acs cache script
mateuszpiatkowski-da 3491145
feat: finalize stress test
mateuszpiatkowski-da 0ce573f
Merge branch 'main' into mateuszpiatkowski-da/add-caching-to-acs-reader
mateuszpiatkowski-da 9ee60a2
Merge branch 'main' into mateuszpiatkowski-da/add-caching-to-acs-reader
mateuszpiatkowski-da aa4f413
feat(wallet-sdk): update jsdoc, update ts type
mateuszpiatkowski-da 7df26d6
Merge branch 'main' into mateuszpiatkowski-da/add-caching-to-acs-reader
mateuszpiatkowski-da be357df
feat(wallet-sdk,core-acs-reader): move acs logic to acs-reader
mateuszpiatkowski-da ccb5b93
Merge branch 'main' into mateuszpiatkowski-da/add-caching-to-acs-reader
mateuszpiatkowski-da aa3aa23
fix: fix build
mateuszpiatkowski-da c7dd4a7
Merge branch 'main' into mateuszpiatkowski-da/add-caching-to-acs-reader
mateuszpiatkowski-da File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,312 @@ | ||
| // Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import { ContractId } from '@canton-network/core-token-standard' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| import { ACEvent, ACS_UPDATE_CONFIG, ACSState } from '../types' | ||
| import { | ||
| AbstractLedgerProvider, | ||
| Ops, | ||
| } from '@canton-network/core-provider-ledger' | ||
| import { LRUCacheOptions } from 'typescript-lru-cache' | ||
| import { LedgerCommonSchemas } from '@canton-network/core-ledger-client-types' | ||
| import pino from 'pino' | ||
| import { | ||
| ResolvedAcsOptions, | ||
| AcsService, | ||
| buildActiveContractFilter, | ||
| } from '../service' | ||
|
|
||
| export type ACSCacheOptions = Pick< | ||
| LRUCacheOptions<string, ACSCache>, | ||
| 'maxSize' | 'entryExpirationTimeInMS' | ||
| > | ||
|
|
||
| const logger = pino({ name: 'acs-reader/cache' }) | ||
|
|
||
| export class ACSCache { | ||
| private readonly state: ACSState = { | ||
| initial: { | ||
| offset: 0, | ||
| acs: [], | ||
| }, | ||
| updates: { | ||
| offset: 0, | ||
| acs: [], | ||
| }, | ||
| archivedACs: new Set(), | ||
| } | ||
| private readonly service: AcsService | ||
|
|
||
| constructor(private readonly ledger: AbstractLedgerProvider) { | ||
| this.service = new AcsService(ledger) | ||
| } | ||
|
|
||
| /** | ||
| * Returns the initial snapshot of the active contract set. | ||
| * Contains the base state captured at a specific ledger offset. | ||
| */ | ||
| private get initial() { | ||
| return this.state.initial | ||
| } | ||
|
|
||
| /** | ||
| * Returns the incremental updates applied after the initial snapshot. | ||
| * Contains created and archived events that occurred since the initial state. | ||
| */ | ||
| private get updates() { | ||
| return this.state.updates | ||
| } | ||
|
|
||
| /** | ||
| * Updates the cache to include ledger changes up to the specified offset. | ||
| * Fetches and applies incremental updates from the ledger, initializing the cache if needed. | ||
| * Automatically prunes old events when the update buffer exceeds configured thresholds. | ||
| */ | ||
| public async update(options: ResolvedAcsOptions) { | ||
| if (!this.initial.acs.length || this.initial.offset > options.offset) { | ||
| await this.initState(options) | ||
| } | ||
|
|
||
| const builtFilter = buildActiveContractFilter(options) | ||
|
|
||
| const updates = await this.fetchUpdates({ | ||
| beginExclusive: this.updates.offset, | ||
| endInclusive: options.offset, | ||
| eventFormat: { | ||
| verbose: Boolean(builtFilter.verbose), | ||
| ...builtFilter.filter, | ||
| }, | ||
| }) | ||
|
|
||
| // in practise length should never be > maxUpdatesToFetch only equal (server should never return more than limit in query). This is just a safeguard. | ||
| if (updates.length >= ACS_UPDATE_CONFIG.maxUpdatesToFetch) | ||
| void this.update(options) | ||
|
|
||
| const { newEvents, newOffset } = this.extractEvents({ | ||
| offset: this.updates.offset, | ||
| updates, | ||
| }) | ||
|
|
||
| if (newOffset > this.updates.offset) { | ||
| this.updates.offset = newOffset | ||
| this.updates.acs = this.updates.acs.concat(newEvents) | ||
| } else this.updates.offset = options.offset | ||
|
|
||
| if (this.updates.acs.length >= ACS_UPDATE_CONFIG.maxEventsBeforePrune) { | ||
| this.prune() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Calculates the active contract set at a specific ledger offset. | ||
| * Applies cached updates to the initial snapshot and filters out archived contracts. | ||
| * Throws an error if the cache is not initialized or the requested offset is too old. | ||
| */ | ||
| public calculateAt(offset: number) { | ||
| if (!this.initial.acs) | ||
| throw Error('No ACS initialized. Call `.update()` first') | ||
| if (this.initial.offset > offset) | ||
| throw Error('Provided offset cannot be smaller than ACS offset') | ||
|
|
||
| const newContracts: LedgerCommonSchemas['JsGetActiveContractsResponse'][] = | ||
| [] | ||
| const newArchivedContracts: Set<ContractId<string>> = new Set() | ||
|
|
||
| this.updates.acs | ||
| .filter((ac) => ac.offset <= offset) | ||
| .map((ac) => { | ||
| if (isCreatedEvent(ac)) { | ||
| newContracts.push({ | ||
| workflowId: ac.workflowId ?? '', | ||
| contractEntry: { | ||
| JsActiveContract: { | ||
| createdEvent: ac.event, | ||
| synchronizerId: ac.synchronizerId ?? '', | ||
| reassignmentCounter: 0, | ||
| }, | ||
| }, | ||
| }) | ||
| } else { | ||
| newArchivedContracts.add( | ||
| ac.event.contractId as ContractId<string> | ||
| ) | ||
| } | ||
| }) | ||
|
|
||
| const allContracts = this.initial.acs.concat(newContracts) | ||
| this.state.archivedACs = | ||
| this.state.archivedACs.union(newArchivedContracts) | ||
|
|
||
| return allContracts.filter(({ contractEntry }) => { | ||
| if (!contractEntry) return false | ||
| const id = ( | ||
| 'JsActiveContract' in contractEntry | ||
| ? contractEntry.JsActiveContract.createdEvent.contractId | ||
| : '' | ||
| ) as ContractId<string> | ||
|
|
||
| return !this.state.archivedACs.has(id) | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
| * Initializes the cache state by fetching the active contract set at the specified offset. | ||
| * Clears any existing updates and archived contract tracking. | ||
| */ | ||
| private async initState(options: ResolvedAcsOptions) { | ||
| const initialAcs = await this.service.getActiveContracts(options) | ||
| this.state.initial = { | ||
| offset: options.offset, | ||
| acs: initialAcs, | ||
| } | ||
| this.state.updates = { | ||
| offset: options.offset, | ||
| acs: [], | ||
| } | ||
| this.state.archivedACs = new Set() | ||
| } | ||
|
|
||
| /** | ||
| * Compacts the cache by moving the initial snapshot forward to a more recent offset. | ||
| * Applies accumulated updates to create a new initial state and discards old events. | ||
| * Improves performance by reducing the number of updates to process on each query. | ||
| */ | ||
| private prune() { | ||
| const newOffset = Math.max( | ||
| this.initial.offset, | ||
| this.updates.offset - ACS_UPDATE_CONFIG.safeOffsetDeltaForPrune | ||
| ) | ||
|
|
||
| if (newOffset > this.initial.offset) { | ||
| const responses = this.calculateAt(newOffset) | ||
|
|
||
| this.state.initial = { | ||
| offset: newOffset, | ||
| acs: responses, | ||
| } | ||
| this.state.updates = { | ||
| offset: this.updates.offset, | ||
| acs: this.updates.acs.filter((ac) => ac.offset > newOffset), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Fetches ledger updates between two offsets. | ||
| * Queries the ledger API for transactions containing contract create and archive events. | ||
| */ | ||
| private async fetchUpdates(args: { | ||
| beginExclusive: number | ||
| endInclusive: number | ||
| eventFormat: LedgerCommonSchemas['EventFormat'] | ||
| filter?: LedgerCommonSchemas['TransactionFilter'] | ||
| }) { | ||
| const { beginExclusive, endInclusive, eventFormat, filter } = args | ||
| const updateFormat: Ops.PostV2UpdatesFlats['ledgerApi']['params']['body']['updateFormat'] = | ||
| { | ||
| includeTransactions: { | ||
| eventFormat, | ||
| transactionShape: 'TRANSACTION_SHAPE_ACS_DELTA', | ||
| }, | ||
| } | ||
|
|
||
| return await this.ledger.request<Ops.PostV2Updates>({ | ||
| method: 'ledgerApi', | ||
| params: { | ||
| resource: '/v2/updates', | ||
| requestMethod: 'post', | ||
| body: { | ||
| beginExclusive, | ||
| endInclusive, | ||
| updateFormat, | ||
| verbose: false, | ||
| ...(filter ? { filter } : {}), | ||
| }, | ||
| query: { | ||
| limit: ACS_UPDATE_CONFIG.maxUpdatesToFetch, | ||
| stream_idle_timeout_ms: 1000, | ||
| }, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
| * Extracts contract creation and archival events from raw ledger updates. | ||
| * Processes transaction and checkpoint updates to build a list of relevant contract events. | ||
| * Tracks the highest offset seen across all updates. | ||
| */ | ||
| private extractEvents(args: { | ||
| updates: Awaited<ReturnType<ACSCache['fetchUpdates']>> | ||
| offset: number | ||
| }) { | ||
| const { updates, offset } = args | ||
| const newEvents: Array<ACEvent> = [] | ||
| let newOffset = offset | ||
| updates.forEach((update) => { | ||
| if (!update || !update.update) { | ||
| return | ||
| } | ||
| if ('Transaction' in update.update) { | ||
| const transaction = update.update.Transaction | ||
| const trOffset = transaction?.value?.offset | ||
| if (trOffset && trOffset > newOffset) { | ||
| const events: Array<LedgerCommonSchemas['Event']> = | ||
| transaction?.value?.events ?? [] | ||
| events.forEach((event) => { | ||
| if (!event) { | ||
| return | ||
| } | ||
| if ( | ||
| 'CreatedEvent' in event || | ||
| 'ArchivedEvent' in event | ||
| ) { | ||
| const eventData = | ||
| 'CreatedEvent' in event | ||
| ? event.CreatedEvent | ||
| : event.ArchivedEvent | ||
|
|
||
| const acUpdate: ACEvent = { | ||
| event: eventData, | ||
| offset: trOffset, | ||
| workflowId: | ||
| transaction?.value?.workflowId ?? null, | ||
| synchronizerId: | ||
| transaction?.value?.synchronizerId ?? null, | ||
| ...('ArchivedEvent' in event && { | ||
| archived: true, | ||
| }), | ||
| } | ||
| newEvents.push(acUpdate) | ||
| newOffset = trOffset | ||
| } | ||
| }) | ||
| } | ||
| } else if ('OffsetCheckpoint' in update.update) { | ||
| const checkpoint = update.update.OffsetCheckpoint | ||
| const offset = checkpoint?.value?.offset | ||
| if (offset) { | ||
| newOffset = offset | ||
| } | ||
| } else { | ||
| logger.warn( | ||
| { | ||
| value: JSON.stringify(update.update), | ||
| }, | ||
| 'ACS update got unknown update type' | ||
| ) | ||
| } | ||
| }) | ||
| return { newEvents, newOffset } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Checks if an event represents a contract creation. | ||
| * Used to distinguish between created and archived events when processing cache updates. | ||
| */ | ||
| function isCreatedEvent(event: ACEvent): event is ACEvent & { | ||
| archived: true | ||
| event: LedgerCommonSchemas['CreatedEvent'] | ||
| } { | ||
| return !event.archived | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does the acs reader need token standard ?