Use the native browser streams API, but with a nicer wrapper.
See experimental-fast-webstreams for use in Node.
npm i -S @substrate-system/streamimport { S } from '@substrate-system/stream';
// Chain operations like array methods
const result = await S.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.skip(2) // skip first 2: [3, 4, 5, 6, 7, 8, 9, 10]
.filter(x => x % 2 === 0) // keep evens: [4, 6, 8, 10]
.map(x => x * 2) // double: [8, 12, 16, 20]
.take(3) // first 3: [8, 12, 16]
.toArray();
console.log(result);
// [8, 12, 16]
// Terminal methods
const sum = await S.from([1, 2, 3, 4])
.reduce((acc, x) => acc + x, 0);
// 10
const found = await S.from([1, 2, 3, 4, 5])
.find(x => x > 3);
// 4
const hasEven = await S.from([1, 3, 5, 6])
.some(x => x % 2 === 0);
// true
const allPositive = await S.from([1, 2, 3])
.every(x => x > 0);
// true
// scan - like reduce but emits intermediate values
const runningTotals = await S.from([1, 2, 3])
.scan((acc, x) => acc + x, 0)
.toArray();
// [1, 3, 6]
const withInitial = await S.from([1, 2, 3])
.scan((acc, x) => acc + x, 10)
.toArray();
// [11, 13, 16]Wrap a ReadableStream with chainable array-like methods. This provides a
fluent API like with arrays, but for streams. The predicate functions
can all be async too.
function S<T> (readable:ReadableStream<T>):EnhancedStream<T>Create an EnhancedStream from an array or iterable:
S.from<T>(iterable:Iterable<T>|AsyncIterable<T>):EnhancedStream<T>const result = await S.from([1, 2, 3])
.filter(x => x > 1)
.map(x => x * 2)
.toArray();
// [4, 6]
// Works with async iterables too
async function* generate() {
yield 1;
yield 2;
yield 3;
}
const asyncResult = await S.from(generate())
.map(x => x * 10)
.toArray();
// [10, 20, 30]Like array.map. Transform each chunk using a mapping function.
The function can be sync or async.
map<U> (fn:(item:T) => U|Promise<U>):EnhancedStream<U>const doubled = await S.from([1, 2, 3])
.map(x => x * 2)
.toArray();
// [2, 4, 6]
// Async mapping works too
const parsed = await S.from(['1', '2', '3'])
.map(async s => JSON.parse(s))
.toArray();
// [1, 2, 3]Like array.filter — keep only chunks that satisfy a predicate.
filter (predicate:(item:T) => boolean|Promise<boolean>):EnhancedStream<T>const evens = await S.from([1, 2, 3, 4, 5, 6])
.filter(x => x % 2 === 0)
.toArray();
// [2, 4, 6]For side effects.
forEach (fn:(item:T) => void|Promise<void>):EnhancedStream<T>const result = await S.from([1, 2, 3])
.forEach(x => console.log('processing', x))
.map(x => x * 10)
.toArray();
// logs: processing 1, processing 2, processing 3
// [10, 20, 30]Skip the first n chunks and pass through the rest.
skip (n:number):EnhancedStream<T>const skipped = await S.from([1, 2, 3, 4, 5])
.skip(2)
.toArray();
// [3, 4, 5]Take the first n chunks from the stream and then terminate it. Useful for
limiting output or short-circuiting a long or infinite stream.
take (n:number):EnhancedStream<T>// First 3 items
const first3 = await S.from([10, 20, 30, 40, 50])
.take(3)
.toArray();
// [10, 20, 30]
// Composable with other methods
const result = await S.from([1, 2, 3, 4, 5, 6, 7, 8])
.filter(x => x % 2 === 0) // evens: [2, 4, 6, 8]
.take(2) // first 2 evens: [2, 4]
.toArray();
// [2, 4]Like reduce, but emits each intermediate accumulated values instead of only
the final result. Useful for state machines, or any case where
you need to see intermediate states. See
reactivex.io/scan
scan<U>(fn:(acc:U, item:T) => U|Promise<U>, initial:U):EnhancedStream<U>// Running totals
const totals = await S.from([1, 2, 3, 4])
.scan((acc, x) => acc + x, 0)
.toArray();
// [1, 3, 6, 10]
// Step by step: 0+1=1, 1+2=3, 3+3=6, 6+4=10
// With different initial value
const fromTen = await S.from([1, 2, 3])
.scan((acc, x) => acc + x, 10)
.toArray();
// [11, 13, 16]
// Building up an array
const accumulated = await S.from(['a', 'b', 'c'])
.scan((acc, x) => [...acc, x], [] as string[])
.toArray();
// [['a'], ['a', 'b'], ['a', 'b', 'c']]
// Can be chained with other methods
const filtered = await S.from([1, 2, 3, 4, 5])
.scan((acc, x) => acc + x, 0)
.filter(x => x > 5)
.toArray();
// [6, 10, 15]Reduce the stream to a single value, like Array.prototype.reduce. The
function can be async.
reduce<U> (fn:(acc:U, item:T) => U|Promise<U>, initial:U):Promise<U>const sum = await S.from([1, 2, 3, 4])
.reduce((acc, x) => acc + x, 0);
// 10Return the first chunk that satisfies the predicate, or undefined if none
match. The predicate can be async.
find (predicate:(item:T) => boolean|Promise<boolean>):Promise<T|undefined>const found = await S.from([1, 2, 3, 4, 5])
.find(x => x > 3);
// 4Return true if any chunk satisfies the predicate. Short-circuits on the first
match. The predicate can be async.
some (predicate:(item:T) => boolean|Promise<boolean>):Promise<boolean>const hasEven = await S.from([1, 3, 5, 6])
.some(x => x % 2 === 0);
// trueReturn true if every chunk satisfies the predicate. Short-circuits on the
first failure. The predicate can be async.
every (predicate:(item:T) => boolean|Promise<boolean>):Promise<boolean>const allPositive = await S.from([1, 2, 3])
.every(x => x > 0);
// trueCollect all chunks into an array.
toArray ():Promise<T[]>const arr = await S.from([1, 2, 3])
.map(x => x * 2)
.toArray();
// [2, 4, 6]Collect chunks and auto-concatenate based on type. Typed arrays (e.g.
Uint8Array) are concatenated into a single typed array, strings are joined,
and everything else is returned as an array.
collect ():Promise<any>// Strings are joined
const text = await S.from(['hello', ' ', 'world'])
.collect();
// 'hello world'
// Typed arrays are concatenated
const buf = await S.from([new Uint8Array([1, 2]), new Uint8Array([3])])
.collect();
// Uint8Array [1, 2, 3]Return the underlying ReadableStream. Useful for interop with native stream
APIs. The readable property provides the same access.
toStream ():ReadableStream<T>const stream = S.from([1, 2, 3]).toStream();
// ReadableStream<number>Node-specific helpers are available from the @substrate-system/stream/node
subpath.
import { fromFile, toFile } from '@substrate-system/stream/node'Create a ReadableStream from a file path or an existing FileHandle. The
returned stream has a fileHandle property so you can close it when done.
export type StreamWithHandle = ReadableStream<Uint8Array> & {
fileHandle:FileHandle;
};
fromFile (input:string|FileHandle):Promise<StreamWithHandle>import { fromFile } from '@substrate-system/stream/node'
// From a file path
const stream = await fromFile('./data.bin')
await stream.pipeTo(someWritableSink)
await stream.fileHandle.close()
// From an existing FileHandle
import { open } from 'node:fs/promises'
const fh = await open('./data.bin')
const stream = await fromFile(fh)
await stream.pipeTo(someWritableSink)
await fh.close()Create a WritableStream that writes bytes to a file. Accepts either a file
path string or a FileHandleLike object. On close, the file is truncated to
the number of bytes written. On abort, the handle is closed without truncating.
toFile (input:string|FileHandleLike):Promise<WritableStream<Uint8Array>>import { toFile } from '@substrate-system/stream/node'
// From a file path
await someReadableStream.pipeTo(await toFile('./result.bin'))
// From an existing FileHandle
import { open } from 'node:fs/promises'
const fh = await open('./result.bin', 'w')
await someReadableStream.pipeTo(await toFile(fh))This exposes ESM and common JS via
package.json exports field.
import { S, EnhancedStream } from '@substrate-system/stream'
import { fromFile, toFile } from '@substrate-system/stream/node'require('@substrate-system/stream')
require('@substrate-system/stream/node')This package exposes minified JS files too. Copy them to a location that is accessible to your web server, then link to them in HTML.
cp ./node_modules/@substrate-system/stream/dist/index.min.js ./public/stream.min.js<script type="module" src="./stream.min.js"></script>