Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c6bcc4b
account for equivalent and sub properties in property joins
zonotope Oct 24, 2025
478274b
add min-* functions; add docstrings
zonotope Oct 24, 2025
ab785ce
move class-ids function closer to where it's used
zonotope Oct 24, 2025
1b9f9c7
prefer go-loop over (go (loop ...))
zonotope Oct 24, 2025
09d200e
add function to compute the outer join of a sequence of channels
zonotope Oct 24, 2025
8f49d8c
function to compute cartesian product of a sequence of collections
zonotope Oct 27, 2025
8b3af72
add explicit function to compute the first sid of a collection
zonotope Oct 27, 2025
c5a4d4c
use outer-join to account for multi-cardinality properties
zonotope Oct 28, 2025
4743870
update docstrings
zonotope Oct 28, 2025
7873fc8
keep related triples as a set of triple groups for comparison
zonotope Oct 28, 2025
4f49574
Merge branch 'feature/conditional-subject-joins' into feature/group-s…
zonotope Oct 30, 2025
68b59bd
Merge branch 'feature/conditional-subject-joins' into feature/group-s…
zonotope Oct 31, 2025
5b39b63
add dedicated function to sort triples for optimization
zonotope Oct 31, 2025
4d9659b
add try-coerce-triple fn
zonotope Oct 31, 2025
654abc0
Move compare-component definition to where its used
zonotope Nov 4, 2025
55cd007
temporarily remove simple-property-join check
zonotope Nov 4, 2025
be2b199
post parse property join detection
zonotope Nov 4, 2025
f7eeb5b
add functions for detecting pattern types
zonotope Nov 4, 2025
5f0717e
use variables and iris when determining property join candidates
zonotope Nov 4, 2025
c4ddf13
remove redundant clause
zonotope Nov 4, 2025
69d5728
skip virtual graph parsing (for now)
zonotope Nov 5, 2025
00e1171
more restrictive determination of pattern types
zonotope Nov 5, 2025
9424cc7
explicitly check for a sequence of triple patterns
zonotope Nov 5, 2025
9f62746
add functions to extract subject variables and predicate iris
zonotope Nov 5, 2025
b1e920c
collect all triples with the same subject var, not just consecutive
zonotope Nov 5, 2025
ef10220
remove redundant conditions
zonotope Nov 6, 2025
c32c754
simplify triple grouping
zonotope Nov 6, 2025
365c823
remove unnecessary property-join-candidate? check
zonotope Nov 6, 2025
da11836
add explicit predicate fn for groups of triples
zonotope Nov 6, 2025
4bb6efe
more descriptive name for property-join function; cleanup
zonotope Nov 6, 2025
26cc1b4
Merge branch 'refactor/flake-db-optimize' into feature/group-subject-…
zonotope Nov 11, 2025
b321267
run triple-group check first to avoid compound pattern for groups
zonotope Nov 12, 2025
233c41b
include entire pattern in unknown pattern type error message
zonotope Nov 12, 2025
19e5485
include subclasses for class patterns in property joins
zonotope Nov 12, 2025
4280c02
don't repeat original property when including subproperties
zonotope Nov 17, 2025
8b1c11c
don't merge property groups
zonotope Nov 17, 2025
0051945
wip: aggressive optimization
zonotope Nov 18, 2025
ade6d1e
leave single element triple groups as raw triples
zonotope Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/fluree/db/async_db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@
(>! error-ch e))))
match-ch))

(-match-properties [_ tracker solution triples error-ch]
(let [match-ch (async/chan)]
(go
(try*
(let [db (<? db-chan)]
(-> db
(where/-match-properties tracker solution triples error-ch)
(async/pipe match-ch)))
(catch* e
(log/error e "Error loading database")
(>! error-ch e))))
match-ch))

