🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

batch-stream-operation

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

batch-stream-operation - npm Package Compare versions

Comparing version

to
1.0.1

.npmignore

14

index.js

@@ -15,2 +15,7 @@ const stream = require('readable-stream')

function performOperation() {
numOngoingOperations++
operation(batch, () => writeStream.emit('operationComplete'))
}
writeStream.on('operationComplete', () => {

@@ -25,3 +30,3 @@ numOngoingOperations--

if (finished && numOngoingOperations === 0) {
writeStream.emit('close')
writeStream.emit('complete')
}

@@ -34,4 +39,3 @@ })

if (batch.length === batchSize) {
numOngoingOperations++
operation(batch, () => writeStream.emit('operationComplete'))
performOperation()
batch = []

@@ -53,5 +57,5 @@

if (batch.length > 0) {
operation(batch, () => writeStream.emit('operationComplete'))
performOperation()
} else if (numOngoingOperations === 0) {
writeStream.emit('close')
writeStream.emit('complete')
}

@@ -58,0 +62,0 @@ })

{
"name": "batch-stream-operation",
"version": "1.0.0",
"version": "1.0.1",
"description": "Group a stream into chunks of a given size, then perform an async operation on each batch before proceeding, with a given concurrency",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -25,3 +25,3 @@ /* eslint-disable id-length */

.pipe(batchStream)
.on('close', () => t.ok('Close called'))
.on('complete', () => t.ok('Complete called'))
})

@@ -44,3 +44,27 @@

.pipe(batchStream)
.on('close', () => t.ok('Close called'))
.on('complete', () => t.ok('Complete called'))
})
test('it closes only when all operations are done', t => {
let batchNum = 0
const completed = [false, false]
const batchStream = batcher({
concurrency: 5,
batchSize: 70,
operation: (batch, cb) => {
const currentBatch = batchNum++
setTimeout(() => {
completed[currentBatch] = true
cb()
}, currentBatch === 0 ? 500 : 100)
}
})
fs.createReadStream(path.join(__dirname, 'fixture.txt'), {encoding: 'utf8'})
.pipe(split2())
.pipe(batchStream)
.on('complete', () => {
t.ok(completed.every(Boolean), 'all operations should be done on complete')
t.end()
})
})