DashKite River provides a unified functional interface for iterators and reactors (asynchronous iterators). Every function is curried and automatically dispatches to the most efficient implementation.
These internal classes handle the buffering and coordination required for multi-consumer operations like tee and partition.
A synchronous FIFO queue that can be consumed as an iterator.
An asynchronous FIFO queue that can be consumed as a reactor.
Composes a buffer and an iterator, allowing for shared iterators that push values into a buffer until they’re ready to be consumed.
Composes a buffer and a reactor, allowing for shared reactors that push values into a buffer until they’re ready to be consumed.
map function, iterator → iterator
map function, reactor ⇢ reactor
Given a mapping function and a stream, returns a new stream of the same type where each item has been transformed by the function.
assert.deepEqual [ 2, 4, 6 ], collect map double, [ 1..3 ]resolve iterator → reactor
Converts an iterator of Promises into a reactor that yields the resolved values.
assert.deepEqual [ 1..5 ], await collect resolve promised [ 1..5 ]spread function, iterator → iterator
spread function, reactor ⇢ reactor
Maps a function over a stream where the function returns an iterable, then flattens the result.
clone = ( x ) -> [ x, x ]
assert.deepEqual [ 1, 1, 2, 2 ], collect spread clone, [ 1, 2 ]tap function, iterator → iterator
tap function, reactor ⇢ reactor
Executes a side-effect for each item in the stream, but returns a stream of the original items.
x = 0
f = -> x++
collect tap f, [ 1..5 ]
assert.equal 5, xtee iterator → [ iterator, iterator ]
tee reactor ⇢ [ reactor, reactor ]
Splits a single source into two identical streams. Uses internal buffering to ensure both consumers see all items.
[ a, b ] = tee [ 1..3 ]
assert.deepEqual [ 1..3 ], collect a
assert.deepEqual [ 1..3 ], collect bpartition predicate, iterator → [ iterator, iterator ]
partition predicate, reactor ⇢ [ reactor, reactor ]
Splits a stream into two: the first yielding items that pass the predicate, and the second yielding those that do not.
[ odds, evens ] = partition odd, [ 1..4 ]
assert.deepEqual [ 1, 3 ], collect odds
assert.deepEqual [ 2, 4 ], collect evensreject predicate, iterator → iterator
reject predicate, reactor ⇢ reactor
Produces a stream containing only the items that do not satisfy the predicate.
assert.deepEqual [ 1, 3, 5 ], collect reject even, [ 1..5 ]select predicate, iterator → iterator
select predicate, reactor ⇢ reactor
Produces a stream containing only the items that satisfy the predicate.
assert.deepEqual [ 1, 3, 5 ], collect select odd, [ 1..5 ]unique iterator → iterator
unique reactor ⇢ reactor
Filters out duplicate items from the stream.
assert.deepEqual [ 1, 2 ], collect unique [ 1, 1, 2, 2 ]uniquely selector, iterator → iterator
uniquely selector, reactor ⇢ reactor
Filters items based on a uniqueness key produced by the selector function.
list = [ {id: 1, name: 'a'}, {id: 1, name: 'b'} ]
getId = (obj) -> obj.id
assert.deepEqual [ list[0] ], collect uniquely getId, listdrop n, iterator → iterator drop predicate, iterator → iterator
drop n, reactor ⇢ reactor drop predicate, reactor ⇢ reactor
Discards the first n items or the first items that satisfy the predicate, and returns a stream of the remainder.
assert.deepEqual [ 3, 4 ], collect drop 2, [ 1..4 ]take n, iterator → iterator take predicate, iterator → iterator
take n, reactor ⇢ reactor drop predicate, reactor ⇢ reactor
Returns a stream consisting of only the first n items or the first items that satisfy the predicate.
assert.deepEqual [ 1, 2 ], collect take 2, [ 1..5 ]merge iterator, iterator → iterator
merge reactor, reactor ⇢ reactor
Interleaves items from two streams into a single stream.
assert.deepEqual [ 1, 3, 2, 4 ], collect merge [ 1, 2 ], [ 3, 4 ]zip iterator, iterator → iterator
zip reactor, reactor ⇢ reactor
Combines two streams into a stream of pairs (2-element arrays).
assert.deepEqual [[ 1, 3 ], [ 2, 4 ]], collect zip [ 1, 2 ], [ 3, 4 ]all predicate, iterator → boolean
all predicate, reactor ⇢ boolean
Returns true if every item passes the predicate. Short-circuits on first failure.
assert.equal false, all odd, [ 1..5 ]any predicate, iterator → boolean
any predicate, reactor ⇢ boolean
Returns true if at least one item passes the predicate. Short-circuits on first success.
assert.equal true, any even, [ 1..3 ]collect iterator → array
collect reactor ⇢ array
Exhausts the stream and returns all items in an array.
assert.deepEqual [ 1, 2 ], collect [ 1, 2 ].values()each function, iterator → undefined
each function, reactor ⇢ undefined
Iterates over the stream and applies the function to each item for side effects.
x = 0
each ((y) -> x += y), [ 1..3 ]
assert.equal 6, xfind predicate, iterator → value | undefined
find predicate, reactor ⇢ value | undefined
Returns the first item that satisfies the predicate.
assert.equal 2, find even, [ 1..3 ]group selector, iterator → Map
group selector, reactor ⇢ Map
Categorizes items into a Map where keys are generated by the selector.
result = group parity, [ 1..4 ]reduce initial, accumulator, iterator → value
reduce initial, accumulator, reactor ⇢ value
Standard reduction of a stream to a single value.
assert.equal 6, reduce 0, sum, [ 1..3 ]start iterator → undefined
start reactor ⇢ undefined
Drives a stream to completion without yielding products. Useful for lazy streams with side effects.
start logger