(-activate-alias [_ alias']
(go-try
(let [db (<? db-chan)]
Expand Down
7 changes: 7 additions & 0 deletions src/fluree/db/dataset.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@
(where/-match-class active-graph tracker solution triple error-ch))
empty-channel))

(-match-properties [ds tracker solution patterns error-ch]
(if-let [active-graph (get-active-graph ds)]
(if (sequential? active-graph)
(where/match-patterns ds tracker solution patterns error-ch)
(where/-match-properties active-graph tracker solution patterns error-ch))
empty-channel))

(-activate-alias [ds alias]
(go-try
(activate ds alias)))
Expand Down
9 changes: 9 additions & 0 deletions src/fluree/db/flake.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@
(cmp-op (op f1) (op f2))
(cmp-meta (m f1) (m f2))))

(defn cmp-flakes-psot [f1 f2]
(combine-cmp
(cmp-pred (p f1) (p f2))
(cmp-subj (s f1) (s f2))
(cmp-obj (o f1) (dt f1) (o f2) (dt f2))
(cmp-tx (t f1) (t f2))
(cmp-op (op f1) (op f2))
(cmp-meta (m f1) (m f2))))

(defn cmp-flakes-post [f1 f2]
(combine-cmp
(cmp-pred (p f1) (p f2))
Expand Down
91 changes: 59 additions & 32 deletions src/fluree/db/flake/commit_data.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[fluree.db.constants :as const]
[fluree.db.datatype :as datatype]
[fluree.db.flake :as flake]
[fluree.db.flake.index :as index]
[fluree.db.json-ld.iri :as iri]
[fluree.db.query.exec.update :as update]
[fluree.db.query.exec.where :as where]
Expand Down Expand Up @@ -177,16 +178,17 @@
issuer (assoc :issuer {:id (get-id issuer)}))))

(defn update-index-roots
[commit-map {:keys [spot post opst tspo]}]
[commit-map index-map]
(if (contains? commit-map :index)
(update commit-map :index assoc :spot spot, :post post, :opst opst, :tspo tspo)
(let [index-map* (index/select-roots index-map)]
(update commit-map :index merge index-map*))
commit-map))

(defn json-ld->map
([commit-jsonld index-roots]
(json-ld->map commit-jsonld nil index-roots))
([commit-jsonld index-map]
(json-ld->map commit-jsonld nil index-map))

([commit-jsonld fallback-address index-roots]
([commit-jsonld fallback-address index-map]
(let [commit-map (jsonld->clj commit-jsonld)]
(cond-> commit-map
(and (some? fallback-address)
Expand All @@ -195,7 +197,7 @@
; IPFS, is empty string

true
(update-index-roots index-roots)))))
(update-index-roots index-map)))))

(defn hash->commit-id
[hsh]
Expand Down Expand Up @@ -351,60 +353,85 @@
add (+ (flake/size-bytes add))
rem (- (flake/size-bytes rem))))

(defn revise-novelty-set
[novelty add rem]
#?(:clj (future (flake/revise novelty add rem))
:cljs (flake/revise novelty add rem)))

(defn revise-index-novelty
[novelty add rem]
(let [index-novelty (index/select-roots novelty)]
(reduce-kv (fn [m idx novelty]
(let [novelty* (if (= :opst idx)
(revise-novelty-set novelty (ref-flakes add) (ref-flakes rem))
(revise-novelty-set novelty add rem))]
(assoc m idx novelty*)))
{} index-novelty)))

(defn merge-index-novelty
[db index-novelty]
(reduce-kv (fn [db* idx novelty]
(assoc-in db* [:novelty idx] #?(:clj @novelty
:cljs novelty)))
db index-novelty))

