Skip to content
Tim Popham edited this page Feb 20, 2015 · 4 revisions

Rationale

NodeJS streams have a notion of back pressure. What that means is that sometimes an input may emit it's content faster than that content can be written to an output. Individual streams may throw away excess content or queue it. Back pressure allows NodeJS output to throttle its input.

In general, an application may want to impose additional throttling strategies, e.g. negotiate a throughput rate. Implementing additional strategies (beyond back pressure) under streams tends to result in coupled code. This document explains how reducers handle back pressure and additional throttling strategies without coupling.

Direct input / output data flow

There is one significant difference between node streams and reducers. Node streams can represent input so called "read" streams, output so called "write" streams and combination of both "read write" / "duplex" streams.

Reducers abstraction has only notion of an input which can be transformed to yet another input input of different form, you don't pipe them neither you have special targets for writing them. What you have instead is reduce which is a canonical mechanism for consuming contents of input. Another interesting fact is that all the transformations are described in form of compositions over transforming functions. As consequence input can directly call consumer on the output and also get results, there for in controlled environments they can easily exchange messages to handle things like "back pressure".

Lets take a more in depth look how that actually works, but first let's define simplified core functions of reducers library.

reduce

So anything that can be reduced via reduce function represents reducible. Let's define simple reduce function:

function reduce(source, f, initial) {
  return source.reduce(f, initial)
}

It takes reducible source, reducing function and initial value to accumulate reductions upon. Reducing functions performing accumulation have a following shape:

f(result, value) // => new result

A reducing function is simply a binary function, akin to the one you might pass to reduce. While the two arguments might be treated symmetrically by the function, there is an implied semantic that distinguishes the arguments: the first argument is a result or accumulator that is being built up by the reduction, while the second is some new input value from the source being reduced.

reducible

In order to define transformations we'll need a primitive API for producing reducibles. Let's define one in form of reducible function that takes accumulator function and returns something that can be reduced via above defined reduce function:

function reducible(accumulator) {
  return {
    reduce: function(next, initial) {
      return accumulator(next, initial)
    }
  }
}

Transformations

All of the operations can be expressed in terms of transformations. By the definition all transformations will produce reducible that can be reduced via reduce function defined earlier. Lets define two common transformation functions:

function map(source, f) {
  return reducible(function accumulator(next, initial) {
    return reduce(source, function reducer(result, input) {
      return next(result, f(input))
    }, initial)
  })
}

function filter(source, predicate) {
  return reducible(function accumulator(next, initial) {
    return reduce(source, function reducer(result, input) {
      return predicate(input) ? next(result, input) : result
    }, initial)
  })
}

Now take a moment and try to make a mental picture of an individual transformations. Here is one example:

var t1 = map(source, JSON.parse)
var t2 = filter(t1, function isFoo(data) { return data.id > 2 })

Both t1 and t2 are just an objects with reduce method that curry reference to a source they transform, transformation functions JSON.parse, isFoo and logic of the transformation defined in map and filter. No actual work is done at this point. Data flow through the transformation is completely controlled by an input source. Let's define very primitive one:

var source = {
  reduce: function(next, initial) {
    var result = initial
    result = next(result, '{ "id": 1, "message": "foo" }')
    console.log("<<<", result)
    result = next(result, '{ "id": 2, "message": "bar" }')
    console.log("<<<", result)
    result = next(result, '{ "id": 3, "message": "beep" }')
    console.log("<<<", result)
    result = next(result, '{ "id": 4, "message": "quux" }')
    console.log("<<<", result)
  }
}

Now let's look at the code flow of reduction over t2:

reduce(t2, function consumer(result, data) {
  console.log(data.message)
  return result + 1
}, 0)

Output of the later function will look as as follows:

<<< 0
<<< 0
beep
<<< 1
quux
<<< 2

The most interesting thing here is that consumer doing reductions passes it return values all the way back to the input source no matter how long and complex transformation chain is. This feature enables output to signal back to input and can be used for handling things like "back pressure". Here is one simple example how this can be done:

var source = {
  messages: [
    '{ "id": 1, "message": "foo" }',
    '{ "id": 2, "message": "bar" }',
    '{ "id": 3, "message": "beep" }',
    '{ "id": 4, "message": "quux" }',
  ],
  reduce: function(next, initial) {
    var messages = this.messages
    var index = 0
    var count = messages.length
    function send(result) {
      // If input content is ended then return
      if (index >= count) return
      
      // yield new content
      var value = next(result, messages[index])
      index = index + 1

      // If value returned by consumer is a function it
      // has not finished writing, in that case pass continuation
      // to it so it can signal us when to continue
      if (typeof(value) === "function") {
         console.log("zzzz...")
         // continue up when consumer is ready
         value(send)
      }
      // Otherwise continue right away.
      else {
        send(value)
      }
    }
    send(initial)
  }
}

var t1 = map(source, JSON.parse)
var t2 = filter(t1, function isFoo(data) { return data.id > 2 })

reduce(t2, function consumer(result, data) {
  console.log(new Date, data.message)
  if (data.id === 3) {
    console.log("Nap time!!")
    return function when(drained) {
      setTimeout(drained, 2000, result + 1)
    }
  } else {
    return result + 1
  }
}, 0)

This will produces following output:

--
Wed Nov 21 2012 17:13:44 GMT-0800 (PST) beep
Nap time!!
zzzz...
Wed Nov 21 2012 17:13:46 GMT-0800 (PST) quux

Of course this code is not ideal, but that's because it was optimized for illustration purposes. The main take away is that input and output can trivially coordinate without and no code in between needs to care about this.

This very powerful since non of the code in the library needs to know or care about things like "back pressure" or other coordination between input and output. This is suits perfectly closed environments like node where all the inputs and outputs are provided by bindings written in other language, since all the coordination could be managed outside without concerning user space code.

Real example of "back pressure" handling via reducers can be found in fs-reduce library.

Clone this wiki locally