-
Notifications
You must be signed in to change notification settings - Fork 8
IO Coordination
NodeJS streams have a notion of "back pressure". What that means is that sometimes input may emit it's content faster than it can be written to the output, in such cases there's a need to do a speed balancing, either by occasionally pausing of an input or by queuing content or maybe even by negotiation of a throughput. There is no silver right or wrong answer it all depends on protocols input and output are representing hardware and of course application itself.
Some solutions in node ecosystem had problems with handling "back pressure" and in general considered to be hard problem. This document explains why "back pressure" is not a problem for reducers and how that can be managed.
There is one significant difference between node streams and reducers. Node streams can represent input so called "read" streams, output so called "write" streams and combination of both "read write" / "duplex" streams.
Reducers abstraction has only notion of an input which can be transformed to yet
another input input of different form, you don't pipe them neither you have
special targets for writing them. What you have instead is reduce
which is a canonical
mechanism for consuming contents of input. Another interesting fact is that all the
transformations are described in form of compositions over transforming functions. As
consequence input can directly call consumer on the output and also get results,
there for in controlled environments they can easily exchange messages to handle things
like "back pressure".
Lets take a more in depth look how that actually works, but first let's define simplified core functions of reducers library.
So anything that can be reduced via reduce
function represents reducible. Let's define
simple reduce
function:
function reduce(source, f, initial) {
return source.reduce(f, initial)
}
It takes reducible source, reducing function and initial value to accumulate reductions upon. Reducing functions performing accumulation have a following shape:
f(result, value) // => new result
A reducing function is simply a binary function, akin to the one you might pass to reduce. While the two arguments might be treated symmetrically by the function, there is an implied semantic that distinguishes the arguments: the first argument is a result or accumulator that is being built up by the reduction, while the second is some new input value from the source
being reduced.
In order to define transformations we'll need a primitive API for producing reducibles. Let's define one
in form of reducible function that takes accumulator function and returns something that can be reduced
via above defined reduce
function:
function reducible(accumulator) {
return {
reduce: function(next, initial) {
return accumulator(next, initial)
}
}
}
All of the operations can be expressed in terms of transformations. By the definition all
transformations will produce reducible that can be reduced via reduce
function defined
earlier. Lets define two common transformation functions:
function map(source, f) {
return reducible(function accumulator(next, initial) {
return reduce(source, function reducer(result, input) {
return next(result, f(input))
}, initial)
})
}
function filter(source, predicate) {
return reducible(function accumulator(next, initial) {
return reduce(source, function reducer(result, input) {
return predicate(input) ? next(result, input) : result
}, initial)
})
}
Now take a moment and try to make a mental picture of an individual transformations. Here is one example:
var t1 = map(source, JSON.parse)
var t2 = filter(t1, function isFoo(data) { return data.id > 2 })
Both t1
and t2
are just an objects with reduce
method that curry reference
to a source
they transform, transformation functions JSON.parse
, isFoo
and
logic of the transformation defined in map
and filter
. No actual work is done
at this point. Data flow through the transformation is completely controlled by
an input source
. Let's define very primitive one:
var source = {
reduce: function(next, initial) {
var result = initial
result = next(result, '{ "id": 1, "message": "foo" }')
console.log("<<<", result)
result = next(result, '{ "id": 2, "message": "bar" }')
console.log("<<<", result)
result = next(result, '{ "id": 3, "message": "beep" }')
console.log("<<<", result)
result = next(result, '{ "id": 4, "message": "quux" }')
console.log("<<<", result)
}
}
Now let's look at the code flow of reduction over t2
:
reduce(t2, function consumer(result, data) {
console.log(data.message)
return result + 1
}, 0)
Output of the later function will look as as follows:
<<< 0
<<< 0
beep
<<< 1
quux
<<< 2
The most interesting thing here is that consumer
doing reductions
passes it return values all the way back to the input source
no
matter how long and complex transformation chain is. This feature
enables output
to signal back to input
and can be used for handling
things like "back pressure". Here is one simple example how this can
be done:
var source = {
messages: [
'{ "id": 1, "message": "foo" }',
'{ "id": 2, "message": "bar" }',
'{ "id": 3, "message": "beep" }',
'{ "id": 4, "message": "quux" }',
],
reduce: function(next, initial) {
var messages = this.messages
var index = 0
var count = messages.length
function send(result) {
// If input content is ended then return
if (index >= count) return
// yield new content
var value = next(result, messages[index])
index = index + 1
// If value returned by consumer is a function it
// has not finished writing, in that case pass continuation
// to it so it can signal us when to continue
if (typeof(value) === "function") {
console.log("zzzz...")
// continue up when consumer is ready
value(send)
}
// Otherwise continue right away.
else {
send(value)
}
}
send(initial)
}
}
var t1 = map(source, JSON.parse)
var t2 = filter(t1, function isFoo(data) { return data.id > 2 })
reduce(t2, function consumer(result, data) {
console.log(new Date, data.message)
if (data.id === 3) {
console.log("Nap time!!")
return function when(drained) {
setTimeout(drained, 2000, result + 1)
}
} else {
return result + 1
}
}, 0)
This will produces following output:
--
Wed Nov 21 2012 17:13:44 GMT-0800 (PST) beep
Nap time!!
zzzz...
Wed Nov 21 2012 17:13:46 GMT-0800 (PST) quux
Of course this code is not ideal, but that's because it was optimized for illustration purposes. The main take away is that input and output can trivially coordinate without and no code in between needs to care about this. This very powerful in closed environments like node where all the inputs and outputs are provided by bindings, there for all the coordination could be managed without concerning user space code.
Real example of "back pressure" handling via reducers can be found in fs-reduce library.