Streams2

Node.js Streams2 Demystified

Bryce Baril @brycebaril

The first time I tried to understand streams.

wtf.pipe(wth)?

Ok, I got this.

Streams

Streams are a first-class construct in Node.js for handling data. I like to think of them as as Lazy evaluation applied to data.

Stream Components

There are essentially three major concepts with Streams:

  1. The Source
  2. The Pipeline
  3. The Sink

Source

The data in the stream comes from somewhere -- disk, a http server, or your own custom source. A streaming source allows you to process the content lazily, you only need to buffer what you need, and you don't need to block if the source is slow.

Pipeline

Often you want to do something to the data between the Source and the Sink. Streams allow you to lazily apply your filters or transformations.

Sink

Where you're putting your data -- disk, a http client, or your own custom sink. A streaming sink gives you flexibility when writing your data. Only buffer what you need, without blocking.

Stream Benefits

  • Lazily produce or consume data in buffered chunks.
  • Evented and non-blocking
  • Low memory footprint
  • Automatically handle back-pressure
  • Buffers allow you to work around the V8 heap memory limit
  • Most core Node.js content sources/sinks are streams already!

Streams2

Streams2 are the second-generation of Streams in Node.js core. The interface was re-written to make it more accessible.

Streams2 Node 0.8 Compatability

Streams2 were originally developed by @izs as a npm module that can be used to add streams2 support to Node 0.8: readable-stream


            var Transform = require("stream").Transform
              || require("readable-stream/transform")
          

This works most of the time.

Streams3

Streams3 Is the next version, which is in 0.11.5 now and will be in 0.12. Streams3 is streams2 with better support for streams1 style streams.
1 | 2 = 3

Streams2 Streams3

Node.js Streams2 Streams3 Demystified

Bryce Baril @brycebaril

Streams2 Classes

There are five Classes of Streams2:

  • Readable -- Data Sources
  • Writable -- Data Sinks
  • Duplex -- Both a Source and a Sink
  • Transform -- In-flight stream operations
  • Passthrough -- Stream spy

Readable

Use a Readable stream when supplying data as a stream.
Think: Spigot/Faucet.

How to implement Readable

  1. Subclass stream.Readable
  2. Implement a `_read(size)` method.

The `_read(size)` method:

  • `size` is in bytes, but can be ignored (especially for objectMode streams)
  • `_read(size)` must call this.push(chunk) to send a chunk to the consumer

Readable options

  • `highWaterMark` Number: The maximum number of bytes to store in the internal buffer before ceasing to read. Default: 16kb
  • `encoding` String: If set, buffers will be decoded to strings instead of passing buffers. Default: null
  • `objectMode` Boolean: Instead of using buffers/strings, use Javascript objects. Default: false

How to use a Readable stream

  • use `readable.pipe(target)`
  • use `readable.read(size)`
  • `readable.on("data", /* ... */)`

A simple Readable stream


var Readable = require("stream").Readable
  || require("readable-stream/readable")
var inherits = require("util").inherits

function Source(options) {
  Readable.call(this, options)
  this.content = "The quick brown fox jumps over the lazy dog."
}
inherits(Source, Readable)
Source.prototype._read = function (size) {
  if (!this.content) this.push(null)
  else {
    this.push(this.content.slice(0, size))
    this.content = this.content.slice(size)
  }
}
            

Using our example:


var s = new Source()
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())

// The quick 
// brown fox 
// jumps over
//  the lazy 
// dog.
            

Handy abstractions/modules:

  • stream-spigot Creates readable streams from Arrays or simple functions.

Writable

Use a Writable stream when collecting data from a stream.
Think: Drain/Collect.

Writable options

  • `highWaterMark` Number: The maximum number of bytes to store in the internal buffer before ceasing to read. Default: 16kb
  • `decodeStrings` Boolean: Whether to decode strings to Buffers before passing them to _write(). Default: true

How to implement Writable

  1. Subclass stream.Writable
  2. Implement a `_write(chunk, encoding, callback)` method.

