From c055a5dba964f505fafc2705d82e06c28cf201e6 Mon Sep 17 00:00:00 2001 From: Brian Platz Date: Thu, 14 Aug 2025 13:55:27 -0400 Subject: [PATCH 01/13] core branching --- src/fluree/db/api.cljc | 234 +--------------------------------- src/fluree/db/api/branch.cljc | 12 +- src/fluree/db/connection.cljc | 10 +- src/fluree/db/ledger.cljc | 7 +- 4 files changed, 17 insertions(+), 246 deletions(-) diff --git a/src/fluree/db/api.cljc b/src/fluree/db/api.cljc index 59610c8857..9c337314b7 100644 --- a/src/fluree/db/api.cljc +++ b/src/fluree/db/api.cljc @@ -12,7 +12,6 @@ [fluree.db.json-ld.iri :as iri] [fluree.db.json-ld.policy :as policy] [fluree.db.ledger :as ledger] - [fluree.db.merge :as merge] [fluree.db.nameservice.query :as ns-query] [fluree.db.query.api :as query-api] [fluree.db.query.fql.parse :as parse] @@ -186,18 +185,10 @@ * For LocalStack: 'http://localhost:4566' * For MinIO: 'http://localhost:9000' - s3-prefix (optional): The prefix within the bucket - - s3-read-timeout-ms (optional): Per-request read timeout (default 20000) - - s3-write-timeout-ms (optional): Per-request write timeout (default 60000) - - s3-list-timeout-ms (optional): Per-request list timeout (default 20000) - - s3-max-retries (optional): Max retry attempts on transient errors (default 4) - - s3-retry-base-delay-ms (optional): Base backoff delay in ms (default 150) - - s3-retry-max-delay-ms (optional): Max backoff delay in ms (default 2000) - parallelism (optional): Number of parallel operations (default: 4) - cache-max-mb (optional): Maximum memory for caching in MB (default: 1000) - defaults (optional): Default options for ledgers created with this connection" - ([{:keys [s3-bucket s3-prefix s3-endpoint parallelism cache-max-mb defaults - s3-read-timeout-ms s3-write-timeout-ms s3-list-timeout-ms - s3-max-retries s3-retry-base-delay-ms s3-retry-max-delay-ms], + ([{:keys [s3-bucket s3-prefix s3-endpoint parallelism cache-max-mb defaults], :or {parallelism 4, cache-max-mb 1000}}] (when-not s3-bucket (throw (ex-info "S3 bucket name is required for S3 connection" @@ -212,13 +203,7 @@ "@type" "Storage" "s3Bucket" s3-bucket "s3Endpoint" s3-endpoint} - s3-prefix (assoc "s3Prefix" s3-prefix) - s3-read-timeout-ms (assoc "s3ReadTimeoutMs" s3-read-timeout-ms) - s3-write-timeout-ms (assoc "s3WriteTimeoutMs" s3-write-timeout-ms) - s3-list-timeout-ms (assoc "s3ListTimeoutMs" s3-list-timeout-ms) - s3-max-retries (assoc "s3MaxRetries" s3-max-retries) - s3-retry-base-delay-ms (assoc "s3RetryBaseDelayMs" s3-retry-base-delay-ms) - s3-retry-max-delay-ms (assoc "s3RetryMaxDelayMs" s3-retry-max-delay-ms)) + s3-prefix (assoc "s3Prefix" s3-prefix)) (cond-> {"@id" "connection" "@type" "Connection" "parallelism" parallelism @@ -252,10 +237,9 @@ ([conn ledger-alias opts] (validate-connection conn) ;; Disallow branch specification in ledger name during creation - (when (or (str/includes? ledger-alias ":") - (str/includes? ledger-alias "@")) + (when (str/includes? ledger-alias ":") (throw (ex-info (str "Ledger name cannot contain ':' character. " - "'@' is reserved for time travel. " + "Branches must be created separately. " "Provided: " ledger-alias) {:error :db/invalid-ledger-name :ledger-alias ledger-alias}))) @@ -636,216 +620,6 @@ (promise-wrap (api.branch/rename-branch! conn old-branch-spec new-branch-spec))) -;; Branch operations (merge, rebase, reset) - -(defn merge! - "Merges commits from source branch into target branch. - - Updates the target branch with changes from the source branch. - Supports fast-forward, squash, and regular merge modes. - - Parameters: - conn - Connection object - from - Source branch spec (e.g., 'ledger:feature') - to - Target branch spec (e.g., 'ledger:main') - opts - Map with optional merge options: - :ff - Fast-forward behavior (default :auto) - :auto - Fast-forward when possible - :only - Only allow fast-forward (fail otherwise) - :never - Never fast-forward, always create merge commit - :squash? - Combine all commits into one (default false) - :schema-aware - Use schema rules to resolve - function - Custom conflict resolution - :preview? - Dry run without making changes (default false) - - Returns promise resolving to: - {:status :success|:conflict|:error - :operation :merge - :from '...' :to '...' - :strategy '3-way' - :commits {:merged [...] :conflicts [...]} - :new-commit 'sha'} - - Phase 2 - Not yet implemented." - ([conn from to] - (merge! conn from to {})) - ([conn from to opts] - (validate-connection conn) - (promise-wrap - (merge/merge! conn from to opts)))) - -(defn rebase! - "Rebases source branch onto target branch (updates source branch). - - NOTE: True rebase is not yet implemented. Currently redirects to merge! - which updates the target branch instead of source branch. - - Parameters: - conn - Connection object - from - Source branch spec to rebase (will be updated) - to - Target branch spec to rebase onto (unchanged) - opts - Map with optional rebase options: - :ff - Fast-forward behavior (default :auto) - :auto - Fast-forward when possible - :only - Only allow fast-forward (fail otherwise) - :never - Never fast-forward, always replay - :squash? - Combine all commits into one (default false) - :atomic? - All-or-nothing vs apply-until-conflict (default true) - :selector - Which commits to include (default nil = all after LCA) - nil - All commits after LCA - {:t {:from 42 :to 44}} - Specific t-value range - {:t {:from 42 :until :conflict}} - Until conflict - {:shas ['sha1' 'sha2']} - Specific commits - :preview? - Dry run without making changes (default false) - - Returns promise resolving to: - {:status :success|:conflict|:error - :operation :rebase - :from '...' :to '...' - :strategy 'fast-forward'|'squash'|'replay' - :commits {:applied [...] :skipped [...] :conflicts [...]} - :new-commit 'sha'} - - Phase 1 implements: fast-forward and squash modes. - Phase 2 will add: cherry-pick and non-atomic modes." - ([conn from to] - (rebase! conn from to {})) - ([conn from to opts] - (validate-connection conn) - (promise-wrap - (merge/rebase! conn from to opts)))) - -(defn reset-branch! - "Resets a branch to a previous state. - - Two modes available: - - Safe mode (default): Creates new commit reverting to target state - - Hard mode: Moves branch pointer (rewrites history) - - Parameters: - conn - Connection object - branch - Target branch to reset (e.g., 'ledger:main') - to - Target state: - {:t 90} - Reset to transaction t-value - {:sha 'abc123'} - Reset to specific commit SHA - opts - Map with optional reset options: - :mode - Reset mode (default :safe) - :safe - Create revert commit (non-destructive) - :hard - Move branch pointer (destructive) - :archive - How to archive on hard reset (default {:as :tag}) - {:as :tag :name 'backup'} - Create tag at old HEAD - {:as :branch :name 'backup'} - Create branch at old HEAD - {:as :none} - No archive (requires force?) - :force? - Required for hard reset without archive (default false) - :message - Commit message for safe mode (auto-generated if not provided) - :preview? - Dry run without making changes (default false) - - Returns promise resolving to: - {:status :success|:error - :operation :reset - :branch '...' - :mode :safe|:hard - :reset-to {:t 90}|{:sha '...'} - :new-commit 'sha' ; For safe mode - :archived {:type :tag :name '...'} ; For hard mode - :previous-head 'sha'} - - Phase 1 implements: safe reset only. - Phase 2 will add: hard reset with archiving." - ([conn branch to] - (reset-branch! conn branch to {})) - ([conn branch to opts] - (validate-connection conn) - (promise-wrap - (merge/reset-branch! conn branch to opts)))) - -;; Legacy API - Deprecated, use rebase! instead -(defn merge-branches! - "DEPRECATED - Use rebase! instead. - - Legacy merge API maintained for backwards compatibility. - Maps to rebase! with appropriate options." - ([conn source-branch-spec target-branch-spec] - (merge-branches! conn source-branch-spec target-branch-spec {})) - ([conn source-branch-spec target-branch-spec opts] - (validate-connection conn) - ;; Map legacy API to new rebase! API - (let [strategy (:strategy opts :auto) - new-opts (clojure.core/merge - (case strategy - :fast-forward {:ff :only} - :flatten {:squash? true} - :auto {:ff :auto} - :no-ff {:ff :never} - {}) - (dissoc opts :strategy)) - result @(rebase! conn source-branch-spec target-branch-spec new-opts)] - ;; Map new response format to legacy format for backwards compatibility - (promise-wrap - (go-try - (if (= :success (:status result)) - (assoc result - :type (case (:strategy result) - "fast-forward" :fast-forward - "squash" :flatten - "replay" :rebase - :rebase)) - result)))))) - -(defn branch-divergence - "Analyzes divergence between two branches. - - Parameters: - conn - Connection object - branch1-spec - First branch spec - branch2-spec - Second branch spec - - Returns promise resolving to divergence analysis including: - :common-ancestor - Commit ID of common ancestor - :branch1-ahead - Number of commits branch1 is ahead - :branch2-ahead - Number of commits branch2 is ahead - :can-fast-forward - Boolean if one can fast-forward to the other" - [conn branch1-spec branch2-spec] - (validate-connection conn) - (promise-wrap - (merge/branch-divergence conn branch1-spec branch2-spec))) - -(defn branch-graph - "Returns a graph representation of branches and their relationships. - - Useful for visualizing branch history and relationships in UIs. - - Parameters: - conn - Connection object - ledger-spec - Ledger specification (e.g., 'myledger') - opts - Options map: - :format - Output format (default :json) - :json - Structured data for UI rendering - :ascii - ASCII art representation - :depth - Number of commits to show (default 20) - integer - Show N most recent commits - :all - Show entire history - :branches - Which branches to include (default :all) - :all - Include all branches - [\"main\" \"feature\"] - Only specified branches - - Returns promise resolving to: - - For :json format: Map with :branches, :commits, and :merges - - For :ascii format: String with ASCII art graph - - Example: - ;; Get JSON data for UI - @(branch-graph conn \"mydb\" {:format :json :depth 50}) - - ;; Get ASCII visualization - @(branch-graph conn \"mydb\" {:format :ascii :branches [\"main\" \"feature\"]})" - ([conn ledger-spec] - (branch-graph conn ledger-spec {})) - ([conn ledger-spec opts] - (validate-connection conn) - (promise-wrap - (merge/branch-graph conn ledger-spec opts)))) - ;; db operations (defn db diff --git a/src/fluree/db/api/branch.cljc b/src/fluree/db/api/branch.cljc index 336c4ff0da..85c9adcbcc 100644 --- a/src/fluree/db/api/branch.cljc +++ b/src/fluree/db/api/branch.cljc @@ -18,7 +18,7 @@ conn - Connection object new-branch-spec - Full branch spec (e.g., 'ledger:new-branch') from-branch-spec - Source branch spec (e.g., 'ledger:old-branch') - from-commit - (optional) Specific commit id (sha256 URI) to branch from, defaults to latest + from-commit - (optional) Specific commit ID to branch from, defaults to latest Returns the new branch metadata." [conn new-branch-spec from-branch-spec from-commit] @@ -33,14 +33,13 @@ ;; Load source ledger to get its current commit (let [source-ledger ( Date: Wed, 20 Aug 2025 23:03:37 -0400 Subject: [PATCH 02/13] add time-travel support for safe branch reset; implement tests for reset functionality --- src/fluree/db/flake/flake_db.cljc | 67 +++++++++++++------------------ 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/src/fluree/db/flake/flake_db.cljc b/src/fluree/db/flake/flake_db.cljc index 76b61003b5..08d3560f9f 100644 --- a/src/fluree/db/flake/flake_db.cljc +++ b/src/fluree/db/flake/flake_db.cljc @@ -32,8 +32,7 @@ [fluree.db.query.range :as query-range] [fluree.db.reasoner :as reasoner] [fluree.db.time-travel :refer [TimeTravel]] - [fluree.db.util :as util :refer [try* catch* get-id get-types get-list - get-first get-first-value]] + [fluree.db.util :as util :refer [try* catch* get-first get-first-value]] [fluree.db.util.async :refer [ list-val :idx last)}] (assoc list-val ::meta m))) (defn list-value? "returns true if json-ld value is a list object." [v] (and (map? v) - (= const/iri-list (-> v first key)))) + (= :list (-> v first key)))) -(defn subject-node? - "Returns true if a nested value is itself another subject node in the graph. +(defn node? + "Returns true if a nested value is itself another node in the graph. Only need to test maps that have :id - and if they have other properties they are defining then we know it is a node and have additional data to include." [mapx] (cond - (contains? mapx const/iri-value) + (contains? mapx :value) false (list-value? mapx) false (and - (contains? mapx const/iri-set) - (= 1 (count mapx))) + (contains? mapx :set) + (= #{:set :idx} (set (keys mapx)))) false :else @@ -181,9 +180,9 @@ (defn value-map->flake [assert? db sid pid t v-map] - (let [ref-id (get-id v-map) + (let [ref-id (:id v-map) meta (::meta v-map)] - (if (and ref-id (subject-node? v-map)) + (if (and ref-id (node? v-map)) (let [ref-sid (iri/encode-iri db ref-id)] (flake/create sid pid ref-sid const/$id t assert? meta)) (let [[value dt] (datatype/from-expanded db v-map)] @@ -194,28 +193,27 @@ (let [v-maps (util/sequential value)] (mapcat (fn [v-map] (if (list-value? v-map) - (let [list-vals (get-list v-map)] + (let [list-vals (:list v-map)] (into [] - (comp (map-indexed add-list-meta) + (comp (map add-list-meta) (map (partial value-map->flake assert? db sid pid t))) list-vals)) [(value-map->flake assert? db sid pid t v-map)])) v-maps))) (defn- get-type-flakes - [assert? db t sid types] + [assert? db t sid type] (into [] (map (fn [type-item] (let [type-sid (iri/encode-iri db type-item)] (flake/create sid const/$rdf:type type-sid const/$id t assert? nil)))) - types)) + type)) (defn node->flakes [assert? db t node] (log/trace "node->flakes:" node "assert?" assert?) - (let [id (get-id node) - types (get-types node) + (let [{:keys [id type]} node sid (if assert? (iri/encode-iri db id) (or (iri/encode-iri db id) @@ -225,11 +223,11 @@ {:status 400 :error :db/invalid-retraction :iri id})))) - type-assertions (if (seq types) - (get-type-flakes assert? db t sid types) + type-assertions (if (seq type) + (get-type-flakes assert? db t sid type) [])] (into type-assertions - (comp (remove #(-> % key #{const/iri-id const/iri-type})) + (comp (remove #(-> % key keyword?)) (mapcat (fn [[prop value]] (let [pid (if assert? @@ -265,7 +263,7 @@ [db commit-jsonld commit-data-jsonld] (go-try (let [t-new (db-t commit-data-jsonld) - nses (map util/get-value + nses (map :value (get commit-data-jsonld const/iri-namespaces)) db* (with-namespaces db nses) asserted-flakes (->> (db-assert commit-data-jsonld) @@ -382,13 +380,12 @@ (sha->t [db sha] (go-try (log/debug "sha->t looking up commit SHA:" sha) - ;; Normalize the input - use only 'fluree:commit:sha256:b' prefix when present, - ;; otherwise ensure the value starts with 'b' - (let [prefix-b "fluree:commit:sha256:b" - sha-normalized (cond - ;; Input is a full commit IRI with ':b' segment - keep leading 'b' - (str/starts-with? sha prefix-b) - (subs sha (dec (count prefix-b))) + ;; Normalize the input - strip known prefixes if present + ;; Always ensure we end up with 'b' prefix + hash + (let [sha-normalized (cond + ;; Full commit ID prefix + (str/starts-with? sha "fluree:commit:sha256:") + (subs sha 21) ; Extract everything after the prefix ;; Already has correct format (starts with 'b') (str/starts-with? sha "b") @@ -410,14 +407,8 @@ {:status 400 :error :db/invalid-commit-sha :sha sha :normalized sha-normalized :length sha-length})) - ;; Too short to be a useful/efficient prefix (minimum 6) - (< sha-length 6) - (throw (ex-info "SHA prefix must be at least 6 characters" - {:status 400 :error :db/invalid-commit-sha :min 6})) - - ;; Full SHA - use direct efficient lookup (51-52 chars with 'b' prefix) - (or (= 52 sha-length) - (= 51 sha-length)) + ;; Full SHA - use direct efficient lookup (53 chars with 'bb' prefix) + (= 53 sha-length) (let [;; sha-normalized already has 'b' prefix from normalization commit-id (str "fluree:commit:sha256:" sha-normalized) direct-query {:select ["?t"] @@ -589,7 +580,7 @@ (try* (! error-ch e))))) (defn merge-novelty-commits From d1e6dd8340a8a89cb8d0afb145eb2338ba06fd9f Mon Sep 17 00:00:00 2001 From: Brian Platz Date: Tue, 26 Aug 2025 17:19:19 -0400 Subject: [PATCH 03/13] implement cuckoo filter for cross-branch index garbage collection - add cuckoo filter implementation with chain support for 100K+ segments - integrate filters into index refresh and garbage collection processes - use FNV-1a 32-bit hash for cross-platform determinism (CLJ/CLJS) - implement proactive filter growth at 90% capacity threshold - cache other-branch filters during GC to reduce I/O operations - exclude garbage files from filter, only track actual index segments - add comprehensive test suite for filter and chain operations --- docs/cuckoo-filter-gc-strategy.md | 240 ++++++++ src/fluree/db/api/branch.cljc | 19 +- src/fluree/db/connection.cljc | 16 +- src/fluree/db/flake/index/novelty.cljc | 35 +- src/fluree/db/indexer/cuckoo.cljc | 545 +++++++++++++++++++ src/fluree/db/indexer/garbage.cljc | 86 ++- src/fluree/db/ledger.cljc | 8 + test/fluree/db/indexer/cuckoo_chain_test.clj | 160 ++++++ test/fluree/db/indexer/cuckoo_test.clj | 261 +++++++++ test/fluree/db_test.cljc | 4 +- 10 files changed, 1345 insertions(+), 29 deletions(-) create mode 100644 docs/cuckoo-filter-gc-strategy.md create mode 100644 src/fluree/db/indexer/cuckoo.cljc create mode 100644 test/fluree/db/indexer/cuckoo_chain_test.clj create mode 100644 test/fluree/db/indexer/cuckoo_test.clj diff --git a/docs/cuckoo-filter-gc-strategy.md b/docs/cuckoo-filter-gc-strategy.md new file mode 100644 index 0000000000..5c84affc96 --- /dev/null +++ b/docs/cuckoo-filter-gc-strategy.md @@ -0,0 +1,240 @@ +# Cuckoo Filter for Cross-Branch Index Garbage Collection + +## Overview + +Fluree uses cuckoo filters to optimize index garbage collection across database branches. The implementation enables efficient checking of whether index nodes marked as garbage by one branch are still in use by other branches, preventing premature deletion while maintaining high performance with minimal memory overhead. + +## Why Cuckoo Filters? + +### The Cross-Branch Garbage Collection Problem +Fluree's content-addressed storage means multiple branches can share the same index segments. When garbage collection runs for branch A: + +1. **Branch A** has a list of index nodes no longer needed after reindexing +2. **Other branches** (B, C, etc.) may still reference these same nodes +3. **Safety requirement**: Must not delete nodes still in use by other branches +4. **Performance requirement**: Need fast membership testing across potentially large sets +5. **Acceptable tradeoff**: False positives (keeping garbage longer) are acceptable; false negatives (deleting active nodes) are NOT acceptable + +### Index Storage Architecture +- **Content-Addressed Storage**: All index files use SHA-256 hashes as filenames (base32 encoded) +- **Index Types**: 4 different index types (spot, post, opst, tspo) +- **Storage Structure**: `ledger-name/index//.json` +- **Leaf Nodes**: Average ~200KB; overflow threshold 500KB +- **Branch Nodes**: Can hold up to 500 children +- **Shared Segments**: Multiple branches frequently reference identical index segments + +## How It Works + +### Filter Storage and Lifecycle +Each branch maintains its own cuckoo filter stored at: +``` +ledger-name/index/cuckoo/branch-name.json +``` + +**Filter Updates:** +1. **During indexing** (`novelty.cljc`): As new index segments are written, their addresses are automatically added to the branch's cuckoo filter +2. **During garbage collection** (`garbage.cljc`): Segments are removed from the current branch's filter before checking other branches +3. **Branch creation** (`branch.cljc`): New branches copy their parent's filter as a starting point + +### Cross-Branch Checking Process +When garbage collection runs: +1. Load all other branch filters (excluding current branch) +2. Check each garbage segment against other branch filters +3. Retain segments that show up as "possibly in use" by other branches +4. Only delete segments that are confirmed not in use elsewhere + +## Design Parameters + +### Fingerprint Selection +- **Method**: Decodes base32 SHA-256 hash to raw bytes and takes the first 16 bits +- **Size**: 16 bits (first 2 bytes of the decoded SHA-256 hash) +- **Bucket Hashing**: FNV-1a 32-bit hash over first 8 bytes for primary bucket +- **Platform Stability**: FNV-1a ensures identical behavior across JVM and JavaScript +- **Rationale**: Platform-stable implementation using first 16 bits of SHA-256 for low false positive rates + +### Filter Configuration +- **Bucket size**: 4 slots per bucket (standard for cuckoo filters) +- **Max relocations**: 500 attempts before considering filter full +- **Load factor target**: 90-95% +- **Automatic sizing**: Filter size is calculated based on expected item count + +## Performance Characteristics + +### Memory Usage +At ~95% load factor: +- **16-bit fingerprints**: ~6 bytes per index segment (using JSON with EDN-encoded bucket arrays) +- **Future optimization**: Binary packing with base64 encoding could reduce to ~2.8 bytes per segment + +### False Positive Rates +- **16-bit fingerprints**: ~0.012% (1 in ~8,200) + +### Database Size vs Filter Size + +Using realistic index characteristics (200KB average leaf size, ~300 branch fanout): + +| Database Size | Estimated Segments | Filter Size (16-bit) | Expected FP Rate | +|---------------|-------------------|---------------------|------------------| +| 100MB | ~502 | ~3KB | ~0.012% | +| 1GB | ~5,017 | ~29KB | ~0.012% | +| 10GB | ~50,167 | ~293KB | ~0.012% | +| 100GB | ~501,667 | ~2.9MB | ~0.012% | +| 1TB | ~5,016,667 | ~29MB | ~0.012% | + +**Calculations:** +- Segments = `(DB_size / 200KB) + (segments / 300)` (leaves + branches) +- Filter size ≈ `segments × 6 bytes` (current JSON/EDN serialization) + +### Runtime Performance +- **Hash operations**: ~1 microsecond per segment +- **Filter lookups**: O(1) average, O(4) worst case +- **Negligible overhead** compared to actual disk operations +- **I/O impact**: Minimal - filters are small and cached during GC operations + +## Implementation Details + +### Key Components + +The implementation spans three main namespaces: + +#### 1. Core Filter (`fluree.db.indexer.cuckoo`) +```clojure +;; Primary operations +(create-filter expected-items) ; Create new filter +(add-item filter segment-address) ; Add segment to filter +(contains-hash? filter segment-address) ; Check membership +(remove-item filter segment-address) ; Remove segment +(serialize/deserialize filter) ; Persistence +``` + +#### 2. Index Integration (`fluree.db.flake.index.novelty`) +During the index refresh process: +1. Collect all newly written segment addresses +2. Add segments (including root and garbage files) to current branch's filter +3. Write updated filter to storage + +#### 3. Garbage Collection (`fluree.db.indexer.garbage`) +During garbage collection: +1. Remove garbage segments from current branch's filter (prevents self-checking) +2. Load filters from all other branches +3. Check each garbage segment against other branch filters +4. Only delete segments not found in any other branch + +## Design Decisions + +### 1. Safety-First Approach +- **False positives acceptable**: Better to keep garbage longer than delete active segments +- **Missing filter = retain all**: When a branch's filter can't be read, assume all segments are in use +- **Self-exclusion**: Branches never check their own filter during garbage collection + +### 2. Fingerprint Strategy +- **Extract from base32 decoded bytes**: Takes first 16 bits from decoded SHA-256 hash +- **Platform consistent**: Works identically across JVM and JavaScript platforms +- **16-bit fixed size**: Provides good balance of memory efficiency and low false positive rate (~0.012%) + +### 3. Storage Strategy +- **Per-branch filters**: Each branch maintains its own filter at `ledger/index/cuckoo/branch.json` +- **JSON serialization**: Human-readable format with EDN-encoded bucket arrays +- **Atomic updates**: Filters are written atomically during index completion +- **Future optimization**: Binary packing could reduce storage by ~50% + +### 4. Concurrency and Consistency +- **No locking required**: Filters are read-only during garbage collection +- **Sequential updates**: Filter updates happen synchronously during indexing +- **Immutable snapshots**: GC operates on filter snapshots, not live data + +## Monitoring and Observability + +### Log Messages +Garbage collection operations log retention statistics: +``` +INFO: Checking 8 garbage segments from ledger "my-ledger:main" branch "main" t 1 + - Retained 2 segments still in use by other branches + - Deleting 6 segments from disk +``` + +### Filter Statistics +The implementation provides metrics via `filter-stats`: +- **Count**: Number of items in the filter +- **Capacity**: Maximum items before resize needed +- **Load factor**: Current utilization percentage +- **Estimated FPR**: Theoretical false positive rate +- **Fingerprint bits**: Configuration setting + +## Operational Characteristics + +### Failure Handling +- **Corrupted filter**: Logged and treated as missing (retain all segments) +- **Storage errors**: GC conservatively retains segments when in doubt +- **Filter overflow**: Automatically creates larger filter as needed + +### Branch Operations +- **Branch creation**: Copies parent's filter as starting point +- **Branch deletion**: Removes corresponding filter file +- **Branch rename**: Updates filter filename accordingly + +## Implementation Reference + +### Core Algorithm +The cuckoo filter uses two hash functions to determine bucket locations: + +```clojure +;; Extract hash part and decode to bytes +(defn- compute-hashes [address num-buckets] + (let [hash-bytes (address->bytes address) + ;; Extract 16-bit fingerprint from first 2 bytes + fp (bit-or (bit-shift-left (bit-and (first hash-bytes) 0xFF) 8) + (bit-and (second hash-bytes) 0xFF)) + ;; FNV-1a 32-bit hash for primary bucket + ;; FNV-1a prime: 16777619, offset basis: 2166136261 + fnv-prime 16777619 + fnv-offset 2166136261 + b1-hash (reduce (fn [hash b] + ;; FNV-1a: hash = (hash XOR byte) * prime + (bit-and 0xFFFFFFFF + (* (bit-xor hash (bit-and b 0xFF)) + fnv-prime))) + fnv-offset + (take 8 hash-bytes)) + b1 (mod b1-hash num-buckets) + ;; Compute alternate bucket using XOR with fingerprint + b2 (mod (bit-xor b1 (hash fp)) num-buckets)] + [fp b1 b2])) +``` + +### Key Operations +```clojure +;; Create filter chain (supports growth beyond initial capacity) +(create-filter-chain) + +;; Add index segment to filter chain +(add-item-chain chain "fluree:file://ledger/index/spot/abc123.json") + +;; Check if segment might be in filter chain (may have false positives) +(contains-hash-chain? chain "fluree:file://ledger/index/spot/abc123.json") + +;; Remove segment from filter chain +(remove-item-chain chain "fluree:file://ledger/index/spot/abc123.json") + +;; Batch operations for efficiency +(batch-add-chain chain segment-list) +(batch-remove-chain chain segment-list) +``` + +### Cross-Branch Checking +```clojure +;; Load all other branch filters +(defn load-other-branch-filters [index-catalog ledger current-branch] + ;; Discovers branches by scanning storage, excludes current branch + ;; Returns vector of loaded filters) + +;; Check if any other branch uses this segment +(defn any-branch-uses? [other-filters segment-address] + (some #(contains-hash? % segment-address) other-filters)) +``` + +## References + +- [Cuckoo Filter: Practically Better Than Bloom](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf) +- [Implementation: `fluree.db.indexer.cuckoo`](../src/fluree/db/indexer/cuckoo.cljc) +- [Garbage Collection: `fluree.db.indexer.garbage`](../src/fluree/db/indexer/garbage.cljc) +- [Index Integration: `fluree.db.flake.index.novelty`](../src/fluree/db/flake/index/novelty.cljc) \ No newline at end of file diff --git a/src/fluree/db/api/branch.cljc b/src/fluree/db/api/branch.cljc index 85c9adcbcc..6e374e7fb9 100644 --- a/src/fluree/db/api/branch.cljc +++ b/src/fluree/db/api/branch.cljc @@ -3,6 +3,7 @@ This namespace contains the implementation logic for branch management." (:require [fluree.db.connection :as connection] [fluree.db.flake.commit-data :as commit-data] + [fluree.db.indexer.cuckoo :as cuckoo] [fluree.db.ledger :as ledger] [fluree.db.nameservice :as nameservice] [fluree.db.nameservice.sub :as ns-subscribe] @@ -41,6 +42,14 @@ :created-from {"f:branch" from-branch "f:commit" {"@id" source-commit}}} + ;; Copy cuckoo filter from source branch to new branch (if storage supports it) + index-catalog (:index-catalog source-db) + _ (when (and index-catalog (:storage index-catalog)) + ;; Read the source branch's filter and copy it if it exists + (when-let [source-filter ( source-commit-map @@ -131,12 +140,18 @@ (throw (ex-info (str "Cannot delete protected branch: " branch) {:status 400 :error :db/cannot-delete-protected-branch}))) ;; Now delete the branch from nameservice - primary-publisher (:primary-publisher conn)] + primary-publisher (:primary-publisher conn) + ;; Also delete the cuckoo filter for this branch + index-catalog (:index-catalog ledger) + [ledger-id branch-name] (util.ledger/ledger-parts branch-spec)] (if primary-publisher (do ( (util/get-first latest-commit const/iri-index) - (util/get-first-value const/iri-address))] + (util/get-first-value const/iri-address)) + ;; Extract ledger alias from the commit + ledger-alias (util/get-first-value latest-commit const/iri-alias)] (when index-address (log/debug "Dropping index" index-address) (let [{:keys [spot opst post tspo]} (! go go-loop]] + [clojure.string :as str] [fluree.db.dbproto :as dbproto] [fluree.db.flake :as flake] [fluree.db.flake.commit-data :as commit-data] [fluree.db.flake.index :as index] [fluree.db.flake.index.storage :as storage] + [fluree.db.indexer.cuckoo :as cuckoo] [fluree.db.indexer.garbage :as garbage] [fluree.db.util :as util :refer [try* catch*]] [fluree.db.util.async :refer [ stats (update :novel inc) (assoc-in [:updated-ids (:id node)] (:id written-node)) - (cond-> (not= old-id :empty) (update :garbage conj old-id)))] + (cond-> (not= old-id :empty) (update :garbage conj old-id)) + (cond-> segment-name (update :new-segments conj segment-name)))] (recur stats* written-node)) (recur (update stats :unchanged inc) @@ -355,11 +362,12 @@ ::t t})) (defn tally - [db-status {:keys [idx root garbage] :as _tally-data}] + [db-status {:keys [idx root garbage new-segments] :as _tally-data}] (-> db-status (update :db assoc idx root) (update :indexes conj idx) - (update :garbage into garbage))) + (update :garbage into garbage) + (update :new-segments into new-segments))) (defn refresh-all ([db error-ch] @@ -369,7 +377,7 @@ (map (partial extract-root db)) (map (partial refresh-index db changes-ch error-ch)) async/merge - (async/reduce tally {:db db, :indexes [], :garbage #{}})))) + (async/reduce tally {:db db, :indexes [], :garbage #{}, :new-segments []})))) (defn refresh [{:keys [novelty t alias] :as db} changes-ch max-old-indexes] @@ -390,7 +398,7 @@ (throw e)) refresh-ch - ([{:keys [garbage], refreshed-db :db, :as _status}] + ([{:keys [garbage new-segments], refreshed-db :db, :as _status}] (let [{:keys [index-catalog alias] :as refreshed-db*} (assoc-in refreshed-db [:stats :indexed] t) ;; TODO - ideally issue garbage/root writes to RAFT together @@ -403,6 +411,21 @@ db-root-res ( new-segments + (:address db-root-res) + (conj (last (str/split (:address db-root-res) #"/")))) + updated-filter (if (seq all-segments) + (cuckoo/batch-add-chain filter all-segments) + filter)] + ;; Always write the filter (even if empty) to ensure it exists + ( refreshed-db* :commit :data) diff --git a/src/fluree/db/indexer/cuckoo.cljc b/src/fluree/db/indexer/cuckoo.cljc new file mode 100644 index 0000000000..4320a1495d --- /dev/null +++ b/src/fluree/db/indexer/cuckoo.cljc @@ -0,0 +1,545 @@ +(ns fluree.db.indexer.cuckoo + "Cuckoo filter implementation for cross-branch index garbage collection. + + Uses cuckoo filters to efficiently check if index nodes marked as garbage + by one branch are still in use by other branches." + (:require [alphabase.core :as alphabase] + [clojure.string :as str] + [fluree.db.storage :as store] + [fluree.db.util :refer [try* catch*]] + [fluree.db.util.async :refer [go-try address + (str/split #"/") + last + (str/replace #"\.json$" "")) + address)) + +(defn- address->bytes + "Convert an address to bytes for hashing. + Expects valid base32-encoded SHA-256 hashes." + [address] + (let [hash-part (extract-hash-part address)] + (try* + ;; Decode base32 SHA-256 hash to raw bytes + (alphabase/base32->bytes hash-part) + (catch* e + ;; Log error and throw - addresses should always be valid base32 SHA-256 hashes + (log/error e "Failed to decode base32 address:" address + "hash-part:" hash-part) + (throw (ex-info "Invalid base32 address for cuckoo filter" + {:address address + :hash-part hash-part + :error (str e)})))))) + +(defn- compute-hashes + "Compute fingerprint and bucket indices from an address. + Decodes base32 once and returns [fingerprint bucket1 bucket2]. + Uses FNV-1a 32-bit hash for cross-platform consistency between CLJ and CLJS." + [address num-buckets] + (let [hash-bytes (address->bytes address) + ;; Extract 16-bit fingerprint from first 2 bytes + fp (bit-or (bit-shift-left (bit-and (first hash-bytes) 0xFF) 8) + (bit-and (second hash-bytes) 0xFF)) + ;; FNV-1a 32-bit hash for primary bucket + ;; Uses first 8 bytes for hashing + ;; FNV-1a prime: 16777619, offset basis: 2166136261 + fnv-prime 16777619 + fnv-offset 2166136261 + b1-hash (reduce (fn [hash b] + ;; FNV-1a: hash = (hash XOR byte) * prime + ;; Keep in 32-bit range using unsigned-bit-shift-right + (bit-and 0xFFFFFFFF + (* (bit-xor hash (bit-and b 0xFF)) + fnv-prime))) + fnv-offset + (take 8 hash-bytes)) + b1 (mod b1-hash num-buckets) + ;; Compute alternate bucket using XOR with fingerprint hash + ;; Simple XOR ensures deterministic b2 calculation + b2 (mod (bit-xor b1 (hash fp)) num-buckets)] + [fp b1 b2])) + +(defn- bucket-full? + "Check if a bucket has no empty slots." + [bucket] + (>= (count (remove nil? bucket)) bucket-size)) + +(defn- add-to-bucket + "Try to add fingerprint to bucket. Returns updated bucket or nil if full." + [bucket fingerprint] + (when-not (bucket-full? bucket) + (let [empty-idx (first (keep-indexed (fn [idx val] + (when (nil? val) idx)) + bucket))] + (when empty-idx + (assoc bucket empty-idx fingerprint))))) + +(defn- remove-from-bucket + "Remove fingerprint from bucket if present." + [bucket fingerprint] + (if-let [idx (first (keep-indexed (fn [idx val] + (when (= val fingerprint) idx)) + bucket))] + (assoc bucket idx nil) + bucket)) + +(defn- bucket-contains? + "Check if bucket contains the fingerprint." + [bucket fingerprint] + (some #(= % fingerprint) bucket)) + +(defn- pick-random-entry + "Pick a random entry from a bucket for eviction." + [bucket] + (let [entries (remove nil? bucket)] + (when (seq entries) + (rand-nth entries)))) + +(defrecord CuckooFilter [buckets num-buckets fingerprint-bits count]) + +(defn create-filter + "Create a new cuckoo filter with 16-bit fingerprints. + + Parameters: + - expected-items: Expected number of items to store" + [expected-items] + (let [;; Size for ~95% load factor + num-buckets (-> expected-items + (/ (* bucket-size 0.95)) + Math/ceil + long + (max 16)) ; Minimum 16 buckets + ;; Initialize buckets with empty vectors + buckets (vec (repeat num-buckets + (vec (repeat bucket-size nil))))] + (->CuckooFilter buckets num-buckets 16 0))) ; Always use 16-bit fingerprints + +(defn- relocate-and-add + "Try to relocate existing items to make room for new fingerprint." + [{:keys [buckets num-buckets] :as filter} bucket-idx fingerprint] + (loop [kicks 0 + fp fingerprint + idx bucket-idx + buckets' buckets] + (if (>= kicks max-kicks) + nil ; Failed to insert + (let [bucket (get buckets' idx) + victim (pick-random-entry bucket)] + (if-not victim + ;; Found empty slot + (let [updated-bucket (add-to-bucket bucket fp)] + (when updated-bucket + (assoc filter :buckets (assoc buckets' idx updated-bucket) + :count (inc (:count filter))))) + ;; Evict victim and try to relocate it + (let [updated-bucket (-> bucket + (remove-from-bucket victim) + (add-to-bucket fp)) + buckets'' (assoc buckets' idx updated-bucket) + alt-idx (mod (bit-xor idx (hash victim)) num-buckets)] + (if-let [alt-bucket' (add-to-bucket (get buckets'' alt-idx) victim)] + ;; Successfully relocated victim + (assoc filter :buckets (assoc buckets'' alt-idx alt-bucket') + :count (inc (:count filter))) + ;; Continue relocating + (recur (inc kicks) victim alt-idx buckets'')))))))) + +(defn- add-item-internal + "Internal add-item for single filter." + [{:keys [buckets num-buckets] :as filter} sha256-hash] + (let [[fp b1 b2] (compute-hashes sha256-hash num-buckets) + b1-bucket (get buckets b1) + b2-bucket (get buckets b2)] + (cond + ;; Try primary bucket + (not (bucket-full? b1-bucket)) + (let [updated (add-to-bucket b1-bucket fp)] + (assoc filter :buckets (assoc buckets b1 updated) + :count (inc (:count filter)))) + + ;; Try alternate bucket + (not (bucket-full? b2-bucket)) + (let [updated (add-to-bucket b2-bucket fp)] + (assoc filter :buckets (assoc buckets b2 updated) + :count (inc (:count filter)))) + + ;; Try relocating + :else + (relocate-and-add filter b1 fp)))) + +(defn- contains-hash-internal? + "Internal contains-hash? for single filter." + [{:keys [buckets num-buckets]} sha256-hash] + (let [[fp b1 b2] (compute-hashes sha256-hash num-buckets)] + (or (bucket-contains? (get buckets b1) fp) + (bucket-contains? (get buckets b2) fp)))) + +(defn- remove-item-internal + "Internal remove-item for single filter." + [{:keys [buckets num-buckets count] :as filter} sha256-hash] + (let [[fp b1 b2] (compute-hashes sha256-hash num-buckets) + b1-bucket (get buckets b1) + b2-bucket (get buckets b2)] + (cond + (bucket-contains? b1-bucket fp) + (assoc filter :buckets (assoc buckets b1 (remove-from-bucket b1-bucket fp)) + :count (dec count)) + + (bucket-contains? b2-bucket fp) + (assoc filter :buckets (assoc buckets b2 (remove-from-bucket b2-bucket fp)) + :count (dec count)) + + :else filter))) + +;; Serialization for persistence + +(defn- encode-buckets + "Encode buckets to a compact EDN format." + [buckets _fingerprint-bits] + {:buckets buckets + :format :edn}) + +(defn- decode-buckets + "Decode buckets from persisted format." + [encoded] + (:buckets encoded)) + +;; Chain management functions + +(defn serialize-single + "Serialize a single filter. Public for testing." + [{:keys [buckets num-buckets fingerprint-bits count]}] + {:f fingerprint-bits + :buckets (encode-buckets buckets fingerprint-bits) + :num-buckets num-buckets + :count count}) + +(defn single-filter->chain + "Convert a single filter to chain format. Public for testing." + [filter] + {:version 2 + :t nil ; Will be set when persisting + :filters [(serialize-single filter)]}) + +(defn- deserialize-single + "Deserialize a single filter." + [{:keys [buckets num-buckets f count]}] + (let [decoded-buckets (decode-buckets buckets)] + (->CuckooFilter decoded-buckets num-buckets f count))) + +(defn add-item-chain + "Add an item to the filter chain, creating new filter if needed. + Proactively creates new filter when current filter reaches 90% capacity." + [{:keys [filters] :as filter-chain} sha256-hash] + (loop [idx 0] + (if (< idx (count filters)) + (let [current-filter (deserialize-single (nth filters idx))] + (if-let [updated (add-item-internal current-filter sha256-hash)] + (let [updated-serialized (serialize-single updated) + ;; Check if this filter is now at 90% capacity + load-factor (/ (double (:count updated)) + (* (:num-buckets updated) bucket-size)) + ;; If last filter and at 90% capacity, proactively add new empty filter + filters' (if (and (= idx (dec (count filters))) + (>= load-factor 0.9)) + (conj (assoc filters idx updated-serialized) + (serialize-single (create-filter default-filter-capacity))) + (assoc filters idx updated-serialized))] + (assoc filter-chain :filters filters')) + (recur (inc idx)))) + ;; All full, add new filter + (let [new-filter (create-filter default-filter-capacity) + updated (add-item-internal new-filter sha256-hash)] + (if updated + (assoc filter-chain :filters (conj filters (serialize-single updated))) + filter-chain))))) + +(defn contains-hash-chain? + "Check if hash exists in any filter in the chain." + [{:keys [filters]} sha256-hash] + (some #(contains-hash-internal? (deserialize-single %) sha256-hash) filters)) + +(defn remove-item-chain + "Remove item from whichever filter contains it, removing empty filters." + [{:keys [filters] :as filter-chain} sha256-hash] + (let [updated-filters + (vec (for [f filters] + (let [filter (deserialize-single f)] + (if (contains-hash-internal? filter sha256-hash) + (serialize-single (remove-item-internal filter sha256-hash)) + f)))) + ;; Remove empty filters (count = 0), but keep at least one + cleaned-filters (vec (remove #(zero? (:count %)) updated-filters)) + ;; Ensure we always have at least one filter + final-filters (if (empty? cleaned-filters) + [(serialize-single (create-filter default-filter-capacity))] + cleaned-filters)] + (assoc filter-chain :filters final-filters))) + +(defn batch-add-chain + "Add multiple items to the filter chain." + [filter-chain sha256-hashes] + (reduce add-item-chain filter-chain sha256-hashes)) + +(defn batch-remove-chain + "Remove multiple items from the filter chain." + [filter-chain sha256-hashes] + (reduce remove-item-chain filter-chain sha256-hashes)) + +(defn create-filter-chain + "Create a new filter chain with an initial empty filter." + [] + {:version 2 + :t nil + :filters [(serialize-single (create-filter default-filter-capacity))]}) + +(defn serialize + "Serialize filter chain for storage." + [filter-chain] + filter-chain) + +(defn deserialize + "Deserialize filter from storage." + [data] + (cond + (= (:version data) 2) data + (nil? (:version data)) data ; Support test data without version + :else (throw (ex-info "Unsupported filter version" {:version (:version data)})))) + +;; Storage integration + +(defn filter-storage-path + "Get the storage path for a branch's cuckoo filter. + Returns path like 'ledger-name/index/cuckoo/branch.json'." + [ledger-alias branch-name] + (str ledger-alias "/index/cuckoo/" branch-name ".json")) + +(defn write-filter + "Persist a cuckoo filter to storage using explicit filename." + [index-catalog ledger-alias branch-name t filter] + (go-try + (when (and index-catalog (:storage index-catalog) filter) + (let [serialized (-> (serialize filter) + (assoc :t t)) + json-str (json/stringify serialized) + bytes (bytes/string->UTF8 json-str) + filename (filter-storage-path ledger-alias branch-name) + ;; Get the actual store from catalog (usually the default store) + storage (:storage index-catalog) + store (if (satisfies? store/ByteStore storage) + storage + (store/get-content-store storage ::store/default))] + (string bytes)) + data (json/parse json-str true)] + (deserialize data))) + (catch* e + ;; Filter doesn't exist or error reading it + (log/debug "Error reading filter:" (ex-message e)) + nil))) + (log/debug "Skipping filter read - missing requirements" + "catalog:" (boolean index-catalog) + "storage:" (boolean (:storage index-catalog)) + "ledger:" ledger-alias + "branch:" branch-name)))) + +;; Cleanup functions + +(defn delete-filter + "Delete a cuckoo filter file for a branch." + [index-catalog ledger-alias branch-name] + (go-try + (when (and index-catalog (:storage index-catalog)) + (let [filename (filter-storage-path ledger-alias branch-name) + ;; Get the actual store from catalog (usually the default store) + storage (:storage index-catalog) + store (if (satisfies? store/EraseableStore storage) + storage + (store/get-content-store storage ::store/default))] + (try* + (> files + (filter #(str/ends-with? % ".json")) + (map #(-> % + (str/replace cuckoo-path "") + (str/replace ".json" ""))) + distinct + vec))))) + +(defn load-other-branch-filters + "Load all branch filters except the current one." + [index-catalog ledger-alias current-branch] + (go-try + (let [storage (:storage index-catalog) + branches ( s + crypto/sha2-256 ; Returns hex string + alphabase/hex->bytes ; Convert hex to bytes + alphabase/bytes->base32)) ; Already lowercase from fluree crypto + +(deftest chain-growth-test + (testing "Chain grows when capacity is exceeded" + ;; Create a filter chain with very small capacity for testing + ;; Note: minimum bucket count is 16, so actual capacity is 16*4*0.95 = ~60 items + (let [small-capacity 10 + small-filter (cuckoo/create-filter small-capacity) + chain (cuckoo/single-filter->chain small-filter) + items (map #(test-hash (str "item-" %)) (range 100)) ; Use 100 items to ensure overflow + ;; Add items until we exceed capacity and force chain growth + chain-with-items (reduce cuckoo/add-item-chain chain items) + filter-count (count (:filters chain-with-items))] + + (testing "Multiple filters created when capacity exceeded" + (is (> filter-count 1) + (str "Should have multiple filters, got " filter-count))) + + (testing "All items are findable across chain" + (doseq [item items] + (is (cuckoo/contains-hash-chain? chain-with-items item) + (str "Should find " item " in chain")))) + + (testing "Items not added are not found" + (is (not (cuckoo/contains-hash-chain? chain-with-items (test-hash "not-in-chain"))))) + + (testing "Remove items across multiple filters" + (let [items-to-remove (take 50 items) + items-to-keep (drop 50 items) + chain-after-remove (reduce cuckoo/remove-item-chain + chain-with-items + items-to-remove)] + + ;; Removed items should not be found + (doseq [item items-to-remove] + (is (not (cuckoo/contains-hash-chain? chain-after-remove item)) + (str item " should be removed"))) + + ;; Kept items should still be found + (doseq [item items-to-keep] + (is (cuckoo/contains-hash-chain? chain-after-remove item) + (str item " should still be present"))) + + ;; Empty filters should be removed + (testing "Empty filters are cleaned up" + (let [filters-after (:filters chain-after-remove)] + (is (every? #(pos? (:count %)) filters-after) + "No empty filters should remain")))))))) + +(deftest collision-handling-test + (testing "Handle items with similar patterns" + (let [chain (cuckoo/create-filter-chain) + ;; Create items that might collide + base-items [(test-hash "aaaa") (test-hash "aaab") (test-hash "aaac") + (test-hash "aaad") (test-hash "aaae") + (test-hash "baaa") (test-hash "baab") (test-hash "baac") + (test-hash "baad") (test-hash "baae")] + chain-with-items (reduce cuckoo/add-item-chain chain base-items)] + + (testing "All similar items are stored" + (doseq [item base-items] + (is (cuckoo/contains-hash-chain? chain-with-items item) + (str item " should be found")))) + + (testing "Removing one item doesn't affect similar items" + (let [item-to-remove (test-hash "aaaa") + similar-items (remove #{item-to-remove} base-items) + chain-after-remove (cuckoo/remove-item-chain chain-with-items item-to-remove)] + + (is (not (cuckoo/contains-hash-chain? chain-after-remove item-to-remove)) + "Removed item should not be found") + + (doseq [item similar-items] + (is (cuckoo/contains-hash-chain? chain-after-remove item) + (str item " should still be found after removing similar item")))))))) + +(deftest chain-serialization-test + (testing "Chain serialization and deserialization" + (let [small-filter (cuckoo/create-filter 5) + chain (cuckoo/single-filter->chain small-filter) + items (map #(test-hash (str "serialize-" %)) (range 20)) + chain-with-items (reduce cuckoo/add-item-chain chain items) + serialized (cuckoo/serialize chain-with-items) + deserialized (cuckoo/deserialize serialized)] + + (testing "Deserialized chain has correct structure" + (is (= (:version deserialized) 2))) + + (testing "All items findable after deserialization" + (doseq [item items] + (is (cuckoo/contains-hash-chain? deserialized item) + (str item " should be found after deserialization")))) + + (testing "Items can be added to deserialized chain" + (let [new-item (test-hash "after-deserialize") + updated (cuckoo/add-item-chain deserialized new-item)] + (is (cuckoo/contains-hash-chain? updated new-item)) + (is (every? #(cuckoo/contains-hash-chain? updated %) items) + "Original items still present")))))) + +(deftest proactive-growth-test + (testing "Chain grows proactively at 90% capacity" + (let [;; Create chain with small filter for testing + ;; Minimum is 16 buckets * 4 = 64 capacity + ;; 90% of 64 = 57.6, so should trigger growth at 58 items + small-filter (cuckoo/create-filter 20) + chain (cuckoo/single-filter->chain small-filter) + ;; Add items to get close to 90% capacity (57 items) + items (map #(test-hash (str "proactive-" %)) (range 57)) + chain-with-items (reduce cuckoo/add-item-chain chain items)] + + (testing "Should have only one filter before 90%" + (is (= 1 (count (:filters chain-with-items))))) + + ;; Add one more item to trigger proactive growth (58th item = >90%) + (let [chain-after (cuckoo/add-item-chain chain-with-items (test-hash "trigger-growth"))] + (testing "Should proactively create second filter at 90% capacity" + (is (= 2 (count (:filters chain-after)))) + (is (= 58 (-> chain-after cuckoo/get-chain-stats :total-count)))) + + (testing "All items still findable after growth" + (doseq [item items] + (is (cuckoo/contains-hash-chain? chain-after item))) + (is (cuckoo/contains-hash-chain? chain-after (test-hash "trigger-growth")))))))) + +(deftest edge-cases-test + (testing "Edge cases for chain operations" + (testing "Empty chain operations" + (let [empty-chain (cuckoo/create-filter-chain)] + (is (not (cuckoo/contains-hash-chain? empty-chain (test-hash "anything")))) + ;; Removing from empty chain should return the same chain structure + (let [after-remove (cuckoo/remove-item-chain empty-chain (test-hash "anything"))] + (is (= (:version after-remove) (:version empty-chain)) + "Version should be unchanged") + (is (= (count (:filters after-remove)) (count (:filters empty-chain))) + "Filter count should be unchanged")))) + + (testing "Single item chain" + (let [chain (cuckoo/create-filter-chain) + single-chain (cuckoo/add-item-chain chain (test-hash "only-item"))] + (is (cuckoo/contains-hash-chain? single-chain (test-hash "only-item"))) + + (let [empty-again (cuckoo/remove-item-chain single-chain (test-hash "only-item"))] + ;; After removing the only item, should have empty filter + (is (or (empty? (:filters empty-again)) + (zero? (-> empty-again :filters first :count))) + "Chain should be empty after removing only item")))))) \ No newline at end of file diff --git a/test/fluree/db/indexer/cuckoo_test.clj b/test/fluree/db/indexer/cuckoo_test.clj new file mode 100644 index 0000000000..a26da48a6d --- /dev/null +++ b/test/fluree/db/indexer/cuckoo_test.clj @@ -0,0 +1,261 @@ +(ns fluree.db.indexer.cuckoo-test + "Test suite for cuckoo filter functionality." + (:require [alphabase.core :as alphabase] + [babashka.fs :refer [with-temp-dir]] + [clojure.java.io :as io] + [clojure.string :as str] + [clojure.test :refer [deftest testing is]] + [fluree.crypto :as crypto] + [fluree.db.api :as fluree] + [fluree.db.indexer.cuckoo :as cuckoo] + [fluree.db.util.json :as json])) + +(defn- test-hash + "Create a valid base32 hash for testing from a string." + [s] + (-> s + crypto/sha2-256 ; Returns hex string + alphabase/hex->bytes ; Convert hex to bytes + alphabase/bytes->base32)) ; Already lowercase from fluree crypto + +(deftest create-filter-test + (testing "Create filter with expected capacity" + (let [filter (cuckoo/create-filter 1000)] + (is (= 16 (:fingerprint-bits filter))) + (is (> (:num-buckets filter) 0)) + (is (= 0 (:count filter)))))) + +(deftest add-and-contains-test + (testing "Add item and check membership" + (let [filter (cuckoo/create-filter 100) + hash1 (test-hash "abc123def456") + filter' (cuckoo/add-item filter hash1)] + (is filter') + (is (= 1 (:count filter'))) + (is (cuckoo/contains-hash? filter' hash1)) + (is (not (cuckoo/contains-hash? filter (test-hash "nonexistent")))))) + + (testing "Batch add items" + (let [filter (cuckoo/create-filter 100) + hashes [(test-hash "hash1") (test-hash "hash2") (test-hash "hash3")] + filter' (cuckoo/batch-add filter hashes)] + (is (= 3 (:count filter'))) + (is (every? #(cuckoo/contains-hash? filter' %) hashes))))) + +(deftest remove-item-test + (testing "Remove existing item" + (let [filter (cuckoo/create-filter 100) + hash1 (test-hash "removeme") + filter' (-> filter + (cuckoo/add-item hash1) + (cuckoo/remove-item hash1))] + (is (= 0 (:count filter'))) + (is (not (cuckoo/contains-hash? filter' hash1))))) + + (testing "Remove non-existent item" + (let [filter (cuckoo/create-filter 100) + filter' (cuckoo/remove-item filter (test-hash "nonexistent"))] + (is (= filter filter'))))) + +(deftest serialization-test + (testing "Serialize and deserialize filter" + (let [filter (-> (cuckoo/create-filter 100) + (cuckoo/add-item (test-hash "hash1")) + (cuckoo/add-item (test-hash "hash2"))) + chain (cuckoo/single-filter->chain filter) + serialized (cuckoo/serialize chain) + restored (cuckoo/deserialize serialized)] + ;; Restored is in chain format, get stats from it + (let [restored-stats (cuckoo/get-chain-stats restored)] + (is (= (:count filter) (:total-count restored-stats))) + (is (= (:fingerprint-bits filter) (:fingerprint-bits restored-stats)))) + ;; contains-hash? works with both formats + (is (cuckoo/contains-hash? restored (test-hash "hash1"))) + (is (cuckoo/contains-hash? restored (test-hash "hash2")))))) + +(deftest metrics-test + (testing "Load factor calculation" + (let [filter (-> (cuckoo/create-filter 100) + (cuckoo/add-item (test-hash "item1")) + (cuckoo/add-item (test-hash "item2"))) + load (cuckoo/load-factor filter)] + (is (> load 0)) + (is (< load 1)))) + + (testing "Filter statistics" + (let [filter (cuckoo/create-filter 100) + stats (cuckoo/filter-stats filter)] + (is (contains? stats :count)) + (is (contains? stats :capacity)) + (is (contains? stats :load-factor)) + (is (contains? stats :estimated-fpr))))) + +(deftest realistic-address-test + (testing "Works with realistic Fluree index addresses" + (let [filter (cuckoo/create-filter 1000) + ;; Simulate realistic index segment addresses with real base32 hashes + hash1 (test-hash "segment1") + hash2 (test-hash "segment2") + hash3 (test-hash "segment3") + hash4 (test-hash "segment4") + hash5 (test-hash "segment5") + addresses [(str "fluree:file://ledger/index/spot/" hash1 ".json") + (str "fluree:file://ledger/index/post/" hash2 ".json") + (str "fluree:file://ledger/index/opst/" hash3 ".json") + (str "ledger/index/tspo/" hash4 ".json") + hash5] ; Just the hash itself + filter' (reduce cuckoo/add-item filter addresses)] + (is (= 5 (:count filter'))) + (is (every? #(cuckoo/contains-hash? filter' %) addresses)) + (let [not-in-filter-hash (test-hash "notinfilter")] + (is (not (cuckoo/contains-hash? filter' + (str "fluree:file://ledger/index/spot/" not-in-filter-hash ".json"))))))) + + (testing "Filter capacity handling" + (let [small-filter (cuckoo/create-filter 10) + ;; Try to add more items than initial capacity + many-items (map #(test-hash (str "item-" %)) (range 50)) + filter' (reduce (fn [f item] + (or (cuckoo/add-item f item) f)) + small-filter many-items) + added-count (:count filter')] + ;; Should be able to add items up to ~95% capacity + (is (> added-count 8)) + ;; All successfully added items should be found + (let [added-items (take added-count many-items)] + (is (every? #(cuckoo/contains-hash? filter' %) added-items)))))) + +(deftest false-positive-rate-test + (testing "No false negatives with moderate dataset" + (let [filter (cuckoo/create-filter 500) + items (map #(test-hash (str "item" %)) (range 100)) + filter' (reduce cuckoo/add-item filter items)] + ;; All added items must be found (no false negatives) + (is (every? #(cuckoo/contains-hash? filter' %) items)))) + + (testing "Acceptable false positive rate" + (let [filter (cuckoo/create-filter 500) + items (map #(test-hash (str "item" %)) (range 250)) + filter' (reduce cuckoo/add-item filter items) + non-items (map #(test-hash (str "nonitem" %)) (range 1000)) + fps (count (clojure.core/filter #(cuckoo/contains-hash? filter' %) non-items)) + fp-rate (/ fps 1000.0)] + ;; With 16-bit fingerprints, expect low FPR + (is (< fp-rate 0.05) "False positive rate should be less than 5%")))) + +(deftest cross-branch-checking-test + (testing "Multiple filters can check for shared items" + (let [;; Simulate three branch filters using chains + main-chain (cuckoo/create-filter-chain) + branch1-chain (cuckoo/create-filter-chain) + branch2-chain (cuckoo/create-filter-chain) + + ;; Shared segments (in all branches) + shared [(test-hash "segment1") (test-hash "segment2") (test-hash "segment3")] + + ;; Branch-specific segments + main-only [(test-hash "main1") (test-hash "main2")] + branch1-only [(test-hash "branch1-1") (test-hash "branch1-2")] + + ;; Add to filters + main-chain' (cuckoo/batch-add-chain main-chain (concat shared main-only)) + branch1-chain' (cuckoo/batch-add-chain branch1-chain (concat shared branch1-only)) + branch2-chain' (cuckoo/batch-add-chain branch2-chain shared) + + other-filters [branch1-chain' branch2-chain']] + + ;; Verify main filter has the expected items + (is (= 5 (-> main-chain' cuckoo/get-chain-stats :total-count))) + + ;; Shared segments should be found in other branches + (is (every? #(cuckoo/any-branch-uses? other-filters %) shared)) + + ;; Main-only segments should not be found in other branches + (is (not-any? #(cuckoo/any-branch-uses? other-filters %) main-only))))) + +(deftest ^:integration cuckoo-filter-file-integration-test + (testing "Cuckoo filter file is created with index and correctly tracks segments" + (with-temp-dir [storage-path {}] + (let [storage-str (str storage-path) + ;; Create connection with file storage and low indexing threshold + conn @(fluree/connect-file {:storage-path storage-str + :defaults {:indexing {:reindex-min-bytes 100 + :indexing-disabled false}}}) + _ @(fluree/create conn "cuckoo-test")] + + (testing "Initial state - cuckoo filter should exist even for empty ledger" + (let [filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json")] + (is (.exists filter-path) "Cuckoo filter file should exist after ledger creation"))) + + (testing "After adding data and triggering index" + ;; Add some data to trigger indexing + (let [_ @(fluree/insert! conn "cuckoo-test" + [{"@context" {"ex" "http://example.org/"} + "@id" "ex:alice" + "@type" "ex:Person" + "ex:name" "Alice"} + {"@id" "ex:bob" + "@type" "ex:Person" + "ex:name" "Bob"}]) + ;; Trigger manual indexing to ensure index files are created + _ @(fluree/trigger-index conn "cuckoo-test" {:block? true}) + + ;; List all index segment files + index-dir (io/file storage-str "cuckoo-test" "index") + index-files (when (.exists index-dir) + (->> (file-seq index-dir) + (filter #(.isFile ^java.io.File %)) + (filter #(str/ends-with? (.getName ^java.io.File %) ".json")) + ;; Exclude cuckoo filter directory + (remove #(str/includes? (.getPath ^java.io.File %) "/cuckoo/")) + (map #(.getName ^java.io.File %)) + (remove #(str/includes? % "root")) ; Exclude root files + (remove #(str/includes? % "commit")) ; Exclude commit files + vec)) + + ;; Load the cuckoo filter to check contents + filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json") + filter-data (when (.exists filter-path) + (-> filter-path + slurp + (json/parse true) + cuckoo/deserialize))] + + (is (seq index-files) "Index files should have been created") + (is filter-data "Cuckoo filter should be readable") + + (when (and (seq index-files) filter-data) + (testing "Filter contains actual index segment addresses" + ;; Check that each index file is in the cuckoo filter + ;; Using the file name as the address (simplified check) + (doseq [file-name index-files] + (is (cuckoo/contains-hash? filter-data file-name) + (str "Cuckoo filter should contain index file: " file-name)))) + + (testing "Filter correctly rejects non-existent segments" + ;; Use valid base32 hashes that aren't in the filter + (let [fake-segments [(str (test-hash "fake-segment-1") ".json") + (str (test-hash "nonexistent-2") ".json") + (str (test-hash "imaginary-3") ".json")]] + (doseq [fake-seg fake-segments] + (is (not (cuckoo/contains-hash? filter-data fake-seg)) + (str "Cuckoo filter should not contain fake segment: " fake-seg)))))))) + + (testing "After creating a branch" + (let [;; Create a new branch with full spec + _ @(fluree/create-branch! conn "cuckoo-test:feature" "cuckoo-test:main") + + ;; Check that branch has its own cuckoo filter + branch-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "feature.json")] + + (is (.exists branch-filter-path) + "Branch should have its own cuckoo filter file") + + (when (.exists branch-filter-path) + (let [main-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json") + main-filter (-> main-filter-path slurp cuckoo/deserialize) + branch-filter (-> branch-filter-path slurp cuckoo/deserialize)] + + ;; Branch filter should start as a copy of main's filter + (is (= (:count main-filter) (:count branch-filter)) + "Branch filter should have same count as main filter initially"))))))))) \ No newline at end of file diff --git a/test/fluree/db_test.cljc b/test/fluree/db_test.cljc index 592c3a2a1d..b21c399b1e 100644 --- a/test/fluree/db_test.cljc +++ b/test/fluree/db_test.cljc @@ -1108,7 +1108,7 @@ ;; For now, just check we have some commit files (is (pos? (count commit-files)) (str "Should have commit files, got: " (count commit-files))))) - (is (= ["garbage" "opst" "post" "root" "spot" "tspo"] + (is (= ["cuckoo" "garbage" "opst" "post" "root" "spot" "tspo"] (sort (async/ Date: Tue, 2 Sep 2025 12:51:39 -0400 Subject: [PATCH 04/13] add merge, rebase, and reset operations for branch management; enhance S3 connection options --- src/fluree/db/api.cljc | 234 +++++++++++++++++++++++++++++- src/fluree/db/flake/flake_db.cljc | 67 +++++---- 2 files changed, 268 insertions(+), 33 deletions(-) diff --git a/src/fluree/db/api.cljc b/src/fluree/db/api.cljc index 9c337314b7..59610c8857 100644 --- a/src/fluree/db/api.cljc +++ b/src/fluree/db/api.cljc @@ -12,6 +12,7 @@ [fluree.db.json-ld.iri :as iri] [fluree.db.json-ld.policy :as policy] [fluree.db.ledger :as ledger] + [fluree.db.merge :as merge] [fluree.db.nameservice.query :as ns-query] [fluree.db.query.api :as query-api] [fluree.db.query.fql.parse :as parse] @@ -185,10 +186,18 @@ * For LocalStack: 'http://localhost:4566' * For MinIO: 'http://localhost:9000' - s3-prefix (optional): The prefix within the bucket + - s3-read-timeout-ms (optional): Per-request read timeout (default 20000) + - s3-write-timeout-ms (optional): Per-request write timeout (default 60000) + - s3-list-timeout-ms (optional): Per-request list timeout (default 20000) + - s3-max-retries (optional): Max retry attempts on transient errors (default 4) + - s3-retry-base-delay-ms (optional): Base backoff delay in ms (default 150) + - s3-retry-max-delay-ms (optional): Max backoff delay in ms (default 2000) - parallelism (optional): Number of parallel operations (default: 4) - cache-max-mb (optional): Maximum memory for caching in MB (default: 1000) - defaults (optional): Default options for ledgers created with this connection" - ([{:keys [s3-bucket s3-prefix s3-endpoint parallelism cache-max-mb defaults], + ([{:keys [s3-bucket s3-prefix s3-endpoint parallelism cache-max-mb defaults + s3-read-timeout-ms s3-write-timeout-ms s3-list-timeout-ms + s3-max-retries s3-retry-base-delay-ms s3-retry-max-delay-ms], :or {parallelism 4, cache-max-mb 1000}}] (when-not s3-bucket (throw (ex-info "S3 bucket name is required for S3 connection" @@ -203,7 +212,13 @@ "@type" "Storage" "s3Bucket" s3-bucket "s3Endpoint" s3-endpoint} - s3-prefix (assoc "s3Prefix" s3-prefix)) + s3-prefix (assoc "s3Prefix" s3-prefix) + s3-read-timeout-ms (assoc "s3ReadTimeoutMs" s3-read-timeout-ms) + s3-write-timeout-ms (assoc "s3WriteTimeoutMs" s3-write-timeout-ms) + s3-list-timeout-ms (assoc "s3ListTimeoutMs" s3-list-timeout-ms) + s3-max-retries (assoc "s3MaxRetries" s3-max-retries) + s3-retry-base-delay-ms (assoc "s3RetryBaseDelayMs" s3-retry-base-delay-ms) + s3-retry-max-delay-ms (assoc "s3RetryMaxDelayMs" s3-retry-max-delay-ms)) (cond-> {"@id" "connection" "@type" "Connection" "parallelism" parallelism @@ -237,9 +252,10 @@ ([conn ledger-alias opts] (validate-connection conn) ;; Disallow branch specification in ledger name during creation - (when (str/includes? ledger-alias ":") + (when (or (str/includes? ledger-alias ":") + (str/includes? ledger-alias "@")) (throw (ex-info (str "Ledger name cannot contain ':' character. " - "Branches must be created separately. " + "'@' is reserved for time travel. " "Provided: " ledger-alias) {:error :db/invalid-ledger-name :ledger-alias ledger-alias}))) @@ -620,6 +636,216 @@ (promise-wrap (api.branch/rename-branch! conn old-branch-spec new-branch-spec))) +;; Branch operations (merge, rebase, reset) + +(defn merge! + "Merges commits from source branch into target branch. + + Updates the target branch with changes from the source branch. + Supports fast-forward, squash, and regular merge modes. + + Parameters: + conn - Connection object + from - Source branch spec (e.g., 'ledger:feature') + to - Target branch spec (e.g., 'ledger:main') + opts - Map with optional merge options: + :ff - Fast-forward behavior (default :auto) + :auto - Fast-forward when possible + :only - Only allow fast-forward (fail otherwise) + :never - Never fast-forward, always create merge commit + :squash? - Combine all commits into one (default false) + :schema-aware - Use schema rules to resolve + function - Custom conflict resolution + :preview? - Dry run without making changes (default false) + + Returns promise resolving to: + {:status :success|:conflict|:error + :operation :merge + :from '...' :to '...' + :strategy '3-way' + :commits {:merged [...] :conflicts [...]} + :new-commit 'sha'} + + Phase 2 - Not yet implemented." + ([conn from to] + (merge! conn from to {})) + ([conn from to opts] + (validate-connection conn) + (promise-wrap + (merge/merge! conn from to opts)))) + +(defn rebase! + "Rebases source branch onto target branch (updates source branch). + + NOTE: True rebase is not yet implemented. Currently redirects to merge! + which updates the target branch instead of source branch. + + Parameters: + conn - Connection object + from - Source branch spec to rebase (will be updated) + to - Target branch spec to rebase onto (unchanged) + opts - Map with optional rebase options: + :ff - Fast-forward behavior (default :auto) + :auto - Fast-forward when possible + :only - Only allow fast-forward (fail otherwise) + :never - Never fast-forward, always replay + :squash? - Combine all commits into one (default false) + :atomic? - All-or-nothing vs apply-until-conflict (default true) + :selector - Which commits to include (default nil = all after LCA) + nil - All commits after LCA + {:t {:from 42 :to 44}} - Specific t-value range + {:t {:from 42 :until :conflict}} - Until conflict + {:shas ['sha1' 'sha2']} - Specific commits + :preview? - Dry run without making changes (default false) + + Returns promise resolving to: + {:status :success|:conflict|:error + :operation :rebase + :from '...' :to '...' + :strategy 'fast-forward'|'squash'|'replay' + :commits {:applied [...] :skipped [...] :conflicts [...]} + :new-commit 'sha'} + + Phase 1 implements: fast-forward and squash modes. + Phase 2 will add: cherry-pick and non-atomic modes." + ([conn from to] + (rebase! conn from to {})) + ([conn from to opts] + (validate-connection conn) + (promise-wrap + (merge/rebase! conn from to opts)))) + +(defn reset-branch! + "Resets a branch to a previous state. + + Two modes available: + - Safe mode (default): Creates new commit reverting to target state + - Hard mode: Moves branch pointer (rewrites history) + + Parameters: + conn - Connection object + branch - Target branch to reset (e.g., 'ledger:main') + to - Target state: + {:t 90} - Reset to transaction t-value + {:sha 'abc123'} - Reset to specific commit SHA + opts - Map with optional reset options: + :mode - Reset mode (default :safe) + :safe - Create revert commit (non-destructive) + :hard - Move branch pointer (destructive) + :archive - How to archive on hard reset (default {:as :tag}) + {:as :tag :name 'backup'} - Create tag at old HEAD + {:as :branch :name 'backup'} - Create branch at old HEAD + {:as :none} - No archive (requires force?) + :force? - Required for hard reset without archive (default false) + :message - Commit message for safe mode (auto-generated if not provided) + :preview? - Dry run without making changes (default false) + + Returns promise resolving to: + {:status :success|:error + :operation :reset + :branch '...' + :mode :safe|:hard + :reset-to {:t 90}|{:sha '...'} + :new-commit 'sha' ; For safe mode + :archived {:type :tag :name '...'} ; For hard mode + :previous-head 'sha'} + + Phase 1 implements: safe reset only. + Phase 2 will add: hard reset with archiving." + ([conn branch to] + (reset-branch! conn branch to {})) + ([conn branch to opts] + (validate-connection conn) + (promise-wrap + (merge/reset-branch! conn branch to opts)))) + +;; Legacy API - Deprecated, use rebase! instead +(defn merge-branches! + "DEPRECATED - Use rebase! instead. + + Legacy merge API maintained for backwards compatibility. + Maps to rebase! with appropriate options." + ([conn source-branch-spec target-branch-spec] + (merge-branches! conn source-branch-spec target-branch-spec {})) + ([conn source-branch-spec target-branch-spec opts] + (validate-connection conn) + ;; Map legacy API to new rebase! API + (let [strategy (:strategy opts :auto) + new-opts (clojure.core/merge + (case strategy + :fast-forward {:ff :only} + :flatten {:squash? true} + :auto {:ff :auto} + :no-ff {:ff :never} + {}) + (dissoc opts :strategy)) + result @(rebase! conn source-branch-spec target-branch-spec new-opts)] + ;; Map new response format to legacy format for backwards compatibility + (promise-wrap + (go-try + (if (= :success (:status result)) + (assoc result + :type (case (:strategy result) + "fast-forward" :fast-forward + "squash" :flatten + "replay" :rebase + :rebase)) + result)))))) + +(defn branch-divergence + "Analyzes divergence between two branches. + + Parameters: + conn - Connection object + branch1-spec - First branch spec + branch2-spec - Second branch spec + + Returns promise resolving to divergence analysis including: + :common-ancestor - Commit ID of common ancestor + :branch1-ahead - Number of commits branch1 is ahead + :branch2-ahead - Number of commits branch2 is ahead + :can-fast-forward - Boolean if one can fast-forward to the other" + [conn branch1-spec branch2-spec] + (validate-connection conn) + (promise-wrap + (merge/branch-divergence conn branch1-spec branch2-spec))) + +(defn branch-graph + "Returns a graph representation of branches and their relationships. + + Useful for visualizing branch history and relationships in UIs. + + Parameters: + conn - Connection object + ledger-spec - Ledger specification (e.g., 'myledger') + opts - Options map: + :format - Output format (default :json) + :json - Structured data for UI rendering + :ascii - ASCII art representation + :depth - Number of commits to show (default 20) + integer - Show N most recent commits + :all - Show entire history + :branches - Which branches to include (default :all) + :all - Include all branches + [\"main\" \"feature\"] - Only specified branches + + Returns promise resolving to: + - For :json format: Map with :branches, :commits, and :merges + - For :ascii format: String with ASCII art graph + + Example: + ;; Get JSON data for UI + @(branch-graph conn \"mydb\" {:format :json :depth 50}) + + ;; Get ASCII visualization + @(branch-graph conn \"mydb\" {:format :ascii :branches [\"main\" \"feature\"]})" + ([conn ledger-spec] + (branch-graph conn ledger-spec {})) + ([conn ledger-spec opts] + (validate-connection conn) + (promise-wrap + (merge/branch-graph conn ledger-spec opts)))) + ;; db operations (defn db diff --git a/src/fluree/db/flake/flake_db.cljc b/src/fluree/db/flake/flake_db.cljc index 08d3560f9f..76b61003b5 100644 --- a/src/fluree/db/flake/flake_db.cljc +++ b/src/fluree/db/flake/flake_db.cljc @@ -32,7 +32,8 @@ [fluree.db.query.range :as query-range] [fluree.db.reasoner :as reasoner] [fluree.db.time-travel :refer [TimeTravel]] - [fluree.db.util :as util :refer [try* catch* get-first get-first-value]] + [fluree.db.util :as util :refer [try* catch* get-id get-types get-list + get-first get-first-value]] [fluree.db.util.async :refer [ list-val :idx last)}] + [idx list-val] + (let [m {:i idx}] (assoc list-val ::meta m))) (defn list-value? "returns true if json-ld value is a list object." [v] (and (map? v) - (= :list (-> v first key)))) + (= const/iri-list (-> v first key)))) -(defn node? - "Returns true if a nested value is itself another node in the graph. +(defn subject-node? + "Returns true if a nested value is itself another subject node in the graph. Only need to test maps that have :id - and if they have other properties they are defining then we know it is a node and have additional data to include." [mapx] (cond - (contains? mapx :value) + (contains? mapx const/iri-value) false (list-value? mapx) false (and - (contains? mapx :set) - (= #{:set :idx} (set (keys mapx)))) + (contains? mapx const/iri-set) + (= 1 (count mapx))) false :else @@ -180,9 +181,9 @@ (defn value-map->flake [assert? db sid pid t v-map] - (let [ref-id (:id v-map) + (let [ref-id (get-id v-map) meta (::meta v-map)] - (if (and ref-id (node? v-map)) + (if (and ref-id (subject-node? v-map)) (let [ref-sid (iri/encode-iri db ref-id)] (flake/create sid pid ref-sid const/$id t assert? meta)) (let [[value dt] (datatype/from-expanded db v-map)] @@ -193,27 +194,28 @@ (let [v-maps (util/sequential value)] (mapcat (fn [v-map] (if (list-value? v-map) - (let [list-vals (:list v-map)] + (let [list-vals (get-list v-map)] (into [] - (comp (map add-list-meta) + (comp (map-indexed add-list-meta) (map (partial value-map->flake assert? db sid pid t))) list-vals)) [(value-map->flake assert? db sid pid t v-map)])) v-maps))) (defn- get-type-flakes - [assert? db t sid type] + [assert? db t sid types] (into [] (map (fn [type-item] (let [type-sid (iri/encode-iri db type-item)] (flake/create sid const/$rdf:type type-sid const/$id t assert? nil)))) - type)) + types)) (defn node->flakes [assert? db t node] (log/trace "node->flakes:" node "assert?" assert?) - (let [{:keys [id type]} node + (let [id (get-id node) + types (get-types node) sid (if assert? (iri/encode-iri db id) (or (iri/encode-iri db id) @@ -223,11 +225,11 @@ {:status 400 :error :db/invalid-retraction :iri id})))) - type-assertions (if (seq type) - (get-type-flakes assert? db t sid type) + type-assertions (if (seq types) + (get-type-flakes assert? db t sid types) [])] (into type-assertions - (comp (remove #(-> % key keyword?)) + (comp (remove #(-> % key #{const/iri-id const/iri-type})) (mapcat (fn [[prop value]] (let [pid (if assert? @@ -263,7 +265,7 @@ [db commit-jsonld commit-data-jsonld] (go-try (let [t-new (db-t commit-data-jsonld) - nses (map :value + nses (map util/get-value (get commit-data-jsonld const/iri-namespaces)) db* (with-namespaces db nses) asserted-flakes (->> (db-assert commit-data-jsonld) @@ -380,12 +382,13 @@ (sha->t [db sha] (go-try (log/debug "sha->t looking up commit SHA:" sha) - ;; Normalize the input - strip known prefixes if present - ;; Always ensure we end up with 'b' prefix + hash - (let [sha-normalized (cond - ;; Full commit ID prefix - (str/starts-with? sha "fluree:commit:sha256:") - (subs sha 21) ; Extract everything after the prefix + ;; Normalize the input - use only 'fluree:commit:sha256:b' prefix when present, + ;; otherwise ensure the value starts with 'b' + (let [prefix-b "fluree:commit:sha256:b" + sha-normalized (cond + ;; Input is a full commit IRI with ':b' segment - keep leading 'b' + (str/starts-with? sha prefix-b) + (subs sha (dec (count prefix-b))) ;; Already has correct format (starts with 'b') (str/starts-with? sha "b") @@ -407,8 +410,14 @@ {:status 400 :error :db/invalid-commit-sha :sha sha :normalized sha-normalized :length sha-length})) - ;; Full SHA - use direct efficient lookup (53 chars with 'bb' prefix) - (= 53 sha-length) + ;; Too short to be a useful/efficient prefix (minimum 6) + (< sha-length 6) + (throw (ex-info "SHA prefix must be at least 6 characters" + {:status 400 :error :db/invalid-commit-sha :min 6})) + + ;; Full SHA - use direct efficient lookup (51-52 chars with 'b' prefix) + (or (= 52 sha-length) + (= 51 sha-length)) (let [;; sha-normalized already has 'b' prefix from normalization commit-id (str "fluree:commit:sha256:" sha-normalized) direct-query {:select ["?t"] @@ -580,7 +589,7 @@ (try* (! error-ch e))))) (defn merge-novelty-commits From 7e37622159e4d032d1c14cb9214478a23777c23e Mon Sep 17 00:00:00 2001 From: bplatz Date: Mon, 22 Sep 2025 12:30:33 -0400 Subject: [PATCH 05/13] refactor garbage collection: optimize filter caching and address handling in indexer --- src/fluree/db/connection.cljc | 7 +------ src/fluree/db/indexer/cuckoo.cljc | 8 +++++--- src/fluree/db/indexer/garbage.cljc | 14 +++++++------ test/fluree/db_test.cljc | 33 ++++++++++++------------------ 4 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/fluree/db/connection.cljc b/src/fluree/db/connection.cljc index 8caf298d52..705d647cb1 100644 --- a/src/fluree/db/connection.cljc +++ b/src/fluree/db/connection.cljc @@ -450,12 +450,7 @@ ;; Also clean up cuckoo filter files for all branches cuckoo-ch (when ledger-alias (let [ledger-name (first (str/split ledger-alias #":" 2))] - ;; Dynamic require to avoid circular dependency - (when-let [delete-fn (try - (require 'fluree.db.indexer.cuckoo) - (resolve 'fluree.db.indexer.cuckoo/delete-all-filters) - (catch #?(:clj Exception :cljs js/Error) _e nil))] - (delete-fn index-catalog ledger-name))))] + (cuckoo/delete-all-filters index-catalog ledger-name)))] ( Date: Mon, 22 Sep 2025 13:20:32 -0400 Subject: [PATCH 06/13] refactor nameservice and filesystem: enhance logging and improve file deletion handling --- src/fluree/db/connection.cljc | 30 ++++++++++++++++++++++++-- src/fluree/db/indexer/cuckoo.cljc | 9 +++++--- src/fluree/db/nameservice/storage.cljc | 5 ++++- src/fluree/db/util/filesystem.cljc | 9 +++++--- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/fluree/db/connection.cljc b/src/fluree/db/connection.cljc index 705d647cb1..26e6d56756 100644 --- a/src/fluree/db/connection.cljc +++ b/src/fluree/db/connection.cljc @@ -461,6 +461,27 @@ ( store storage/location (storage/build-address path))] + (log/debug "nameservice.storage/retract start" + {:alias ledger-alias :path path :address address}) (storage/delete store address))) (publishing-address [_ ledger-alias] diff --git a/src/fluree/db/util/filesystem.cljc b/src/fluree/db/util/filesystem.cljc index 1cc950ca9b..a695ff52cd 100644 --- a/src/fluree/db/util/filesystem.cljc +++ b/src/fluree/db/util/filesystem.cljc @@ -117,7 +117,7 @@ #?(:clj (async/thread (try - (io/delete-file (io/file path)) + (io/delete-file (io/file path) true) :deleted (catch Exception e (log/trace (str "Failed to delete file: " path)) @@ -128,8 +128,11 @@ (fs/unlinkSync path) :deleted (catch :default e - (log/trace (str "Failed to delete file: " path)) - e))))) + (if (= (.-code e) "ENOENT") + :deleted + (do + (log/warn e (str "Failed to delete file: " path)) + e))))))) (defn list-files [path] From 472f451f7e68cac0d14b340aa3e72a8aa7418d93 Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 2 Oct 2025 16:52:33 -0400 Subject: [PATCH 07/13] Refactor Cuckoo filter tests and integrate CBOR serialization - Removed outdated cuckoo chain test suite and replaced it with integration tests for garbage collection and round-trip serialization. - Added new tests for CBOR encoding/decoding to ensure data integrity during storage operations. - Updated existing tests to utilize the new filter chain structure, ensuring compatibility with recent changes in the Cuckoo filter implementation. - Enhanced edge case handling and collision detection tests to improve robustness. - Adjusted assertions in the main test suite to reflect changes in filter structure and statistics. --- deps.edn | 17 +- package.json | 1 + src/fluree/db/api/branch.cljc | 8 +- src/fluree/db/connection.cljc | 11 +- src/fluree/db/flake/index/novelty.cljc | 33 +-- src/fluree/db/indexer/cuckoo.cljc | 159 ++++--------- src/fluree/db/indexer/garbage.cljc | 9 +- src/fluree/db/storage.cljc | 9 +- src/fluree/db/storage/file.cljc | 23 +- src/fluree/db/storage/memory.cljc | 13 +- src/fluree/db/storage/s3.clj | 17 +- src/fluree/db/util/cbor.cljc | 89 +++++++ src/fluree/db/util/filesystem.cljc | 53 +++++ test/fluree/db/indexer/cuckoo_chain_test.clj | 160 ------------- .../cuckoo_garbage_integration_test.clj | 114 +++++++++ .../db/indexer/cuckoo_roundtrip_test.clj | 144 ++++++++++++ test/fluree/db/indexer/cuckoo_test.clj | 222 +++++++++++------- test/fluree/db/util/cbor_test.cljc | 55 +++++ test/fluree/db_test.cljc | 20 +- 19 files changed, 748 insertions(+), 409 deletions(-) create mode 100644 src/fluree/db/util/cbor.cljc delete mode 100644 test/fluree/db/indexer/cuckoo_chain_test.clj create mode 100644 test/fluree/db/indexer/cuckoo_garbage_integration_test.clj create mode 100644 test/fluree/db/indexer/cuckoo_roundtrip_test.clj create mode 100644 test/fluree/db/util/cbor_test.cljc diff --git a/deps.edn b/deps.edn index 219c4d6cff..76bca4c1f9 100644 --- a/deps.edn +++ b/deps.edn @@ -20,14 +20,15 @@ com.github.rholder/snowball-stemmer {:mvn/version "1.3.0.581.1"} ;; parsing / serialization - com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git" - :git/sha "74083536c84d77f8cdd4b686b5661714010baad3"} - com.fluree/alphabase {:mvn/version "3.3.0"} - metosin/jsonista {:mvn/version "0.3.13"} - camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} - org.clojure/data.xml {:mvn/version "0.2.0-alpha9"} - instaparse/instaparse {:mvn/version "1.5.0"} - org.clojars.quoll/raphael {:mvn/version "0.3.12"} ;; turtle (TTL) parsing + com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git" + :git/sha "74083536c84d77f8cdd4b686b5661714010baad3"} + com.fluree/alphabase {:mvn/version "3.3.0"} + metosin/jsonista {:mvn/version "0.3.13"} + com.fasterxml.jackson.dataformat/jackson-dataformat-cbor {:mvn/version "2.18.2"} + camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} + org.clojure/data.xml {:mvn/version "0.2.0-alpha9"} + instaparse/instaparse {:mvn/version "1.5.0"} + org.clojars.quoll/raphael {:mvn/version "0.3.12"} ;; turtle (TTL) parsing ;; cryptography com.fluree/crypto {:git/url "https://github.com/fluree/fluree.crypto.git" diff --git a/package.json b/package.json index 9d81a8309b..8db9c82de8 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "@peculiar/webcrypto": "^1.4.5", "axios": "^1.5.1", "bufferutil": "^4.0.7", + "cbor": "^9.0.2", "form-data": "^4.0.0", "jsonld": "8.3.3", "seedrandom": "^3.0.5", diff --git a/src/fluree/db/api/branch.cljc b/src/fluree/db/api/branch.cljc index f20845ec8c..d65c94edd9 100644 --- a/src/fluree/db/api/branch.cljc +++ b/src/fluree/db/api/branch.cljc @@ -26,8 +26,10 @@ Returns the new branch metadata." [conn new-branch-spec from-branch-spec from-commit] (go-try - (let [[ledger-id new-branch] (util.ledger/ledger-parts new-branch-spec) - [from-ledger-id from-branch] (util.ledger/ledger-parts from-branch-spec)] + (let [new-branch-spec* (util.ledger/ensure-ledger-branch new-branch-spec) + from-branch-spec* (util.ledger/ensure-ledger-branch from-branch-spec) + [ledger-id new-branch] (util.ledger/ledger-parts new-branch-spec*) + [from-ledger-id from-branch] (util.ledger/ledger-parts from-branch-spec*)] (when (not= ledger-id from-ledger-id) (throw (ex-info "Cannot create branch across different ledgers" @@ -43,10 +45,8 @@ :source-branch from-branch :source-commit source-commit-id} - ;; Copy cuckoo filter from source branch to new branch (if storage supports it) index-catalog (:index-catalog source-db) _ (when (and index-catalog (:storage index-catalog)) - ;; Read the source branch's filter and copy it if it exists (when-let [source-filter ( segment-hashes + (:address db-root-res) + (conj (cuckoo/extract-hash-part (:address db-root-res)))) + updated-filter (if (seq all-hashes) + (cuckoo/batch-add-chain filter all-hashes) + filter)] + ( new-segments - (:address db-root-res) - (conj (last (str/split (:address db-root-res) #"/")))) - updated-filter (if (seq all-segments) - (cuckoo/batch-add-chain filter all-segments) - filter)] - ;; Always write the filter (even if empty) to ensure it exists - (chain - "Convert a single filter to chain format. Public for testing." - [filter] - {:version 2 - :t nil ; Will be set when persisting - :filters [(serialize-single filter)]}) - -(defn- deserialize-single - "Deserialize a single filter." +(defn- deserialize-filter + "Deserialize a filter from map format back to CuckooFilter record." [{:keys [buckets num-buckets f count]}] (let [decoded-buckets (decode-buckets buckets)] (->CuckooFilter decoded-buckets num-buckets f count))) @@ -248,9 +240,9 @@ [{:keys [filters] :as filter-chain} sha256-hash] (loop [idx 0] (if (< idx (count filters)) - (let [current-filter (deserialize-single (nth filters idx))] + (let [current-filter (deserialize-filter (nth filters idx))] (if-let [updated (add-item-internal current-filter sha256-hash)] - (let [updated-serialized (serialize-single updated) + (let [updated-serialized (serialize-filter updated) ;; Check if this filter is now at 90% capacity load-factor (/ (double (:count updated)) (* (:num-buckets updated) bucket-size)) @@ -258,7 +250,7 @@ filters' (if (and (= idx (dec (count filters))) (>= load-factor 0.9)) (conj (assoc filters idx updated-serialized) - (serialize-single (create-filter default-filter-capacity))) + (serialize-filter (create-filter default-filter-capacity))) (assoc filters idx updated-serialized))] (assoc filter-chain :filters filters')) (recur (inc idx)))) @@ -266,28 +258,28 @@ (let [new-filter (create-filter default-filter-capacity) updated (add-item-internal new-filter sha256-hash)] (if updated - (assoc filter-chain :filters (conj filters (serialize-single updated))) + (assoc filter-chain :filters (conj filters (serialize-filter updated))) filter-chain))))) (defn contains-hash-chain? "Check if hash exists in any filter in the chain." [{:keys [filters]} sha256-hash] - (some #(contains-hash-internal? (deserialize-single %) sha256-hash) filters)) + (some #(contains-hash-internal? (deserialize-filter %) sha256-hash) filters)) (defn remove-item-chain "Remove item from whichever filter contains it, removing empty filters." [{:keys [filters] :as filter-chain} sha256-hash] (let [updated-filters (vec (for [f filters] - (let [filter (deserialize-single f)] + (let [filter (deserialize-filter f)] (if (contains-hash-internal? filter sha256-hash) - (serialize-single (remove-item-internal filter sha256-hash)) + (serialize-filter (remove-item-internal filter sha256-hash)) f)))) ;; Remove empty filters (count = 0), but keep at least one cleaned-filters (vec (remove #(zero? (:count %)) updated-filters)) ;; Ensure we always have at least one filter final-filters (if (empty? cleaned-filters) - [(serialize-single (create-filter default-filter-capacity))] + [(serialize-filter (create-filter default-filter-capacity))] cleaned-filters)] (assoc filter-chain :filters final-filters))) @@ -306,15 +298,10 @@ [] {:version 2 :t nil - :filters [(serialize-single (create-filter default-filter-capacity))]}) - -(defn serialize - "Serialize filter chain for storage." - [filter-chain] - filter-chain) + :filters [(serialize-filter (create-filter default-filter-capacity))]}) (defn deserialize - "Deserialize filter from storage." + "Deserialize filter chain from storage, converting serialized filters back to CuckooFilter records." [data] (cond (= (:version data) 2) data @@ -324,62 +311,43 @@ ;; Storage integration (defn filter-storage-path - "Get the storage path for a branch's cuckoo filter. - Returns path like 'ledger-name/index/cuckoo/branch.json'." + "Returns storage path without extension: 'ledger-alias/index/cuckoo/branch-name'." [ledger-alias branch-name] - (str ledger-alias "/index/cuckoo/" branch-name ".json")) + (str ledger-alias "/index/cuckoo/" branch-name)) (defn write-filter - "Persist a cuckoo filter to storage using explicit filename." + "Writes cuckoo filter to storage as CBOR binary data." [index-catalog ledger-alias branch-name t filter] (go-try (when (and index-catalog (:storage index-catalog) filter) - (let [serialized (-> (serialize filter) - (assoc :t t)) - json-str (json/stringify serialized) - bytes (bytes/string->UTF8 json-str) - filename (filter-storage-path ledger-alias branch-name) - ;; Get the actual store from catalog (usually the default store) + (let [serialized (assoc filter :t t) + cbor-bytes (cbor/encode serialized) + path (filter-storage-path ledger-alias branch-name) storage (:storage index-catalog) store (if (satisfies? store/ByteStore storage) storage (store/get-content-store storage ::store/default))] - (string bytes)) - data (json/parse json-str true)] - (deserialize data))) - (catch* e - ;; Filter doesn't exist or error reading it - (log/debug "Error reading filter:" (ex-message e)) - nil))) - (log/debug "Skipping filter read - missing requirements" - "catalog:" (boolean index-catalog) - "storage:" (boolean (:storage index-catalog)) - "ledger:" ledger-alias - "branch:" branch-name)))) + (when-let [cbor-bytes (> files - (filter #(str/ends-with? % ".json")) + (filter #(str/ends-with? % ".cbor")) (map #(-> % (str/replace cuckoo-path "") - (str/replace ".json" ""))) + (str/replace ".cbor" ""))) distinct vec))))) @@ -499,7 +467,7 @@ "Get comprehensive statistics from a filter chain." [filter-chain] (when (and (:version filter-chain) (seq (:filters filter-chain))) - (let [filters (map deserialize-single (:filters filter-chain)) + (let [filters (map deserialize-filter (:filters filter-chain)) total-count (reduce + (map :count filters)) total-capacity (reduce + (map #(* (:num-buckets %) bucket-size) filters)) filter-stats (map (fn [f] @@ -517,34 +485,3 @@ :filters filter-stats :fingerprint-bits (:fingerprint-bits (first filters))}))) -;; Simplified API functions for testing -;; These are thin wrappers that delegate to chain operations - -(defn add-item - "Add item to filter chain. For testing." - [filter-or-chain sha256-hash] - (if (:version filter-or-chain) - (add-item-chain filter-or-chain sha256-hash) - ;; Support for single filter in tests - (add-item-internal filter-or-chain sha256-hash))) - -(defn contains-hash? - "Check if hash exists in filter. For testing." - [filter-or-chain sha256-hash] - (if (:version filter-or-chain) - (contains-hash-chain? filter-or-chain sha256-hash) - ;; Support for single filter in tests - (contains-hash-internal? filter-or-chain sha256-hash))) - -(defn remove-item - "Remove item from filter chain. For testing." - [filter-or-chain sha256-hash] - (if (:version filter-or-chain) - (remove-item-chain filter-or-chain sha256-hash) - ;; Support for single filter in tests - (remove-item-internal filter-or-chain sha256-hash))) - -(defn batch-add - "Add multiple items to filter." - [filter-or-chain sha256-hashes] - (reduce add-item filter-or-chain sha256-hashes)) \ No newline at end of file diff --git a/src/fluree/db/indexer/garbage.cljc b/src/fluree/db/indexer/garbage.cljc index c9d9763086..e862697337 100644 --- a/src/fluree/db/indexer/garbage.cljc +++ b/src/fluree/db/indexer/garbage.cljc @@ -51,7 +51,10 @@ ;; No other branches, safe to delete all garbage-items ;; Check each item against other branch filters - (remove #(cuckoo/any-branch-uses? filters %) + ;; Extract hash from address for checking + (remove (fn [garbage-address] + (let [hash (cuckoo/extract-hash-part garbage-address)] + (cuckoo/any-branch-uses? filters hash))) garbage-items))))) (defn clean-garbage-record @@ -74,7 +77,9 @@ _ (when (and (:storage index-catalog) (seq garbage)) (let [filter ( root + (full-path path-with-ext) + (fs/write-file final-bytes)))) + + (read-bytes-ext [_ path extension] + (let [path-with-ext (str path "." extension)] + (-> root + (full-path path-with-ext) + (fs/read-binary-file encryption-key)))) + storage/RecursiveListableStore (list-paths-recursive [_ prefix] (go-try @@ -138,9 +153,11 @@ (let [all-files (> @contents keys (filter #(and (str/starts-with? % prefix) - (str/ends-with? % ".json"))) + (or (str/ends-with? % ".json") + (str/ends-with? % ".cbor")))) vec)))) (defn open diff --git a/src/fluree/db/storage/s3.clj b/src/fluree/db/storage/s3.clj index 3856ffe354..0d0f2fc107 100644 --- a/src/fluree/db/storage/s3.clj +++ b/src/fluree/db/storage/s3.clj @@ -431,6 +431,18 @@ (when-let [body (:Body resp)] (.getBytes ^String body)))))) + (write-bytes-ext [this path bytes extension] + (let [path-with-ext (str path "." extension)] + (write-s3-data this path-with-ext bytes))) + + (read-bytes-ext [this path extension] + (go-try + (let [path-with-ext (str path "." extension) + resp (> all-results - (filter #(str/ends-with? % ".json")) + (filter #(or (str/ends-with? % ".json") + (str/ends-with? % ".cbor"))) vec))))) (defn- jitter diff --git a/src/fluree/db/util/cbor.cljc b/src/fluree/db/util/cbor.cljc new file mode 100644 index 0000000000..2b61e7ebc9 --- /dev/null +++ b/src/fluree/db/util/cbor.cljc @@ -0,0 +1,89 @@ +(ns fluree.db.util.cbor + #?(:clj (:import (com.fasterxml.jackson.dataformat.cbor CBORFactory) + (com.fasterxml.jackson.databind ObjectMapper) + (java.io ByteArrayOutputStream ByteArrayInputStream)))) + +#?(:clj + (def ^:private ^ObjectMapper mapper + (ObjectMapper. (CBORFactory.)))) + +#?(:clj + (defn- clj->java + [x] + (cond + (map? x) (let [m (java.util.LinkedHashMap.)] + (doseq [[k v] x] + (.put m (if (keyword? k) (name k) k) (clj->java v))) + m) + (vector? x) (java.util.ArrayList. (map clj->java x)) + (sequential? x) (java.util.ArrayList. (map clj->java x)) + (keyword? x) (name x) + :else x))) + +#?(:clj + (defn- java->clj* + [x] + (cond + (instance? java.util.Map x) + (into {} (for [e (.entrySet ^java.util.Map x)] + [(keyword (.getKey e)) (java->clj* (.getValue e))])) + + (instance? java.util.List x) + (vec (map java->clj* ^java.util.List x)) + + :else x))) + +#?(:cljs + (def ^:private cborjs + (try + (js/require "cbor") + (catch :default _ nil)))) + +#?(:clj (def cbor-available? true) + :cljs (def cbor-available? (boolean cborjs))) + +#?(:cljs + (defn- js-deep->clj + [x] + (cond + (instance? js/Map x) + (into {} + (for [entry (array-seq (js/Array.from (.entries x)))] + (let [k (aget entry 0) + v (aget entry 1)] + [(keyword (str k)) (js-deep->clj v)]))) + + (instance? js/Array x) + (vec (map js-deep->clj (array-seq x))) + + (and (some? x) (identical? (type x) js/Object)) + (let [o (js->clj x :keywordize-keys true)] + (if (map? o) + (into {} (for [[k v] o] [k (js-deep->clj v)])) + o)) + + :else x))) + +(defn encode + "Encode Clojure data to CBOR bytes." + [_data] + #?(:clj + (let [baos (ByteArrayOutputStream.)] + (.writeValue mapper baos (clj->java _data)) + (.toByteArray baos)) + :cljs + (if cborjs + (.encode cborjs (clj->js _data)) + (throw (ex-info "CBOR encode not available in this CLJS runtime" {}))))) + +(defn decode + "Decode CBOR bytes to Clojure data with keyword keys." + [^bytes _bs] + #?(:clj + (let [bais (ByteArrayInputStream. _bs) + obj (.readValue mapper bais Object)] + (java->clj* obj)) + :cljs + (if cborjs + (js-deep->clj (.decode cborjs _bs)) + (throw (ex-info "CBOR decode not available in this CLJS runtime" {}))))) diff --git a/src/fluree/db/util/filesystem.cljc b/src/fluree/db/util/filesystem.cljc index a695ff52cd..e2203da9ca 100644 --- a/src/fluree/db/util/filesystem.cljc +++ b/src/fluree/db/util/filesystem.cljc @@ -111,6 +111,59 @@ "code" (.-code e) "path" (.-path e)})))))))) +(defn read-binary-file + "Read raw bytes from disk at `path`. Returns nil if file does not exist. + If encryption-key is provided, expects file to be encrypted and will decrypt it." + ([path] (read-binary-file path nil)) + ([path encryption-key] + #?(:clj + (async/thread + (try + (with-open [xin (io/input-stream path) + xout (ByteArrayOutputStream.)] + (io/copy xin xout) + (let [raw-bytes (.toByteArray xout)] + (if encryption-key + (try + (aes/decrypt raw-bytes encryption-key + {:input-format :none + :output-format :none}) + (catch Exception e + (ex-info (str "Failed to decrypt file: " path) + {:status 500 + :error :db/storage-error + :path path} + e))) + raw-bytes))) + (catch FileNotFoundException _ + nil) + (catch Exception e + e))) + :cljs + (async/go + (try + (let [buffer (fs/readFileSync path)] + (if encryption-key + (try + (aes/decrypt buffer encryption-key + {:input-format :none + :output-format :none}) + (catch :default e + (ex-info (str "Failed to decrypt file: " path) + {:status 500 + :error :db/storage-error + :path path} + e))) + buffer)) + (catch :default e + (if (= "ENOENT" (.-code e)) + nil + (ex-info "Error reading file." + {"errno" ^String (.-errno e) + "syscall" ^String (.-syscall e) + "code" (.-code e) + "path" (.-path e)})))))))) + (defn delete-file "Delete the file at `path`." [path] diff --git a/test/fluree/db/indexer/cuckoo_chain_test.clj b/test/fluree/db/indexer/cuckoo_chain_test.clj deleted file mode 100644 index cbae043f10..0000000000 --- a/test/fluree/db/indexer/cuckoo_chain_test.clj +++ /dev/null @@ -1,160 +0,0 @@ -(ns fluree.db.indexer.cuckoo-chain-test - "Test suite for cuckoo filter chain functionality, growth, and collision handling." - (:require [alphabase.core :as alphabase] - [clojure.test :refer [deftest testing is]] - [fluree.crypto :as crypto] - [fluree.db.indexer.cuckoo :as cuckoo])) - -(defn- test-hash - "Create a valid base32 hash for testing from a string." - [s] - (-> s - crypto/sha2-256 ; Returns hex string - alphabase/hex->bytes ; Convert hex to bytes - alphabase/bytes->base32)) ; Already lowercase from fluree crypto - -(deftest chain-growth-test - (testing "Chain grows when capacity is exceeded" - ;; Create a filter chain with very small capacity for testing - ;; Note: minimum bucket count is 16, so actual capacity is 16*4*0.95 = ~60 items - (let [small-capacity 10 - small-filter (cuckoo/create-filter small-capacity) - chain (cuckoo/single-filter->chain small-filter) - items (map #(test-hash (str "item-" %)) (range 100)) ; Use 100 items to ensure overflow - ;; Add items until we exceed capacity and force chain growth - chain-with-items (reduce cuckoo/add-item-chain chain items) - filter-count (count (:filters chain-with-items))] - - (testing "Multiple filters created when capacity exceeded" - (is (> filter-count 1) - (str "Should have multiple filters, got " filter-count))) - - (testing "All items are findable across chain" - (doseq [item items] - (is (cuckoo/contains-hash-chain? chain-with-items item) - (str "Should find " item " in chain")))) - - (testing "Items not added are not found" - (is (not (cuckoo/contains-hash-chain? chain-with-items (test-hash "not-in-chain"))))) - - (testing "Remove items across multiple filters" - (let [items-to-remove (take 50 items) - items-to-keep (drop 50 items) - chain-after-remove (reduce cuckoo/remove-item-chain - chain-with-items - items-to-remove)] - - ;; Removed items should not be found - (doseq [item items-to-remove] - (is (not (cuckoo/contains-hash-chain? chain-after-remove item)) - (str item " should be removed"))) - - ;; Kept items should still be found - (doseq [item items-to-keep] - (is (cuckoo/contains-hash-chain? chain-after-remove item) - (str item " should still be present"))) - - ;; Empty filters should be removed - (testing "Empty filters are cleaned up" - (let [filters-after (:filters chain-after-remove)] - (is (every? #(pos? (:count %)) filters-after) - "No empty filters should remain")))))))) - -(deftest collision-handling-test - (testing "Handle items with similar patterns" - (let [chain (cuckoo/create-filter-chain) - ;; Create items that might collide - base-items [(test-hash "aaaa") (test-hash "aaab") (test-hash "aaac") - (test-hash "aaad") (test-hash "aaae") - (test-hash "baaa") (test-hash "baab") (test-hash "baac") - (test-hash "baad") (test-hash "baae")] - chain-with-items (reduce cuckoo/add-item-chain chain base-items)] - - (testing "All similar items are stored" - (doseq [item base-items] - (is (cuckoo/contains-hash-chain? chain-with-items item) - (str item " should be found")))) - - (testing "Removing one item doesn't affect similar items" - (let [item-to-remove (test-hash "aaaa") - similar-items (remove #{item-to-remove} base-items) - chain-after-remove (cuckoo/remove-item-chain chain-with-items item-to-remove)] - - (is (not (cuckoo/contains-hash-chain? chain-after-remove item-to-remove)) - "Removed item should not be found") - - (doseq [item similar-items] - (is (cuckoo/contains-hash-chain? chain-after-remove item) - (str item " should still be found after removing similar item")))))))) - -(deftest chain-serialization-test - (testing "Chain serialization and deserialization" - (let [small-filter (cuckoo/create-filter 5) - chain (cuckoo/single-filter->chain small-filter) - items (map #(test-hash (str "serialize-" %)) (range 20)) - chain-with-items (reduce cuckoo/add-item-chain chain items) - serialized (cuckoo/serialize chain-with-items) - deserialized (cuckoo/deserialize serialized)] - - (testing "Deserialized chain has correct structure" - (is (= (:version deserialized) 2))) - - (testing "All items findable after deserialization" - (doseq [item items] - (is (cuckoo/contains-hash-chain? deserialized item) - (str item " should be found after deserialization")))) - - (testing "Items can be added to deserialized chain" - (let [new-item (test-hash "after-deserialize") - updated (cuckoo/add-item-chain deserialized new-item)] - (is (cuckoo/contains-hash-chain? updated new-item)) - (is (every? #(cuckoo/contains-hash-chain? updated %) items) - "Original items still present")))))) - -(deftest proactive-growth-test - (testing "Chain grows proactively at 90% capacity" - (let [;; Create chain with small filter for testing - ;; Minimum is 16 buckets * 4 = 64 capacity - ;; 90% of 64 = 57.6, so should trigger growth at 58 items - small-filter (cuckoo/create-filter 20) - chain (cuckoo/single-filter->chain small-filter) - ;; Add items to get close to 90% capacity (57 items) - items (map #(test-hash (str "proactive-" %)) (range 57)) - chain-with-items (reduce cuckoo/add-item-chain chain items)] - - (testing "Should have only one filter before 90%" - (is (= 1 (count (:filters chain-with-items))))) - - ;; Add one more item to trigger proactive growth (58th item = >90%) - (let [chain-after (cuckoo/add-item-chain chain-with-items (test-hash "trigger-growth"))] - (testing "Should proactively create second filter at 90% capacity" - (is (= 2 (count (:filters chain-after)))) - (is (= 58 (-> chain-after cuckoo/get-chain-stats :total-count)))) - - (testing "All items still findable after growth" - (doseq [item items] - (is (cuckoo/contains-hash-chain? chain-after item))) - (is (cuckoo/contains-hash-chain? chain-after (test-hash "trigger-growth")))))))) - -(deftest edge-cases-test - (testing "Edge cases for chain operations" - (testing "Empty chain operations" - (let [empty-chain (cuckoo/create-filter-chain)] - (is (not (cuckoo/contains-hash-chain? empty-chain (test-hash "anything")))) - ;; Removing from empty chain should return the same chain structure - (let [after-remove (cuckoo/remove-item-chain empty-chain (test-hash "anything"))] - (is (= (:version after-remove) (:version empty-chain)) - "Version should be unchanged") - (is (= (count (:filters after-remove)) (count (:filters empty-chain))) - "Filter count should be unchanged")))) - - (testing "Single item chain" - (let [chain (cuckoo/create-filter-chain) - single-chain (cuckoo/add-item-chain chain (test-hash "only-item"))] - (is (cuckoo/contains-hash-chain? single-chain (test-hash "only-item"))) - - (let [empty-again (cuckoo/remove-item-chain single-chain (test-hash "only-item"))] - ;; After removing the only item, should have empty filter - (is (or (empty? (:filters empty-again)) - (zero? (-> empty-again :filters first :count))) - "Chain should be empty after removing only item")))))) \ No newline at end of file diff --git a/test/fluree/db/indexer/cuckoo_garbage_integration_test.clj b/test/fluree/db/indexer/cuckoo_garbage_integration_test.clj new file mode 100644 index 0000000000..b6f733b94e --- /dev/null +++ b/test/fluree/db/indexer/cuckoo_garbage_integration_test.clj @@ -0,0 +1,114 @@ +(ns fluree.db.indexer.cuckoo-garbage-integration-test + "Integration test verifying cuckoo filter garbage collection mechanics." + (:require [babashka.fs :refer [with-temp-dir]] + [clojure.java.io :as io] + [clojure.string :as str] + [clojure.test :refer [deftest testing is]] + [fluree.db.api :as fluree] + [fluree.db.indexer.cuckoo :as cuckoo] + [fluree.db.util.cbor :as cbor]) + (:import (java.io FileInputStream))) + +(defn- read-cuckoo-filter + "Helper to read and decode a cuckoo filter file." + [storage-path ledger branch] + (let [filter-path (io/file storage-path ledger "index" "cuckoo" (str branch ".cbor"))] + (when (.exists filter-path) + (with-open [fis (FileInputStream. filter-path)] + (let [cbor-bytes (.readAllBytes fis)] + (cuckoo/deserialize (cbor/decode cbor-bytes))))))) + +(defn- list-index-segments + "Lists all index segment files (excluding cuckoo, root, garbage)." + [storage-path ledger] + (let [index-dir (io/file storage-path ledger "index")] + (when (.exists index-dir) + (->> (file-seq index-dir) + (filter #(.isFile ^java.io.File %)) + (filter #(str/ends-with? (.getName ^java.io.File %) ".json")) + (remove #(str/includes? (.getPath ^java.io.File %) "/cuckoo/")) + (remove #(str/includes? (.getPath ^java.io.File %) "/garbage/")) + (remove #(str/includes? (.getName ^java.io.File %) "root")) + (map (fn [^java.io.File f] + (let [name (.getName f)] + ;; Extract just the hash part (remove .json extension) + (str/replace name #"\.json$" "")))) + set)))) + +(deftest ^:integration cuckoo-garbage-collection-integration-test + (testing "Cuckoo filter correctly tracks segments for garbage collection" + (with-temp-dir [storage-path {}] + (let [storage-str (str storage-path) + conn @(fluree/connect-file {:storage-path storage-str + :defaults {:indexing {:reindex-min-bytes 100 + :reindex-max-bytes 1000 + :max-old-indexes 2}}}) + _ @(fluree/create conn "gc-test")] + + (testing "Initial index has segments in cuckoo filter" + ;; Add data to trigger indexing + @(fluree/insert! conn "gc-test" + [{"@context" {"ex" "http://example.org/"} + "@id" "ex:alice" + "@type" "ex:Person" + "ex:name" "Alice" + "ex:age" 30}]) + @(fluree/trigger-index conn "gc-test" {:block? true}) + + (let [filter (read-cuckoo-filter storage-str "gc-test" "main") + segments (list-index-segments storage-str "gc-test")] + + (is (some? filter) "Cuckoo filter should exist") + (is (seq segments) "Index segments should exist") + + ;; Verify every segment file is in the cuckoo filter + (testing "All segment files are tracked in cuckoo filter" + (doseq [segment segments] + (is (cuckoo/contains-hash-chain? filter segment) + (str "Segment " segment " should be in cuckoo filter")))))) + + (testing "After garbage collection, filter matches current segments" + ;; Trigger several more indexes to create garbage + @(fluree/insert! conn "gc-test" + [{"@id" "ex:bob" "@type" "ex:Person" "ex:name" "Bob"}]) + @(fluree/trigger-index conn "gc-test" {:block? true}) + + @(fluree/insert! conn "gc-test" + [{"@id" "ex:charlie" "@type" "ex:Person" "ex:name" "Charlie"}]) + @(fluree/trigger-index conn "gc-test" {:block? true}) + + @(fluree/insert! conn "gc-test" + [{"@id" "ex:diana" "@type" "ex:Person" "ex:name" "Diana"}]) + @(fluree/trigger-index conn "gc-test" {:block? true}) + + ;; Wait for GC to complete + (Thread/sleep 100) + + (let [filter (read-cuckoo-filter storage-str "gc-test" "main") + segments (list-index-segments storage-str "gc-test")] + + (is (some? filter) "Cuckoo filter should exist after GC") + (is (seq segments) "Index segments should exist after GC") + + (testing "All current segments exist in cuckoo filter" + (doseq [segment segments] + (is (cuckoo/contains-hash-chain? filter segment) + (str "Segment " segment " should be in filter")))))) + + (testing "Branch isolation with shared segments" + @(fluree/create-branch! conn "gc-test:feature" "gc-test:main") + + (let [main-filter (read-cuckoo-filter storage-str "gc-test" "main") + feature-filter (read-cuckoo-filter storage-str "gc-test" "feature") + current-segments (list-index-segments storage-str "gc-test") + main-stats (cuckoo/get-chain-stats main-filter) + feature-stats (cuckoo/get-chain-stats feature-filter)] + + (is (some? feature-filter) "Feature branch filter should exist") + (is (= (:total-count main-stats) (:total-count feature-stats)) + "Branch filter should have same total count as main") + + (testing "Branch filter contains all current segments" + (doseq [segment current-segments] + (is (cuckoo/contains-hash-chain? feature-filter segment) + (str "Current segment " segment " should be in feature branch filter")))))))))) diff --git a/test/fluree/db/indexer/cuckoo_roundtrip_test.clj b/test/fluree/db/indexer/cuckoo_roundtrip_test.clj new file mode 100644 index 0000000000..8502a11e0e --- /dev/null +++ b/test/fluree/db/indexer/cuckoo_roundtrip_test.clj @@ -0,0 +1,144 @@ +(ns fluree.db.indexer.cuckoo-roundtrip-test + "Tests verifying cuckoo filter storage layer: CBOR serialization, disk I/O, and hash normalization. + + This file focuses on the persistence and storage aspects of cuckoo filters, ensuring: + - Filters can be written to and read from disk without data loss + - CBOR encoding/decoding preserves all filter data + - Hash normalization works correctly across different address formats + - Multiple write-read cycles maintain filter integrity" + (:require [babashka.fs :refer [with-temp-dir]] + [clojure.core.async :refer [ (.length filter-path) 0) "Filter file should have content"))) + + (testing "Read filter from disk and verify complete round-trip" + (let [loaded-filter (read-filter-from-disk storage-str ledger-id branch-name)] + (is (some? loaded-filter) "Should successfully load filter from disk") + + (testing "Filter structure is preserved" + (let [loaded-stats (cuckoo/get-chain-stats loaded-filter)] + (is (= (:version original-stats) (:version loaded-stats)) + "Version should match") + (is (= (:total-count original-stats) (:total-count loaded-stats)) + "Total count should match") + (is (= (:total-buckets original-stats) (:total-buckets loaded-stats)) + "Total buckets should match") + (is (= test-t (:t loaded-filter)) + "Timestamp t should match"))) + + (testing "Filter membership is preserved" + (doseq [hash test-hashes] + (is (cuckoo/contains-hash-chain? loaded-filter hash) + (str "Loaded filter should contain hash: " hash)))) + + (testing "Filter rejects unknown hashes" + (let [unknown-hash "bzunknownhashnotaddedtofilterxxxxxxxxxxxxxxxxx"] + (is (not (cuckoo/contains-hash-chain? loaded-filter unknown-hash)) + "Loaded filter should reject unknown hashes"))))) + + (testing "Verify CBOR encoding by inspecting raw file" + (let [filter-path (io/file storage-str ledger-id "index" "cuckoo" (str branch-name ".cbor"))] + (with-open [fis (FileInputStream. filter-path)] + (let [cbor-bytes (.readAllBytes fis) + decoded-data (cbor/decode cbor-bytes)] + (is (map? decoded-data) "Should decode to a map") + (is (contains? decoded-data :version) "Should contain version") + (is (contains? decoded-data :t) "Should contain timestamp") + (is (= test-t (:t decoded-data)) "Timestamp should match") + (is (< (count cbor-bytes) (count (pr-str decoded-data))) + "CBOR should be more compact than pr-str"))))) + + (testing "Multiple write-read cycles preserve data" + (let [new-hash "bznewhashtoadd22345abcdefghijklmnopqrstuvwxyza" + filter-v2 (cuckoo/batch-add-chain filter-with-data [new-hash])] + ( chain' cuckoo/get-chain-stats :total-count))) + (is (cuckoo/contains-hash-chain? chain' hash1)) + (is (not (cuckoo/contains-hash-chain? chain (test-hash "nonexistent")))))) (testing "Batch add items" - (let [filter (cuckoo/create-filter 100) + (let [chain (cuckoo/create-filter-chain) hashes [(test-hash "hash1") (test-hash "hash2") (test-hash "hash3")] - filter' (cuckoo/batch-add filter hashes)] - (is (= 3 (:count filter'))) - (is (every? #(cuckoo/contains-hash? filter' %) hashes))))) + chain' (cuckoo/batch-add-chain chain hashes)] + (is (= 3 (-> chain' cuckoo/get-chain-stats :total-count))) + (is (every? #(cuckoo/contains-hash-chain? chain' %) hashes))))) (deftest remove-item-test (testing "Remove existing item" - (let [filter (cuckoo/create-filter 100) + (let [chain (cuckoo/create-filter-chain) hash1 (test-hash "removeme") - filter' (-> filter - (cuckoo/add-item hash1) - (cuckoo/remove-item hash1))] - (is (= 0 (:count filter'))) - (is (not (cuckoo/contains-hash? filter' hash1))))) + chain' (-> chain + (cuckoo/add-item-chain hash1) + (cuckoo/remove-item-chain hash1))] + (is (= 0 (-> chain' cuckoo/get-chain-stats :total-count))) + (is (not (cuckoo/contains-hash-chain? chain' hash1))))) (testing "Remove non-existent item" - (let [filter (cuckoo/create-filter 100) - filter' (cuckoo/remove-item filter (test-hash "nonexistent"))] - (is (= filter filter'))))) + (let [chain (cuckoo/create-filter-chain) + chain' (cuckoo/remove-item-chain chain (test-hash "nonexistent"))] + (is (= (-> chain cuckoo/get-chain-stats :total-count) + (-> chain' cuckoo/get-chain-stats :total-count)))))) (deftest serialization-test - (testing "Serialize and deserialize filter" - (let [filter (-> (cuckoo/create-filter 100) - (cuckoo/add-item (test-hash "hash1")) - (cuckoo/add-item (test-hash "hash2"))) - chain (cuckoo/single-filter->chain filter) - serialized (cuckoo/serialize chain) - restored (cuckoo/deserialize serialized)] - ;; Restored is in chain format, get stats from it - (let [restored-stats (cuckoo/get-chain-stats restored)] - (is (= (:count filter) (:total-count restored-stats))) - (is (= (:fingerprint-bits filter) (:fingerprint-bits restored-stats)))) - ;; contains-hash? works with both formats - (is (cuckoo/contains-hash? restored (test-hash "hash1"))) - (is (cuckoo/contains-hash? restored (test-hash "hash2")))))) + (testing "Serialize and deserialize filter chain" + (let [chain (-> (cuckoo/create-filter-chain) + (cuckoo/add-item-chain (test-hash "hash1")) + (cuckoo/add-item-chain (test-hash "hash2"))) + restored (cuckoo/deserialize chain)] + (let [original-stats (cuckoo/get-chain-stats chain) + restored-stats (cuckoo/get-chain-stats restored)] + (is (= (:total-count original-stats) (:total-count restored-stats))) + (is (= (:fingerprint-bits original-stats) (:fingerprint-bits restored-stats)))) + (is (cuckoo/contains-hash-chain? restored (test-hash "hash1"))) + (is (cuckoo/contains-hash-chain? restored (test-hash "hash2")))))) (deftest metrics-test - (testing "Load factor calculation" - (let [filter (-> (cuckoo/create-filter 100) - (cuckoo/add-item (test-hash "item1")) - (cuckoo/add-item (test-hash "item2"))) - load (cuckoo/load-factor filter)] - (is (> load 0)) - (is (< load 1)))) - - (testing "Filter statistics" - (let [filter (cuckoo/create-filter 100) - stats (cuckoo/filter-stats filter)] - (is (contains? stats :count)) - (is (contains? stats :capacity)) - (is (contains? stats :load-factor)) - (is (contains? stats :estimated-fpr))))) + (testing "Chain statistics" + (let [chain (-> (cuckoo/create-filter-chain) + (cuckoo/add-item-chain (test-hash "item1")) + (cuckoo/add-item-chain (test-hash "item2"))) + stats (cuckoo/get-chain-stats chain)] + (is (contains? stats :total-count)) + (is (contains? stats :total-capacity)) + (is (contains? stats :overall-load-factor)) + (is (contains? stats :filter-count)) + (is (> (:overall-load-factor stats) 0)) + (is (< (:overall-load-factor stats) 1))))) (deftest realistic-address-test (testing "Works with realistic Fluree index addresses" - (let [filter (cuckoo/create-filter 1000) + (let [chain (cuckoo/create-filter-chain) ;; Simulate realistic index segment addresses with real base32 hashes hash1 (test-hash "segment1") hash2 (test-hash "segment2") @@ -104,41 +99,35 @@ (str "fluree:file://ledger/index/opst/" hash3 ".json") (str "ledger/index/tspo/" hash4 ".json") hash5] ; Just the hash itself - filter' (reduce cuckoo/add-item filter addresses)] - (is (= 5 (:count filter'))) - (is (every? #(cuckoo/contains-hash? filter' %) addresses)) + chain' (cuckoo/batch-add-chain chain addresses)] + (is (= 5 (-> chain' cuckoo/get-chain-stats :total-count))) + (is (every? #(cuckoo/contains-hash-chain? chain' %) addresses)) (let [not-in-filter-hash (test-hash "notinfilter")] - (is (not (cuckoo/contains-hash? filter' - (str "fluree:file://ledger/index/spot/" not-in-filter-hash ".json"))))))) + (is (not (cuckoo/contains-hash-chain? chain' + (str "fluree:file://ledger/index/spot/" not-in-filter-hash ".json"))))))) - (testing "Filter capacity handling" - (let [small-filter (cuckoo/create-filter 10) - ;; Try to add more items than initial capacity + (testing "Chain auto-expansion with many items" + (let [chain (cuckoo/create-filter-chain) + ;; Add many items to test chain expansion many-items (map #(test-hash (str "item-" %)) (range 50)) - filter' (reduce (fn [f item] - (or (cuckoo/add-item f item) f)) - small-filter many-items) - added-count (:count filter')] - ;; Should be able to add items up to ~95% capacity - (is (> added-count 8)) - ;; All successfully added items should be found - (let [added-items (take added-count many-items)] - (is (every? #(cuckoo/contains-hash? filter' %) added-items)))))) + chain' (cuckoo/batch-add-chain chain many-items)] + (is (= 50 (-> chain' cuckoo/get-chain-stats :total-count))) + (is (every? #(cuckoo/contains-hash-chain? chain' %) many-items))))) (deftest false-positive-rate-test (testing "No false negatives with moderate dataset" - (let [filter (cuckoo/create-filter 500) + (let [chain (cuckoo/create-filter-chain) items (map #(test-hash (str "item" %)) (range 100)) - filter' (reduce cuckoo/add-item filter items)] + chain' (cuckoo/batch-add-chain chain items)] ;; All added items must be found (no false negatives) - (is (every? #(cuckoo/contains-hash? filter' %) items)))) + (is (every? #(cuckoo/contains-hash-chain? chain' %) items)))) (testing "Acceptable false positive rate" - (let [filter (cuckoo/create-filter 500) + (let [chain (cuckoo/create-filter-chain) items (map #(test-hash (str "item" %)) (range 250)) - filter' (reduce cuckoo/add-item filter items) + chain' (cuckoo/batch-add-chain chain items) non-items (map #(test-hash (str "nonitem" %)) (range 1000)) - fps (count (clojure.core/filter #(cuckoo/contains-hash? filter' %) non-items)) + fps (count (clojure.core/filter #(cuckoo/contains-hash-chain? chain' %) non-items)) fp-rate (/ fps 1000.0)] ;; With 16-bit fingerprints, expect low FPR (is (< fp-rate 0.05) "False positive rate should be less than 5%")))) @@ -173,6 +162,55 @@ ;; Main-only segments should not be found in other branches (is (not-any? #(cuckoo/any-branch-uses? other-filters %) main-only))))) +(deftest collision-handling-test + (testing "Handle items with similar patterns that might collide" + (let [chain (cuckoo/create-filter-chain) + ;; Create items with similar patterns + base-items [(test-hash "aaaa") (test-hash "aaab") (test-hash "aaac") + (test-hash "aaad") (test-hash "aaae") + (test-hash "baaa") (test-hash "baab") (test-hash "baac") + (test-hash "baad") (test-hash "baae")] + chain-with-items (reduce cuckoo/add-item-chain chain base-items)] + + (testing "All similar items are stored" + (doseq [item base-items] + (is (cuckoo/contains-hash-chain? chain-with-items item) + (str item " should be found")))) + + (testing "Removing one item doesn't affect similar items" + (let [item-to-remove (test-hash "aaaa") + similar-items (remove #{item-to-remove} base-items) + chain-after-remove (cuckoo/remove-item-chain chain-with-items item-to-remove)] + + (is (not (cuckoo/contains-hash-chain? chain-after-remove item-to-remove)) + "Removed item should not be found") + + (doseq [item similar-items] + (is (cuckoo/contains-hash-chain? chain-after-remove item) + (str item " should still be found after removing similar item")))))))) + +(deftest edge-cases-test + (testing "Edge cases for chain operations" + (testing "Empty chain operations" + (let [empty-chain (cuckoo/create-filter-chain)] + (is (not (cuckoo/contains-hash-chain? empty-chain (test-hash "anything")))) + + (let [after-remove (cuckoo/remove-item-chain empty-chain (test-hash "anything"))] + (is (= (:version after-remove) (:version empty-chain)) + "Version should be unchanged") + (is (= (count (:filters after-remove)) (count (:filters empty-chain))) + "Filter count should be unchanged")))) + + (testing "Single item chain" + (let [chain (cuckoo/create-filter-chain) + single-chain (cuckoo/add-item-chain chain (test-hash "only-item"))] + (is (cuckoo/contains-hash-chain? single-chain (test-hash "only-item"))) + + (let [empty-again (cuckoo/remove-item-chain single-chain (test-hash "only-item"))] + (is (or (empty? (:filters empty-again)) + (zero? (-> empty-again :filters first :count))) + "Chain should be empty after removing only item")))))) + (deftest ^:integration cuckoo-filter-file-integration-test (testing "Cuckoo filter file is created with index and correctly tracks segments" (with-temp-dir [storage-path {}] @@ -184,7 +222,7 @@ _ @(fluree/create conn "cuckoo-test")] (testing "Initial state - cuckoo filter should exist even for empty ledger" - (let [filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json")] + (let [filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.cbor")] (is (.exists filter-path) "Cuckoo filter file should exist after ledger creation"))) (testing "After adding data and triggering index" @@ -214,12 +252,13 @@ vec)) ;; Load the cuckoo filter to check contents - filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json") + filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.cbor") filter-data (when (.exists filter-path) - (-> filter-path - slurp - (json/parse true) - cuckoo/deserialize))] + (with-open [fis (FileInputStream. filter-path)] + (let [cbor-bytes (.readAllBytes fis)] + (-> cbor-bytes + cbor/decode + cuckoo/deserialize))))] (is (seq index-files) "Index files should have been created") (is filter-data "Cuckoo filter should be readable") @@ -229,7 +268,7 @@ ;; Check that each index file is in the cuckoo filter ;; Using the file name as the address (simplified check) (doseq [file-name index-files] - (is (cuckoo/contains-hash? filter-data file-name) + (is (cuckoo/contains-hash-chain? filter-data file-name) (str "Cuckoo filter should contain index file: " file-name)))) (testing "Filter correctly rejects non-existent segments" @@ -238,24 +277,33 @@ (str (test-hash "nonexistent-2") ".json") (str (test-hash "imaginary-3") ".json")]] (doseq [fake-seg fake-segments] - (is (not (cuckoo/contains-hash? filter-data fake-seg)) + (is (not (cuckoo/contains-hash-chain? filter-data fake-seg)) (str "Cuckoo filter should not contain fake segment: " fake-seg)))))))) (testing "After creating a branch" (let [;; Create a new branch with full spec _ @(fluree/create-branch! conn "cuckoo-test:feature" "cuckoo-test:main") - ;; Check that branch has its own cuckoo filter - branch-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "feature.json")] + ;; Check that branch has its own cuckoo filter + branch-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "feature.cbor")] (is (.exists branch-filter-path) "Branch should have its own cuckoo filter file") (when (.exists branch-filter-path) - (let [main-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.json") - main-filter (-> main-filter-path slurp cuckoo/deserialize) - branch-filter (-> branch-filter-path slurp cuckoo/deserialize)] + (let [main-filter-path (io/file storage-str "cuckoo-test" "index" "cuckoo" "main.cbor") + main-filter (with-open [fis (FileInputStream. main-filter-path)] + (let [cbor-bytes (.readAllBytes fis)] + (-> cbor-bytes + cbor/decode + cuckoo/deserialize))) + branch-filter (with-open [fis (FileInputStream. branch-filter-path)] + (let [cbor-bytes (.readAllBytes fis)] + (-> cbor-bytes + cbor/decode + cuckoo/deserialize)))] ;; Branch filter should start as a copy of main's filter - (is (= (:count main-filter) (:count branch-filter)) + (is (= (-> main-filter cuckoo/get-chain-stats :total-count) + (-> branch-filter cuckoo/get-chain-stats :total-count)) "Branch filter should have same count as main filter initially"))))))))) \ No newline at end of file diff --git a/test/fluree/db/util/cbor_test.cljc b/test/fluree/db/util/cbor_test.cljc new file mode 100644 index 0000000000..d58a2a6744 --- /dev/null +++ b/test/fluree/db/util/cbor_test.cljc @@ -0,0 +1,55 @@ +(ns fluree.db.util.cbor-test + (:require [clojure.test :refer [deftest testing is]] + [fluree.db.util.cbor :as cbor])) + +(deftest cbor-availability-test + (testing "CBOR is available on this platform" + (is (true? cbor/cbor-available?) + "CBOR should be available on JVM and Node.js"))) + +(deftest cbor-encode-decode-test + (testing "Basic CBOR encoding and decoding" + (let [data {:name "Alice" + :age 30 + :tags ["developer" "clojure"]} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (some? encoded) "Encoding should produce bytes") + (is (= data decoded) "Decoded data should match original")))) + +(deftest cbor-nested-data-test + (testing "CBOR handles nested data structures" + (let [data {:user {:name "Bob" + :email "bob@example.com"} + :prefs {:theme "dark" + :notifications true} + :count 42} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Nested structures should round-trip correctly")))) + +(deftest cbor-arrays-test + (testing "CBOR handles arrays/vectors" + (let [data {:items [1 2 3 4 5] + :names ["alice" "bob" "charlie"]} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Arrays should round-trip correctly")))) + +(deftest cbor-nil-values-test + (testing "CBOR handles nil values" + (let [data {:field1 "value" + :field2 nil + :field3 "other"} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Nil values should be preserved")))) + +(deftest cbor-empty-collections-test + (testing "CBOR handles empty collections" + (let [data {:empty-map {} + :empty-vec [] + :value 123} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Empty collections should round-trip correctly")))) diff --git a/test/fluree/db_test.cljc b/test/fluree/db_test.cljc index 76f952f569..5e1ebe8afe 100644 --- a/test/fluree/db_test.cljc +++ b/test/fluree/db_test.cljc @@ -1118,21 +1118,23 @@ (count (async/ Date: Thu, 2 Oct 2025 17:11:57 -0400 Subject: [PATCH 08/13] eastwood --- src/fluree/db/util/cbor.cljc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fluree/db/util/cbor.cljc b/src/fluree/db/util/cbor.cljc index 2b61e7ebc9..10f4326d92 100644 --- a/src/fluree/db/util/cbor.cljc +++ b/src/fluree/db/util/cbor.cljc @@ -15,8 +15,8 @@ (doseq [[k v] x] (.put m (if (keyword? k) (name k) k) (clj->java v))) m) - (vector? x) (java.util.ArrayList. (map clj->java x)) - (sequential? x) (java.util.ArrayList. (map clj->java x)) + (vector? x) (java.util.ArrayList. ^java.util.Collection (map clj->java x)) + (sequential? x) (java.util.ArrayList. ^java.util.Collection (map clj->java x)) (keyword? x) (name x) :else x))) @@ -25,7 +25,7 @@ [x] (cond (instance? java.util.Map x) - (into {} (for [e (.entrySet ^java.util.Map x)] + (into {} (for [^java.util.Map$Entry e (.entrySet ^java.util.Map x)] [(keyword (.getKey e)) (java->clj* (.getValue e))])) (instance? java.util.List x) From 7a58995dc5d759304d05af0bf388accad92cb0d4 Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 2 Oct 2025 17:22:49 -0400 Subject: [PATCH 09/13] Add handling for corrupted CBOR in cuckoo filter read operation --- src/fluree/db/indexer/cuckoo.cljc | 14 +++++--- .../db/indexer/cuckoo_roundtrip_test.clj | 32 +++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/fluree/db/indexer/cuckoo.cljc b/src/fluree/db/indexer/cuckoo.cljc index 1c80099e78..95044ae6c6 100644 --- a/src/fluree/db/indexer/cuckoo.cljc +++ b/src/fluree/db/indexer/cuckoo.cljc @@ -342,11 +342,17 @@ (store/get-content-store storage ::store/default))] (try* (when-let [cbor-bytes ( Date: Thu, 2 Oct 2025 17:39:52 -0400 Subject: [PATCH 10/13] noop for cbor in browser environments --- src/fluree/db/indexer/cuckoo.cljc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/fluree/db/indexer/cuckoo.cljc b/src/fluree/db/indexer/cuckoo.cljc index 95044ae6c6..60b9e16436 100644 --- a/src/fluree/db/indexer/cuckoo.cljc +++ b/src/fluree/db/indexer/cuckoo.cljc @@ -316,10 +316,12 @@ (str ledger-alias "/index/cuckoo/" branch-name)) (defn write-filter - "Writes cuckoo filter to storage as CBOR binary data." + "Writes cuckoo filter to storage as CBOR binary data. + No-op in CLJS environments without CBOR support." [index-catalog ledger-alias branch-name t filter] (go-try - (when (and index-catalog (:storage index-catalog) filter) + (when (and cbor/cbor-available? + index-catalog (:storage index-catalog) filter) (let [serialized (assoc filter :t t) cbor-bytes (cbor/encode serialized) path (filter-storage-path ledger-alias branch-name) @@ -331,10 +333,11 @@ (defn read-filter "Reads cuckoo filter from storage, decoding CBOR binary data. - Returns nil if filter doesn't exist." + Returns nil if filter doesn't exist or in CLJS environments without CBOR support." [index-catalog ledger-alias branch-name] (go-try - (when (and index-catalog (:storage index-catalog) ledger-alias branch-name) + (when (and cbor/cbor-available? + index-catalog (:storage index-catalog) ledger-alias branch-name) (let [path (filter-storage-path ledger-alias branch-name) storage (:storage index-catalog) store (if (satisfies? store/ByteStore storage) From b54105f275e6f26b4b0225221f9123f144d8ca21 Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 2 Oct 2025 18:31:50 -0400 Subject: [PATCH 11/13] Refactor CBOR tests to ensure availability checks are consistent across platforms --- test/fluree/db/util/cbor_test.cljc | 85 ++++++++++++++++-------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/test/fluree/db/util/cbor_test.cljc b/test/fluree/db/util/cbor_test.cljc index d58a2a6744..8a0171a6f3 100644 --- a/test/fluree/db/util/cbor_test.cljc +++ b/test/fluree/db/util/cbor_test.cljc @@ -4,52 +4,59 @@ (deftest cbor-availability-test (testing "CBOR is available on this platform" - (is (true? cbor/cbor-available?) - "CBOR should be available on JVM and Node.js"))) + #?(:clj (is (true? cbor/cbor-available?) + "CBOR should be available on JVM") + :cljs (is (boolean? cbor/cbor-available?) + "CBOR availability should be detectable in CLJS")))) (deftest cbor-encode-decode-test - (testing "Basic CBOR encoding and decoding" - (let [data {:name "Alice" - :age 30 - :tags ["developer" "clojure"]} - encoded (cbor/encode data) - decoded (cbor/decode encoded)] - (is (some? encoded) "Encoding should produce bytes") - (is (= data decoded) "Decoded data should match original")))) + (when cbor/cbor-available? + (testing "Basic CBOR encoding and decoding" + (let [data {:name "Alice" + :age 30 + :tags ["developer" "clojure"]} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (some? encoded) "Encoding should produce bytes") + (is (= data decoded) "Decoded data should match original"))))) (deftest cbor-nested-data-test - (testing "CBOR handles nested data structures" - (let [data {:user {:name "Bob" - :email "bob@example.com"} - :prefs {:theme "dark" - :notifications true} - :count 42} - encoded (cbor/encode data) - decoded (cbor/decode encoded)] - (is (= data decoded) "Nested structures should round-trip correctly")))) + (when cbor/cbor-available? + (testing "CBOR handles nested data structures" + (let [data {:user {:name "Bob" + :email "bob@example.com"} + :prefs {:theme "dark" + :notifications true} + :count 42} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Nested structures should round-trip correctly"))))) (deftest cbor-arrays-test - (testing "CBOR handles arrays/vectors" - (let [data {:items [1 2 3 4 5] - :names ["alice" "bob" "charlie"]} - encoded (cbor/encode data) - decoded (cbor/decode encoded)] - (is (= data decoded) "Arrays should round-trip correctly")))) + (when cbor/cbor-available? + (testing "CBOR handles arrays/vectors" + (let [data {:items [1 2 3 4 5] + :names ["alice" "bob" "charlie"]} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Arrays should round-trip correctly"))))) (deftest cbor-nil-values-test - (testing "CBOR handles nil values" - (let [data {:field1 "value" - :field2 nil - :field3 "other"} - encoded (cbor/encode data) - decoded (cbor/decode encoded)] - (is (= data decoded) "Nil values should be preserved")))) + (when cbor/cbor-available? + (testing "CBOR handles nil values" + (let [data {:field1 "value" + :field2 nil + :field3 "other"} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Nil values should be preserved"))))) (deftest cbor-empty-collections-test - (testing "CBOR handles empty collections" - (let [data {:empty-map {} - :empty-vec [] - :value 123} - encoded (cbor/encode data) - decoded (cbor/decode encoded)] - (is (= data decoded) "Empty collections should round-trip correctly")))) + (when cbor/cbor-available? + (testing "CBOR handles empty collections" + (let [data {:empty-map {} + :empty-vec [] + :value 123} + encoded (cbor/encode data) + decoded (cbor/decode encoded)] + (is (= data decoded) "Empty collections should round-trip correctly"))))) From 0ecdc5b8eeb8e47697dab9b3854f6edf506d5b21 Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 2 Oct 2025 19:07:36 -0400 Subject: [PATCH 12/13] Fix typo in error message for unknown ledger address --- docs/cuckoo-filter-gc-strategy.md | 29 ++++++++++++++++------------- src/fluree/db/connection.cljc | 2 +- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/cuckoo-filter-gc-strategy.md b/docs/cuckoo-filter-gc-strategy.md index 5c84affc96..dfa5b8b29b 100644 --- a/docs/cuckoo-filter-gc-strategy.md +++ b/docs/cuckoo-filter-gc-strategy.md @@ -28,7 +28,7 @@ Fluree's content-addressed storage means multiple branches can share the same in ### Filter Storage and Lifecycle Each branch maintains its own cuckoo filter stored at: ``` -ledger-name/index/cuckoo/branch-name.json +ledger-name/index/cuckoo/branch-name.cbor ``` **Filter Updates:** @@ -61,9 +61,11 @@ When garbage collection runs: ## Performance Characteristics ### Memory Usage -At ~95% load factor: -- **16-bit fingerprints**: ~6 bytes per index segment (using JSON with EDN-encoded bucket arrays) -- **Future optimization**: Binary packing with base64 encoding could reduce to ~2.8 bytes per segment +Actual measurements with realistic hash distribution: +- **16-bit fingerprints**: ~4.6 bytes per index segment (measured with 50K well-distributed segments) +- **Storage format**: CBOR binary encoding with bucket arrays +- **Load factor**: Typically 47-95% with good hash distribution +- **Note**: Slightly higher than theoretical 2.8 bytes due to CBOR structure overhead and empty slots ### False Positive Rates - **16-bit fingerprints**: ~0.012% (1 in ~8,200) @@ -74,15 +76,15 @@ Using realistic index characteristics (200KB average leaf size, ~300 branch fano | Database Size | Estimated Segments | Filter Size (16-bit) | Expected FP Rate | |---------------|-------------------|---------------------|------------------| -| 100MB | ~502 | ~3KB | ~0.012% | -| 1GB | ~5,017 | ~29KB | ~0.012% | -| 10GB | ~50,167 | ~293KB | ~0.012% | -| 100GB | ~501,667 | ~2.9MB | ~0.012% | -| 1TB | ~5,016,667 | ~29MB | ~0.012% | +| 100MB | ~502 | ~2.3KB | ~0.012% | +| 1GB | ~5,017 | ~23KB | ~0.012% | +| 10GB | ~50,167 | ~224KB | ~0.012% | +| 100GB | ~501,667 | ~2.2MB | ~0.012% | +| 1TB | ~5,016,667 | ~22MB | ~0.012% | **Calculations:** - Segments = `(DB_size / 200KB) + (segments / 300)` (leaves + branches) -- Filter size ≈ `segments × 6 bytes` (current JSON/EDN serialization) +- Filter size ≈ `segments × 4.6 bytes` (measured with realistic SHA-256 hash distribution) ### Runtime Performance - **Hash operations**: ~1 microsecond per segment @@ -132,10 +134,11 @@ During garbage collection: - **16-bit fixed size**: Provides good balance of memory efficiency and low false positive rate (~0.012%) ### 3. Storage Strategy -- **Per-branch filters**: Each branch maintains its own filter at `ledger/index/cuckoo/branch.json` -- **JSON serialization**: Human-readable format with EDN-encoded bucket arrays +- **Per-branch filters**: Each branch maintains its own filter at `ledger/index/cuckoo/branch.cbor` +- **CBOR serialization**: Binary format with bucket array storage (~68 bytes per segment measured) +- **Filter chains**: Automatically creates new filters as needed to handle growth - **Atomic updates**: Filters are written atomically during index completion -- **Future optimization**: Binary packing could reduce storage by ~50% +- **Platform support**: CBOR available on JVM and Node.js (gracefully degrades on browsers) ### 4. Concurrency and Consistency - **No locking required**: Filters are read-only during garbage collection diff --git a/src/fluree/db/connection.cljc b/src/fluree/db/connection.cljc index 1fa8680278..bff7176278 100644 --- a/src/fluree/db/connection.cljc +++ b/src/fluree/db/connection.cljc @@ -406,7 +406,7 @@ (async/put! ledger-chan ledger) ledger) (throw (ex-info (str "Unable to load. No record of ledger at address: " ledger-address " exists.") - {:status 404, :error :db/unkown-address}))))) + {:status 404, :error :db/unknown-address}))))) (defn load-ledger-address [conn address] From 1ada7f7bab22e15d7a407fa8bb74660882111a9e Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 2 Oct 2025 19:16:20 -0400 Subject: [PATCH 13/13] Refactor cuckoo filter initialization and garbage cleanup comments for clarity --- src/fluree/db/connection.cljc | 2 +- src/fluree/db/indexer/garbage.cljc | 3 +-- src/fluree/db/ledger.cljc | 16 ++++++++++------ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/fluree/db/connection.cljc b/src/fluree/db/connection.cljc index bff7176278..eee319fdff 100644 --- a/src/fluree/db/connection.cljc +++ b/src/fluree/db/connection.cljc @@ -515,7 +515,7 @@ (