Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

aws-dynamodb-batch-iterator

Package Overview
Dependencies
Maintainers
1
Versions
2
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aws-dynamodb-batch-iterator - npm Package Compare versions

Comparing version 0.7.3-pull-request-167-3 to 0.7.3-pull-request-167-4

144

build/BatchGet.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var tslib_1 = require("tslib");
var BatchOperation_1 = require("./BatchOperation");
const BatchOperation_1 = require("./BatchOperation");
exports.MAX_READ_BATCH_SIZE = 100;

@@ -14,4 +13,3 @@ /**

*/
var BatchGet = /** @class */ (function (_super) {
tslib_1.__extends(BatchGet, _super);
class BatchGet extends BatchOperation_1.BatchOperation {
/**

@@ -26,93 +24,53 @@ * @param client The AWS SDK client with which to communicate with

*/
function BatchGet(client, items, _a) {
var _b = _a === void 0 ? {} : _a, ConsistentRead = _b.ConsistentRead, _c = _b.PerTableOptions, PerTableOptions = _c === void 0 ? {} : _c;
var _this = _super.call(this, client, items) || this;
_this.batchSize = exports.MAX_READ_BATCH_SIZE;
_this.consistentRead = ConsistentRead;
_this.options = PerTableOptions;
return _this;
constructor(client, items, { ConsistentRead, PerTableOptions = {}, } = {}) {
super(client, items);
this.batchSize = exports.MAX_READ_BATCH_SIZE;
this.consistentRead = ConsistentRead;
this.options = PerTableOptions;
}
BatchGet.prototype.doBatchRequest = function () {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var operationInput, batchSize, _a, tableName, item, _b, projection, consistentRead, attributeNames, _c, _d, Responses, _e, UnprocessedKeys, unprocessedTables, _f, _g, table, _h, _j, table, tableData, _k, _l, item;
var e_1, _m, e_2, _o, e_3, _p;
return tslib_1.__generator(this, function (_q) {
switch (_q.label) {
case 0:
operationInput = { RequestItems: {} };
batchSize = 0;
while (this.toSend.length > 0) {
_a = tslib_1.__read(this.toSend.shift(), 2), tableName = _a[0], item = _a[1];
if (operationInput.RequestItems[tableName] === undefined) {
_b = this.state[tableName], projection = _b.projection, consistentRead = _b.consistentRead, attributeNames = _b.attributeNames;
operationInput.RequestItems[tableName] = {
Keys: [],
ConsistentRead: consistentRead,
ProjectionExpression: projection,
ExpressionAttributeNames: attributeNames,
};
}
operationInput.RequestItems[tableName].Keys.push(item);
if (++batchSize === this.batchSize) {
break;
}
}
return [4 /*yield*/, this.client.batchGetItem(operationInput).promise()];
case 1:
_c = _q.sent(), _d = _c.Responses, Responses = _d === void 0 ? {} : _d, _e = _c.UnprocessedKeys, UnprocessedKeys = _e === void 0 ? {} : _e;
unprocessedTables = new Set();
try {
for (_f = tslib_1.__values(Object.keys(UnprocessedKeys)), _g = _f.next(); !_g.done; _g = _f.next()) {
table = _g.value;
unprocessedTables.add(table);
this.handleThrottled(table, UnprocessedKeys[table].Keys);
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (_g && !_g.done && (_m = _f.return)) _m.call(_f);
}
finally { if (e_1) throw e_1.error; }
}
this.movePendingToThrottled(unprocessedTables);
try {
for (_h = tslib_1.__values(Object.keys(Responses)), _j = _h.next(); !_j.done; _j = _h.next()) {
table = _j.value;
tableData = this.state[table];
tableData.backoffFactor = Math.max(0, tableData.backoffFactor - 1);
try {
for (_k = (e_3 = void 0, tslib_1.__values(Responses[table])), _l = _k.next(); !_l.done; _l = _k.next()) {
item = _l.value;
this.pending.push([table, item]);
}
}
catch (e_3_1) { e_3 = { error: e_3_1 }; }
finally {
try {
if (_l && !_l.done && (_p = _k.return)) _p.call(_k);
}
finally { if (e_3) throw e_3.error; }
}
}
}
catch (e_2_1) { e_2 = { error: e_2_1 }; }
finally {
try {
if (_j && !_j.done && (_o = _h.return)) _o.call(_h);
}
finally { if (e_2) throw e_2.error; }
}
return [2 /*return*/];
}
});
});
};
BatchGet.prototype.getInitialTableState = function (tableName) {
var _a = this.options[tableName] || {}, ExpressionAttributeNames = _a.ExpressionAttributeNames, ProjectionExpression = _a.ProjectionExpression, _b = _a.ConsistentRead, ConsistentRead = _b === void 0 ? this.consistentRead : _b;
return tslib_1.__assign(tslib_1.__assign({}, _super.prototype.getInitialTableState.call(this, tableName)), { attributeNames: ExpressionAttributeNames, projection: ProjectionExpression, consistentRead: ConsistentRead });
};
return BatchGet;
}(BatchOperation_1.BatchOperation));
async doBatchRequest() {
const operationInput = { RequestItems: {} };
let batchSize = 0;
while (this.toSend.length > 0) {
const [tableName, item] = this.toSend.shift();
if (operationInput.RequestItems[tableName] === undefined) {
const { projection, consistentRead, attributeNames, } = this.state[tableName];
operationInput.RequestItems[tableName] = {
Keys: [],
ConsistentRead: consistentRead,
ProjectionExpression: projection,
ExpressionAttributeNames: attributeNames,
};
}
operationInput.RequestItems[tableName].Keys.push(item);
if (++batchSize === this.batchSize) {
break;
}
}
const { Responses = {}, UnprocessedKeys = {}, } = await this.client.batchGetItem(operationInput).promise();
const unprocessedTables = new Set();
for (const table of Object.keys(UnprocessedKeys)) {
unprocessedTables.add(table);
this.handleThrottled(table, UnprocessedKeys[table].Keys);
}
this.movePendingToThrottled(unprocessedTables);
for (const table of Object.keys(Responses)) {
const tableData = this.state[table];
tableData.backoffFactor = Math.max(0, tableData.backoffFactor - 1);
for (const item of Responses[table]) {
this.pending.push([table, item]);
}
}
}
getInitialTableState(tableName) {
const { ExpressionAttributeNames, ProjectionExpression, ConsistentRead = this.consistentRead, } = this.options[tableName] || {};
return {
...super.getInitialTableState(tableName),
attributeNames: ExpressionAttributeNames,
projection: ProjectionExpression,
consistentRead: ConsistentRead
};
}
}
exports.BatchGet = BatchGet;
//# sourceMappingURL=BatchGet.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var tslib_1 = require("tslib");
if (Symbol && !Symbol.asyncIterator) {
Symbol.asyncIterator = Symbol.for("__@@asyncIterator__");
}
var BatchOperation = /** @class */ (function () {
class BatchOperation {
/**

@@ -16,3 +15,3 @@ * @param client The AWS SDK client with which to communicate with

*/
function BatchOperation(client, items) {
constructor(client, items) {
this.client = client;

@@ -42,6 +41,5 @@ /**

}
BatchOperation.prototype.next = function () {
var _this = this;
next() {
if (this.lastResolved) {
this.lastResolved = this.lastResolved.then(function () { return _this.getNext(); });
this.lastResolved = this.lastResolved.then(() => this.getNext());
}

@@ -52,6 +50,6 @@ else {

return this.lastResolved;
};
BatchOperation.prototype[Symbol.asyncIterator] = function () {
}
[Symbol.asyncIterator]() {
return this;
};
}
/**

@@ -63,3 +61,3 @@ * Create and return the initial state object for a given DynamoDB table.

*/
BatchOperation.prototype.getInitialTableState = function (tableName) {
getInitialTableState(tableName) {
return {

@@ -69,3 +67,3 @@ backoffFactor: 0,

};
};
}
/**

@@ -82,12 +80,12 @@ * Accept an array of unprocessed items belonging to a single table and

*/
BatchOperation.prototype.handleThrottled = function (tableName, unprocessed) {
var tableState = this.state[tableName];
handleThrottled(tableName, unprocessed) {
const tableState = this.state[tableName];
tableState.backoffFactor++;
if (tableState.tableThrottling) {
this.throttled.delete(tableState.tableThrottling.backoffWaiter);
unprocessed.unshift.apply(unprocessed, tslib_1.__spread(tableState.tableThrottling.unprocessed));
unprocessed.unshift(...tableState.tableThrottling.unprocessed);
}
tableState.tableThrottling = {
unprocessed: unprocessed,
backoffWaiter: new Promise(function (resolve) {
unprocessed,
backoffWaiter: new Promise(resolve => {
setTimeout(resolve, exponentialBackoff(tableState.backoffFactor), tableState);

@@ -97,3 +95,3 @@ })

this.throttled.add(tableState.tableThrottling.backoffWaiter);
};
}
/**

@@ -106,5 +104,5 @@ * Iterate over all pending writes and move those targeting throttled tables

*/
BatchOperation.prototype.movePendingToThrottled = function (unprocessedTables) {
for (var i = this.toSend.length - 1; i > -1; i--) {
var _a = tslib_1.__read(this.toSend[i], 2), table = _a[0], attributes = _a[1];
movePendingToThrottled(unprocessedTables) {
for (let i = this.toSend.length - 1; i > -1; i--) {
const [table, attributes] = this.toSend[i];
if (unprocessedTables.has(table)) {

@@ -116,9 +114,8 @@ this.state[table]

}
};
BatchOperation.prototype.addToSendQueue = function (_a) {
var _b = tslib_1.__read(_a, 2), tableName = _b[0], attributes = _b[1];
}
addToSendQueue([tableName, attributes]) {
if (!this.state[tableName]) {
this.state[tableName] = this.getInitialTableState(tableName);
}
var tableState = this.state[tableName];
const tableState = this.state[tableName];
if (tableState.tableThrottling) {

@@ -130,88 +127,55 @@ tableState.tableThrottling.unprocessed.push(attributes);

}
};
BatchOperation.prototype.enqueueThrottled = function (table) {
var _a;
var _b = table.tableThrottling, backoffWaiter = _b.backoffWaiter, unprocessed = _b.unprocessed;
}
enqueueThrottled(table) {
const { tableThrottling: { backoffWaiter, unprocessed } } = table;
if (unprocessed.length > 0) {
(_a = this.toSend).push.apply(_a, tslib_1.__spread(unprocessed.map(function (attr) { return [table.name, attr]; })));
this.toSend.push(...unprocessed.map(attr => [table.name, attr]));
}
this.throttled.delete(backoffWaiter);
delete table.tableThrottling;
};
BatchOperation.prototype.getNext = function () {
return tslib_1.__awaiter(this, void 0, void 0, function () {
return tslib_1.__generator(this, function (_a) {
switch (_a.label) {
case 0:
if (this.sourceDone &&
this.pending.length === 0 &&
this.toSend.length === 0 &&
this.throttled.size === 0) {
return [2 /*return*/, { done: true }];
}
if (this.pending.length > 0) {
return [2 /*return*/, {
done: false,
value: this.pending.shift()
}];
}
return [4 /*yield*/, this.refillPending()];
case 1:
_a.sent();
return [2 /*return*/, this.getNext()];
}
async getNext() {
if (this.sourceDone &&
this.pending.length === 0 &&
this.toSend.length === 0 &&
this.throttled.size === 0) {
return { done: true };
}
if (this.pending.length > 0) {
return {
done: false,
value: this.pending.shift()
};
}
await this.refillPending();
return this.getNext();
}
async refillPending() {
while (!this.sourceDone &&
this.toSend.length < this.batchSize) {
const toProcess = isIteratorResult(this.sourceNext)
? this.sourceNext
: await Promise.race([
this.sourceNext,
Promise.race(this.throttled)
]);
if (isIteratorResult(toProcess)) {
this.sourceDone = Boolean(toProcess.done);
if (!this.sourceDone) {
this.addToSendQueue(toProcess.value);
this.sourceNext = this.iterator.next();
}
});
});
};
BatchOperation.prototype.refillPending = function () {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var toProcess, _a, _b;
return tslib_1.__generator(this, function (_c) {
switch (_c.label) {
case 0:
if (!(!this.sourceDone &&
this.toSend.length < this.batchSize)) return [3 /*break*/, 4];
if (!isIteratorResult(this.sourceNext)) return [3 /*break*/, 1];
_a = this.sourceNext;
return [3 /*break*/, 3];
case 1: return [4 /*yield*/, Promise.race([
this.sourceNext,
Promise.race(this.throttled)
])];
case 2:
_a = _c.sent();
_c.label = 3;
case 3:
toProcess = _a;
if (isIteratorResult(toProcess)) {
this.sourceDone = Boolean(toProcess.done);
if (!this.sourceDone) {
this.addToSendQueue(toProcess.value);
this.sourceNext = this.iterator.next();
}
}
else {
this.enqueueThrottled(toProcess);
}
return [3 /*break*/, 0];
case 4:
if (!(this.toSend.length < this.batchSize && this.throttled.size > 0)) return [3 /*break*/, 6];
_b = this.enqueueThrottled;
return [4 /*yield*/, Promise.race(this.throttled)];
case 5:
_b.apply(this, [_c.sent()]);
return [3 /*break*/, 4];
case 6:
if (!(this.toSend.length > 0)) return [3 /*break*/, 8];
return [4 /*yield*/, this.doBatchRequest()];
case 7:
_c.sent();
_c.label = 8;
case 8: return [2 /*return*/];
}
});
});
};
return BatchOperation;
}());
}
else {
this.enqueueThrottled(toProcess);
}
}
while (this.toSend.length < this.batchSize && this.throttled.size > 0) {
this.enqueueThrottled(await Promise.race(this.throttled));
}
if (this.toSend.length > 0) {
await this.doBatchRequest();
}
}
}
exports.BatchOperation = BatchOperation;

@@ -218,0 +182,0 @@ function exponentialBackoff(attempts) {

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var tslib_1 = require("tslib");
var BatchOperation_1 = require("./BatchOperation");
var itemIdentifier_1 = require("./itemIdentifier");
const BatchOperation_1 = require("./BatchOperation");
const itemIdentifier_1 = require("./itemIdentifier");
exports.MAX_WRITE_BATCH_SIZE = 25;

@@ -20,110 +19,55 @@ /**

*/
var BatchWrite = /** @class */ (function (_super) {
tslib_1.__extends(BatchWrite, _super);
function BatchWrite() {
var _this = _super !== null && _super.apply(this, arguments) || this;
_this.batchSize = exports.MAX_WRITE_BATCH_SIZE;
return _this;
class BatchWrite extends BatchOperation_1.BatchOperation {
constructor() {
super(...arguments);
this.batchSize = exports.MAX_WRITE_BATCH_SIZE;
}
BatchWrite.prototype.doBatchRequest = function () {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var inFlight, operationInput, batchSize, _a, tableName, marshalled, _b, UnprocessedItems, unprocessedTables, _c, _d, table, unprocessed, _e, _f, item, identifier, i, _g, tableName, attributes, processedTables, inFlight_1, inFlight_1_1, _h, tableName, marshalled, processedTables_1, processedTables_1_1, tableName;
var e_1, _j, e_2, _k, e_3, _l, e_4, _m;
return tslib_1.__generator(this, function (_o) {
switch (_o.label) {
case 0:
inFlight = [];
operationInput = { RequestItems: {} };
batchSize = 0;
while (this.toSend.length > 0) {
_a = tslib_1.__read(this.toSend.shift(), 2), tableName = _a[0], marshalled = _a[1];
inFlight.push([tableName, marshalled]);
if (operationInput.RequestItems[tableName] === undefined) {
operationInput.RequestItems[tableName] = [];
}
operationInput.RequestItems[tableName].push(marshalled);
if (++batchSize === this.batchSize) {
break;
}
async doBatchRequest() {
const inFlight = [];
const operationInput = { RequestItems: {} };
let batchSize = 0;
while (this.toSend.length > 0) {
const [tableName, marshalled] = this.toSend.shift();
inFlight.push([tableName, marshalled]);
if (operationInput.RequestItems[tableName] === undefined) {
operationInput.RequestItems[tableName] = [];
}
operationInput.RequestItems[tableName].push(marshalled);
if (++batchSize === this.batchSize) {
break;
}
}
const { UnprocessedItems = {} } = await this.client.batchWriteItem(operationInput).promise();
const unprocessedTables = new Set();
for (const table of Object.keys(UnprocessedItems)) {
unprocessedTables.add(table);
const unprocessed = [];
for (const item of UnprocessedItems[table]) {
if (item.DeleteRequest || item.PutRequest) {
unprocessed.push(item);
const identifier = itemIdentifier_1.itemIdentifier(table, item);
for (let i = inFlight.length - 1; i >= 0; i--) {
const [tableName, attributes] = inFlight[i];
if (tableName === table &&
itemIdentifier_1.itemIdentifier(tableName, attributes) === identifier) {
inFlight.splice(i, 1);
}
return [4 /*yield*/, this.client.batchWriteItem(operationInput).promise()];
case 1:
_b = (_o.sent()).UnprocessedItems, UnprocessedItems = _b === void 0 ? {} : _b;
unprocessedTables = new Set();
try {
for (_c = tslib_1.__values(Object.keys(UnprocessedItems)), _d = _c.next(); !_d.done; _d = _c.next()) {
table = _d.value;
unprocessedTables.add(table);
unprocessed = [];
try {
for (_e = (e_2 = void 0, tslib_1.__values(UnprocessedItems[table])), _f = _e.next(); !_f.done; _f = _e.next()) {
item = _f.value;
if (item.DeleteRequest || item.PutRequest) {
unprocessed.push(item);
identifier = itemIdentifier_1.itemIdentifier(table, item);
for (i = inFlight.length - 1; i >= 0; i--) {
_g = tslib_1.__read(inFlight[i], 2), tableName = _g[0], attributes = _g[1];
if (tableName === table &&
itemIdentifier_1.itemIdentifier(tableName, attributes) === identifier) {
inFlight.splice(i, 1);
}
}
}
}
}
catch (e_2_1) { e_2 = { error: e_2_1 }; }
finally {
try {
if (_f && !_f.done && (_k = _e.return)) _k.call(_e);
}
finally { if (e_2) throw e_2.error; }
}
this.handleThrottled(table, unprocessed);
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (_d && !_d.done && (_j = _c.return)) _j.call(_c);
}
finally { if (e_1) throw e_1.error; }
}
this.movePendingToThrottled(unprocessedTables);
processedTables = new Set();
try {
for (inFlight_1 = tslib_1.__values(inFlight), inFlight_1_1 = inFlight_1.next(); !inFlight_1_1.done; inFlight_1_1 = inFlight_1.next()) {
_h = tslib_1.__read(inFlight_1_1.value, 2), tableName = _h[0], marshalled = _h[1];
processedTables.add(tableName);
this.pending.push([tableName, marshalled]);
}
}
catch (e_3_1) { e_3 = { error: e_3_1 }; }
finally {
try {
if (inFlight_1_1 && !inFlight_1_1.done && (_l = inFlight_1.return)) _l.call(inFlight_1);
}
finally { if (e_3) throw e_3.error; }
}
try {
for (processedTables_1 = tslib_1.__values(processedTables), processedTables_1_1 = processedTables_1.next(); !processedTables_1_1.done; processedTables_1_1 = processedTables_1.next()) {
tableName = processedTables_1_1.value;
this.state[tableName].backoffFactor =
Math.max(0, this.state[tableName].backoffFactor - 1);
}
}
catch (e_4_1) { e_4 = { error: e_4_1 }; }
finally {
try {
if (processedTables_1_1 && !processedTables_1_1.done && (_m = processedTables_1.return)) _m.call(processedTables_1);
}
finally { if (e_4) throw e_4.error; }
}
return [2 /*return*/];
}
}
});
});
};
return BatchWrite;
}(BatchOperation_1.BatchOperation));
}
this.handleThrottled(table, unprocessed);
}
this.movePendingToThrottled(unprocessedTables);
const processedTables = new Set();
for (const [tableName, marshalled] of inFlight) {
processedTables.add(tableName);
this.pending.push([tableName, marshalled]);
}
for (const tableName of processedTables) {
this.state[tableName].backoffFactor =
Math.max(0, this.state[tableName].backoffFactor - 1);
}
}
}
exports.BatchWrite = BatchWrite;
//# sourceMappingURL=BatchWrite.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var tslib_1 = require("tslib");
const tslib_1 = require("tslib");
tslib_1.__exportStar(require("./BatchGet"), exports);
tslib_1.__exportStar(require("./BatchWrite"), exports);
//# sourceMappingURL=index.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var tslib_1 = require("tslib");
var bytes = require('utf8-bytes');
const bytes = require('utf8-bytes');
/**
* @internal
*/
function itemIdentifier(tableName, _a) {
var DeleteRequest = _a.DeleteRequest, PutRequest = _a.PutRequest;
function itemIdentifier(tableName, { DeleteRequest, PutRequest }) {
if (DeleteRequest) {
return tableName + "::delete::" + serializeKeyTypeAttributes(DeleteRequest.Key);
return `${tableName}::delete::${serializeKeyTypeAttributes(DeleteRequest.Key)}`;
}
else if (PutRequest) {
return tableName + "::put::" + serializeKeyTypeAttributes(PutRequest.Item);
return `${tableName}::put::${serializeKeyTypeAttributes(PutRequest.Item)}`;
}
throw new Error("Invalid write request provided");
throw new Error(`Invalid write request provided`);
}
exports.itemIdentifier = itemIdentifier;
function serializeKeyTypeAttributes(attributes) {
var e_1, _a;
var keyTypeProperties = [];
try {
for (var _b = tslib_1.__values(Object.keys(attributes).sort()), _c = _b.next(); !_c.done; _c = _b.next()) {
var property = _c.value;
var attribute = attributes[property];
if (attribute.B) {
keyTypeProperties.push(property + "=" + toByteArray(attribute.B));
}
else if (attribute.N) {
keyTypeProperties.push(property + "=" + attribute.N);
}
else if (attribute.S) {
keyTypeProperties.push(property + "=" + attribute.S);
}
const keyTypeProperties = [];
for (const property of Object.keys(attributes).sort()) {
const attribute = attributes[property];
if (attribute.B) {
keyTypeProperties.push(`${property}=${toByteArray(attribute.B)}`);
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (_c && !_c.done && (_a = _b.return)) _a.call(_b);
else if (attribute.N) {
keyTypeProperties.push(`${property}=${attribute.N}`);
}
finally { if (e_1) throw e_1.error; }
else if (attribute.S) {
keyTypeProperties.push(`${property}=${attribute.S}`);
}
}

@@ -44,0 +31,0 @@ return keyTypeProperties.join('&');

{
"name": "aws-dynamodb-batch-iterator",
"version": "0.7.3-pull-request-167-3",
"version": "0.7.3-pull-request-167-4",
"description": "Abstraction for DynamoDB batch reads and writes for that handles batch splitting and partial retries with exponential backoff",

@@ -45,3 +45,3 @@ "keywords": [

},
"gitHead": "7cfb733e819ee0a5a3cfc7ee8f1f4222f0ac7954"
"gitHead": "78e82a3c68facefa4d2daebc382340b99ac72954"
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc