pull streams
pull-streams are a very simple streaming primitive,
but can do everything that streams need to do,
from back pressure to propagating errors (which node streams do not support!)
You can use them to write any application or system you might
use node for, but they are also great for simple realtime data
processing - like Array.map but now you can easily do it on
async – realtime – sequences of data. Things that would be
cumbersome to do with node streams.
sources
The first pull stream you’ll meet is values, it creates
a pull-stream from an array.
function values(array) {
var i = 0
return function (abort, cb) {
if(abort) return cb(abort)
return cb(i >= array.length ? true : null, array[i++])
}
}
That’s it. values is a source pull-stream. A source stream
is just an async function that you may call repeatedly.
If you call with the first argument as true or an error,
that tells the stream to abort, but we’ll come back to that later.
Otherwise, each time you call the stream, it calls back with the
next item in the array. When all the items have been returned,
the first argument will be true.
this is because err and end both are the terminal state of
the stream: they both mean that nothing more is coming
sinks
data comes out of sources and goes into sinks. Where a source
is a function you call repeatedly, a sink is a function you pass
a source to.
Here is a simple sink that dumps a pull-stream to the console.
function log () {
return function (read) {
read(null, function next (end, data) {
if(end == true) return
else if(end) throw end //error
console.log(data)
read(null, next) //loop again.
})
}
}
log creates a function that accepts a source - i.e. a read function.
it then calls that read function, and if it doesn’t give an “end”
or “error” signal, then it reads again.
Other stream apis such as node.js streams have the concept of
a “writable” stream. A writable is a passive object that accepts
data, like a couch-potato watching the TV. They must actively use the TV remote
to send an explicit signal to the TV when they want to stop or
slow down.
A pull-stream sink is a reader. it is like a book-worm reading a
book - they are active and can read faster or slower.
They don’t need to send an explicit signal to slow down, they just act slower.
you could combine these two streams like this:
log() (values([1,2,3]))
output:
1
2
3
Normally, we are used to reading code from left to right,
so seeing log before values is a bit weird, when the data
is coming out of values, but we’ll come back to that.
Since pull-streams use an async call, we get two way back pressure.
The source can slow the stream down by calling back slower.
And the sink can slow the stream down by waiting longer until
they call read again.
Slowing down is imporant, because that is how you save resource.
software performance is like loosing weight, not building muscles.
To go faster you must do less. Streams are about doing less,
and about not doing things you don’t need to do yet.
throughs/transforms
Often we want to transform data, we might split a file
into lines, or parse objects out of json, or look for matches.
for that we need a stream which is both a source and a sink.
here is a through stream that takes applies a function to the
data in the stream.
function map (fn) {
return function (read) {
return function (abort, cb) {
read(abort, function (end, data) {
if(end) cb(end)
else cb(null, fn(data))
})
}
}
}
A through stream is just a sink stream that returns a
source stream. You can think of it as transforming that stream.
if we put these streams to gether it would look like this.
var vs = values([2,4,6])
var ms = map(function (e) { return e*10 })
var ls = log()
ls(ms(vs))
output:
20
40
60
A through stream is both a source and a sink but also neither.
To the source, the through seems like a sink, but it’s really
just proxying the sink behind it. To the sink, the through is
a source, but it’s really just transforming the values that
the source gives it. This means through streams are very cheap.
They don’t need to add their own back pressure, but they allow
the source/sink back pressure to flow through them.
Now, we don’t like reversing our thinking from left to right
to right to left. lets fix that.
function pull() {
var stream = arguments[0]
//detect a sink (or through)
if(stream.length === 1) {
var args = [].slice.call(arguments)
return function (read) {
return pull.apply(null, [read].concat(args))
}
}
for(var i = 1; i 0) read(abort, cb)
else read(true, cb)
}
}
}
take doesn’t change the incoming data, but after you have
called it n times (n is decremented until it’s zero)
and then on the next call it calls read with true
as the first argument. If you check back at values,
if the first argument is true, then it just calls back immediately.
map doesn’t need to handle this case specially, it just passes
it through.
that means we can do things like:
pull(
values([1,2,3,4,5,6,7]),
map(function (e) { return e*e })
take(4),
log()
)
output:
1
4
9
16
when take decides to stop reading, it passes that on
to the source end of the stream and they can stop too.
If there was work they where gonna do, they wont need to do it now.
This property is known as lazyness. pull-streams are very lazy.
Because lazyness means we don’t do any work until we know we’ll
need to, we can commit to work that we know we’ll never do!
like, we can have an infinite stream.
function random () {
return function (abort, cb) {
cb(abort, abort || Math.random())
}
}
random creats and infinite stream of random values.
pull(random(), take(5), log())
the output will look like:
0.16838359273970127
0.10774739156477153
0.6401788892690092
0.24491786980070174
0.7873906309250742
Aborting an infinite stream is fun, but in real life
there are lots of reasons for aborting a stream.
maybe you are uploading a file, and the wifi goes down
(now there is no where to send that file)
maybe you are running a server with many clients downloading
files, if some of those clients break, you need to clean up
after them otherwise you will get a resource leak.
Sometimes a stream needs to stop in the middle, maybe
a parser has found an error.
expanding or shrinking streams
sometimes we have a stream of big chunks that we want to break into
small chunks (say, pages into lines) this means few items become many.
On the other hand, we may want to filter out lines that don’t match
some pattern or property - here we turn many into few.
flatmap takes a function that returns an array,
and outputs a stream of the items in returned arrays.
(note, the array could have zero or more items!)
function flatmap (fn) {
var queue = []
return function (read) {
return function again (abort, cb) {
if(abort) return read(abort, cb)
if(queue.length) return cb(null, queue.shift())
read(null, function (err, data) {
if(err) return cb(err)
queue = fn(data)
again(null, cb) //cb or read again if queue is empty.
})
}
}
}
This gives us expanding or contracting streams!
The trick is to maintain a queue of the current state,
and on the next read, read from that instead of asking the source.
If fn returns an empty stream, the oppositie happens,
the source is possibly consulted many times without passing an
answer through to the sink.
Notice that we have not installed any dependencies, or even
imported any libraries. We have acomplished a lot, and without
writing a function longer than 20 lines!
I’m not pulling your leg, the code we have written is completely
valid pull-streams, without bugs.
one small lie: log() will get a stack overflow if you use it
on a long synchronous source stream, but will be fine with an async stream.
use drain in the pull-stream module to get around this.
pull-streams are just a pattern. There are many modules
that implement particular pull streams,
but you can write a useful pull-stream without using them.
many pull-stream modules don’t depend on other pull-streams,
except maybe for testing.
That is how pull-streams work (except we havn’t gone into any
detail about duplex stream - which are distinct from transform streams)
but the best thing about pull-streams is that they are very
easy to compose and it is very easy to write modules for
and reuse those modules and so there are many pull-stream modules.
The most useful pull-streams are being curated into the
pull-stream
github org.
source for thes post as git repo