pull-stream
Advanced tools
Comparing version 3.1.0 to 3.2.0
{ | ||
"name": "pull-stream", | ||
"description": "minimal pull stream", | ||
"version": "3.1.0", | ||
"version": "3.2.0", | ||
"homepage": "https://github.com/dominictarr/pull-stream", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -53,5 +53,7 @@ 'use strict' | ||
sink.abort = function (err) { | ||
sink.abort = function (err, cb) { | ||
if('function' == typeof err) | ||
cb = err, err = true | ||
abort = err || true | ||
if(read) return read(abort, function () {}) | ||
if(read) return read(abort, cb || function () {}) | ||
} | ||
@@ -58,0 +60,0 @@ |
var pull = require('../') | ||
var tape = require('tape') | ||
tape('async-map', function (t) { | ||
require('tape')('async-map', function (t) { | ||
pull( | ||
@@ -17,6 +17,48 @@ pull.count(), | ||
) | ||
}) | ||
tape('abort async map', function (t) { | ||
var err = new Error('abort') | ||
t.plan(2) | ||
var read = pull( | ||
pull.infinite(), | ||
pull.asyncMap(function (data, cb) { | ||
setImmediate(function () { | ||
cb(null, data) | ||
}) | ||
}) | ||
) | ||
read(null, function (end) { | ||
if(!end) throw new Error('expected read to end') | ||
t.ok(end, "read's callback") | ||
}) | ||
read(err, function (end) { | ||
if(!end) throw new Error('expected abort to end') | ||
t.ok(end, "Abort's callback") | ||
t.end() | ||
}) | ||
}) | ||
tape('asyncMap aborts when map errors', function (t) { | ||
t.plan(2) | ||
var ERR = new Error('abort') | ||
pull( | ||
pull.values([1,2,3], function (err) { | ||
console.log('on abort') | ||
t.equal(err, ERR, 'abort gets error') | ||
t.end() | ||
}), | ||
pull.asyncMap(function (data, cb) { | ||
cb(ERR) | ||
}), | ||
pull.collect(function (err) { | ||
t.equal(err, ERR, 'collect gets error') | ||
}) | ||
) | ||
}) | ||
@@ -27,2 +27,3 @@ 'use strict'; | ||
var map = exports.map = | ||
function (map) { | ||
@@ -48,3 +49,43 @@ if(!map) return id | ||
var asyncMap = exports.asyncMap = | ||
function (map) { | ||
function async (map) { | ||
if(!map) return id | ||
map = prop(map) | ||
var busy = false, abortCb, aborted | ||
return function (read) { | ||
return function next (abort, cb) { | ||
if(aborted) return cb(aborted) | ||
if(abort) { | ||
aborted = abort | ||
if(!busy) read(abort, cb) | ||
else read(abort, function () { | ||
//if we are still busy, wait for the mapper to complete. | ||
if(busy) abortCb = cb | ||
else cb(abort) | ||
}) | ||
} | ||
else | ||
read(null, function (end, data) { | ||
if(end) { | ||
cb(end) | ||
if(abortCb) cb(end, data) | ||
} | ||
else { | ||
busy = true | ||
map(data, function (err, data) { | ||
busy = false | ||
if(aborted) { | ||
cb(aborted) | ||
abortCb(aborted) | ||
} | ||
else if(err) next (err, cb) | ||
else cb(null, data) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
} | ||
function asyncMap (map) { | ||
if(!map) return id //when read is passed, pass it on. | ||
@@ -212,1 +253,6 @@ return function (read) { | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
53732
1225