The _write() method

  • chunk is the content to write
  • Call callback() when you're done with this chunk

How to use a Writable stream

  1. source.pipe(writable)
  2. writable.write(chunk [,encoding] [,callback])

A simple Writable stream


var Writable = require("stream").Writable
  || require("readable-stream/writable")
var inherits = require("util").inherits

function Drain(options) {
  Writable.call(this, options)
}
inherits(Drain, Writable)
Drain.prototype._write = function (chunk, encoding, callback) {
  console.log(chunk.toString())
  callback()
}
          

Using our examples so far:


var s = new Source()
var d = new Drain()
s.pipe(d)

// The quick brown fox jumps over the lazy dog.
            

Handy abstractions/modules:

Duplex

Use a Duplex stream when you accept input OR output, but as different streams. It is simply both a Readable and a Writable stream.
Think: Server

How to implement Duplex

  1. Subclass stream.Duplex
  2. Implement a `_read(size)` method.
  3. Implement a `_write(chunk, encoding, callback)` method.

Duplex options

Superset of Readable and Writable options.

How to use a Duplex stream

  • input.pipe(duplex)
  • duplex.pipe(output)
  • duplex.on("data", /* ... */)
  • duplex.write()
  • duplex.read()

A simple Duplex stream


var Duplex = require("stream").Duplex
  || require("readable-stream/duplex")
var inherits = require("util").inherits

function Server(options) {
  Duplex.call(this, options)
  this.queue = []
}
inherits(Server, Duplex)
Server.prototype._read = function (size) {
  this.push(this.queue.shift())
}
Server.prototype._write = function (chunk, encoding, callback) {
  this.queue.push(Buffer.concat([new Buffer("REC: "), chunk, new Buffer("\n")]))
  callback()
}
            

Using our example:


var s = new Server()
s.write("HI THERE")
s.write("HOW ARE YOU?")
s.pipe(process.stdout)

// REC: HI THERE
// REC: HOW ARE YOU?
            

Handy abstractions/modules

I don't actually know of any, nor do I tend to implement Duplex often.

Transform

Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.
Think: Filter/Map

How to implement Transform

  1. Subclass stream.Transform
  2. Implement a `_transform(chunk, encoding, callback)` method.
  3. Optionally implement a `_flush(callback)` method.

The `_transform(chunk, encoding, callback)` method:

  • Call `this.push(something)` to forward it to the next consumer.
  • You don't have to push anything, this will skip a chunk.
  • You *must* call `callback` one time per _transform call.

The `_flush(callback)` method:

When the stream ends, this is your chance to do any cleanup or last-minute `this.push()` calls to clear any buffers or work. Call `callback()` when done.

Transform options

Superset of Readable and Writable options.

How to use a Transform stream

  • source.pipe(transform).pipe(drain)
  • transform.on("data", /* ... */)

A simple Transform stream


var Transform = require("stream").Transform
  || require("readable-stream/transform")
var inherits = require("util").inherits

function ToUpper (options) {
  Transform.call(this, options)
}
inherits(ToUpper, Transform)
ToUpper.prototype._transform = function (chunk, encoding, callback) {
  var str = chunk.toString().toUpperCase()
  this.push(str)
  callback()
}
          

Using our example:


var s = new Source()
var d = new Drain()
var tx = new ToUpper()

s.pipe(tx).pipe(d)

// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
            

Handy abstractions/modules

  • through2 makes it easy to generate Transforms without all the subclassing boilerplate.
  • throug2-filter Create simple stream filters.
  • throug2-map Create simple stream map operations.

Passthrough

Most commonly Passthrough streams are used for testing. They are exactly a Transform stream that does no transformations.
Think: spy

How to use a Passthrough stream

Short answer: Don't

Use through2-spy instead.

A through2-spy


var spy = require("through2-spy")

var bytes = 0
// Spy on a pipeline, counting the number of bytes it has passed
var counter = spy(function (chunk) {bytes += chunk.length})

source.pipe(counter).pipe(drain)
            

Buffering

