Bryce Baril @brycebaril
Streams are a first-class construct in Node.js for handling data. I like to think of them as as Lazy evaluation applied to data.
There are essentially three major concepts with Streams:
The data in the stream comes from somewhere -- disk, a http server, or your own custom source. A streaming source allows you to process the content lazily, you only need to buffer what you need, and you don't need to block if the source is slow.
Often you want to do something to the data between the Source and the Sink. Streams allow you to lazily apply your filters or transformations.
Where you're putting your data -- disk, a http client, or your own custom sink. A streaming sink gives you flexibility when writing your data. Only buffer what you need, without blocking.
Streams2 are the second-generation of Streams in Node.js core. The interface was re-written to make it more accessible.
Streams2 were originally developed by @izs as a npm module that can be used to add streams2 support to Node 0.8: readable-stream
var Transform = require("stream").Transform
|| require("readable-stream/transform")
This works most of the time.
Streams3 Is the next version, which is in 0.11.5 now and will be in 0.12. Streams3 is streams2 with better support for streams1 style streams.
1 | 2 = 3
Bryce Baril @brycebaril
There are five Classes of Streams2:
Use a Readable stream when supplying data as a stream.
Think: Spigot/Faucet.
var Readable = require("stream").Readable
|| require("readable-stream/readable")
var inherits = require("util").inherits
function Source(options) {
Readable.call(this, options)
this.content = "The quick brown fox jumps over the lazy dog."
}
inherits(Source, Readable)
Source.prototype._read = function (size) {
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
}
var s = new Source()
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
// The quick
// brown fox
// jumps over
// the lazy
// dog.
Use a Writable stream when collecting data from a stream.
Think: Drain/Collect.
var Writable = require("stream").Writable
|| require("readable-stream/writable")
var inherits = require("util").inherits
function Drain(options) {
Writable.call(this, options)
}
inherits(Drain, Writable)
Drain.prototype._write = function (chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
var s = new Source()
var d = new Drain()
s.pipe(d)
// The quick brown fox jumps over the lazy dog.
Use a Duplex stream when you accept input OR output, but as different streams. It is simply both a Readable and a Writable stream.
Think: Server
Superset of Readable and Writable options.
var Duplex = require("stream").Duplex
|| require("readable-stream/duplex")
var inherits = require("util").inherits
function Server(options) {
Duplex.call(this, options)
this.queue = []
}
inherits(Server, Duplex)
Server.prototype._read = function (size) {
this.push(this.queue.shift())
}
Server.prototype._write = function (chunk, encoding, callback) {
this.queue.push(Buffer.concat([new Buffer("REC: "), chunk, new Buffer("\n")]))
callback()
}
var s = new Server()
s.write("HI THERE")
s.write("HOW ARE YOU?")
s.pipe(process.stdout)
// REC: HI THERE
// REC: HOW ARE YOU?
I don't actually know of any, nor do I tend to implement Duplex often.
Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.
Think: Filter/Map
When the stream ends, this is your chance to do any cleanup or last-minute `this.push()` calls to clear any buffers or work. Call `callback()` when done.
Superset of Readable and Writable options.
var Transform = require("stream").Transform
|| require("readable-stream/transform")
var inherits = require("util").inherits
function ToUpper (options) {
Transform.call(this, options)
}
inherits(ToUpper, Transform)
ToUpper.prototype._transform = function (chunk, encoding, callback) {
var str = chunk.toString().toUpperCase()
this.push(str)
callback()
}
var s = new Source()
var d = new Drain()
var tx = new ToUpper()
s.pipe(tx).pipe(d)
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
Most commonly Passthrough streams are used for testing. They are exactly a Transform stream that does no transformations.
Think: spy
Short answer: Don't
Use through2-spy instead.
var spy = require("through2-spy")
var bytes = 0
// Spy on a pipeline, counting the number of bytes it has passed
var counter = spy(function (chunk) {bytes += chunk.length})
source.pipe(counter).pipe(drain)
Streams2 handle buffering and backpressure automatically.
Readable streams (Readable/Duplex/Transform) buffer when you call `this.push(chunk)` internally until the stream is read.
Writable streams (Writable/Duplex/Transform) buffer when written to, draining as they are read or processed.
You can trigger a refresh of the system without consuming any data by calling `.read(0)` on a readable stream. You probably won't need to do this.
Pushing a zero-byte string, or null for Object mode will terminate the pipeline.
Streams are EventEmitters, so they get traditional EventEmitter error handling.
I.e. Either add an 'error' listener to catch errors or let them bubble as exceptions.
Either Emit an 'error' event, or put an Error in the first argument of the `callback` in _write or _transform to signal an error and abort the stream.
Example: stream-meter
These last few slides are some simple examples of using streams in node.
var map = require("through2-map")
var alter = map(function (buf) {
var outBuf = new Buffer(buf.length)
for (var i = 0; i < buf.length; i++) {
if (i % 2 == 0 && buf[i] > 96 && buf[i] < 123)
outBuf[i] = buf[i] - 32
else
outBuf[i] = buf[i]
}
return outBuf
})
process.stdin.pipe(alter).pipe(process.stdout)
$ echo "Hi there how are you?" | node example.js
Hi tHeRe hOw aRe yOu?
var https = require("https")
var fs = require("fs")
https.get("https://ravenwall.com", function (res) {
var contents = ''
res.on("data", function (buffer) {
// WARNING! Not multi-byte character safe!
contents += buffer.toString()
})
res.on("end", function () {
fs.writeFile("file.html", contents)
})
})
var https = require("https")
var fs = require("fs")
https.get("https://ravenwall.com", function (res) {
res.pipe(fs.createWriteStream('file.html'))
})
var hyperquest = require("hyperquest")
var fs = require("fs")
hyperquest.get("https://ravenwall.com")
.pipe(fs.createWriteStream("index.html"))
Let's make a source:
var spigot = require("stream-spigot")
var count = 0
function gen(next) {
if (count++ > 20) return next()
setTimeout(function () {
next(null, {time: Date.now(), id: count})
}, 25)
}
var source = spigot(gen, {objectMode: true})
Do some filtering:
var filter = require("through2-filter")
var oddsOnly = filter({objectMode: true}, function (record) {
if (record.time % 2 != 0) return true
})
Do some modification:
var map = require("through2-map")
var gap = map({objectMode: true}, function (record) {
if (this.last == null)
record.gap = 0
else
record.gap = record.time - this.last.time
this.last = record
return record
})
Collect the results:
var concat = require("concat-stream")
function collect(data) {
console.log(data)
}
Create the pipeline:
source.pipe(oddsOnly)
.pipe(gap)
.pipe(concat(collect))
[ { time: 1376242414241, id: 1, gap: 0 },
{ time: 1376242414269, id: 2, gap: 28 },
{ time: 1376242414345, id: 5, gap: 76 },
{ time: 1376242414421, id: 8, gap: 76 },
{ time: 1376242414523, id: 12, gap: 102 },
{ time: 1376242414599, id: 15, gap: 76 },
{ time: 1376242414675, id: 18, gap: 76 },
{ time: 1376242414751, id: 21, gap: 76 } ]
CascadiaJS is an awesome conference coming up November 14-15 in Vancouver, BC.
The Call for Proposals closes in two days! (August 15th) You still have time...
More at 2013.cascadiajs.com.