Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pull-write

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pull-write - npm Package Compare versions

Comparing version 0.0.0 to 1.0.1

31

index.js

@@ -7,11 +7,12 @@ //another idea: buffer 2* the max, but only call write with half of that,

function count (length, item) {
return length + 1
function append (array, item) {
(array = array || []).push(item)
return array
}
module.exports = function (write, reduceLength, max, cb) {
reduceLength = reduceLength || count
module.exports = function (write, reduce, max, cb) {
reduce = reduce || append
var ended
return function (read) {
var queue = [], writing = false, length = 0
var queue = null, writing = false, length = 0

@@ -21,7 +22,6 @@ function flush () {

var _queue = queue
queue = []
writing = true
length = 0
queue = null; writing = true; length = 0
write(_queue, function (err) {
writing = false
console.log(ended, length, writing)
if(ended === true && !length) cb(err)

@@ -31,14 +31,17 @@ else if(ended && ended !== true) cb(err || ended)

else if(length) flush()
else read(null, next)
})
}
read(null, function next (end, data) {
function next (end, data) {
if(ended) return
ended = end
if(!ended) {
queue.push(data); length = reduceLength(length, data); flush()
queue = reduce(queue, data)
length = (queue && queue.length) || 0
flush()
if(length < max) read(null, next)
}
else if(!writing) cb(ended === true ? null : ended)
})
}
read(null, next)
}

@@ -48,1 +51,5 @@ }

{
"name": "pull-write",
"description": "",
"version": "0.0.0",
"version": "1.0.1",
"homepage": "https://github.com/dominictarr/pull-write",

@@ -6,0 +6,0 @@ "repository": {

@@ -7,3 +7,3 @@ # pull-write

## Write(asyncWrite, lengthReduce, max, cb)
## Write(asyncWrite, reduce, max, cb)

@@ -18,14 +18,18 @@ ### asyncWrite(ary, cb)

### lengthReduce(length, item)
### reduce (queue, item)
`length` is the current length of the buffer.
`item` is a piece of data about to be added to the internal buffer,
`lengthReduce` must return the length of the buffer plus this item.
`queue` is the current backlog of data the `pull-write` is getting ready to write.
`item` is the next incoming item. `reduce` must add `item` into `queue`
in whatever way is appropiate. If `queue` is empty, then it will be `null`.
Your `reduce` function must handle that case and set an initial value.
if `lengthReduce` is not provided, it will default to count the number of elements in the buffer.
by default, `reduce` will be a function that initializes a buffer,
and then pushes the new items onto that buffer, this means `max` will be
compared to the number of items in that buffer.
### max
A number, when the internal buffer gets this big it will stop reading more,
until asyncWrite calls back.
A number, when the `.length` property of the `queue` returned by `reduce`
gets this big `pull-write` will stop reading more, until asyncWrite
calls back.

@@ -45,17 +49,11 @@ ## example

return Write(function (ary, cb) {
var ts = 0
ary.map(function (e) {
ts = Math.max(ts, e.ts)
e.type = 'put'
})
//assuming that the incoming data always has a timestamp,
//write that out to be queried separately.
ary.push({key: '~meta~ts', value: ts, type: 'put'})
db.batch(ary, cb)
}, function (len, data) {
//since data is json and we havn't serialized it yet,
//just keep a count instead of calculating the exact length.
//if the input was buffers, it would be easy to calculate the length.
return len + 1
}, function (queue, data) {
if(!queue)
queue = [{key: '~meta~ts', value: 0, type: 'put'}]
queue.push({key:data.key, value: data.value, type: 'put'})
//the record of the current sequence is always the first value
//in the batch, so we can update it easily.
queue[0].value = data.ts
return queue
}, max, cb)

@@ -76,1 +74,5 @@ }

@@ -100,1 +100,22 @@ var tape = require('tape')

tape('read to max', function (t) {
var output = []
pull(
pull.count(30),
createWrite(function write(data, cb) {
setImmediate(function () {
output = output.concat(data); cb()
})
}, null, 10, function (err) {
t.notOk(err)
t.equal(output.length, 31)
t.end()
})
)
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc