In the last article of this series, we learned about Generator Functions that can produce values that can be iterated over by a for…of loop. We used this feature to create a sequence of natural numbers without storing the whole set of numbers in memory, and only generating each value when the consumer needs one.
Similarly, Streams enable us to read a large amount of data (or files!) incrementally. Instead of requiring us to allocate memory for the data, the decoded content and any other results, it lets us process data as a series of operations on individual chunks.
A Gentle Introduction to Streams
Streams come in three variations:
-
ReadableStream
: a stream that can be read from, -
WritableStream
: a stream that can be written to, -
TransformStream
: a stream that can take the output of a readable or transform stream and output a transformed result. It is both readable and writable.
The one that will be useful most of the time is ReadableStream
. This is also the stream type that is provided by the File
and Response
objects as an alternative to the more common text()
and json()
methods that returns a decoded string asynchronously.
As an example, we'll build an application that will can count the number of lines of text for a file dropped into the web page. First, let's look at the core of the problem — how do we read a file using streams?
Consuming a ReadableStream
Let's start with a function that takes a File
object and reads it from start to end, counting the occurence of line
feeds (empty lines also count as a line) and returns the final count:
const countLines = async (file: File): Promise<number> => {
let count = 1;
const stream = file.stream().pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lineBreaks = value.match(/\n/g)?.length ?? 0;
count += lineBreaks;
}
} finally {
reader.releaseLock();
}
return count;
};
The first thing to note is that files are 8-bit binary streams (Uint8Array
) which must first be decoded into UTF-16 strings that JavaScript for string representation. To do this, we pipe the file's ReadableStream<Uint8Array>
through a transform stream called TextDecoderStream
which converts the byte stream into a stream of strings that we can consume using the reader (an instance of ReadableStreamDefaultReader
).
A reader provides the read()
method which returns a Promise
that resolves to the next chunk in the stream. We can use an infinite
loop to read the next chunk and check if the stream has been exhausted
by testing the done
property on the resolved value. If true
, we simply break the loop.
Now that we have a function that can count lines in files, let's
build a quick web application that we can demonstrate its functionality.
I've decided to use Vite, Preact and @krofdrakula/drop
to build the web application shell that uses the above function to
display line counts for any file (or multiple files) dropped into the
page:
Since the files are read locally and not sent over the wire, you can drop any size files (even hundreds of MB!) and the browser will handle it just fine. It doesn't even have to be text, but the line count will not really make much sense in that case.
Creating Your Own ReadableStream
At this point you may be wondering how one can one create a ReadableStream
in the first place. A ReadableStream
is created using the ReadableStream
constructor which takes a configuration object with the following optional methods:
-
pull(controller)
: whenever the stream is read, this method is called by the reader in order to produce the next chunk of data. Thecontroller
object is used to send values to the consumer of the stream. -
start(controller)
: whenever the stream is constructed, this method is called and can optionally send values via thecontroller
object -
cancel(reason?)
: called when the stream'scancel()
method is called with an optional reason. Used to clean up any open handles to resources or similar.
Let's take the example of natural numbers from the previous article and write a ReadableStream
that emits all natural numbers up to the given maximum:
const createNaturalNumberStream = (max: number = Infinity) => {
let i = 1;
return new ReadableStream<number>({
pull(controller) {
if (i <= max) {
controller.enqueue(i);
i++;
} else {
controller.close();
}
},
});
};
Reading this stream will keep emitting numbers in sequence up until, and including, max
.
Transforming Streams Using TransformStream
As previously mentioned, TextDecoderStream
takes a stream of bytes and converts them to a stream of UTF-16 strings
suitable for JavaScript. Streams do not have to provide data in byte
arrays or strings, however; they can emit any kind of JavaScript value.
Constructing a TransformStream
is achieved similarly to ReadableStream
but has a different set of optional methods:
-
transform(chunk, controller)
: called whenever a chunk is read from the transform stream. Usecontroller
to emit values to the reader. -
start(controller)
: called when the transform stream is constructed. -
flush(controller)
: called when the source has been consumed and is about to be closed.
To demonstrate the API, let's create a mapping stream that will take a
value from the input stream and apply a mapping function on the emitted
value. The only method we need to implement is the transform
function that takes a chunk and enqueues the mapped value:
const createMapStream = <I, O>(
mapper: (value: I) => O
): TransformStream<I, O> =>
new TransformStream({
transform(value, controller) {
controller.enqueue(mapper(value));
},
});
Using this transformer is simply a matter of piping any compatible readable stream through it:
const evenNumberStream = createNaturalNumberStream().pipeThrough(
createMapStream((n) => n * 2)
);
Other Streams
-
CompressionStream
: compresses the content of aUint8Array
stream into a newUint8Array
stream -
DecompressionStream
: does the opposite of the above -
FileSystemWritableFileStream
: allows writing to an open file system file handle -
WebSocketStream
: an open standard proposal to allow writing to a web socket as aWritableStream
Converting ReadableStream
s into Asynchronous Iterables
Admittedly, streams are not as straightforward to consume as a for...of
loop, but there is good and bad news.
The good news is that the ReadableStream
specification implements Symbol.asyncIterator
that allows you to read a ReadableStream
using a for await...of
loop, e.g.:
for await (const number of evenNumberStream) {
console.log(number);
}
The bad news is that it currently isn't supported in all JavaScript engines. However, what we can
do is leverage what we learned in the previous article regarding
generator functions and create a utility function that takes a stream
and creates an AsyncGenerator
yielding values from the input stream:
const streamToIterable = async function* <T>(
stream: ReadableStream<T>
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
};
This makes it possible to consume any stream as an async iterable:
for await (const number of streamToIterable(evenNumberStream))
console.log(number);
Applied Streamology
In the next installment, we'll have a look at applying our knowledge of streams and generators to read large files in the browser while keeping memory consumption low and the UI responsive while data is being processed.
No comments:
Post a Comment