Comparing version 1.0.0-beta.1 to 1.0.0-beta.2
16
API.md
@@ -201,5 +201,17 @@ # Dyno | ||
Send all the requests in a set, optionally specifying concurrency. The | ||
provided callback function is passed an array of individual results | ||
emphasis is on making it transparent to the caller the exact outcome of each | ||
request in the set. | ||
The callback function will be passed arguments in this order: | ||
- error: set to null if no errors occurred, otherwise an array of errors with | ||
indexes that correspond to the indexes of the original request set | ||
- responses: always an array of responses equal with indexes corresponding | ||
to the original request set. If a particular request encountered an error, | ||
that index in the `responses` array will be set to `null`. | ||
- unprocessed: set to null if no unprocessed results were detected, otherwise | ||
a new set of requests with its own .sendAll function bound to it. Again, | ||
indexes correspond to those in the original request set. | ||
**Parameters** | ||
@@ -210,3 +222,3 @@ | ||
- `callback` **function** a function to handle the response array. | ||
- `callback` **function** a function to handle the response. | ||
@@ -213,0 +225,0 @@ |
15
index.js
@@ -193,4 +193,15 @@ var AWS = require('aws-sdk'); | ||
* Send all the requests in a set, optionally specifying concurrency. The | ||
* provided callback function is passed an array of individual results | ||
* emphasis is on making it transparent to the caller the exact outcome of each | ||
* request in the set. | ||
* | ||
* The callback function will be passed arguments in this order: | ||
* - error: set to null if no errors occurred, otherwise an array of errors with | ||
* indexes that correspond to the indexes of the original request set | ||
* - responses: always an array of responses equal with indexes corresponding | ||
* to the original request set. If a particular request encountered an error, | ||
* that index in the `responses` array will be set to `null`. | ||
* - unprocessed: set to null if no unprocessed results were detected, otherwise | ||
* a new set of requests with its own .sendAll function bound to it. Again, | ||
* indexes correspond to those in the original request set. | ||
* | ||
* @name sendAll | ||
@@ -201,3 +212,3 @@ * @instance | ||
* Default value is `1`. | ||
* @param {function} callback - a function to handle the response array. | ||
* @param {function} callback - a function to handle the response. | ||
*/ | ||
@@ -204,0 +215,0 @@ |
@@ -5,18 +5,76 @@ var big = require('big.js'); | ||
function sendAll(requests, concurrency, callback) { | ||
if (typeof concurrency === 'function') { | ||
callback = concurrency; | ||
concurrency = 1; | ||
} | ||
var q = queue(concurrency); | ||
requests.forEach(function(req) { | ||
q.defer(req.send.bind(req)); | ||
}); | ||
q.awaitAll(callback); | ||
} | ||
module.exports = function(client) { | ||
var requests = {}; | ||
/** | ||
* Given a set of requests, this function sends them all at specified concurrency. | ||
* Generally, this function is not called directly, but is bound to an array of | ||
* requests with the first two parameters wired to specific values. | ||
* | ||
* @private | ||
* @param {RequestSet} requests - an array of AWS.Request objects | ||
* @param {string} fnName - the name of the aws-sdk client function that will | ||
* be used to construct new requests should an unprocessed items be detected | ||
* @param {number} concurrency - the concurrency with which batch requests | ||
* will be made | ||
* @param {function} callback - a function to handle the response | ||
*/ | ||
function sendAll(requests, fnName, concurrency, callback) { | ||
if (typeof concurrency === 'function') { | ||
callback = concurrency; | ||
concurrency = 1; | ||
} | ||
var q = queue(concurrency); | ||
requests.forEach(function(req) { | ||
q.defer(function(next) { | ||
if (!req) return next(); | ||
req.on('complete', function(response) { | ||
next(null, response); | ||
}).send(); | ||
}); | ||
}); | ||
q.awaitAll(function(err, responses) { | ||
if (err) return callback(err); | ||
var errors = []; | ||
var data = []; | ||
var unprocessed = []; | ||
responses.forEach(function(response) { | ||
errors.push(response.error); | ||
data.push(response.data); | ||
if (!response.data) return unprocessed.push(null); | ||
var newParams = { | ||
RequestItems: response.data.UnprocessedItems || response.data.UnprocessedKeys | ||
}; | ||
if (newParams.RequestItems && !Object.keys(newParams.RequestItems).length) | ||
return unprocessed.push(null); | ||
unprocessed.push(newParams.RequestItems ? newParams : null); | ||
}); | ||
unprocessed = unprocessed.map(function(params) { | ||
if (params) return client[fnName].bind(client)(params); | ||
else return null; | ||
}); | ||
var unprocessedCount = unprocessed.filter(function(req) { return !!req; }).length; | ||
if (unprocessedCount) unprocessed.sendAll = sendAll.bind(unprocessed, fnName); | ||
var errorCount = errors.filter(function(err) { return !!err; }).length; | ||
callback( | ||
errorCount ? errors : null, | ||
data, | ||
unprocessedCount ? unprocessed : null | ||
); | ||
}); | ||
} | ||
requests.batchWriteItemRequests = function(params) { | ||
@@ -73,3 +131,3 @@ var maxSize = 16 * 1024 * 1024; | ||
results.sendAll = sendAll.bind(null, results); | ||
results.sendAll = sendAll.bind(null, results, 'batchWrite'); | ||
return results; | ||
@@ -113,3 +171,3 @@ }; | ||
results.sendAll = sendAll.bind(null, results); | ||
results.sendAll = sendAll.bind(null, results, 'batchGet'); | ||
return results; | ||
@@ -116,0 +174,0 @@ }; |
{ | ||
"name": "dyno", | ||
"version": "1.0.0-beta.1", | ||
"version": "1.0.0-beta.2", | ||
"description": "Simple DynamoDB client", | ||
@@ -9,3 +9,4 @@ "main": "index.js", | ||
"test": "nyc tape test/*.test.js", | ||
"docs": "documentation index.js -f md > API.md" | ||
"docs": "documentation index.js -f md > API.md", | ||
"coverage": "nyc --reporter=html tape test/*.test.js && open coverage/index.html" | ||
}, | ||
@@ -12,0 +13,0 @@ "bin": { |
@@ -0,4 +1,6 @@ | ||
var AWS = require('aws-sdk'); | ||
var test = require('tape'); | ||
var testTables = require('./test-tables'); | ||
var dynamodb = require('dynamodb-test')(test, 'dyno', testTables.idhash); | ||
var second = require('dynamodb-test')(test, 'dyno', testTables.idhash); | ||
var Requests = require('../lib/requests'); | ||
@@ -25,2 +27,3 @@ var _ = require('underscore'); | ||
dynamodb.start(); | ||
second.start(); | ||
@@ -54,30 +57,2 @@ dynamodb.test('[requests] batchGetItemRequests (single table)', fixtures, function(assert) { | ||
dynamodb.test('[requests] batchGetItemRequests.sendAll (single table)', fixtures, function(assert) { | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
var found = dyno.batchGetItemRequests(params); | ||
assert.equal(found.length, 2, 'split 150 keys into two requests'); | ||
found.sendAll(function(err, results) { | ||
assert.ifError(err, 'requests were sent successfully'); | ||
results = results[0].Responses[dynamodb.tableName].concat(results[1].Responses[dynamodb.tableName]); | ||
assert.equal(results.length, 150, 'all responses were recieved'); | ||
found.sendAll(4, function(err) { | ||
assert.ifError(err, 'can set concurrency'); | ||
assert.end(); | ||
}); | ||
}); | ||
}); | ||
dynamodb.test('[requests] batchWriteItemRequests (single table, small writes)', fixtures, function(assert) { | ||
@@ -159,2 +134,386 @@ var dyno = Dyno({ | ||
dynamodb.test('[requests] sendAll (single table)', fixtures, function(assert) { | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
var found = dyno.batchGetItemRequests(params); | ||
assert.equal(found.length, 2, 'split 150 keys into two requests'); | ||
found.sendAll(function(err, results) { | ||
assert.ifError(err, 'requests were sent successfully'); | ||
results = results[0].Responses[dynamodb.tableName].concat(results[1].Responses[dynamodb.tableName]); | ||
assert.equal(results.length, 150, 'all responses were recieved'); | ||
found.sendAll(4, function(err) { | ||
assert.ifError(err, 'can set concurrency'); | ||
assert.end(); | ||
}); | ||
}); | ||
}); | ||
second.load(fixtures); | ||
dynamodb.test('[requests] sendAll (two tables)', fixtures, function(assert) { | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
params.RequestItems[second.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
var found = dyno.batchGetItemRequests(params); | ||
assert.equal(found.length, 3, 'split 300 keys into three requests'); | ||
found.sendAll(function(err, results, unprocessed) { | ||
assert.ifError(err, 'requests were sent successfully'); | ||
assert.ifError(unprocessed, 'no unprocessed items reported'); | ||
results = results.reduce(function(results, result) { | ||
if (result.Responses[dynamodb.tableName]) results = results.concat(result.Responses[dynamodb.tableName]); | ||
if (result.Responses[second.tableName]) results = results.concat(result.Responses[second.tableName]); | ||
return results; | ||
}, []); | ||
assert.equal(results.length, 300, 'all responses were recieved'); | ||
assert.end(); | ||
}); | ||
}); | ||
second.empty(); | ||
test('[requests] batchGet sendAll: no errors, unprocessed items present', function(assert) { | ||
var original = AWS.Request.prototype.send; | ||
AWS.Request.prototype.send = function() { | ||
var params = this.params.RequestItems[dynamodb.tableName].Keys; | ||
var data = { Responses: {} }; | ||
data.Responses[dynamodb.tableName] = []; | ||
params.forEach(function(key) { | ||
if (key.id === '143') { | ||
data.UnprocessedKeys = {}; | ||
data.UnprocessedKeys[dynamodb.tableName] = { Keys: [key] }; | ||
} | ||
else data.Responses[dynamodb.tableName].push({ | ||
Item: fixtures[key.id] | ||
}); | ||
}); | ||
this.removeListener('extractError', AWS.EventListeners.Core.EXTRACT_ERROR); | ||
this.on('extractError', function(response) { response.error = null; }); | ||
this.removeListener('extractData', AWS.EventListeners.Core.EXTRACT_DATA); | ||
this.on('extractData', function(response) { response.data = data; }); | ||
this.removeListener('send', AWS.EventListeners.Core.SEND); | ||
this.on('send', function(response) { | ||
response.httpResponse.body = '{"mocked":"response"}'; | ||
response.httpResponse.statusCode = 200; | ||
}); | ||
this.runTo(); | ||
return this.response; | ||
}; | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
var requests = dyno.batchGetItemRequests(params); | ||
requests.sendAll(function(err, responses, unprocessed) { | ||
assert.ifError(err, 'success'); | ||
assert.equal(responses.length, requests.length, 'when present, responses array has as many entries as there were requests'); | ||
assert.equal(unprocessed.length, requests.length, 'when present, unprocessed array has as many entries as there were requests'); | ||
var successCount = responses[0].Responses[dynamodb.tableName].length + responses[1].Responses[dynamodb.tableName].length; | ||
assert.equal(successCount, 149, '149 items requested successfully'); | ||
assert.equal(unprocessed[0], null, 'first request contained no unprocessed items'); | ||
var expected = { RequestItems: {} }; | ||
expected.RequestItems[dynamodb.tableName] = { Keys: [{ id: '143' }] }; | ||
assert.deepEqual(unprocessed[1].params, expected, 'unprocessed request for expected params'); | ||
assert.equal(typeof unprocessed.sendAll, 'function', 'unprocessed response has bound .sendAll'); | ||
AWS.Request.prototype.send = original; | ||
assert.end(); | ||
}); | ||
}); | ||
test('[requests] batchGet sendAll: with errors, unprocessed items present', function(assert) { | ||
var original = AWS.Request.prototype.send; | ||
AWS.Request.prototype.send = function() { | ||
var params = this.params.RequestItems[dynamodb.tableName].Keys; | ||
var data = { Responses: {} }; | ||
data.Responses[dynamodb.tableName] = []; | ||
var error; | ||
params.forEach(function(key) { | ||
if (key.id === '2') { | ||
console.log('getting an error'); | ||
error = new Error('omg! mock error!'); | ||
error.statusCode = 404; | ||
} | ||
else if (key.id === '143') { | ||
data.UnprocessedKeys = {}; | ||
data.UnprocessedKeys[dynamodb.tableName] = { Keys: [key] }; | ||
} | ||
else data.Responses[dynamodb.tableName].push({ | ||
Item: fixtures[key.id] | ||
}); | ||
}); | ||
this.removeListener('extractError', AWS.EventListeners.Core.EXTRACT_ERROR); | ||
this.on('extractError', function(response) { response.error = error || null; }); | ||
this.removeListener('extractData', AWS.EventListeners.Core.EXTRACT_DATA); | ||
this.on('extractData', function(response) { response.data = data; }); | ||
this.removeListener('send', AWS.EventListeners.Core.SEND); | ||
this.on('send', function(response) { | ||
response.httpResponse.body = '{"mocked":"response"}'; | ||
response.httpResponse.statusCode = error ? 404 : 200; | ||
}); | ||
this.runTo(); | ||
return this.response; | ||
}; | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = { | ||
Keys: _.range(150).map(function(i) { | ||
return { id: i.toString() }; | ||
}) | ||
}; | ||
var requests = dyno.batchGetItemRequests(params); | ||
requests.sendAll(function(err, responses, unprocessed) { | ||
assert.equal(err.length, requests.length, 'when present, error array has as many entries as there were requests'); | ||
assert.equal(responses.length, requests.length, 'when present, responses array has as many entries as there were requests'); | ||
assert.equal(unprocessed.length, requests.length, 'when present, unprocessed array has as many entries as there were requests'); | ||
assert.equal(err[0].message, 'omg! mock error!', 'first request errored'); | ||
assert.equal(responses[0], null, 'response set to null when error occurred'); | ||
assert.equal(unprocessed[0], null, 'first request contained no unprocessed items'); | ||
var expected = { RequestItems: {} }; | ||
expected.RequestItems[dynamodb.tableName] = { Keys: [{ id: '143' }] }; | ||
assert.equal(err[1], null, 'no error on second request'); | ||
assert.equal(responses[1].Responses[dynamodb.tableName].length, 49, '49 successful requests'); | ||
assert.deepEqual(unprocessed[1].params, expected, 'unprocessed request for expected params'); | ||
assert.equal(typeof unprocessed.sendAll, 'function', 'unprocessed response has bound .sendAll'); | ||
AWS.Request.prototype.send = original; | ||
assert.end(); | ||
}); | ||
}); | ||
test('[requests] batchWrite sendAll: no errors, unprocessed items present', function(assert) { | ||
var original = AWS.Request.prototype.send; | ||
AWS.Request.prototype.send = function() { | ||
var params = this.params.RequestItems[dynamodb.tableName]; | ||
var data = { Responses: {} }; | ||
data.Responses[dynamodb.tableName] = []; | ||
params.forEach(function(req) { | ||
if (req.PutRequest.Item.id === '143') { | ||
data.UnprocessedItems = {}; | ||
data.UnprocessedItems[dynamodb.tableName] = [{ PutRequest: { Item: fixtures['143'] } }]; | ||
} | ||
else data.Responses[dynamodb.tableName].push({}); | ||
}); | ||
this.removeListener('extractError', AWS.EventListeners.Core.EXTRACT_ERROR); | ||
this.on('extractError', function(response) { response.error = null; }); | ||
this.removeListener('extractData', AWS.EventListeners.Core.EXTRACT_DATA); | ||
this.on('extractData', function(response) { response.data = data; }); | ||
this.removeListener('send', AWS.EventListeners.Core.SEND); | ||
this.on('send', function(response) { | ||
response.httpResponse.body = '{"mocked":"response"}'; | ||
response.httpResponse.statusCode = 200; | ||
}); | ||
this.runTo(); | ||
return this.response; | ||
}; | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = fixtures.map(function(item) { | ||
return { PutRequest: { Item: item } }; | ||
}); | ||
var requests = dyno.batchWriteItemRequests(params); | ||
requests.sendAll(function(err, responses, unprocessed) { | ||
assert.ifError(err, 'success'); | ||
assert.equal(responses.length, requests.length, 'when present, responses array has as many entries as there were requests'); | ||
assert.equal(unprocessed.length, requests.length, 'when present, unprocessed array has as many entries as there were requests'); | ||
var successCount = | ||
responses[0].Responses[dynamodb.tableName].length + | ||
responses[1].Responses[dynamodb.tableName].length + | ||
responses[2].Responses[dynamodb.tableName].length + | ||
responses[3].Responses[dynamodb.tableName].length + | ||
responses[4].Responses[dynamodb.tableName].length + | ||
responses[5].Responses[dynamodb.tableName].length; | ||
assert.equal(successCount, 149, '149 items requested successfully'); | ||
assert.equal(unprocessed[0], null, 'first request contained no unprocessed items'); | ||
assert.equal(unprocessed[1], null, 'second request contained no unprocessed items'); | ||
assert.equal(unprocessed[2], null, 'third request contained no unprocessed items'); | ||
assert.equal(unprocessed[3], null, 'fourth request contained no unprocessed items'); | ||
assert.equal(unprocessed[4], null, 'fifth request contained no unprocessed items'); | ||
var expected = { RequestItems: {} }; | ||
expected.RequestItems[dynamodb.tableName] = [{ PutRequest: { Item: fixtures['143'] } }]; | ||
assert.deepEqual(unprocessed[5].params, expected, 'unprocessed request for expected params'); | ||
assert.equal(typeof unprocessed.sendAll, 'function', 'unprocessed response has bound .sendAll'); | ||
AWS.Request.prototype.send = original; | ||
assert.end(); | ||
}); | ||
}); | ||
test('[requests] batchWrite sendAll: with errors, unprocessed items present', function(assert) { | ||
var original = AWS.Request.prototype.send; | ||
AWS.Request.prototype.send = function() { | ||
var params = this.params.RequestItems[dynamodb.tableName]; | ||
var data = { Responses: {} }; | ||
data.Responses[dynamodb.tableName] = []; | ||
var error; | ||
params.forEach(function(req) { | ||
if (req.PutRequest.Item.id === '2') { | ||
error = new Error('omg! mock error!'); | ||
error.statusCode = 404; | ||
} | ||
else if (req.PutRequest.Item.id === '143') { | ||
data.UnprocessedItems = {}; | ||
data.UnprocessedItems[dynamodb.tableName] = [{ PutRequest: { Item: fixtures['143'] } }]; | ||
} | ||
else data.Responses[dynamodb.tableName].push({}); | ||
}); | ||
this.removeListener('extractError', AWS.EventListeners.Core.EXTRACT_ERROR); | ||
this.on('extractError', function(response) { response.error = error || null; }); | ||
this.removeListener('extractData', AWS.EventListeners.Core.EXTRACT_DATA); | ||
this.on('extractData', function(response) { response.data = data; }); | ||
this.removeListener('send', AWS.EventListeners.Core.SEND); | ||
this.on('send', function(response) { | ||
response.httpResponse.body = '{"mocked":"response"}'; | ||
response.httpResponse.statusCode = error ? 404 : 200; | ||
}); | ||
this.runTo(); | ||
return this.response; | ||
}; | ||
var dyno = Dyno({ | ||
table: dynamodb.tableName, | ||
region: 'local', | ||
endpoint: 'http://localhost:4567' | ||
}); | ||
var params = { RequestItems: {} }; | ||
params.RequestItems[dynamodb.tableName] = fixtures.map(function(item) { | ||
return { PutRequest: { Item: item } }; | ||
}); | ||
var requests = dyno.batchWriteItemRequests(params); | ||
requests.sendAll(function(err, responses, unprocessed) { | ||
assert.equal(err.length, requests.length, 'when present, error array has as many entries as there were requests'); | ||
assert.equal(responses.length, requests.length, 'when present, responses array has as many entries as there were requests'); | ||
assert.equal(unprocessed.length, requests.length, 'when present, unprocessed array has as many entries as there were requests'); | ||
assert.equal(err[0].message, 'omg! mock error!', 'first response errored'); | ||
assert.equal(responses[0], null, 'responses set to null when error occurred'); | ||
assert.equal(unprocessed[0], null, 'no unprocessed results'); | ||
assert.equal(err[1], null, 'second response did not error'); | ||
assert.equal(responses[1].Responses[dynamodb.tableName].length, 25, '25 successful responses'); | ||
assert.equal(unprocessed[1], null, 'no unprocessed results'); | ||
assert.equal(err[2], null, 'third response did not error'); | ||
assert.equal(responses[2].Responses[dynamodb.tableName].length, 25, '25 successful responses'); | ||
assert.equal(unprocessed[2], null, 'no unprocessed results'); | ||
assert.equal(err[3], null, 'fourth response did not error'); | ||
assert.equal(responses[3].Responses[dynamodb.tableName].length, 25, '25 successful responses'); | ||
assert.equal(unprocessed[3], null, 'no unprocessed results'); | ||
assert.equal(err[4], null, 'fifth response did not error'); | ||
assert.equal(responses[4].Responses[dynamodb.tableName].length, 25, '25 successful responses'); | ||
assert.equal(unprocessed[4], null, 'no unprocessed results'); | ||
assert.equal(err[5], null, 'sixth response did not error'); | ||
assert.equal(responses[5].Responses[dynamodb.tableName].length, 24, '24 successful responses'); | ||
var expected = { RequestItems: {} }; | ||
expected.RequestItems[dynamodb.tableName] = [{ PutRequest: { Item: fixtures['143'] } }]; | ||
assert.deepEqual(unprocessed[5].params, expected, 'unprocessed request for expected params'); | ||
assert.equal(typeof unprocessed.sendAll, 'function', 'unprocessed response has bound .sendAll'); | ||
AWS.Request.prototype.send = original; | ||
assert.end(); | ||
}); | ||
}); | ||
second.delete(); | ||
dynamodb.delete(); | ||
dynamodb.close(); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
122349
2241
1