Streams2 handle buffering and backpressure automatically.

Readable Buffering

Readable streams (Readable/Duplex/Transform) buffer when you call `this.push(chunk)` internally until the stream is read.

Writable Buffering

Writable streams (Writable/Duplex/Transform) buffer when written to, draining as they are read or processed.

stream.read(0)

You can trigger a refresh of the system without consuming any data by calling `.read(0)` on a readable stream. You probably won't need to do this.

stream.push('') or stream.push(null)

Pushing a zero-byte string, or null for Object mode will terminate the pipeline.

Errors

Streams are EventEmitters, so they get traditional EventEmitter error handling.
I.e. Either add an 'error' listener to catch errors or let them bubble as exceptions.

Passing Errors

Either Emit an 'error' event, or put an Error in the first argument of the `callback` in _write or _transform to signal an error and abort the stream.
Example: stream-meter

Using Streams

These last few slides are some simple examples of using streams in node.

through2-map


var map = require("through2-map")

var alter = map(function (buf) {
  var outBuf = new Buffer(buf.length)
  for (var i = 0; i < buf.length; i++) {
    if (i % 2 == 0 && buf[i] > 96 && buf[i] < 123)
      outBuf[i] = buf[i] - 32
    else
      outBuf[i] = buf[i]
  }
  return outBuf
})

process.stdin.pipe(alter).pipe(process.stdout)
            

$ echo "Hi there how are you?" | node example.js
Hi tHeRe hOw aRe yOu?
            

Fetch a web page, non-streaming:


var https = require("https")
var fs = require("fs")

https.get("https://ravenwall.com", function (res) {
  var contents = ''
  res.on("data", function (buffer) {
    // WARNING! Not multi-byte character safe!
    contents += buffer.toString()
  })
  res.on("end", function () {
    fs.writeFile("file.html", contents)
  })
})
            

Vs. Streaming:


var https = require("https")
var fs = require("fs")

https.get("https://ravenwall.com", function (res) {
  res.pipe(fs.createWriteStream('file.html'))
})
            

hyperquest


var hyperquest = require("hyperquest")
var fs = require("fs")

hyperquest.get("https://ravenwall.com")
  .pipe(fs.createWriteStream("index.html"))
            

objectMode

Let's make a source:


var spigot = require("stream-spigot")

var count = 0
function gen(next) {
  if (count++ > 20) return next()
  setTimeout(function () {
    next(null, {time: Date.now(), id: count})
  }, 25)
}
var source = spigot(gen, {objectMode: true})
            

objectMode (cont.)

Do some filtering:


var filter = require("through2-filter")

var oddsOnly = filter({objectMode: true}, function (record) {
  if (record.time % 2 != 0) return true
})
            

objectMode (cont.)

Do some modification:


var map = require("through2-map")

var gap = map({objectMode: true}, function (record) {
  if (this.last == null)
    record.gap = 0
  else
    record.gap = record.time - this.last.time
  this.last = record
  return record
})
            

objectMode (cont.)

Collect the results:


var concat = require("concat-stream")

function collect(data) {
  console.log(data)
}
            

objectMode (cont.)

Create the pipeline:


source.pipe(oddsOnly)
  .pipe(gap)
  .pipe(concat(collect))
            

objectMode results


[ { time: 1376242414241, id: 1, gap: 0 },
  { time: 1376242414269, id: 2, gap: 28 },
  { time: 1376242414345, id: 5, gap: 76 },
  { time: 1376242414421, id: 8, gap: 76 },
  { time: 1376242414523, id: 12, gap: 102 },
  { time: 1376242414599, id: 15, gap: 76 },
  { time: 1376242414675, id: 18, gap: 76 },
  { time: 1376242414751, id: 21, gap: 76 } ]
            

THE END

BY Bryce Baril Ravenwall.com

CascadiaJS Bonus Slide!

CascadiaJS is an awesome conference coming up November 14-15 in Vancouver, BC.
The Call for Proposals closes in two days! (August 15th) You still have time...
More at 2013.cascadiajs.com.