Streams2

Node.js Streams2 Demystified

Bryce Baril @brycebaril

The first time I tried to understand streams.

wtf.pipe(wth)?

Using Streams

These last few slides are some simple examples of using streams in node.

through2-map


var map = require("through2-map")

var alter = map(function (buf) {
  var outBuf = new Buffer(buf.length)
  for (var i = 0; i < buf.length; i++) {
    if (i % 2 == 0 && buf[i] > 96 && buf[i] < 123)
      outBuf[i] = buf[i] - 32
    else
      outBuf[i] = buf[i]
  }
  return outBuf
})

process.stdin.pipe(alter).pipe(process.stdout)
            

$ echo "Hi there how are you?" | node example.js
Hi tHeRe hOw aRe yOu?
            

Fetch a web page, non-streaming:


var https = require("https")
var fs = require("fs")

https.get("https://ravenwall.com", function (res) {
  var contents = ''
  res.on("data", function (buffer) {
    // WARNING! Not multi-byte character safe!
    contents += buffer.toString()
  })
  res.on("end", function () {
    fs.writeFile("file.html", contents)
  })
})
            

Vs. Streaming:


var https = require("https")
var fs = require("fs")

https.get("https://ravenwall.com", function (res) {
  res.pipe(fs.createWriteStream('file.html'))
})
            

hyperquest


var hyperquest = require("hyperquest")
var fs = require("fs")

hyperquest.get("https://ravenwall.com")
  .pipe(fs.createWriteStream("index.html"))
            

objectMode

Let's make a source:


var spigot = require("stream-spigot")

var count = 0
function gen(next) {
  if (count++ > 20) return next()
  setTimeout(function () {
    next(null, {time: Date.now(), id: count})
  }, 25)
}
var source = spigot(gen, {objectMode: true})
            

objectMode (cont.)

Do some filtering:


var filter = require("through2-filter")

var oddsOnly = filter({objectMode: true}, function (record) {
  if (record.time % 2 != 0) return true
})
            

objectMode (cont.)

Do some modification:


var map = require("through2-map")

var gap = map({objectMode: true}, function (record) {
  if (this.last == null)
    record.gap = 0
  else
    record.gap = record.time - this.last.time
  this.last = record
  return record
})
            

objectMode (cont.)

Collect the results:


var concat = require("concat-stream")

function collect(data) {
  console.log(data)
}
            

objectMode (cont.)

Create the pipeline:


source.pipe(oddsOnly)
  .pipe(gap)
  .pipe(concat(collect))
            

objectMode results


[ { time: 1376242414241, id: 1, gap: 0 },
  { time: 1376242414269, id: 2, gap: 28 },
  { time: 1376242414345, id: 5, gap: 76 },
  { time: 1376242414421, id: 8, gap: 76 },
  { time: 1376242414523, id: 12, gap: 102 },
  { time: 1376242414599, id: 15, gap: 76 },
  { time: 1376242414675, id: 18, gap: 76 },
  { time: 1376242414751, id: 21, gap: 76 } ]
            

THE END

BY Bryce Baril Ravenwall.com

CascadiaJS Bonus Slide!

CascadiaJS is an awesome conference coming up November 14-15 in Vancouver, BC.
The Call for Proposals closes in two days! (August 15th) You still have time...
More at 2013.cascadiajs.com.