Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/fluree/db/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,53 @@
(throw (ex-info "No nameservice available for querying"
{:status 400 :error :db/no-nameservice})))))))

;; Streaming Query APIs

(defn query-stream
"Streaming version of `query`. Returns a core.async channel that emits
individual results as produced, rather than collecting into a vector.

See `query` for query syntax and options. Key differences:
- Returns core.async channel instead of promise
- Emits results individually (reduces memory for large result sets)
- Caching not supported
- CONSTRUCT queries emit individual graph nodes (not grouped by subject)
- When :meta enabled, final item is {:_fluree-meta {...}}"
([ds q]
(query-stream ds q {}))
([ds q opts]
(cond
(util/exception? ds)
(async/to-chan! [ds])

(:cache opts)
(async/to-chan! [(ex-info "Streaming queries do not support caching"
{:status 400
:error :db/invalid-query
:message "Remove :cache option or use non-streaming query API"})])

:else
(query-api/query-stream ds q opts))))

(defn query-connection-stream
"Streaming version of `query-connection`. Returns a core.async channel that
emits individual results as produced.

See `query-connection` for query syntax and options. Key differences:
- Returns core.async channel instead of promise
- Emits results individually (reduces memory for large result sets)
- Caching not supported
- When :meta enabled, final item is {:_fluree-meta {...}}"
([conn q] (query-connection-stream conn q {}))
([conn q opts]
(validate-connection conn)
(if (:cache opts)
(async/to-chan! [(ex-info "Streaming queries do not support caching"
{:status 400
:error :db/invalid-query
:message "Remove :cache option or use non-streaming query API"})])
(query-api/query-connection-stream conn q opts))))

(defn history
"Queries the history of entities across commits.

Expand Down
133 changes: 132 additions & 1 deletion src/fluree/db/query/api.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns fluree.db.query.api
"Primary API ns for any user-invoked actions. Wrapped by language & use specific APIS
that are directly exposed"
(:require [fluree.db.connection :as connection]
(:require [clojure.core.async :as async]
[fluree.db.connection :as connection]
[fluree.db.dataset :as dataset :refer [dataset?]]
[fluree.db.json-ld.policy :as perm]
[fluree.db.ledger :as ledger]
Expand Down Expand Up @@ -288,3 +289,133 @@
(case format
:fql (query-connection-fql conn query override-opts)
:sparql (query-connection-sparql conn query override-opts)))

(defn query-fql-stream
"Internal streaming query implementation. Handles db restriction, policy tracking,
and metadata emission. Returns channel emitting individual results + optional
final :_fluree-meta map."
([ds query]
(query-fql-stream ds query nil))
([ds query override-opts]
(let [out-ch (async/chan)
track-init track/init
track-query? track/track-query?
track-tally track/tally]
(async/go
(try*
(let [{:keys [opts] :as query*} (-> query
syntax/coerce-query
(sanitize-query-options override-opts))
tracker (track-init opts)
track-meta? (track-query? opts)
ds* (if (dataset? ds)
ds
(async/<! (restrict-db ds tracker query*)))
query** (update query* :opts dissoc :meta :max-fuel)]

(if (util/exception? ds*)
(do
(async/>! out-ch ds*)
(async/close! out-ch))
(let [result-ch (async/<! (fql/query-stream ds* tracker query**))]
(if (util/exception? result-ch)
(do
(async/>! out-ch result-ch)
(async/close! out-ch))
(do
(loop []
(when-some [result (async/<! result-ch)]
(async/>! out-ch result)
(recur)))

(when track-meta?
(let [tally (track-tally tracker)]
(async/>! out-ch {:_fluree-meta (assoc tally :status 200)})))

(async/close! out-ch))))))
(catch* e
(async/>! out-ch e)
(async/close! out-ch))))
out-ch)))

(defn query-sparql-stream
"Converts SPARQL to FQL and delegates to query-fql-stream."
[db query override-opts]
(let [fql (sparql/->fql query)]
(query-fql-stream db fql override-opts)))

(defn query-stream
"Dispatches to query-fql-stream or query-sparql-stream based on :format option."
[db query {:keys [format] :as override-opts :or {format :fql}}]
(case format
:fql (query-fql-stream db query override-opts)
:sparql (query-sparql-stream db query override-opts)))

(defn query-connection-fql-stream
"Loads ledger(s) from connection and executes streaming query. Like
query-connection-fql but streams individual results instead of collecting."
[conn query override-opts]
(let [out-ch (async/chan)
track-init track/init
track-query? track/track-query?
track-tally track/tally]
(async/go
(try*
(let [{:keys [opts] :as sanitized-query} (-> query
syntax/coerce-query
(sanitize-query-options override-opts))
tracker (track-init opts)
track-meta? (track-query? opts)
default-aliases (some-> sanitized-query :from util/sequential)
named-aliases (some-> sanitized-query :from-named util/sequential)]

(log/debug "query-connection-fql-stream - from:" default-aliases
"from-named:" named-aliases)

(if (or (seq default-aliases) (seq named-aliases))
(let [ds (async/<! (load-dataset conn tracker default-aliases
named-aliases sanitized-query))
trimmed-query (update sanitized-query :opts dissoc :meta :max-fuel)]
(if (util/exception? ds)
(do
(async/>! out-ch ds)
(async/close! out-ch))
(let [result-ch (async/<! (fql/query-stream ds tracker trimmed-query))]
(if (util/exception? result-ch)
(do
(async/>! out-ch result-ch)
(async/close! out-ch))
(do
(loop []
(when-some [result (async/<! result-ch)]
(async/>! out-ch result)
(recur)))

(when track-meta?
(let [tally (track-tally tracker)]
(async/>! out-ch {:_fluree-meta (assoc tally :status 200)})))

(async/close! out-ch))))))

(throw (ex-info "Missing ledger specification in connection query"
{:status 400, :error :db/invalid-query}))))
(catch* e
(async/>! out-ch e)
(async/close! out-ch))))
out-ch))

(defn query-connection-sparql-stream
"Converts SPARQL to FQL and delegates to query-connection-fql-stream."
[conn query override-opts]
(let [fql (sparql/->fql query)]
(log/debug "query-connection-sparql-stream fql:" fql
"override-opts:" override-opts)
(query-connection-fql-stream conn fql override-opts)))

(defn query-connection-stream
"Dispatches to query-connection-fql-stream or query-connection-sparql-stream
based on :format option."
[conn query {:keys [format] :as override-opts :or {format :fql}}]
(case format
:fql (query-connection-fql-stream conn query override-opts)
:sparql (query-connection-sparql-stream conn query override-opts)))
29 changes: 29 additions & 0 deletions src/fluree/db/query/exec.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@
(paginate q)
(collect-results q)))

