sonic-boom
Advanced tools
Comparing version 2.2.3 to 2.3.0
17
index.js
@@ -91,3 +91,3 @@ 'use strict' | ||
let { fd, dest, minLength, sync, append = true, mkdir } = opts || {} | ||
let { fd, dest, minLength, sync, append = true, mkdir, retryEAGAIN } = opts || {} | ||
@@ -110,4 +110,4 @@ fd = fd || dest | ||
this.append = append || false | ||
this.retryEAGAIN = retryEAGAIN || (() => true) | ||
this.mkdir = mkdir || false | ||
this._againTimeout = null | ||
@@ -128,3 +128,3 @@ if (typeof fd === 'number') { | ||
if (err) { | ||
if (err.code === 'EAGAIN') { | ||
if (err.code === 'EAGAIN' && this.retryEAGAIN(err, this._writingBuf.length, this._len - this._writingBuf.length)) { | ||
if (this.sync) { | ||
@@ -143,4 +143,3 @@ // This error code should not happen in sync mode, because it is | ||
// Let's give the destination some time to process the chunk. | ||
this._againTimeout = setTimeout(() => { | ||
this._againTimeout = null | ||
setTimeout(() => { | ||
fs.write(this.fd, this._writingBuf, 'utf8', this.release) | ||
@@ -340,4 +339,3 @@ }, BUSY_WRITE_TIMEOUT) | ||
if (this._againTimeout) { | ||
this._againTimeout = null | ||
if (!this._writing && this._writingBuf.length > 0) { | ||
this._bufs.unshift(this._writingBuf) | ||
@@ -348,7 +346,8 @@ this._writingBuf = '' | ||
while (this._bufs.length) { | ||
const buf = this._bufs[0] | ||
try { | ||
this._len -= fs.writeSync(this.fd, this._bufs[0], 'utf8') | ||
this._len -= fs.writeSync(this.fd, buf, 'utf8') | ||
this._bufs.shift() | ||
} catch (err) { | ||
if (err.code !== 'EAGAIN') { | ||
if (err.code !== 'EAGAIN' || !this.retryEAGAIN(err, buf.length, this._len - buf.length)) { | ||
throw err | ||
@@ -355,0 +354,0 @@ } |
{ | ||
"name": "sonic-boom", | ||
"version": "2.2.3", | ||
"version": "2.3.0", | ||
"description": "Extremely fast utf8 only stream implementation", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -66,7 +66,13 @@ # sonic-boom | ||
* `mkdir`: ensure directory for dest file exists when `true` (default `false`). | ||
* `retryEAGAIN(err, writeBufferLen, remainingBufferLen)`: a function that will be called when sonic-boom | ||
write/writeSync/flushSync encounters a EAGAIN error. If the return value is | ||
true sonic-boom will retry the operation, otherwise it will bubble the | ||
error. `err` is the error that caused this function to be called, | ||
`writeBufferLen` is the length of the buffer sonic-boom tried to write, and | ||
`remainingBufferLen` is the length of the remaining buffer sonic-boom didn't try to write. | ||
For `sync:false` a `SonicBoom` instance will emit the `'ready'` event when a file descriptor is available. | ||
For `sync:true` this is not relevant because the `'ready'` event will be fired when the `SonicBoom` instance is created, before it can be subscribed to. | ||
For `sync:false` a `SonicBoom` instance will emit the `'ready'` event when a file descriptor is available. | ||
For `sync:true` this is not relevant because the `'ready'` event will be fired when the `SonicBoom` instance is created, before it can be subscribed to. | ||
### SonicBoom#write(string) | ||
@@ -73,0 +79,0 @@ |
287
test.js
@@ -12,2 +12,3 @@ 'use strict' | ||
const MAX_WRITE = 16 * 1024 * 1024 | ||
const files = [] | ||
@@ -728,2 +729,55 @@ let count = 0 | ||
test('emit error on async EAGAIN', (t) => { | ||
t.plan(11) | ||
const fakeFs = Object.create(fs) | ||
fakeFs.write = function (fd, buf, enc, cb) { | ||
t.pass('fake fs.write called') | ||
fakeFs.write = fs.write | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
process.nextTick(cb, err) | ||
} | ||
const SonicBoom = proxyquire('.', { | ||
fs: fakeFs | ||
}) | ||
const dest = file() | ||
const fd = fs.openSync(dest, 'w') | ||
const stream = new SonicBoom({ | ||
fd, | ||
sync: false, | ||
minLength: 12, | ||
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.equal(writeBufferLen, 12) | ||
t.equal(remainingBufferLen, 0) | ||
return false | ||
} | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
stream.once('error', err => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.ok(stream.write('something else\n')) | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
fs.readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('retry on EAGAIN (sync)', (t) => { | ||
@@ -768,2 +822,55 @@ t.plan(7) | ||
test('emit error on EAGAIN (sync)', (t) => { | ||
t.plan(11) | ||
const fakeFs = Object.create(fs) | ||
fakeFs.writeSync = function (fd, buf, enc, cb) { | ||
t.pass('fake fs.writeSync called') | ||
fakeFs.writeSync = fs.writeSync | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
throw err | ||
} | ||
const SonicBoom = proxyquire('.', { | ||
fs: fakeFs | ||
}) | ||
const dest = file() | ||
const fd = fs.openSync(dest, 'w') | ||
const stream = new SonicBoom({ | ||
fd, | ||
minLength: 0, | ||
sync: true, | ||
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.equal(writeBufferLen, 12) | ||
t.equal(remainingBufferLen, 0) | ||
return false | ||
} | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
stream.once('error', err => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.ok(stream.write('something else\n')) | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
fs.readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('retry in flushSync on EAGAIN', (t) => { | ||
@@ -811,2 +918,180 @@ t.plan(7) | ||
test('throw error in flushSync on EAGAIN', (t) => { | ||
t.plan(11) | ||
const fakeFs = Object.create(fs) | ||
const SonicBoom = proxyquire('.', { | ||
fs: fakeFs | ||
}) | ||
const dest = file() | ||
const fd = fs.openSync(dest, 'w') | ||
const stream = new SonicBoom({ | ||
fd, | ||
sync: false, | ||
minLength: 1000, | ||
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.equal(writeBufferLen, 12) | ||
t.equal(remainingBufferLen, 0) | ||
return false | ||
} | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
fakeFs.writeSync = function (fd, buf, enc) { | ||
Error.captureStackTrace(err) | ||
t.pass('fake fs.write called') | ||
fakeFs.writeSync = fs.writeSync | ||
throw err | ||
} | ||
t.ok(stream.write('hello world\n')) | ||
t.throws(stream.flushSync.bind(stream), err, 'EAGAIN') | ||
t.ok(stream.write('something else\n')) | ||
stream.flushSync() | ||
stream.end() | ||
stream.on('finish', () => { | ||
fs.readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('retryEAGAIN receives remaining buffer on async if write fails', (t) => { | ||
t.plan(12) | ||
const fakeFs = Object.create(fs) | ||
const SonicBoom = proxyquire('.', { | ||
fs: fakeFs | ||
}) | ||
const dest = file() | ||
const fd = fs.openSync(dest, 'w') | ||
const stream = new SonicBoom({ | ||
fd, | ||
sync: false, | ||
minLength: 12, | ||
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.equal(writeBufferLen, 12) | ||
t.equal(remainingBufferLen, 11) | ||
return false | ||
} | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
stream.once('error', err => { | ||
t.equal(err.code, 'EAGAIN') | ||
t.ok(stream.write('done')) | ||
}) | ||
fakeFs.write = function (fd, buf, enc, cb) { | ||
t.pass('fake fs.write called') | ||
fakeFs.write = fs.write | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
t.ok(stream.write('sonic boom\n')) | ||
process.nextTick(cb, err) | ||
} | ||
t.ok(stream.write('hello world\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
fs.readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsonic boom\ndone') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('retryEAGAIN receives remaining buffer if exceeds MAX_WRITE', (t) => { | ||
t.plan(17) | ||
const fakeFs = Object.create(fs) | ||
const SonicBoom = proxyquire('.', { | ||
fs: fakeFs | ||
}) | ||
const dest = file() | ||
const fd = fs.openSync(dest, 'w') | ||
const buf = Buffer.alloc(MAX_WRITE - 2).fill('x').toString() // 1 MB | ||
const stream = new SonicBoom({ | ||
fd, | ||
sync: false, | ||
minLength: MAX_WRITE - 1, | ||
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => { | ||
t.equal(err.code, 'EAGAIN', 'retryEAGAIN received EAGAIN error') | ||
t.equal(writeBufferLen, buf.length, 'writeBufferLen === buf.length') | ||
t.equal(remainingBufferLen, 23, 'remainingBufferLen === 23') | ||
return false | ||
} | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
fakeFs.write = function (fd, buf, enc, cb) { | ||
t.pass('fake fs.write called') | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
process.nextTick(cb, err) | ||
} | ||
fakeFs.writeSync = function (fd, buf, enc, cb) { | ||
t.pass('fake fs.write called') | ||
const err = new Error('EAGAIN') | ||
err.code = 'EAGAIN' | ||
throw err | ||
} | ||
t.ok(stream.write(buf), 'write buf') | ||
t.notOk(stream.write('hello world\nsonic boom\n'), 'write hello world sonic boom') | ||
stream.once('error', err => { | ||
t.equal(err.code, 'EAGAIN', 'bubbled error should be EAGAIN') | ||
try { | ||
stream.flushSync() | ||
} catch (err) { | ||
t.equal(err.code, 'EAGAIN', 'thrown error should be EAGAIN') | ||
fakeFs.write = fs.write | ||
fakeFs.writeSync = fs.writeSync | ||
stream.end() | ||
} | ||
}) | ||
stream.on('finish', () => { | ||
t.pass('finish emitted') | ||
fs.readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, `${buf}hello world\nsonic boom\n`, 'data on file should match written') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('write buffers that are not totally written', (t) => { | ||
@@ -1071,5 +1356,5 @@ t.plan(9) | ||
fd, | ||
minLength: 16 * 1024 * 1024 | ||
minLength: MAX_WRITE | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
50433
1594
115