Comparing version 0.0.0 to 1.0.1
31
index.js
@@ -7,11 +7,12 @@ //another idea: buffer 2* the max, but only call write with half of that, | ||
function count (length, item) { | ||
return length + 1 | ||
function append (array, item) { | ||
(array = array || []).push(item) | ||
return array | ||
} | ||
module.exports = function (write, reduceLength, max, cb) { | ||
reduceLength = reduceLength || count | ||
module.exports = function (write, reduce, max, cb) { | ||
reduce = reduce || append | ||
var ended | ||
return function (read) { | ||
var queue = [], writing = false, length = 0 | ||
var queue = null, writing = false, length = 0 | ||
@@ -21,7 +22,6 @@ function flush () { | ||
var _queue = queue | ||
queue = [] | ||
writing = true | ||
length = 0 | ||
queue = null; writing = true; length = 0 | ||
write(_queue, function (err) { | ||
writing = false | ||
console.log(ended, length, writing) | ||
if(ended === true && !length) cb(err) | ||
@@ -31,14 +31,17 @@ else if(ended && ended !== true) cb(err || ended) | ||
else if(length) flush() | ||
else read(null, next) | ||
}) | ||
} | ||
read(null, function next (end, data) { | ||
function next (end, data) { | ||
if(ended) return | ||
ended = end | ||
if(!ended) { | ||
queue.push(data); length = reduceLength(length, data); flush() | ||
queue = reduce(queue, data) | ||
length = (queue && queue.length) || 0 | ||
flush() | ||
if(length < max) read(null, next) | ||
} | ||
else if(!writing) cb(ended === true ? null : ended) | ||
}) | ||
} | ||
read(null, next) | ||
} | ||
@@ -48,1 +51,5 @@ } | ||
{ | ||
"name": "pull-write", | ||
"description": "", | ||
"version": "0.0.0", | ||
"version": "1.0.1", | ||
"homepage": "https://github.com/dominictarr/pull-write", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -7,3 +7,3 @@ # pull-write | ||
## Write(asyncWrite, lengthReduce, max, cb) | ||
## Write(asyncWrite, reduce, max, cb) | ||
@@ -18,14 +18,18 @@ ### asyncWrite(ary, cb) | ||
### lengthReduce(length, item) | ||
### reduce (queue, item) | ||
`length` is the current length of the buffer. | ||
`item` is a piece of data about to be added to the internal buffer, | ||
`lengthReduce` must return the length of the buffer plus this item. | ||
`queue` is the current backlog of data the `pull-write` is getting ready to write. | ||
`item` is the next incoming item. `reduce` must add `item` into `queue` | ||
in whatever way is appropiate. If `queue` is empty, then it will be `null`. | ||
Your `reduce` function must handle that case and set an initial value. | ||
if `lengthReduce` is not provided, it will default to count the number of elements in the buffer. | ||
by default, `reduce` will be a function that initializes a buffer, | ||
and then pushes the new items onto that buffer, this means `max` will be | ||
compared to the number of items in that buffer. | ||
### max | ||
A number, when the internal buffer gets this big it will stop reading more, | ||
until asyncWrite calls back. | ||
A number, when the `.length` property of the `queue` returned by `reduce` | ||
gets this big `pull-write` will stop reading more, until asyncWrite | ||
calls back. | ||
@@ -45,17 +49,11 @@ ## example | ||
return Write(function (ary, cb) { | ||
var ts = 0 | ||
ary.map(function (e) { | ||
ts = Math.max(ts, e.ts) | ||
e.type = 'put' | ||
}) | ||
//assuming that the incoming data always has a timestamp, | ||
//write that out to be queried separately. | ||
ary.push({key: '~meta~ts', value: ts, type: 'put'}) | ||
db.batch(ary, cb) | ||
}, function (len, data) { | ||
//since data is json and we havn't serialized it yet, | ||
//just keep a count instead of calculating the exact length. | ||
//if the input was buffers, it would be easy to calculate the length. | ||
return len + 1 | ||
}, function (queue, data) { | ||
if(!queue) | ||
queue = [{key: '~meta~ts', value: 0, type: 'put'}] | ||
queue.push({key:data.key, value: data.value, type: 'put'}) | ||
//the record of the current sequence is always the first value | ||
//in the batch, so we can update it easily. | ||
queue[0].value = data.ts | ||
return queue | ||
}, max, cb) | ||
@@ -76,1 +74,5 @@ } | ||
@@ -100,1 +100,22 @@ var tape = require('tape') | ||
tape('read to max', function (t) { | ||
var output = [] | ||
pull( | ||
pull.count(30), | ||
createWrite(function write(data, cb) { | ||
setImmediate(function () { | ||
output = output.concat(data); cb() | ||
}) | ||
}, null, 10, function (err) { | ||
t.notOk(err) | ||
t.equal(output.length, 31) | ||
t.end() | ||
}) | ||
) | ||
}) | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
7344
144
0
75