(defn update-novelty
([db add]
(update-novelty db add []))

([{:keys [t] :as db} add rem]
([{:keys [t novelty] :as db} add rem]
(try*
(let [flake-count (cond-> 0
add (+ (count add))
rem (- (count rem)))

;; launch futures for parallellism on JVM
flake-size #?(:clj (future (calc-flake-size add rem))
:cljs (calc-flake-size add rem))
post #?(:clj (future (flake/revise (get-in db [:novelty :post]) add rem))
:cljs (flake/revise (get-in db [:novelty :post]) add rem))
opst #?(:clj (future (flake/revise (get-in db [:novelty :opst]) (ref-flakes add) (ref-flakes rem)))
:cljs (flake/revise (get-in db [:novelty :opst]) (ref-flakes add) (ref-flakes rem)))]
flake-size #?(:clj (future (calc-flake-size add rem))
:cljs (calc-flake-size add rem))
index-novelty (revise-index-novelty novelty add rem)]
(-> db
(update-in [:novelty :spot] flake/revise add rem)
(update-in [:novelty :tspo] flake/revise add rem)
(assoc-in [:novelty :post] #?(:clj @post
:cljs post))
(assoc-in [:novelty :opst] #?(:clj @opst
:cljs opst))
(update-in [:novelty :size] + #?(:clj @flake-size
:cljs flake-size))
(assoc-in [:novelty :t] t)
(update-in [:stats :size] + #?(:clj @flake-size
:cljs flake-size))
(update-in [:stats :flakes] + flake-count)))
(update-in [:stats :flakes] + flake-count)
(merge-index-novelty index-novelty)))
(catch* e
(log/error (str "Update novelty unexpected error while attempting to updated db: "
(pr-str db) " due to exception: " (ex-message e))
{:add-flakes add
:rem-flakes rem})
(throw e)))))

(defn add-child-tt-ids
[children tt-id]
(reduce-kv (fn [children* k child]
(assoc children* k (assoc child :tt-id tt-id)))
(empty children) children))

(defn add-root-tt-id
[root tt-id]
(-> root
(update :children add-child-tt-ids tt-id)
(assoc :tt-id tt-id)))

(defn add-index-tt-ids
[db tt-id]
(let [indexes (index/indexes-for db)]
(reduce (fn [db* idx]
(update db* idx add-root-tt-id tt-id))
db indexes)))

