fs-write-stream-atomic
Advanced tools
Comparing version 1.0.7 to 1.0.8
125
index.js
var fs = require('graceful-fs') | ||
var PassThrough = require('readable-stream').PassThrough | ||
var Writable = require('readable-stream').Writable | ||
var util = require('util') | ||
@@ -20,81 +20,106 @@ var MurmurHash3 = require('imurmurhash') | ||
var setImmediate = global.setImmediate || setTimeout | ||
module.exports = WriteStreamAtomic | ||
util.inherits(WriteStreamAtomic, PassThrough) | ||
// Requirements: | ||
// 1. Write everything written to the stream to a temp file. | ||
// 2. If there are no errors: | ||
// a. moves the temp file into its final destination | ||
// b. emits `finish` & `closed` ONLY after the file is | ||
// fully flushed and renamed. | ||
// 3. If there's an error, removes the temp file. | ||
util.inherits(WriteStreamAtomic, Writable) | ||
function WriteStreamAtomic (path, options) { | ||
if (!options) options = {} | ||
if (!(this instanceof WriteStreamAtomic)) { | ||
return new WriteStreamAtomic(path, options) | ||
} | ||
Writable.call(this, options) | ||
this.__atomicTarget = path | ||
this.__atomicChown = options.chown | ||
this.__atomicTmp = getTmpname(path) | ||
this.__atomicFinished = false | ||
this.__atomicMoved = false | ||
this.__atomicChown = options && options.chown | ||
this.__atomicClosed = false | ||
this.__atomicStream = fs.WriteStream(this.__atomicTmp, options) | ||
this.__atomicStream.on('error', handleError.bind(this)) | ||
PassThrough.call(this, options) | ||
this.pipe(this.__atomicStream) | ||
this.__atomicStream.once('open', handleOpen(this)) | ||
this.__atomicStream.once('close', handleClose(this)) | ||
this.__atomicStream.once('error', handleError(this)) | ||
} | ||
function cleanupSync () { | ||
try { | ||
fs.unlinkSync(this.__atomicTmp) | ||
} finally { | ||
return | ||
} | ||
// We have to suppress default finish emitting, because ordinarily it | ||
// would happen as soon as `end` is called on us and all of the | ||
// data has been written to our target stream. So we suppress | ||
// finish from being emitted here, and only emit it after our | ||
// target stream is closed and we've moved everything around. | ||
WriteStreamAtomic.prototype.emit = function (event) { | ||
if (event === 'finish') return this.__atomicStream.end() | ||
return Writable.prototype.emit.apply(this, arguments) | ||
} | ||
function handleError (er) { | ||
cleanupSync() | ||
this.emit('error', er) | ||
WriteStreamAtomic.prototype._write = function (buffer, encoding, cb) { | ||
var flushed = this.__atomicStream.write(buffer, encoding) | ||
if (flushed) return cb() | ||
this.__atomicStream.once('drain', cb) | ||
} | ||
function finish () { | ||
if (!this.__atomicFinished) return | ||
if (!this.__atomicMoved) return | ||
PassThrough.prototype.emit.call(this, 'finish') | ||
process.nextTick(function () { | ||
this.emit('close') | ||
}.bind(this)) | ||
} | ||
WriteStreamAtomic.prototype.emit = function (event) { | ||
// We'll emit this ourselves, as we need to hold off on emitting it | ||
// until after we've completed putting the final file into place. | ||
// To do otherwise creats a race between finish and close ;_; | ||
if (event === 'finish') { | ||
this.__atomicFinished = true | ||
return finish.call(this) | ||
function handleOpen (writeStream) { | ||
return function (fd) { | ||
writeStream.emit('open', fd) | ||
} | ||
return PassThrough.prototype.emit.apply(this, arguments) | ||
} | ||
WriteStreamAtomic.prototype._flush = function (cb) { | ||
var writeStream = this | ||
if (writeStream.__atomicChown) { | ||
var uid = writeStream.__atomicChown.uid | ||
var gid = writeStream.__atomicChown.gid | ||
return fs.chown(writeStream.__atomicTmp, uid, gid, iferr(cleanup, moveIntoPlace)) | ||
} else { | ||
moveIntoPlace() | ||
function handleClose (writeStream) { | ||
return function () { | ||
if (writeStream.__atomicClosed) return | ||
writeStream.__atomicClosed = true | ||
if (writeStream.__atomicChown) { | ||
var uid = writeStream.__atomicChown.uid | ||
var gid = writeStream.__atomicChown.gid | ||
return fs.chown(writeStream.__atomicTmp, uid, gid, iferr(cleanup, moveIntoPlace)) | ||
} else { | ||
moveIntoPlace() | ||
} | ||
} | ||
function cleanup (err) { | ||
if (!err) return cb() | ||
fs.unlink(writeStream.__atomicTmp, function () { | ||
writeStream.emit('error', err) | ||
cb() | ||
writeStream.emit('close') | ||
}) | ||
} | ||
function moveIntoPlace () { | ||
fs.rename(writeStream.__atomicTmp, writeStream.__atomicTarget, function (err) { | ||
cleanup(err) | ||
writeStream.__atomicMoved = true | ||
finish.call(writeStream) | ||
fs.rename(writeStream.__atomicTmp, writeStream.__atomicTarget, iferr(cleanup, end)) | ||
} | ||
function end () { | ||
// We have to use our parent class directly because we suppress `finish` | ||
// events fired via our own emit method. | ||
Writable.prototype.emit.call(writeStream, 'finish') | ||
// Delay the close to provide the same temporal separation a physical | ||
// file operation would have– that is, the close event is emitted only | ||
// after the async close operation completes. | ||
setImmediate(function () { | ||
writeStream.emit('close') | ||
}) | ||
} | ||
} | ||
function handleError (writeStream) { | ||
return function (er) { | ||
cleanupSync() | ||
writeStream.emit('error', er) | ||
writeStream.__atomicClosed = true | ||
writeStream.emit('close') | ||
} | ||
function cleanupSync () { | ||
try { | ||
fs.unlinkSync(writeStream.__atomicTmp) | ||
} finally { | ||
return | ||
} | ||
} | ||
} |
{ | ||
"name": "fs-write-stream-atomic", | ||
"version": "1.0.7", | ||
"version": "1.0.8", | ||
"description": "Like `fs.createWriteStream(...)`, but atomic.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
'use strict' | ||
var fs = require('graceful-fs') | ||
var path = require('path') | ||
@@ -15,3 +16,3 @@ var test = require('tap').test | ||
hadError = true | ||
console.error('#', er) | ||
console.log('#', er) | ||
}) | ||
@@ -24,2 +25,19 @@ stream.on('close', function () { | ||
test('chown fails', function (t) { | ||
t.plan(1) | ||
fs.chown = function (file, uid, gid, cb) { | ||
cb(new Error('TEST BREAK')) | ||
} | ||
var stream = writeStream(target, {chown: {uid: process.getuid(), gid: process.getgid()}}) | ||
var hadError = false | ||
stream.on('error', function (er) { | ||
hadError = true | ||
console.log('#', er) | ||
}) | ||
stream.on('close', function () { | ||
t.is(hadError, true, 'error before close') | ||
}) | ||
stream.end() | ||
}) | ||
test('cleanup', function (t) { | ||
@@ -26,0 +44,0 @@ rimraf.sync(target) |
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
320
0
13115
11