batch-stream-operation
Advanced tools
Comparing version
14
index.js
@@ -15,2 +15,7 @@ const stream = require('readable-stream') | ||
function performOperation() { | ||
numOngoingOperations++ | ||
operation(batch, () => writeStream.emit('operationComplete')) | ||
} | ||
writeStream.on('operationComplete', () => { | ||
@@ -25,3 +30,3 @@ numOngoingOperations-- | ||
if (finished && numOngoingOperations === 0) { | ||
writeStream.emit('close') | ||
writeStream.emit('complete') | ||
} | ||
@@ -34,4 +39,3 @@ }) | ||
if (batch.length === batchSize) { | ||
numOngoingOperations++ | ||
operation(batch, () => writeStream.emit('operationComplete')) | ||
performOperation() | ||
batch = [] | ||
@@ -53,5 +57,5 @@ | ||
if (batch.length > 0) { | ||
operation(batch, () => writeStream.emit('operationComplete')) | ||
performOperation() | ||
} else if (numOngoingOperations === 0) { | ||
writeStream.emit('close') | ||
writeStream.emit('complete') | ||
} | ||
@@ -58,0 +62,0 @@ }) |
{ | ||
"name": "batch-stream-operation", | ||
"version": "1.0.0", | ||
"version": "1.0.1", | ||
"description": "Group a stream into chunks of a given size, then perform an async operation on each batch before proceeding, with a given concurrency", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -25,3 +25,3 @@ /* eslint-disable id-length */ | ||
.pipe(batchStream) | ||
.on('close', () => t.ok('Close called')) | ||
.on('complete', () => t.ok('Complete called')) | ||
}) | ||
@@ -44,3 +44,27 @@ | ||
.pipe(batchStream) | ||
.on('close', () => t.ok('Close called')) | ||
.on('complete', () => t.ok('Complete called')) | ||
}) | ||
test('it closes only when all operations are done', t => { | ||
let batchNum = 0 | ||
const completed = [false, false] | ||
const batchStream = batcher({ | ||
concurrency: 5, | ||
batchSize: 70, | ||
operation: (batch, cb) => { | ||
const currentBatch = batchNum++ | ||
setTimeout(() => { | ||
completed[currentBatch] = true | ||
cb() | ||
}, currentBatch === 0 ? 500 : 100) | ||
} | ||
}) | ||
fs.createReadStream(path.join(__dirname, 'fixture.txt'), {encoding: 'utf8'}) | ||
.pipe(split2()) | ||
.pipe(batchStream) | ||
.on('complete', () => { | ||
t.ok(completed.every(Boolean), 'all operations should be done on complete') | ||
t.end() | ||
}) | ||
}) |
36528
1.93%7
16.67%108
30.12%