(defn add-tt-id
"Associates a unique tt-id for any in-memory staged db in their index roots.
tt-id is used as part of the caching key, by having this in place it means
that even though the 't' value hasn't changed it will cache each stage db
data as its own entity."
[db]
(let [tt-id (random-uuid)
indexes [:spot :post :opst :tspo]]
(-> (reduce
(fn [db* idx]
(let [{:keys [children] :as node} (get db* idx)
children* (reduce-kv
(fn [children* k v]
(assoc children* k (assoc v :tt-id tt-id)))
(empty children) children)]
(assoc db* idx (assoc node :tt-id tt-id
:children children*))))
db indexes)
(let [tt-id (random-uuid)]
(-> db
(add-index-tt-ids tt-id)
(assoc :tt-id tt-id))))

(defn commit-metadata-flakes
Expand Down
62 changes: 33 additions & 29 deletions src/fluree/db/flake/flake_db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@

(defn empty-all-novelty
[db]
(let [cleared (reduce (fn [db* idx]
(update-in db* [:novelty idx] empty))
db index/types)]
(let [cleared (->> (index/indexes-for db)
(reduce (fn [db* idx]
(update-in db* [:novelty idx] empty))
db))]
(assoc-in cleared [:novelty :size] 0)))

(defn novelty-after-t
Expand All @@ -67,19 +68,19 @@
(empty-all-novelty db)

(flake/t-before? t (:t db))
(let [novelty (reduce (fn [acc idx]
(let [indexes (index/indexes-for db)
novelty (reduce (fn [acc idx]
(assoc acc idx
#?(:clj (future (novelty-after-t db t idx))
:cljs (novelty-after-t db t idx))))
{} index/types)
{} indexes)
size (flake/size-bytes #?(:clj @(:spot novelty)
:cljs (:spot novelty)))
db* (reduce
(fn [db* idx]
(assoc-in db* [:novelty idx] #?(:clj @(get novelty idx)
:cljs (get novelty idx))))
(assoc-in db [:novelty :size] size)
index/types)]
db* (reduce (fn [db* idx]
(assoc-in db* [:novelty idx] #?(:clj @(get novelty idx)
:cljs (get novelty idx))))
(assoc-in db [:novelty :size] size)
indexes)]
db*)

:else
Expand All @@ -99,18 +100,15 @@
(defn index-update
"If provided commit-index is newer than db's commit index, updates db by cleaning novelty.
If it is not newer, returns original db."
[{:keys [commit] :as db} {data-map :data, :keys [spot psot post opst tspo] :as index-map}]
[{:keys [commit] :as db} {data-map :data, :as index-map}]
(if (newer-index? commit index-map)
(let [index-t (:t data-map)
commit* (assoc commit :index index-map)]
(let [index-t (:t data-map)
commit* (assoc commit :index index-map)
index-roots (index/select-roots index-map)]
(-> db
(empty-novelty index-t)
(assoc :commit commit*
:spot spot
:psot psot
:post post
:opst opst
:tspo tspo)
(assoc :commit commit*)
(merge index-roots)
(assoc-in [:stats :indexed] index-t)))
db))

Expand Down Expand Up @@ -329,6 +327,9 @@
(-match-triple [db tracker solution triple-mch error-ch]
(match/match-triple db tracker solution triple-mch error-ch))

(-match-properties [db tracker solution triple-mchs error-ch]
(match/match-properties db tracker solution triple-mchs error-ch))

(-match-class [db tracker solution class-mch error-ch]
(match/match-class db tracker solution class-mch error-ch))

Expand Down Expand Up @@ -524,13 +525,10 @@

(defn new-novelty-map
[comparators]
(reduce
(fn [m idx]
(assoc m idx (-> comparators
(get idx)
flake/sorted-set-by)))
{:size 0
:t 0} index/types))
(reduce-kv (fn [m idx cmp]
(assoc m idx (flake/sorted-set-by cmp)))
{:size 0, :t 0}
comparators))

(defn genesis-root-map
[ledger-alias]
Expand Down Expand Up @@ -628,6 +626,11 @@
:reindex-max-bytes reindex-max-bytes
:max-old-indexes max-old-indexes)))

(defn root-comparators
[root-map]
(let [indexes (index/indexes-for root-map)]
(select-keys index/comparators indexes)))

;; TODO - VG - need to reify vg from db-root!!
(defn load
([ledger-alias commit-catalog index-catalog commit-pair]
Expand All @@ -640,6 +643,7 @@
root-map (if-let [{:keys [address]} (:index commit-map)]
(<? (index-storage/read-db-root index-catalog address))
(genesis-root-map ledger-alias))
comparators (root-comparators root-map)
max-ns-code (-> root-map :namespace-codes iri/get-max-namespace-code)
indexed-db (-> root-map
(add-reindex-thresholds indexing-opts)
Expand All @@ -648,9 +652,9 @@
:alias ledger-alias
:commit commit-map
:tt-id nil
:comparators index/comparators
:comparators comparators
:staged nil
:novelty (new-novelty-map index/comparators)
:novelty (new-novelty-map comparators)
:max-namespace-code max-ns-code)
map->FlakeDB
policy/root)
Expand Down
1 change: 1 addition & 0 deletions src/fluree/db/flake/history.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
idx (index/for-components s p o nil)
pattern (case idx
:spot [s p o]
:psot [p s o]
:post [p o s]
:opst [o p s])]
[pattern idx])))
Expand Down
22 changes: 20 additions & 2 deletions src/fluree/db/flake/index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,36 @@
[fluree.db.util.log :as log :include-macros true]))

(def comparators
"Map of default index comparators for the four index types"
"Map of default index comparators for the five index types"
(array-map ;; when using futures, can base other calcs on :spot (e.g. size), so ensure comes first
:spot flake/cmp-flakes-spot
:psot flake/cmp-flakes-psot
:post flake/cmp-flakes-post
:opst flake/cmp-flakes-opst
:tspo flake/cmp-flakes-block))

(def types
"The four possible index orderings based on the subject, predicate, object,
"The five possible index orderings based on the subject, predicate, object,
and transaction flake attributes"
(-> comparators keys vec))

(defn supports?
[indexed idx]
(-> indexed (get idx) some?))

(defn indexes-for
"Return a sequence of the indexes supported by `indexed`"
[indexed]
(filter (partial supports? indexed)
types))

(defn select-roots
"Return a map with keys of indexes from `types` supported by `indexed`, and
values of the index root node for that index"
[indexed]
(let [index-keys (indexes-for indexed)]
(select-keys indexed index-keys)))

(defn reference?
[dt]
(= dt const/$id))
Expand Down
Loading
Loading