@substrate-system/stream
    Preparing search index...

    @substrate-system/stream

    @substrate-system/stream

    tests types module semantic versioning Common Changelog install size gzip size dependencies license

    Use the native browser streams API, but with a nicer wrapper.

    See a live demo

    Contents

    npm i -S @substrate-system/stream
    
    import { from, through, collect } from '@substrate-system/stream';

    // Create a pipeline that transforms numbers
    const pipeline = from([1, 2, 3, 4, 5])
    .pipe(through(x => x * 2)) // double each number
    .pipe(through(x => x + 1)) // add 1
    .pipe(through(x => `Number: ${x}`)); // convert to string

    const result = await collect(pipeline);

    console.log(result);
    // ['Number: 3', 'Number: 5', 'Number: 7', 'Number: 9', 'Number: 11']
    import { Stream, through } from '@substrate-system/stream';

    // Fetch and process text line by line
    const response = await fetch('data.txt');

    const pipeline = Stream(response.body.pipeThrough(new TextDecoderStream()))
    .pipe(through(text => text.split('\n')))
    .pipe(through(lines => lines.map(line => line.trim())))
    .pipe(through(lines => lines.map(line => line.toUpperCase())));

    await pipeline.pipeTo(new WritableStream({
    write(lines) {
    lines.forEach(line => console.log(line));
    }
    }));
    import { from, through, transform, collect } from '@substrate-system/stream';

    const users = [
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
    { name: 'Charlie', age: 35 }
    ];

    // Filter adults and extract names
    const filterAdults = transform<typeof users[0], string>({
    transform(user, controller) {
    if (user.age >= 30) {
    controller.enqueue(user.name);
    }
    }
    });

    const pipeline = from(users).pipe(filterAdults);

    const adults = await collect(pipeline);
    console.log(adults);
    // ['Alice', 'Charlie']

    The through function takes a second callback, or "flush" function, that gets called once when the stream ends. The callback takes any remainig items, ie, any items at the end that do not fill a complete batch.

    import { through, from, collect } from '@substrate-system/stream';

    // Batch processing with flush
    let batch:string[] = [];

    const batcher = through<string, string[]>(
    (item) => {
    batch.push(item);
    if (batch.length >= 3) {
    const result = [...batch];
    batch = [];
    return result;
    }
    return []; // Don't emit yet
    },
    () => {
    // Flush remaining items
    if (batch.length > 0) {
    console.log('Flushing remaining:', batch);
    }
    }
    );

    const pipeline = from(['a', 'b', 'c', 'd', 'e']).pipe(batcher);
    const batches = await collect(pipeline);

    Create a readable stream from an array or iterable (sync or async).

    function from<T> (iterable:Iterable<T>|AsyncIterable<T>):PipeableStream<T, never>
    
    import { from, collect } from '@substrate-system/stream';

    const pipeline = from([1, 2, 3, 4, 5]);
    const result = await collect(pipeline);
    // [1, 2, 3, 4, 5]

    // Also works with async iterables
    async function* generator() {
    yield 1;
    yield 2;
    yield 3;
    }

    const asyncPipeline = from(generator());
    const asyncResult = await collect(asyncPipeline);
    // [1, 2, 3]

    Take a native ReadableStream and wrap it in our API.

    • Add a .pipe method
    function Stream<R> (readable:ReadableStream<R>):PipeableStream<R, never>
    
    import { Stream, through, collect } from '@substrate-system/stream';

    const response = await fetch('data.txt');
    const pipeline = Stream(response.body)
    .pipe(through(chunk => new TextDecoder().decode(chunk)));

    const result = await collect(pipeline);

    Create a simple transform that applies a function to each chunk.

    function through<I, O> (
    transformFn:(chunk:I)=>O|Promise<O>,
    flushFn?:()=>void|Promise<void>
    ):PipeableStream<O, I>
    import { from, through, collect } from '@substrate-system/stream';

    const pipeline = from([1, 2, 3])
    .pipe(through(x => x * 2))
    .pipe(through(x => x + 1));

    const result = await collect(pipeline);
    // [3, 5, 7]

    // Optional flush callback runs when stream closes
    const withFlush = through(
    (x) => x * 2,
    () => console.log('Stream finished!')
    );

    Create a custom transform with full control over the TransformStream API.

    function transform<I, O> (transformer:Transformer<I, O>):PipeableStream<O, I>
    
    import { from, transform, collect } from '@substrate-system/stream';

    // One-to-many: emit multiple values per input
    const splitter = transform<string, string>({
    transform(chunk, controller) {
    for (const char of chunk) {
    controller.enqueue(char);
    }
    }
    });

    const pipeline = from(['hello', 'world']).pipe(splitter);
    const result = await collect(pipeline);
    // ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']

    // With flush callback
    const withFlush = transform<number, number>({
    transform(chunk, controller) {
    controller.enqueue(chunk * 2);
    },
    flush(controller) {
    controller.enqueue(999); // Emit final value
    }
    });

    Filter stream values based on a predicate function.

    function filter<T> (
    predicate:(item:T) => boolean|Promise<boolean>
    ):PipeableStream<T, T>
    import { from, filter, collect } from '@substrate-system/stream';

    const pipeline = from([1, 2, 3, 4, 5])
    .pipe(filter(x => x > 2));

    const result = await collect(pipeline);
    // [3, 4, 5]

    // Async predicates work too
    const asyncFilter = filter(async (x: number) => {
    const shouldKeep = await someAsyncCheck(x);
    return shouldKeep;
    });

    Collect all values from a stream into an array.

    function collect<T> (stream:PipeableStream<T, any>):Promise<T[]>
    
    import { from, through, collect } from '@substrate-system/stream';

    const pipeline = from([1, 2, 3])
    .pipe(through(x => x * 2));

    const result = await collect(pipeline);
    // [2, 4, 6]

    Execute a stream pipeline without collecting results (for side effects only).

    function run<T> (stream:PipeableStream<T, any>):Promise<void>
    
    import { from, through, run } from '@substrate-system/stream';

    const pipeline = from([1, 2, 3])
    .pipe(through(x => {
    console.log(x);
    return x;
    }));

    await run(pipeline);
    // Logs: 1, 2, 3
    // Returns: void
    // Native TransformStream API
    const response = await fetch('data.json');

    const decoder = new TextDecoderStream();
    const parseJson = new TransformStream({
    transform(chunk, controller) {
    try {
    const parsed = JSON.parse(chunk);
    controller.enqueue(parsed);
    } catch (e) {
    controller.error(e);
    }
    }
    });

    const filterTransform = new TransformStream({
    transform(chunk, controller) {
    if (chunk.active) {
    controller.enqueue(chunk);
    }
    }
    });

    // pipe
    const stream1 = response.body.pipeThrough(decoder);
    const stream2 = stream1.pipeThrough(parseJson);
    const stream3 = stream2.pipeThrough(filterTransform);

    const reader = stream3.getReader();
    const results = [];
    while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    results.push(value);
    }
    import { Stream } from '@substrate-system/stream'

    // pipe
    const response = await fetch('data.json');

    const pipeline = Stream(response.body)
    .pipe(through(chunk => new TextDecoder().decode(chunk))) // buffer to string
    .pipe(through(text => JSON.parse(text))) // string to object
    .pipe(through(obj => obj.active ? obj : null)); // filter based on .active

    const results = await collect(pipeline);

    If you need more control (e.g., emitting multiple values per input), use the full transform() with a Transformer object:

    import { transform, from, collect } from '@substrate-system/stream';

    // Split each string into individual characters
    const splitter = transform<string, string>({
    transform(chunk, controller) {
    for (const char of chunk) {
    controller.enqueue(char);
    }
    }
    });

    const pipeline = from(['hello', 'world']).pipe(splitter);
    const chars = await collect(pipeline);

    console.log(chars);
    // ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']

    Process large CSV data.

    import { Stream, through, transform } from '@substrate-system/stream';

    const response = await fetch('large-data.csv');

    interface CSVRow {
    id:number;
    name:string;
    value:number;
    }

    // Note: This is a simplified CSV parser for demonstration.
    const pipeline = Stream(response.body)
    .pipe(through(chunk => new TextDecoder().decode(chunk))) // to string
    .pipe(through(text => text.split('\n'))) // split each line
    .pipe(transform<string, CSVRow>({
    transform(line, controller) {
    const [id, name, value] = line.split(',');
    const row = { id: parseInt(id), name, value: parseFloat(value) };
    if (row.value > 100) {
    controller.enqueue(row);
    }
    }
    }))
    .pipe(through(row => JSON.stringify(row))); // to string again

    await pipeline.pipeTo(new WritableStream({
    write(json) {
    console.log(json);
    // or send to another API, write to IndexedDB, etc.
    }
    }));
    import { from, through } from '@substrate-system/stream';

    // Slow processor - backpressure will prevent memory buildup
    const slowProcessor = through(async (x:number) => {
    await new Promise(resolve => setTimeout(resolve, 100));
    return x * 2;
    });

    const pipeline = from(Array.from({ length: 1000 }, (_, i) => i))
    .pipe(slowProcessor)
    .pipe(through(x => x + 1));

    // This will process at the rate of the slowest transform
    await pipeline.pipeTo(new WritableStream({
    write(chunk) {
    console.log(chunk);
    }
    }));

    This exposes ESM and common JS via package.json exports field.

    import {
    from,
    through,
    collect,
    Stream,
    transform,
    filter,
    run
    } from '@substrate-system/stream'
    require('@substrate-system/stream')
    

    This package exposes minified JS files too. Copy them to a location that is accessible to your web server, then link to them in HTML.

    cp ./node_modules/@substrate-system/stream/dist/index.min.js ./public/stream.min.js
    
    <script type="module" src="./stream.min.js"></script>