@venzee/dynamo_streams
Advanced tools
Comparing version 1.0.2 to 1.1.0
'use strict'; | ||
module.exports = { | ||
query: require('./query') | ||
query: require('./query'), | ||
scan: require('./scan') | ||
}; |
{ | ||
"name": "@venzee/dynamo_streams", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "Venzee Streaming tools for DynamoDB access", | ||
@@ -39,7 +39,2 @@ "main": "index.js", | ||
"private": false, | ||
"dependencies": { | ||
"async": "^2.6.0", | ||
"lodash": "^4.17.4", | ||
"object-streaming-tools": "^1.2.0" | ||
}, | ||
"devDependencies": { | ||
@@ -46,0 +41,0 @@ "babel-cli": "^6.26.0", |
73
query.js
'use strict'; | ||
function _toConsumableArray(arr) { if (Array.isArray(arr)) { for (var i = 0, arr2 = Array(arr.length); i < arr.length; i++) { arr2[i] = arr[i]; } return arr2; } else { return Array.from(arr); } } | ||
function createQueryStream(client, params, options) { | ||
var isReading = false; | ||
var hasRead = false; | ||
var ExclusiveStartKey = void 0; | ||
var rest = []; | ||
function read(n) { | ||
var _this = this; | ||
if (isReading) return; | ||
if (rest.length) { | ||
isReading = true; | ||
pushInitialItems.call(this, rest, n); | ||
isReading = false; | ||
this.push(rest.shift()); | ||
return; | ||
} | ||
if (hasRead) { | ||
if (ExclusiveStartKey == null) return this.push(null); | ||
params = Object.assign({}, params, { ExclusiveStartKey: ExclusiveStartKey }); | ||
ExclusiveStartKey = null; | ||
} | ||
hasRead = true; | ||
isReading = true; | ||
client.query(params, function (err, result) { | ||
if (err) return _this.emit('error', err); | ||
if (!(result && result.Items && result.Items.length)) return _this.push(null); | ||
ExclusiveStartKey = result.LastEvaluatedKey; | ||
var items = [].concat(_toConsumableArray(result.Items)); | ||
pushInitialItems.call(_this, items, n); | ||
isReading = false; | ||
var item = items.shift(); | ||
rest.push.apply(rest, _toConsumableArray(items)); | ||
_this.push(item); | ||
}); | ||
} | ||
return require('stream').Readable(Object.assign({}, options, { read: read, objectMode: true })); | ||
} | ||
module.exports = createQueryStream; | ||
function pushInitialItems(items, n) { | ||
while (items.length > 1 && n-- > 1) { | ||
push.call(this, items.shift()); | ||
} | ||
} | ||
function push(item) { | ||
if (item != null) this.push(item); | ||
} | ||
module.exports = require('./lib/createStreamFor')('query'); |
@@ -26,1 +26,5 @@ # @venzee/dynamo_streams | ||
``` | ||
### ScanStream | ||
Same behavior as a QueryStream, except we are streaming `scan` results, not `query` results. |
12548
0
14
178
30
- Removedasync@^2.6.0
- Removedlodash@^4.17.4
- Removedobject-streaming-tools@^1.2.0
- Removedasync@2.6.4(transitive)
- Removedlodash@4.17.21(transitive)
- Removedobject-streaming-tools@1.4.0(transitive)