(defn execute-stream
"Internal streaming execution. Skips collect-results, emits individual results.
CONSTRUCT emits ungrouped nodes. SELECT ONE applies async/take 1.
ORDER BY, GROUP BY, and aggregates require collecting all results before emitting."
[ds tracker q error-ch]
(let [result-ch (->> (execute* ds tracker q error-ch)
(select/format ds q tracker error-ch)
(paginate q))]
;; For SELECT ONE, take only first result
(if (:select-one q)
(async/take 1 result-ch)
result-ch)))

;; TODO: refactor namespace heirarchy so this isn't necessary
(defn subquery-executor
"Closes over a subquery to allow processing the whole query pipeline from within the
Expand Down Expand Up @@ -115,3 +128,19 @@
(async/alt!
error-ch ([e] (async/close! error-ch) e)
result-ch ([result] result))))))

(defn query-stream
"Execute the parsed query `q` against the database value `db` and return a
channel that emits individual results as they are produced.

Returns an async channel which emits individual results. If an error occurs,
the error will be sent to a separate error channel (and the result channel
will be closed)."
([ds q]
(query-stream ds nil q))
([ds tracker q]
(let [error-ch (async/chan)
prepped-q (prep-subqueries q)
result-ch (execute-stream ds tracker prepped-q error-ch)]
result-ch)))

11 changes: 11 additions & 0 deletions src/fluree/db/query/fql.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@
(let [oq (<? (optimize/optimize ds pq))]
(<? (exec/query ds tracker oq)))))))))

(defn query-stream
"Parses query and delegates to exec/query-stream. Returns channel emitting
individual results. Caching not supported for streaming."
([ds query-map]
(query-stream ds nil query-map))
([ds tracker query-map]
(go-try
(let [pq (parse/parse-query query-map)
oq (<? (optimize/optimize ds pq))]
(exec/query-stream ds tracker oq)))))

(defn explain
"Returns query execution plan without executing the query.
Returns core async channel with query plan or exception."
Expand Down
Loading
Loading