Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lifion-kinesis

Package Overview
Dependencies
Maintainers
3
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lifion-kinesis - npm Package Compare versions

Comparing version 1.0.7 to 1.0.8

lib/constants.js

43

CHANGELOG.md

@@ -10,2 +10,28 @@ ## Changelog

### v1.0.8 (2019-06-05)
- [`#127`](https://github.com/lifion/lifion-kinesis/pull/127): Feature/unit tests and updated docs
- [`#147`](https://github.com/lifion/lifion-kinesis/pull/147): Update dependency husky to ^2.4.0
- [`#146`](https://github.com/lifion/lifion-kinesis/pull/146): Update dependency aws-sdk to ^2.468.0
- [`#145`](https://github.com/lifion/lifion-kinesis/pull/145): Update dependency aws-sdk to ^2.464.0
- [`#144`](https://github.com/lifion/lifion-kinesis/pull/144): Update dependency semver to ^6.1.1
- [`#143`](https://github.com/lifion/lifion-kinesis/pull/143): Update dependency aws-sdk to ^2.463.0
- [`#142`](https://github.com/lifion/lifion-kinesis/pull/142): Update dependency aws-sdk to ^2.462.0
- [`#141`](https://github.com/lifion/lifion-kinesis/pull/141): Update dependency semver to ^6.1.0
- [`#140`](https://github.com/lifion/lifion-kinesis/pull/140): Update dependency aws-sdk to ^2.461.0
- [`54662a1`](https://github.com/lifion/lifion-kinesis/commit/54662a1bcc6af9735e37e4acb89bf72f938fb3a4): Add tests for lib/state-store
- [`8af3763`](https://github.com/lifion/lifion-kinesis/commit/8af3763dd21a784eb062ccdf929d6d52b0bfd744): Add tests for lib/fan-out-consumer
- [`cb9c34f`](https://github.com/lifion/lifion-kinesis/commit/cb9c34f39db6a75bef7afda301a365d5d9b8a5d9): Add tests for lib/index
- [`254b525`](https://github.com/lifion/lifion-kinesis/commit/254b525a4856bca6d0767569f9da71ecd06b17bf): Add tests for lib/kinesis-client
- [`5d10b17`](https://github.com/lifion/lifion-kinesis/commit/5d10b1780ba0352da2b6a37e0e62a41519283dc1): Document the rest of the modules
- [`c86066b`](https://github.com/lifion/lifion-kinesis/commit/c86066b044a48e31ad58660102fdcedaa37b3741): Refactor put record(s) results
- [`efcf145`](https://github.com/lifion/lifion-kinesis/commit/efcf145ea511973a224022bcbf91df15073e40e1): Add tests for lib/stream
- [`2cc0e0d`](https://github.com/lifion/lifion-kinesis/commit/2cc0e0d651f1525119a75ddfc688c3e041ea9275): Upgrade dependencies
- [`e9759ad`](https://github.com/lifion/lifion-kinesis/commit/e9759ada8858091ffaf5c1a1b21f4e6fb92fb72d): Recover test-coverage in lib/comsumers-manager
- [`bc09da7`](https://github.com/lifion/lifion-kinesis/commit/bc09da7cc0b7747b70cbfa68719eb1b520178a32): Integrate branch with latest release
- [`eebdb34`](https://github.com/lifion/lifion-kinesis/commit/eebdb3400796ab186f4caf67b8e2258edc46ec8d): Upgrade aws-sdk
- [`182d6f5`](https://github.com/lifion/lifion-kinesis/commit/182d6f51cac27da72029114a2bc86da4de7a0115): Upgrade aws-sdk
- [`ef4e970`](https://github.com/lifion/lifion-kinesis/commit/ef4e9707581c96abb0b3528ecfa5d413324d0bc5): Upgrade aws-sdk
- [`c3f30a8`](https://github.com/lifion/lifion-kinesis/commit/c3f30a85319e544df7f7e258ca472b15141676e2): Upgrade aws-sdk
### v1.0.7 (2019-05-22)

@@ -33,4 +59,21 @@

- [`#119`](https://github.com/lifion/lifion-kinesis/pull/119): Update dependency short-uuid to ^3.1.1
- [`4a9e7b4`](https://github.com/lifion/lifion-kinesis/commit/4a9e7b457297def09ace2853bc0e501b3b80dfb8): Add tests for lib/dynamodb-client
- [`0669488`](https://github.com/lifion/lifion-kinesis/commit/066948819aece6d4c7a27279b73a029cfccff0b2): Add tests for lib/lease-manager
- [`ce8a742`](https://github.com/lifion/lifion-kinesis/commit/ce8a74288118aa7574c420b2b0eb902ce74aa32e): Add tests for lib/polling-consumer
- [`b12d83b`](https://github.com/lifion/lifion-kinesis/commit/b12d83b09137789d84f1270511c15a8ed915e22f): Upgrade dependencies
- [`dd845d6`](https://github.com/lifion/lifion-kinesis/commit/dd845d657dc224d6c3546cc0a1afc454e7c21904): Partial tests for lib/stream
- [`4cb343e`](https://github.com/lifion/lifion-kinesis/commit/4cb343ed085754006124f9a7a45b25041d3c45d9): Refactor the tests for lib/dynamodb-client
- [`3671145`](https://github.com/lifion/lifion-kinesis/commit/36711459161cff9aef5bccb13f9e4286fe6e52d1): Add tests for lib/records
- [`7efc497`](https://github.com/lifion/lifion-kinesis/commit/7efc497507e5582ae15361e28c5097481271c408): Add tests for lib/utils
- [`edaf114`](https://github.com/lifion/lifion-kinesis/commit/edaf1145a5f2b7a3cfac94f6c6e759fb4414a77b): Add tests for lib/table
- [`126e455`](https://github.com/lifion/lifion-kinesis/commit/126e45513e1521ec6078dddf78450faf6a8e7c29): Add tests for lib/stats
- [`3316550`](https://github.com/lifion/lifion-kinesis/commit/331655005561d436880cbe3364bb0bc5b8ba5891): Adopt the latest eslint-config-lifion
- [`2d05ec4`](https://github.com/lifion/lifion-kinesis/commit/2d05ec4cb692d7463327bdd463ebeb0c4a867879): Improve resilience of the fan-out consumer
- [`462916c`](https://github.com/lifion/lifion-kinesis/commit/462916c0fd3a8b855112045adf2869fb96a77806): Add tests for lib/heartbeat-manager
- [`32a89e1`](https://github.com/lifion/lifion-kinesis/commit/32a89e14b9a8ee9586dfc54362bd3925c7217934): Upgrade aws-sdk and codecov
- [`6ccc463`](https://github.com/lifion/lifion-kinesis/commit/6ccc46302fe345d8fb400f2267d19d344d45de75): Remove extra linting rules
- [`6fc42b3`](https://github.com/lifion/lifion-kinesis/commit/6fc42b3f8d19e49d9a5796b100e1a0558847dc4e): Recover 100% test coverage in lib/utils
- [`8304786`](https://github.com/lifion/lifion-kinesis/commit/8304786aec8cb4a7183b98e957791b9f4f4b8dcc): Include network errors
- [`ae4ba4b`](https://github.com/lifion/lifion-kinesis/commit/ae4ba4b331de88465e2d4a3eaba8047dc72ace34): Adjust test thresholds
- [`eaf7c2b`](https://github.com/lifion/lifion-kinesis/commit/eaf7c2bcceafca43c3761e2e0d53206833083621): Tweak intervals

@@ -37,0 +80,0 @@ ### v1.0.6 (2019-04-29)

@@ -0,1 +1,8 @@

/**
* Module with a suite of compress and decompress functions for different compression algorithms.
*
* @module compression
* @private
*/
'use strict';

@@ -6,3 +13,16 @@

module.exports = {
/**
* Compress and decompress methods for the LZ-UTF8 algorithm. LZ-UTF8 is an extension to the
* [UTF-8]{@link external:UTF8} character encoding, augmenting the UTF-8 bytestream with
* optional compression based the [LZ77]{@link external:LZ77} algorithm.
*/
'LZ-UTF8': {
/**
* Compresses the given input using the specified encoding using LZ-UTF8.
*
* @param {Buffer} input - The buffer to compress.
* @param {string} outputEncoding - The encoding of the result.
* @fulfil {Buffer} - The compressed input.
* @returns {Promise}
*/
compress: (input, outputEncoding) =>

@@ -16,2 +36,10 @@ new Promise((resolve, reject) => {

}),
/**
* Decompresses the given input using the specified encoding using LZ-UTF8.
*
* @param {Buffer} input - The buffer to decompress.
* @param {string} inputEncoding - The encoding of the input buffer to decompress.
* @fulfil {String} - A decompressed UTF-8 string.
* @returns {Promise}
*/
decompress: (input, inputEncoding) =>

@@ -27,1 +55,11 @@ new Promise((resolve, reject) => {

};
/**
* @external UTF8
* @see https://en.wikipedia.org/wiki/UTF-8
*/
/**
* @external LZ77
* @see https://en.wikipedia.org/wiki/LZ77_and_LZ78
*/

@@ -0,1 +1,8 @@

/**
* Module that ensures there are active consumers for the shards with an active lease.
*
* @module consumers-manager
* @private
*/
'use strict';

@@ -8,2 +15,9 @@

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -14,3 +28,31 @@ if (!privateData.has(instance)) privateData.set(instance, {});

/**
* Class that implements the consumers manager module.
*
* @alias module:consumers-manager
*/
class ConsumersManager {
/**
* Initializes an instance of the consumers manager.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.Kinesis.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {number} options.limit - The limit of records per get records call.
* @param {Object} options.logger - An instance of a logger.
* @param {number} options.noRecordsPollDelay - The delay in milliseconds before attempting to
* get more records when there were none in the previous attempt.
* @param {number} options.pollDelay - When the `usePausedPolling` option is `false`, this
* option defines the delay in milliseconds in between poll requests for more records.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {Object} options.stateStore - An instance of the state store.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoCheckpoints - Whether to automatically store shard checkpoints
* using the sequence number of the most-recently received record or not.
* @param {boolean} options.useEnhancedFanOut - Whether if the consumer is using enhanced
* fan-out shard consumers or not.
* @param {boolean} options.usePausedPolling - Whether if the client is waiting for
* user-intervention before polling for more records, or not.
*/
constructor(options) {

@@ -21,2 +63,3 @@ const {

compression,
limit,
logger,

@@ -38,2 +81,3 @@ noRecordsPollDelay,

consumers: {},
limit,
logger,

@@ -51,2 +95,10 @@ noRecordsPollDelay,

/**
* Triggers the reconciliation of shard consumers where new instances of either the fan-out or
* polling consumers will be initialized for newly acquired shard leases, or where running
* consumers will be stopped for lost or expired shard leases.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async reconcile() {

@@ -58,2 +110,3 @@ const {

consumers,
limit,
logger,

@@ -104,2 +157,3 @@ noRecordsPollDelay,

compression,
limit,
logger,

@@ -142,4 +196,14 @@ noRecordsPollDelay,

}
/**
* Stops all the running shard consumers.
*
* @returns {undefined}
*/
stop() {
const { consumers } = internal(this);
Object.keys(consumers).forEach(shardId => consumers[shardId].stop());
}
}
module.exports = ConsumersManager;

380

lib/dynamodb-client.js

@@ -0,1 +1,11 @@

/**
* Module that wraps the calls to the AWS.DynamoDB library. Calls are wrapped so they can be
* retried with a custom logic instead of the one provided by the AWS-SDK. In addition to retries,
* calls are also promisified and the call stacks are preserved even in async/await calls by using
* the `CAPTURE_STACK_TRACE` environment variable.
*
* @module dynamodb-client
* @private
*/
'use strict';

@@ -5,7 +15,16 @@

const { DynamoDB } = require('aws-sdk');
const { reportAwsResponse, reportException } = require('./stats');
const { reportError, reportResponse } = require('./stats');
const { getStackObj, shouldBailRetry, transformErrorStack } = require('./utils');
const privateData = new WeakMap();
const reportError = reportException.bind(null, 'dynamoDb');
const statsSource = 'dynamoDb';
/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -16,21 +35,93 @@ if (!privateData.has(instance)) privateData.set(instance, {});

function shouldBailRetry(err) {
const { code } = err;
return (
code === 'MissingParameter' ||
code === 'MissingRequiredParameter' ||
code === 'MultipleValidationErrors' ||
code === 'UnexpectedParameter' ||
code === 'ValidationException'
);
/**
* Calls a method on the given instance of AWS.DynamoDB. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are the original ones provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.DynamoDB.
* @param {string} methodName - The name of the method to call.
* @param {...*} args - The arguments of the method call.
* @fulfil {*} - The original response from the AWS.DynamoDB call.
* @reject {Error} - The error details from AWS.DynamoDB with a corrected error stack.
* @returns {Promise}
* @private
*
*/
async function sdkCall(client, methodName, ...args) {
const stackObj = getStackObj(sdkCall);
try {
return client[methodName](...args)
.promise()
.then(response => {
reportResponse(statsSource);
return response;
})
.catch(err => {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error);
throw error;
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error);
throw error;
}
}
const tapAwsResponse = result => {
reportAwsResponse('dynamoDb');
return result;
};
/**
* Calls a method on the given instance of AWS.DynamoDB. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are based on a custom logic replacing the one provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.DynamoDB.
* @param {string} methodName - The name of the method to call.
* @param {Object} retryOpts - The [retry options as in async-retry]{@link external:AsyncRetry}.
* @param {...*} args - The argument of the method call.
* @fulfil {*} - The original response from the AWS.DynamoDB call.
* @reject {Error} - The error details from AWS.DynamoDB with a corrected error stack.
* @returns {Promise}
* @private
*/
function retriableSdkCall(client, methodName, retryOpts, ...args) {
const stackObj = getStackObj(retriableSdkCall);
return retry(bail => {
try {
return client[methodName](...args)
.promise()
.then(response => {
reportResponse(statsSource);
return response;
})
.catch(err => {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error);
if (!shouldBailRetry(err)) throw error;
else bail(error);
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error);
bail(error);
return undefined;
}
}, retryOpts);
}
/**
* A class that wraps AWS.DynamoDB.
*
* @alias module:dynamodb-client
*/
class DynamoDbClient {
/**
* Initializes the AWS.DynamoDB internal instance and prepares the retry logic.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.DynamoDB.
* @param {Object} options.logger - An instace of a logger.
* @param {string} options.tableName - The name of the DynamoDB table.
*/
constructor({ awsOptions, logger, tableName }) {
const client = new DynamoDB(awsOptions);
const docClient = new DynamoDB.DocumentClient({

@@ -40,2 +131,3 @@ params: { TableName: tableName },

});
const retryOpts = {

@@ -50,3 +142,4 @@ maxTimeout: 5 * 60 * 1000,

`\t- Request ID: ${requestId}`,
`\t- Code: ${code} (${statusCode})`
`\t- Code: ${code} (${statusCode})`,
`\t- Table: ${tableName}`
].join('\n')}`

@@ -58,194 +151,115 @@ );

};
Object.assign(internal(this), { client, docClient, retryOpts });
}
/**
* The CreateTable operation adds a new table to your account.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
createTable(...args) {
return internal(this)
.client.createTable(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportError(err);
throw error;
});
const { client } = internal(this);
return sdkCall(client, 'createTable', ...args);
}
/**
* Returns information about the table, including the current status of the table, when it was
* created, the primary key schema, and any indexes on the table.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
describeTable(...args) {
const { client, retryOpts } = internal(this);
return retry(
bail =>
client
.describeTable(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code } = err;
if (code === 'ResourceNotFoundException' || shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'describeTable', retryOpts, ...args);
}
get(...args) {
const { docClient, retryOpts } = internal(this);
return retry(
bail =>
docClient
.get(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
/**
* List all tags on an Amazon DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
listTagsOfResource(...args) {
const { client, retryOpts } = internal(this);
return retriableSdkCall(client, 'listTagsOfResource', retryOpts, ...args);
}
listTagsOfResource(...args) {
/**
* Associate a set of tags with an Amazon DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
tagResource(...args) {
const { client } = internal(this);
return sdkCall(client, 'tagResource', ...args);
}
/**
* Waits for a given DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
waitFor(...args) {
const { client, retryOpts } = internal(this);
return retry(
bail =>
client
.listTagsOfResource(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'waitFor', retryOpts, ...args);
}
put(...args) {
/**
* Deletes a single item in a table by primary key by delegating to `AWS.DynamoDB.deleteItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
delete(...args) {
const { docClient } = internal(this);
return sdkCall(docClient, 'delete', ...args);
}
/**
* Returns a set of attributes for the item with the given primary key by delegating to
* `AWS.DynamoDB.getItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
get(...args) {
const { docClient, retryOpts } = internal(this);
return retry(
bail =>
docClient
.put(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code } = err;
if (code === 'ConditionalCheckFailedException' || shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(docClient, 'get', retryOpts, ...args);
}
tagResource(...args) {
return internal(this)
.client.tagResource(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportError(err);
throw error;
});
/**
* Creates a new item, or replaces an old item with a new item by delegating to
* `AWS.DynamoDB.putItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
put(...args) {
const { docClient, retryOpts } = internal(this);
return retriableSdkCall(docClient, 'put', retryOpts, ...args);
}
/**
* Edits an existing item's attributes, or adds a new item to the table if it does not already
* exist by delegating to `AWS.DynamoDB.updateItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
update(...args) {
const { docClient, retryOpts } = internal(this);
return retry(
bail =>
docClient
.update(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code } = err;
if (code === 'ConditionalCheckFailedException' || shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(docClient, 'update', retryOpts, ...args);
}
}
waitFor(...args) {
const { client, retryOpts } = internal(this);
return retry(
bail =>
client
.waitFor(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
}
/**
* @external AsyncRetry
* @see https://github.com/zeit/async-retry#api
*/
delete(...args) {
return internal(this)
.docClient.delete(...args)
.promise()
.then(tapAwsResponse)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportError(err);
throw error;
});
}
}
module.exports = DynamoDbClient;

@@ -0,1 +1,8 @@

/**
* Module that implements an AWS enhanced fan-out consumer.
*
* @module fan-out-consumer
* @private
*/
'use strict';

@@ -12,3 +19,4 @@

const { getStreamShards } = require('./stream');
const { reportAwsResponse, reportException } = require('./stats');
const { reportError, reportResponse } = require('./stats');
const { shouldBailRetry } = require('./utils');

@@ -25,2 +33,9 @@ const AWS_API_TARGET = 'Kinesis_20131202.SubscribeToShard';

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -31,14 +46,22 @@ if (!privateData.has(instance)) privateData.set(instance, {});

function shouldBailRetry(err) {
const { code } = err;
return (
code === 'MissingParameter' ||
code === 'MissingRequiredParameter' ||
code === 'MultipleValidationErrors' ||
code === 'UnexpectedParameter' ||
code === 'ValidationException'
);
}
/**
* Class that implements a pre-processing stream used as a filter to a stream request to the
* shard subscription API. If the request is successful and the response is an event stream,
* chunks are passed to subsequent streams in the pipeline. If the server responds with an error,
* the error details are parsed and then thrown as an error, which breaks the entire pipeline.
*
* @extends external:Transform
* @memberof module:fan-out-consumer
* @private
*/
class PreProcess extends Transform {
/**
* Initializes an instance of the pre-processing stream.
*
* @param {Object} options - The initialization options.
* @param {Object} options.requestFlags - The object where the flags for the request are stored.
* @param {boolean} options.requestFlags.isEventStream - If the request is sucessful and the
* headers in the initial response point to an even stream, this flag is set to `true`.
* @param {number} options.requestFlags.statusCode - The status code of the last request response.
*/
constructor({ requestFlags }) {

@@ -49,10 +72,21 @@ super({ objectMode: true });

/**
* The stream transformation logic.
*
* @param {Buffer} chunk - A chunk of data coming from the event stream.
* @param {string} encoding - The stream encoding mode (ignored)
* @param {Function} callback - The callback for more data.
*/
_transform(chunk, encoding, callback) {
const { requestFlags } = internal(this);
if (!requestFlags.isEventStream) {
const { statusCode } = requestFlags;
const { __type, message } = JSON.parse(chunk.toString('utf8'));
const err = new Error(message || 'Failed to subscribe to shard.');
if (__type) err.code = __type;
err.isRetryable = true;
this.emit('error', err);
const error = Object.assign(
new Error(message || 'Failed to subscribe to shard.'),
{ isRetryable: true },
__type && { code: __type },
statusCode && { statusCode }
);
this.emit('error', error);
} else {

@@ -65,7 +99,29 @@ this.push(chunk);

/**
* Class that implements a post-processing stream used to push records outside the internal
* stream pipeline. It also stores checkpoints as records arrive, and look for shard depletion.
*
* @extends external:Writable
* @memberof module:fan-out-consumer
* @private
*/
class PostProcess extends Writable {
constructor({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
/**
* Initializes an instance of the post-processing stream.
*
* @param {Object} options - The initialization options.
* @param {Function} options.abort - A function that will close the entire pipeline, called
* when no data has been pushed through the event stream on a given time window.
* @param {Object} options.logger - An instance of a logger.
* @param {Function} options.markShardAsDepleted - A function that will mark a given shard as
* depleted. Called when a shard depletion event has been detected.
* @param {Function} options.pushToStream - A function that pushes records out of the pipeline.
* @param {Function} options.setCheckpoint - A function that stores the checkpoint for the shard.
* @param {string} options.shardId - The ID of the shard.
*/
constructor({ abort, logger, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
super({ objectMode: true });
Object.assign(internal(this), {
abort,
logger,
markShardAsDepleted,

@@ -79,5 +135,13 @@ pushToStream,

/**
* The stream writable logic.
*
* @param {Object} chunk - A chunk of data coming from the pipeline.
* @param {string} encoding - The stream encoding mode (ignored)
* @param {Function} callback - The callback for more data.
*/
async _write(chunk, encoding, callback) {
const {
abort,
logger,
markShardAsDepleted,

@@ -91,6 +155,11 @@ pushToStream,

internal(this).timeoutId = setTimeout(abort, 10000);
const { continuationSequenceNumber } = chunk;
const { continuationSequenceNumber, records } = chunk;
if (continuationSequenceNumber !== undefined) {
await setCheckpoint(continuationSequenceNumber);
pushToStream(null, { ...chunk, shardId });
const recordsCount = records.length;
const msBehind = chunk.millisBehindLatest;
if (recordsCount > 0) {
logger.debug(`Got ${recordsCount} record(s) from "${shardId}" (${msBehind}ms behind)`);
pushToStream(null, { ...chunk, shardId });
}
callback();

@@ -103,3 +172,25 @@ } else {

/**
* Class that implements an AWS enhanced fan-out consumer.
*
* @alias module:fan-out-consumer
*/
class FanOutConsumer {
/**
* Initializes an instance of an enhanced fan-out consumer.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The AWS.Kinesis options to use in the HTTP request.
* @param {string} options.checkpoint - The last-known checkpoint for the stream shard.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {string} options.consumerArn - The ARN of the enhanced consumer as registered in AWS.
* @param {string} options.leaseExpiration - The timestamp of the shard lease expiration.
* @param {Object} options.logger - An instance of a logger.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {string} options.shardId - The ID of the stream shard to subscribe for records.
* @param {Object} options.stateStore - An instance of the state store.
* @param {Function} options.stopConsumer - A function that stops this consumer from the manager.
* @param {string} options.streamName - The name of the Kinesis stream.
*/
constructor(options) {

@@ -159,2 +250,8 @@ const {

/**
* Starts the enhanced fan-out consumer by initializing the internal stream pipeline.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {

@@ -176,3 +273,3 @@ const privateProps = internal(this);

logger.debug(`Starting an enhanded fan-out subscriber for shard "${shardId}"…`);
logger.debug(`Starting an enhanced fan-out subscriber for shard "${shardId}"…`);

@@ -192,10 +289,11 @@ this.updateLeaseExpiration(leaseExpiration);

const { headers, statusCode } = res;
requestFlags.statusCode = statusCode;
if (headers['content-type'] !== AWS_EVENT_STREAM || statusCode !== 200) {
logger.error(`Subscription unsuccessful: ${statusCode}`);
requestFlags.isEventStream = false;
reportException('kinesis', { statusCode }, streamName);
reportError('kinesis', { statusCode }, streamName);
} else {
logger.debug('Subscription to shard is successful.');
requestFlags.isEventStream = true;
reportAwsResponse('kinesis', streamName);
reportResponse('kinesis', streamName);
}

@@ -252,15 +350,22 @@ };

new RecordsDecoder({ compression }),
new PostProcess({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId })
new PostProcess({
abort,
logger,
markShardAsDepleted,
pushToStream,
setCheckpoint,
shardId
})
]);
} catch (err) {
const { code, message, requestId, statusCode } = err;
console.warn('pipeline erorr', code);
if (!shouldBailRetry(err)) {
logger.warn(
`Trying to recover from AWS.Kinesis error…\n${[
`\t- Message: ${message}`,
`\t- Request ID: ${requestId}`,
`\t- Code: ${code} (${statusCode})`,
`\t- Stream: ${streamName}`
].join('\n')}`
[
'Trying to recover from AWS.Kinesis error…',
`- Message: ${message}`,
`- Request ID: ${requestId}`,
`- Code: ${code} (${statusCode})`,
`- Stream: ${streamName}`
].join('\n\t')
);

@@ -278,2 +383,5 @@ } else {

/**
* Stops the internal stream pipeline.
*/
stop() {

@@ -291,2 +399,7 @@ const privateProps = internal(this);

/**
* Updates the shard lease expiration timestamp.
*
* @param {string} leaseExpiration - The updated timestamp when the shard lease expires.
*/
updateLeaseExpiration(leaseExpiration) {

@@ -311,2 +424,12 @@ const privateProps = internal(this);

/**
* @external Transform
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_transform
*/
/**
* @external Writable
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_writable
*/
module.exports = FanOutConsumer;

@@ -0,1 +1,12 @@

/**
* Module that makes sure this client is registered as a known consumer of a stream. The module
* will also keep storing a hearbeat in the shared stream state and will detect old clients.
* Clients are considered as old when they miss a given number of hartbeats (currently 3). Old
* clients are removed from the state and any shard leases or enhanced fan-out consumers in use by
* those clients will get released.
*
* @module heartbeat-manager
* @private
*/
'use strict';

@@ -8,2 +19,9 @@

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -14,11 +32,32 @@ if (!privateData.has(instance)) privateData.set(instance, {});

/**
* Class that implements the heartbeat manager.
*
* @alias module:heartbeat-manager
*/
class HeartbeatManager {
constructor({ consumerId, logger, stateStore }) {
Object.assign(internal(this), { consumerId, logger, stateStore });
/**
* Initializes an instance of the hearbeat manager.
*
* @param {Object} options - The initialization options.
* @param {Object} options.logger - An instance of a logger.
* @param {Object} options.stateStore - An instance of the state store.
*/
constructor({ logger, stateStore }) {
Object.assign(internal(this), { logger, stateStore });
}
/**
* Starts the hearbeat interval where this client is registered, its hearbeat updated, and old
* clients are detected and handled.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {
const privateProps = internal(this);
const { logger, stateStore } = privateProps;
const { logger, stateStore, timeoutId } = privateProps;
if (timeoutId) return;
const heartbeat = async () => {

@@ -34,5 +73,10 @@ await stateStore.registerConsumer();

/**
* Stops the hearbeat interval.
*/
stop() {
const { timeoutId } = internal(this);
const privateProps = internal(this);
const { timeoutId } = privateProps;
clearTimeout(timeoutId);
privateProps.timeoutId = null;
}

@@ -39,0 +83,0 @@ }

@@ -34,2 +34,9 @@ /**

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -40,18 +47,25 @@ if (!privateData.has(instance)) privateData.set(instance, {});

async function ensureStreamInitialized(instance, streamNameParam) {
/**
* Ensures a Kinesis stream exists and that is encrypted and tagged as required.
*
* @param {Object} instance - The instance of the Kinesis class where the call originated from.
* @param {string} [streamName] - The name of the stream to check initialization for.
* @fulfil {undefined}
* @returns {Promise}
* @private
*/
async function ensureStreamInitialized(instance, streamName) {
const privateProps = internal(instance);
const { encryption, streamArn, streamCreatedOn, streamName, tags } = privateProps;
let streamProps;
if (!streamNameParam || streamNameParam === streamName) {
if (streamArn && streamCreatedOn) return;
streamProps = privateProps;
Object.assign(privateProps, await ensureStreamExists(streamProps));
let params;
if (!streamName || streamName === privateProps.streamName) {
const { streamArn, streamCreatedOn } = await ensureStreamExists(privateProps);
Object.assign(privateProps, { streamArn, streamCreatedOn });
params = privateProps;
} else {
streamProps = Object.assign({}, privateProps, { streamName: streamNameParam });
await ensureStreamExists(streamProps);
params = { ...privateProps, streamName };
await ensureStreamExists(params);
}
if (encryption) await ensureStreamEncription(streamProps);
if (tags) await confirmStreamTags(streamProps);
const { encryption, tags } = params;
if (encryption) await ensureStreamEncription(params);
if (tags) await confirmStreamTags(params);
}

@@ -113,9 +127,8 @@

/**
* A [pass-through stream]{@link external:NodeJsPassThrough} class specialization implementing a
* consumer of Kinesis Data Streams using the [AWS SDK for JavaScript]{@link external:AwsJsSdk}.
* Incoming data can be retrieved through either the `data` event or by piping the instance to a
* writable stream.
* A [pass-through stream]{@link external:PassThrough} class specialization implementing a consumer
* of Kinesis Data Streams using the [AWS SDK for JavaScript]{@link external:AwsJsSdk}. Incoming
* data can be retrieved through either the `data` event or by piping the instance to other streams.
*
* @alias module:lifion-kinesis
* @extends external:Readable
* @extends external:PassThrough
*/

@@ -136,4 +149,4 @@ class Kinesis extends PassThrough {

* @param {Object} [options.dynamoDb={}] - The initialization options for the DynamoDB client
* used to store the state of the stream consumers. In addition to `tableNames` and
* `tags`, it can also contain any of the [`AWS.DynamoDB` options]{@link AwsJsSdkDynamoDb}.
* used to store the state of the consumers. In addition to `tableNames` and `tags`, it
* can also contain any of the [`AWS.DynamoDB` options]{@link external:AwsJsSdkDynamoDb}.
* @param {string} [options.dynamoDb.tableName] - The name of the table in which to store the

@@ -149,2 +162,4 @@ * state of consumers. If not provided, it defaults to "lifion-kinesis-state".

* specified ARN to either an alias or a key, or an alias name prefixed by "alias/".
* @param {number} [options.limit=10000] - The limit of records per get records call (only
* applicable with `useEnhancedFanOut` is set to `false`)
* @param {Object} [options.logger] - An object with the `warn`, `debug`, and `error` functions

@@ -189,6 +204,7 @@ * that will be used for logging purposes. If not provided, logging will be omitted.

encryption,
limit = 10000,
logger = {},
noRecordsPollDelay = 1000,
pollDelay = 250,
shardCount,
shardCount = 1,
statsInterval = 30000,

@@ -216,6 +232,7 @@ streamName,

const normNoRecordsPollDelay = Number(noRecordsPollDelay);
const normPollDelay = Number(pollDelay);
const normShardCount = Number(shardCount);
const normStatsInterval = Number(statsInterval);
const limitNumber = Number(limit);
const noRecordsPollDelayNumber = Number(noRecordsPollDelay);
const pollDelayNumber = Number(pollDelay);
const shardCountNumber = Number(shardCount);
const statsIntervalNumber = Number(statsInterval);

@@ -232,8 +249,9 @@ Object.assign(internal(this), {

getStatsIntervalId: null,
limit: limitNumber > 0 && limitNumber <= 10000 ? limitNumber : 10000,
logger: normLogger,
noRecordsPollDelay: normNoRecordsPollDelay >= 250 ? normNoRecordsPollDelay : 250,
pollDelay: normPollDelay >= 0 ? normPollDelay : 250,
noRecordsPollDelay: noRecordsPollDelayNumber >= 250 ? noRecordsPollDelayNumber : 250,
pollDelay: pollDelayNumber >= 0 ? pollDelayNumber : 250,
recordsEncoder: getRecordsEncoder(compression, 'Buffer'),
shardCount: normShardCount >= 1 ? normShardCount : 1,
statsInterval: normStatsInterval >= 1000 ? normStatsInterval : 30000,
shardCount: shardCountNumber >= 1 ? shardCountNumber : 1,
statsInterval: statsIntervalNumber >= 1000 ? statsIntervalNumber : 30000,
streamName,

@@ -249,7 +267,7 @@ tags,

/**
* Initializes the client, by ensuring that the stream exists, it's ready, and configured as
* requested. The internal managers that deal with heartbeats, state, and consumers will also
* be started.
* Starts the stream consumer, by ensuring that the stream exists, that it's ready, and
* configured as requested. The internal managers that deal with heartbeats, state, and
* consumers will also be started.
*
* @fulfil Once the client has successfully started.
* @fulfil {undefined} - Once the consumer has successfully started.
* @reject {Error} - On any unexpected error while trying to start.

@@ -301,5 +319,11 @@ * @returns {Promise}

/**
* Stops the stream consumer. The internal managers will also be stopped.
*/
stopConsumer() {
const privateProps = internal(this);
const { getStatsIntervalId } = privateProps;
const { consumersManager, getStatsIntervalId, heartbeatManager, leaseManager } = privateProps;
heartbeatManager.stop();
consumersManager.stop();
leaseManager.stop();
clearTimeout(getStatsIntervalId);

@@ -310,28 +334,45 @@ privateProps.getStatsIntervalId = null;

/**
* Puts a record to a stream.
* Writes a single data record into a stream.
*
* @param {Object} params - The putRecord parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecord` parameters]{@link external:AwsJsSdkKinesisPutRecord}
* from the `AWS.Kinesis.putRecord` method in camel case.
* @param {(Object|string)} params.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided on
* client instantiation.
* @fulfil If record is successfully pushed to stream.
* @reject {Error} - On any unexpected error while pushing to stream.
* @param {Object} params - The parameters.
* @param {*} params.data - The data to put into the record.
* @param {string} [params.explicitHashKey] - The hash value used to explicitly determine the
* shard the data record is assigned to by overriding the partition key hash.
* @param {string} [params.partitionKey] - Determines which shard in the stream the data record
* is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
* @param {string} [params.sequenceNumberForOrdering] - Set this to the sequence number obtained
* from the last put record operation to guarantee strictly increasing sequence numbers,
* for puts from the same client and to the same partition key. If omitted, records are
* coarsely ordered based on arrival time.
* @param {string} [params.streamName] - If provided, the record will be put into the specified
* stream instead of the stream name provided during the consumer instantiation.
* @fulfil {Object} - The de-serialized data returned from the request.
* @reject {Error} - On any unexpected error while writing to the stream.
* @returns {Promise}
*/
async putRecord({ streamName, ...record }) {
async putRecord(params = {}) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
const params = {
const { client, createStreamIfNeeded, recordsEncoder } = privateProps;
const { streamName, ...record } = params;
const awsParams = {
...(await recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, SequenceNumber, ShardId }) => ({
encryptionType: EncryptionType,
sequenceNumber: SequenceNumber,
shardId: ShardId
});
try {
const result = await client.putRecord(params);
return result;
return parseResult(await client.putRecord(awsParams));
} catch (err) {
if (err.code !== 'ResourceNotFoundException') throw err;
await ensureStreamInitialized(this, streamName);
return client.putRecord(params);
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return parseResult(await client.putRecord(awsParams));
}
throw err;
}

@@ -341,32 +382,44 @@ }

/**
* Batch puts multiple records to a stream.
* Writes multiple data records into a stream in a single call.
*
* @param {Object} params - The putRecords parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecords` parameters]{@link external:AwsJsSdkKinesisPutRecords}
* from the `AWS.Kinesis.putRecords` method in camel case.
* @param {Array} params.records - A list of records to push to a Kinesis stream.
* @param {(Object|string)} params.records.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided
* on client instantiation.
* @fulfil If records are successfully pushed to stream.
* @reject {Error} - On any unexpected error while pushing to stream.
* @param {Object} params - The parameters.
* @param {Array<Object>} params.records - The records associated with the request.
* @param {*} params.records[].data - The record data.
* @param {string} [params.records[].explicitHashKey] - The hash value used to explicitly
* determine the shard the data record is assigned to by overriding the partition key hash.
* @param {string} [params.records[].partitionKey] - Determines which shard in the stream the
* data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash
* of the data.
* @param {string} [params.streamName] - If provided, the record will be put into the specified
* stream instead of the stream name provided during the consumer instantiation.
* @fulfil {Object} - The de-serialized data returned from the request.
* @reject {Error} - On any unexpected error while writing to the stream.
* @returns {Promise}
*/
async putRecords({ records, streamName }) {
async putRecords(params = {}) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
const params = {
const { client, createStreamIfNeeded, recordsEncoder } = privateProps;
const { records, streamName } = params;
if (!Array.isArray(records)) throw new TypeError('The "records" property is required.');
const awsParams = {
Records: await Promise.all(records.map(recordsEncoder)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, Records }) => ({
encryptionType: EncryptionType,
records: Records.map(({ SequenceNumber, ShardId }) => ({
sequenceNumber: SequenceNumber,
shardId: ShardId
}))
});
try {
const result = await client.putRecords(params);
return result;
return parseResult(await client.putRecords(awsParams));
} catch (err) {
if (
err.code === 'ResourceNotFoundException' ||
(err.code === 'UnknownError' && client.isEndpointLocal())
) {
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return client.putRecords(params);
return parseResult(await client.putRecords(awsParams));
}

@@ -377,2 +430,7 @@ throw err;

/**
* Returns statistics for the instance of the client.
*
* @returns {Object} An object with the statistics.
*/
getStats() {

@@ -383,2 +441,7 @@ const { streamName } = internal(this);

/**
* Returns the aggregated statistics of all the instances of the client.
*
* @returns {Object} An object with the statistics.
*/
static getStats() {

@@ -400,12 +463,2 @@ return getStats();

/**
* @external AwsJsSdkKinesisPutRecord
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property
*/
/**
* @external AwsJsSdkKinesisPutRecords
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property
*/
/**
* @external AwsJsSdkDynamoDb

@@ -416,3 +469,3 @@ * @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property

/**
* @external NodeJsPassThrough
* @external PassThrough
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough

@@ -419,0 +472,0 @@ */

@@ -0,1 +1,11 @@

/**
* Module that wraps the calls to the AWS.Kinesis library. Calls are wrapped so they can be
* retried with a custom logic instead of the one provided by the AWS-SDK. In addition to retries,
* calls are also promisified and the call stacks are preserved even in async/await calls by using
* the `CAPTURE_STACK_TRACE` environment variable.
*
* @module kinesis-client
* @private
*/
'use strict';

@@ -5,7 +15,27 @@

const { Kinesis } = require('aws-sdk');
const { reportAwsResponse, reportException, reportRecordSent } = require('./stats');
const { getStackObj, shouldBailRetry, transformErrorStack } = require('./utils');
const { reportError, reportRecordSent, reportResponse } = require('./stats');
const RETRIABLE_PUT_ERRORS = [
'EADDRINUSE',
'ECONNREFUSED',
'ECONNRESET',
'EPIPE',
'ESOCKETTIMEDOUT',
'ETIMEDOUT',
'NetworkingError',
'ProvisionedThroughputExceededException'
];
const privateData = new WeakMap();
const reportError = reportException.bind(null, 'kinesis');
const statsSource = 'kinesis';
/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -16,23 +46,94 @@ if (!privateData.has(instance)) privateData.set(instance, {});

function tapAwsReponse(streamName) {
return result => {
reportAwsResponse('kinesis', streamName);
return result;
};
/**
* Calls a method on the given instance of AWS.Kinesis. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are the original ones provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.Kinesis.
* @param {string} methodName - The name of the method to call.
* @param {string} streamName - The name of the Kinesis stream for which the call relates to.
* @param {...*} args - The arguments of the method call.
* @fulfil {*} - The original response from the AWS.Kinesis call.
* @reject {Error} - The error details from AWS.Kinesis with a corrected error stack.
* @returns {Promise}
* @private
*/
async function sdkCall(client, methodName, streamName, ...args) {
const stackObj = getStackObj(sdkCall);
try {
return client[methodName](...args)
.promise()
.then(response => {
reportResponse(statsSource, streamName);
return response;
})
.catch(err => {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
throw error;
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
throw error;
}
}
function shouldBailRetry(err) {
const { code } = err;
return (
code === 'MissingParameter' ||
code === 'MissingRequiredParameter' ||
code === 'MultipleValidationErrors' ||
code === 'UnexpectedParameter' ||
code === 'ValidationException'
);
/**
* Calls a method on the given instance of AWS.Kinesis. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are based on a custom logic replacing the one provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.Kinesis.
* @param {string} methodName - The name of the method to call.
* @param {string} streamName - The name of the Kinesis stream for which the call relates to.
* @param {Object} retryOpts - The [retry options as in async-retry]{@link external:AsyncRetry}.
* @param {...*} args - The argument of the method call.
* @fulfil {*} - The original response from the AWS.Kinesis call.
* @reject {Error} - The error details from AWS.Kinesis with a corrected error stack.
* @returns {Promise}
* @private
*/
function retriableSdkCall(client, methodName, streamName, retryOpts, ...args) {
const stackObj = getStackObj(retriableSdkCall);
return retry(bail => {
try {
return client[methodName](...args)
.promise()
.then(response => {
reportResponse(statsSource, streamName);
return response;
})
.catch(err => {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
if (!shouldBailRetry(err)) throw error;
else bail(error);
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
bail(error);
return undefined;
}
}, retryOpts);
}
/**
* A class that wraps AWS.Kinesis.
*
* @alias module:kinesis-client
*/
class KinesisClient {
/**
* Initializes the AWS.Kinesis internal instance and prepares the retry logic.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.Kinesis.
* @param {Object} options.logger - An instace of a logger.
* @param {string} options.streamName - The name of the Kinesis stream for which calls relate to.
*/
constructor({ awsOptions, logger, streamName }) {
const client = new Kinesis(awsOptions);
const retryOpts = {

@@ -47,3 +148,4 @@ maxTimeout: 5 * 60 * 1000,

`\t- Request ID: ${requestId}`,
`\t- Code: ${code} (${statusCode})`
`\t- Code: ${code} (${statusCode})`,
`\t- Stream: ${streamName}`
].join('\n')}`

@@ -55,250 +157,135 @@ );

};
Object.assign(internal(this), { client, retryOpts, streamName });
}
/**
* Adds or updates tags for the specified Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
addTagsToStream(...args) {
const { client, streamName } = internal(this);
return sdkCall(client, 'addTagsToStream', streamName, ...args);
}
/**
* Creates a Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
createStream(...args) {
const { client, streamName } = internal(this);
return client
.createStream(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
const { code, message } = err;
if (code !== 'ResourceInUseException') {
const error = new Error(message);
error.code = code;
reportError(err, streamName);
throw error;
}
});
return sdkCall(client, 'createStream', streamName, ...args).catch(err => {
if (err.code !== 'ResourceInUseException') throw err;
});
}
addTagsToStream(...args) {
/**
* To deregister a consumer, provide its ARN.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
deregisterStreamConsumer(...args) {
const { client, streamName } = internal(this);
return client
.addTagsToStream(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportError(err, streamName);
throw error;
});
return sdkCall(client, 'deregisterStreamConsumer', streamName, ...args);
}
/**
* Describes the specified Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
describeStream(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.describeStream(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
const { code } = err;
if (code === 'ResourceNotFoundException' || shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'describeStream', streamName, retryOpts, ...args);
}
listShards(...args) {
/**
* Gets data records from a Kinesis data stream's shard.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
getRecords(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.listShards(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'getRecords', streamName, retryOpts, ...args);
}
listStreamConsumers(...args) {
const { client, retryOpts } = internal(this);
return retry(
bail =>
client
.listStreamConsumers(...args)
.promise()
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else throw err;
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
}
listTagsForStream(...args) {
/**
* Gets an Amazon Kinesis shard iterator.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
getShardIterator(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.listTagsForStream(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'getShardIterator', streamName, retryOpts, ...args);
}
startStreamEncryption(...args) {
const { client, streamName } = internal(this);
return client
.startStreamEncryption(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
const { code, message } = err;
if (code !== 'UnknownOperationException' && code !== 'ResourceInUseException') {
const error = new Error(message);
error.code = code;
reportError(err, streamName);
throw error;
}
});
/**
* Tells whether the endpoint of the client is local or not.
*
* @returns {boolean} `true` if the endpoints is local, `false` otherwise.
*/
isEndpointLocal() {
const { client } = internal(this);
const { host } = client.endpoint;
return host.includes('localhost') || host.includes('localstack');
}
waitFor(...args) {
/**
* Lists the shards in a stream and provides information about each shard.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
listShards(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.waitFor(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'listShards', streamName, retryOpts, ...args);
}
getShardIterator(...args) {
/**
* Lists the consumers registered to receive data from a stream using enhanced fan-out, and
* provides information about each consumer.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
listStreamConsumers(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.getShardIterator(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'listStreamConsumers', streamName, retryOpts, ...args);
}
getRecords(...args) {
/**
* Lists the tags for the specified Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
listTagsForStream(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
.getRecords(...args)
.promise()
.then(tapAwsReponse(streamName))
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
throw error;
});
return retriableSdkCall(client, 'listTagsForStream', streamName, retryOpts, ...args);
}
deregisterStreamConsumer(...args) {
return internal(this)
.client.deregisterStreamConsumer(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}
registerStreamConsumer(...args) {
return internal(this)
.client.registerStreamConsumer(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}
/**
* Writes a single data record into an Amazon Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
putRecord(...args) {
const { client, retryOpts, streamName } = internal(this);
return retry(
bail =>
client
const stackObj = getStackObj(retriableSdkCall);
return retry(bail => {
try {
return client
.putRecord(...args)
.promise()
.then(result => {
reportAwsResponse('kinesis', streamName);
reportResponse(statsSource, streamName);
reportRecordSent(streamName);

@@ -308,80 +295,112 @@ return result;

.catch(err => {
if (err.code !== 'ProvisionedThroughputExceededException') bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
if (RETRIABLE_PUT_ERRORS.includes(err.code)) throw error;
else bail(error);
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
bail(error);
return undefined;
}
}, retryOpts);
}
async putRecords(params) {
/**
* Writes multiple data records into a Kinesis data stream in a single call (also referred to as
* a PutRecords request).
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
putRecords(...args) {
const { client, retryOpts, streamName } = internal(this);
const { Records, ...opts } = params;
let records = Records;
let results = [];
let failedRecordCount = 0;
return retry(
bail =>
client
.putRecords({ ...opts, Records: records })
const stackObj = getStackObj(retriableSdkCall);
const [firstArg, ...restOfArgs] = args;
let records = firstArg.Records;
const results = [];
return retry(bail => {
try {
return client
.putRecords({ ...firstArg, Records: records }, ...restOfArgs)
.promise()
.then(payload => {
({ FailedRecordCount: failedRecordCount, Records: results } = payload);
if (failedRecordCount !== records.length) {
reportAwsResponse('kinesis', streamName);
const { EncryptionType, Records } = payload;
const failedCount = payload.FailedRecordCount;
const recordsCount = Records.length;
const nextRecords = [];
for (let i = 0; i < recordsCount; i += 1) {
if (Records[i].ErrorCode) nextRecords.push(records[i]);
else results.push(Records[i]);
}
reportResponse(statsSource, streamName);
if (failedCount < records.length) {
reportRecordSent(streamName);
}
if (failedRecordCount === 0) return;
if (failedCount === 0) {
return { EncryptionType, Records: results };
}
records = nextRecords;
const error = new Error(`Failed to write ${failedCount} of ${recordsCount} record(s).`);
error.code = 'ProvisionedThroughputExceededException';
throw error;
})
.catch(err => {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
if (RETRIABLE_PUT_ERRORS.includes(err.code)) throw error;
else bail(error);
});
} catch (err) {
const error = transformErrorStack(err, stackObj);
reportError(statsSource, error, streamName);
bail(error);
return undefined;
}
}, retryOpts);
}
let code;
let message;
/**
* Registers a consumer with a Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
registerStreamConsumer(...args) {
const { client, streamName } = internal(this);
return sdkCall(client, 'registerStreamConsumer', streamName, ...args);
}
records = records.filter((record, i) => {
const { ErrorCode, ErrorMessage } = results[i];
if (ErrorCode && ErrorCode !== 'ProvisionedThroughputExceededException') {
code = ErrorCode;
message = ErrorMessage;
return false;
}
if (ErrorCode && !code) code = code || ErrorCode;
if (ErrorMessage && !message) message = ErrorMessage;
return ErrorCode;
});
const err = new Error(message);
err.code = code;
err.requestId = null;
err.statusCode = null;
throw err;
})
.catch(err => {
if (err.code !== 'ProvisionedThroughputExceededException') bail(err);
else {
reportError(err, streamName);
throw err;
}
}),
retryOpts
).catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
/**
* Enables or updates server-side encryption using an AWS KMS key for a specified stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
startStreamEncryption(...args) {
const { client, streamName } = internal(this);
return sdkCall(client, 'startStreamEncryption', streamName, ...args).catch(err => {
const { code } = err;
if (code !== 'UnknownOperationException' && code !== 'ResourceInUseException') throw err;
});
}
isEndpointLocal() {
const { client } = internal(this);
const { host } = client.endpoint;
return host.includes('localhost') || host.includes('localstack');
/**
* Waits for a given Kinesis resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
waitFor(...args) {
const { client, retryOpts, streamName } = internal(this);
return retriableSdkCall(client, 'waitFor', streamName, retryOpts, ...args);
}
}
/**
* @external AsyncRetry
* @see https://github.com/zeit/async-retry#api
*/
module.exports = KinesisClient;

@@ -0,11 +1,31 @@

/**
* Module in charge of acquiring and renewing shard leases. The module exports a class whose
* instances will periodically try to acquire a lease for all the stream shards. The lease manager
* won't try to renew or acquire more leases than the maximum allowed. The maximum allowed number
* of active leases is calculated by dividing the number of stream shards by the number of known
* stream consumers. Another restriction, that the manager handles, is that for splitted shards,
* children shards won't be leased until the parent shard is reported as depleted. If the manager
* detects changes in the leases, an instance of the consumers manager is signaled so it can
* start or stop shard consumers for the active leases as needed.
*
* @module lease-manager
* @private
*/
'use strict';
const ACQUIRE_LEASES_INTERVAL = 30 * 1000;
const { checkIfStreamExists, getStreamShards } = require('./stream');
const ACQUIRE_LEASES_INTERVAL = 20 * 1000;
const LEASE_TERM_TIMEOUT = 5 * 60 * 1000;
const LEASE_RENEWAL_OFFSET = ACQUIRE_LEASES_INTERVAL * 3;
const LEASE_RENEWAL_OFFSET = Math.round(LEASE_TERM_TIMEOUT * 0.25);
const { checkIfStreamExists, getStreamShards } = require('./stream');
const privateData = new WeakMap();
/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
*/
function internal(instance) {

@@ -16,9 +36,21 @@ if (!privateData.has(instance)) privateData.set(instance, {});

function stopManager(instance) {
const { consumersManager, logger, timeoutId } = internal(instance);
clearTimeout(timeoutId);
consumersManager.stop();
logger.debug('The lease manager has stopped.');
}
/**
* Tries to acquire the lease for a specific shard. The lease won't be acquired or renewed if:
*
* - The shard is a parent of a splitted shard and it has been marked as depleted.
* - The shard is currently leased by this consumer and the lease is active.
* - The shard is currently leased by another consumer and the lease is active.
* - The shard is a children of a splitted shard and the parent shard hasn't been depleted.
* - Acquiring or reneweing the lease would go beyond the maximum count of allowed active leases.
*
* The lease will be renewed if the owner is the current consumer and if the lease period is
* about to expire. The lease will be released the lease owner is gone of if the lease expired.
* Which would make the shard available for leasing for the next leasing attempt.
*
* @param {Object} instance - The instance of Lease Manager from which the attempt originated.
* @param {string} shardId - The ID of the shard to acquire or renew a lease for.
* @param {Object} shardsDescription - The AWS-provided data describing the shards.
* @fulfil {boolean} - `true` if a change in the leases was detected, `false` otherwise.
* @returns {Promise}
*/
async function acquireLease(instance, shardId, shardsDescription) {

@@ -67,4 +99,5 @@ const privateProps = internal(instance);

logger.debug(
`Lease for shard "${shardId}" released.`,
theLeaseExpired ? 'The lease expired.' : 'The owner is gone.'
`Lease for shard "${shardId}" released. ${
theLeaseExpired ? 'The lease expired.' : 'The owner is gone.'
}`
);

@@ -116,3 +149,23 @@ leaseExpiration = null;

/**
* Class that implements the lease manager logic.
*
* @alias module:lease-manager
*/
class LeaseManager {
/**
* Initializes an instance of the lease manager.
*
* @param {Object} options - Initialization options.
* @param {Object} options.client - An instance of AWS.Kinesis.
* @param {string} options.consumerId - The unique ID of the current Kinesis consumer.
* @param {Object} options.consumersManager - An instance of the ConsumersManager module.
* @param {Object} options.logger - An instance of a logger.
* @param {Object} options.stateStore - An instance of the StateStore module.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoShardAssignment - Whether if the consumer is automatically
* asigning the shards in between the known consumers or just consuming from all shards.
* @param {boolean} options.useEnhancedFanOut - Whether if the consumer is using enhanced fan-out
* consumer or just simple polling consumers.
*/
constructor(options) {

@@ -142,6 +195,14 @@ const {

/**
* Tries to acquire leases for all the shards in the stream and continues to do so periodically
* until the lease manager instance is stopped.
*
* @returns {Promise}
*/
async start() {
const privateProps = internal(this);
const { consumersManager, logger, stateStore, useEnhancedFanOut } = privateProps;
const { consumersManager, logger, stateStore, timeoutId, useEnhancedFanOut } = privateProps;
if (timeoutId) return;
const acquireLeases = async () => {

@@ -151,5 +212,6 @@ logger.debug('Trying to acquire leases…');

const { streamArn } = await checkIfStreamExists(privateProps);
if (streamArn === null) {
if (!streamArn) {
logger.debug("Can't acquire leases as the stream is gone.");
stopManager(this);
consumersManager.stop();
this.stop();
return;

@@ -180,4 +242,11 @@ }

/**
* Stops the lease manager attempts to acquire leases for the shards.
*/
stop() {
stopManager(this);
const privateProps = internal(this);
const { logger, timeoutId } = privateProps;
clearTimeout(timeoutId);
privateProps.timeoutId = null;
logger.debug('The lease manager has stopped.');
}

@@ -184,0 +253,0 @@ }

@@ -0,1 +1,8 @@

/**
* Module that implements a shard polling consumer.
*
* @module polling-consumer
* @private
*/
'use strict';

@@ -8,2 +15,9 @@

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -26,2 +40,3 @@ if (!privateData.has(instance)) privateData.set(instance, {});

* @returns {Promise} A promise for a new shard iterator.
* @private
*/

@@ -55,2 +70,3 @@ async function getShardIterator(client, logger, streamName, shardId, sequenceNumber) {

* @returns {Promise}
* @private
*/

@@ -63,3 +79,5 @@ async function pollForRecords(instance) {

client,
continuePolling,
leaseExpiration,
limit,
logger,

@@ -80,71 +98,106 @@ noRecordsPollDelay,

if (Date.now() > leaseExpiration) {
logger.debug(`Can't read from shard "${shardId}" anymore, the lease expired.`);
stopConsumer(shardId);
return;
}
try {
if (Date.now() > leaseExpiration) {
logger.debug(`Unable to read from shard "${shardId}" anymore, the lease expired.`);
stopConsumer(shardId);
return;
}
if (seqNumToCheckpoint) {
await setCheckpoint(seqNumToCheckpoint);
privateProps.seqNumToCheckpoint = null;
}
if (seqNumToCheckpoint) {
await setCheckpoint(seqNumToCheckpoint);
privateProps.seqNumToCheckpoint = null;
}
let { iterator } = privateProps;
let { iterator } = privateProps;
if (!iterator && checkpoint) {
logger.debug(`Starting to read shard "${shardId}" from a known checkpoint.`);
iterator = await getShardIterator(client, logger, streamName, shardId, checkpoint);
}
if (!iterator && checkpoint) {
logger.debug(`Starting to read shard "${shardId}" from a known checkpoint.`);
iterator = await getShardIterator(client, logger, streamName, shardId, checkpoint);
}
if (!iterator) {
logger.debug(`Starting to read shard "${shardId}" from the latest record.`);
iterator = await getShardIterator(client, logger, streamName, shardId);
}
if (!iterator) {
logger.debug(`Starting to read shard "${shardId}" from the latest record.`);
iterator = await getShardIterator(client, logger, streamName, shardId);
}
const data = await client.getRecords({ ShardIterator: iterator });
const { NextShardIterator, Records } = data;
const millisBehindLatest = data.MillisBehindLatest;
privateProps.iterator = NextShardIterator;
const recordsCount = Records.length;
const data = await client.getRecords({ Limit: limit, ShardIterator: iterator });
const { NextShardIterator, Records } = data;
const msBehind = data.MillisBehindLatest;
privateProps.iterator = NextShardIterator;
const recordsCount = Records.length;
if (recordsCount === 0) {
if (NextShardIterator === undefined) {
const shards = await getStreamShards(privateProps);
logger.debug(`The parent shard "${shardId}" has been depleted.`);
await stateStore.markShardAsDepleted(shards, shardId);
stopConsumer(shardId);
if (recordsCount === 0) {
if (NextShardIterator === undefined) {
const shards = await getStreamShards(privateProps);
logger.debug(`The parent shard "${shardId}" has been depleted.`);
await stateStore.markShardAsDepleted(shards, shardId);
stopConsumer(shardId);
return;
}
const delay = msBehind <= 0 ? noRecordsPollDelay : 0;
if (delay === 0) logger.debug(`Fast-forwarding "${shardId}"… (${msBehind}ms behind)`);
privateProps.timeoutId = setTimeout(pollForRecords, delay, instance);
return;
}
const noMsgsDelay = millisBehindLatest <= 0 ? noRecordsPollDelay : 0;
if (noMsgsDelay === 0)
logger.debug(`Fast-forwarding "${shardId}"… (${millisBehindLatest}ms behind)`);
privateProps.timeoutId = setTimeout(pollForRecords, noMsgsDelay, instance);
return;
}
const records = await Promise.all(Records.map(recordsDecoder));
logger.debug(`Got ${recordsCount} record(s) from "${shardId}" (${msBehind}ms behind)`);
const records = await Promise.all(Records.map(recordsDecoder));
logger.debug(`Got ${recordsCount} records(s) from "${shardId}" (${millisBehindLatest}ms behind)`);
if (useAutoCheckpoints) {
const { sequenceNumber } = records[recordsCount - 1];
if (!usePausedPolling) {
await setCheckpoint(sequenceNumber);
} else {
privateProps.seqNumToCheckpoint = sequenceNumber;
if (useAutoCheckpoints) {
const { sequenceNumber } = records[recordsCount - 1];
if (!usePausedPolling) {
await setCheckpoint(sequenceNumber);
} else {
privateProps.seqNumToCheckpoint = sequenceNumber;
}
}
}
const propsToPush = { millisBehindLatest, records, setCheckpoint, shardId, streamName };
const propsToPush = Object.assign(
{ millisBehindLatest: msBehind, records, shardId, streamName },
!useAutoCheckpoints && { setCheckpoint },
usePausedPolling && { continuePolling }
);
if (usePausedPolling) {
const continuePolling = pollForRecords.bind(this, instance);
pushToStream(null, { ...propsToPush, continuePolling });
} else {
privateProps.timeoutId = setTimeout(pollForRecords, pollDelay, instance);
pushToStream(null, propsToPush);
if (!usePausedPolling) {
privateProps.timeoutId = setTimeout(pollForRecords, pollDelay, instance);
}
} catch (err) {
logger.error(err);
pushToStream(err);
}
}
/**
* Class that implements a polling consumer.
*
* @alias module:polling-consumer
*/
class PollingConsumer {
/**
* Initializes an instance of the polling consumer.
*
* @param {Object} options - The initialization options.
* @param {string} options.checkpoint - The last-known checkpoint for the stream shard.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {string} options.leaseExpiration - The timestamp of the shard lease expiration.
* @param {number} options.limit - The limit of records per get records call.
* @param {Object} options.logger - An instance of a logger.
* @param {number} options.noRecordsPollDelay - The delay in milliseconds before attempting to
* get more records when there were none in the previous attempt.
* @param {number} options.pollDelay - When the `usePausedPolling` option is `false`, this
* option defines the delay in milliseconds in between poll requests for more records.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {string} options.shardId - The ID of the stream shard to retrieve records for.
* @param {Object} options.stateStore - An instance of the state store.
* @param {Function} options.stopConsumer - A function that stops this consumer from the manager.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoCheckpoints - Whether to automatically store shard checkpoints
* using the sequence number of the most-recently received record or not.
* @param {boolean} options.usePausedPolling - Whether if the client is waiting for
* user-intervention before polling for more records, or not.
*/
constructor(options) {

@@ -156,2 +209,3 @@ const {

leaseExpiration,
limit,
logger,

@@ -172,4 +226,6 @@ noRecordsPollDelay,

client,
continuePolling: null,
iterator: null,
leaseExpiration: new Date(leaseExpiration).getTime(),
limit,
logger,

@@ -192,2 +248,8 @@ noRecordsPollDelay,

/**
* Starts the timers to poll for records.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {

@@ -201,5 +263,10 @@ const privateProps = internal(this);

privateProps.continuePolling = () => pollForRecords(this);
pollForRecords(this);
}
/**
* Stops the timers that poll for records.
*/
stop() {

@@ -211,2 +278,7 @@ const privateProps = internal(this);

/**
* Updates the shard lease expiration timestamp.
*
* @param {string} leaseExpiration - The updated timestamp when the shard lease expires.
*/
updateLeaseExpiration(leaseExpiration) {

@@ -213,0 +285,0 @@ internal(this).leaseExpiration = new Date(leaseExpiration).getTime();

@@ -0,1 +1,8 @@

/**
* Module that contains a collection of classes and functions to encode and decode Kinesis records.
*
* @module records
* @private
*/
'use strict';

@@ -12,2 +19,9 @@

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {

@@ -18,2 +32,9 @@ if (!privateData.has(instance)) privateData.set(instance, {});

/**
* Hashes the given buffer into a SHA1-Base64 string.
*
* @param {Buffer} buffer - The buffer of bytes to hash.
* @returns {string} A string with the hash of the buffer.
* @private
*/
function hash(buffer) {

@@ -25,2 +46,11 @@ return createHash('sha1')

/**
* Returns a function that decodes Kinesis records as they are retrieved from AWS.Kinesis into
* native objects. The decoder will also decompress the record data as instructed.
*
* @param {string} compression - The kind of compression used in the Kinesis record data.
* @param {string} inputEncoding - The encoding of the `Data` property in the AWS.Kinesis record.
* @returns {Function} A function that decodes `record` objects from AWS.Kinesis.
* @memberof module:records
*/
function getRecordsDecoder(compression, inputEncoding) {

@@ -59,2 +89,11 @@ const compressionLib = compression && compressionLibs[compression];

/**
* Returns a function that encodes native objects into records that can be sent to AWS.Kinesis.
* The encoder will also compress the record data using the specified compression library.
*
* @param {string} compression - The kind of compression used for the Kinesis record data.
* @param {string} outputEncoding - The encoding for the resulting `Data` property.
* @returns {Function} A function that encodes objects into the format expected by AWS.Kinesis.
* @memberof module:records
*/
function getRecordsEncoder(compression, outputEncoding) {

@@ -76,3 +115,16 @@ const compressionLib = compression && compressionLibs[compression];

/**
* Implements a transform stream that would decode and decompress records in AWS.Kinesis format as
* they arrive to the stream. The records are transformed into native objects.
*
* @extends external:Transform
* @memberof module:records
*/
class RecordsDecoder extends Transform {
/**
* Initializes the decoder stream.
*
* @param {Object} options - The initialization options.
* @param {string} options.compression - The kind of compression to use in records data.
*/
constructor({ compression }) {

@@ -85,2 +137,12 @@ super({ objectMode: true });

/**
* Transforms data as it passes through the stream.
*
* @param {Object} chunk - The data to transform.
* @param {Object} chunk.headers - The headers from an AWS event stream chunk.
* @param {Object} chunk.payload - The payload from an AWS event stream chunk.
* @param {string} encoding - The encoding used in the stream (ignored)
* @param {Function} callback - The callback to signal for completion.
* @returns {undefined}
*/
async _transform({ headers, payload }, encoding, callback) {

@@ -121,2 +183,7 @@ const { recordsDecoder } = internal(this);

/**
* @external Transform
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_transform
*/
module.exports = {

@@ -123,0 +190,0 @@ RecordsDecoder,

@@ -0,1 +1,8 @@

/**
* Module that maintains the state of the consumer in a DynamoDB table.
*
* @module state-store
* @private
*/
'use strict';

@@ -21,2 +28,3 @@

* @returns {Object} The private data.
* @private
*/

@@ -28,2 +36,9 @@ function internal(instance) {

/**
* Retrieves the stream state.
*
* @param {Object} instance - The instance of the state store to get the private data from.
* @returns {Promise}
* @private
*/
async function getStreamState(instance) {

@@ -39,2 +54,9 @@ const privateProps = internal(instance);

/**
* Ensures there's an entry for the current stream in the state database.
*
* @param {Object} instance - The instance of the state store to get the private data from.
* @returns {Promise}
* @private
*/
async function initStreamState(instance) {

@@ -74,3 +96,89 @@ const privateProps = internal(instance);

/**
* Updates the "is active" flag of a consumer. This is useful for when a consumer is using
* enhanced fan-out but is unable to use one of the registered enhanced consumers. When non
* active, a consumer isn't used when calculating the maximum active number of leases.
*
* @param {Object} instance - The instance of the state store to get the private data from.
* @param {boolean} isActive - The value to set the "is active" flag to.
* @fulfil {undefined}
* @returns {Promise}
*/
async function updateConsumerIsActive(instance, isActive) {
const { client, consumerGroup, consumerId, logger, streamName } = internal(instance);
try {
await client.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'isActive'
},
ExpressionAttributeValues: {
':z': isActive
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :z'
});
} catch (err) {
logger.debug("Can't update the is consumer active flag.");
}
}
/**
* Tries to lock an enhanced fan-out consumer to this consumer.
*
* @param {Object} instance - The instance of the state store to get the private data from.
* @param {string} consumerName - The name of the enhanced fan-out consumer to lock.
* @param {string} version - The known version number of the enhanced consumer state entry.
* @fulfil {boolean} - `true` if the enhanced consumer was locked, `false` otherwise.
* @returns {Promise}
*/
async function lockEnhancedConsumer(instance, consumerName, version) {
const {
client,
consumerGroup,
consumerId,
logger,
streamName,
useAutoShardAssignment
} = internal(instance);
try {
await client.update({
ConditionExpression: `#a.#b.#d = :z`,
ExpressionAttributeNames: Object.assign(
{
'#a': 'enhancedConsumers',
'#b': consumerName,
'#c': 'isUsedBy',
'#d': 'version'
},
!useAutoShardAssignment && { '#e': 'shards' }
),
ExpressionAttributeValues: Object.assign(
{
':x': consumerId,
':y': generate(),
':z': version
},
!useAutoShardAssignment && { ':w': {} }
),
Key: { consumerGroup, streamName },
UpdateExpression: `SET #a.#b.#c = :x, #a.#b.#d = :y${
!useAutoShardAssignment ? ', #a.#b.#e = if_not_exists(#a.#b.#e, :w)' : ''
}`
});
return true;
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
}
return false;
}
}
/**
* Class that encapsulates the DynamoDB table where the shared state for the stream is stored.
*
* @alias module:state-store
*/

@@ -82,5 +190,10 @@ class StateStore {

* @param {Object} options - The initialization options.
* @param {string} options.consumerGroup - The name of the group of consumers in which shards
* will be distributed and checkpoints will be shared.
* @param {string} options.consumerId - An unique ID representing the instance of this consumer.
* @param {Object} options.dynamoDb - The initialization options passed to the Kinesis
* client module, specific for the DynamoDB state data table. This object can also
* contain any of the [`AWS.DynamoDB` options]{@link external:dynamoDbConstructor}.
* contain any of the [`AWS.DynamoDB` options]{@link external:AwsJsSdkDynamoDb}.
* @param {Object} [options.dynamoDB.provisionedThroughput] - The provisioned throughput for the
* state table. If not provided, pay-per-request is used.
* @param {string} [options.dynamoDb.tableName=lifion-kinesis-state] - The name of the

@@ -90,4 +203,10 @@ * table where the shared state is stored.

* the table has these tags during start.
* @param {Object} options.logger - A logger instance.
* @param {Object} options.logger - An instance of a logger.
* @param {string} options.streamCreatedOn - The creation timestamp for the stream. It's used
* to confirm the stored state corresponds to the same stream with the given name.
* @param {string} options.streamName - The name of the stream to keep state for.
* @param {boolean} options.useAutoShardAssignment - Wheter if the stream shards should be
* automatically assigned to the active consumers in the same group or not.
* @param {boolean} options.useEnhancedFanOut - Whether if the consumer is using enhanced
* fan-out consumers or not.
*/

@@ -122,18 +241,12 @@ constructor(options) {

/**
* Starts the state store by initializing a DynamoDB client and a document client. Then,
* it will ensure the table exists, that is tagged as required, and there's an entry for
* the stream state.
* Clears out consumers that are considered to be gone as they have failed to record a
* hearbeat in a given timeout period. In addition to clearing out the consumers, any shard
* with an active lease for consumers that are gone will be released. Any enhanced fan-out
* consumers in use by gone consumers will also be released.
*
* @param {number} heartbeatFailureTimeout - The number of milliseconds after a heartbeat when
* a consumer should be considered as gone.
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {
const privateProps = internal(this);
const { tags } = privateProps;
const client = new DynamoDbClient(privateProps);
privateProps.client = client;
privateProps.tableArn = await ensureTableExists(privateProps);
if (tags) await confirmTableTags(privateProps);
await initStreamState(this);
}
async clearOldConsumers(heartbeatFailureTimeout) {

@@ -151,26 +264,26 @@ const privateProps = internal(this);

if (oldConsumers.length === 0) return;
try {
await client.update({
ConditionExpression: `#b = :y`,
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': 'version',
...oldConsumers.reduce((obj, id, index) => ({ ...obj, [`#${index}`]: id }), {})
},
ExpressionAttributeValues: {
':x': generate(),
':y': version
},
Key: { consumerGroup, streamName },
UpdateExpression: `REMOVE ${oldConsumers
.map((id, index) => `#a.#${index}`)
.join(', ')} SET #b = :x`
});
logger.debug(`Cleared ${oldConsumers.length} old consumer(s).`);
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
if (oldConsumers.length > 0) {
try {
await client.update({
ConditionExpression: `#b = :y`,
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': 'version',
...oldConsumers.reduce((obj, id, index) => ({ ...obj, [`#${index}`]: id }), {})
},
ExpressionAttributeValues: {
':x': generate(),
':y': version
},
Key: { consumerGroup, streamName },
UpdateExpression: `REMOVE ${oldConsumers
.map((id, index) => `#a.#${index}`)
.join(', ')} SET #b = :x`
});
logger.debug(`Cleared ${oldConsumers.length} old consumer(s).`);
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
}
}

@@ -184,24 +297,22 @@ }

if (usagesToClear.length === 0) return;
try {
await client.update({
ExpressionAttributeNames: {
'#a': 'enhancedConsumers',
'#b': 'isUsedBy',
'#c': 'version',
...usagesToClear.reduce((obj, id, index) => ({ ...obj, [`#${index}`]: id }), {})
},
ExpressionAttributeValues: {
':z': null,
...usagesToClear.reduce((obj, id, index) => ({ ...obj, [`:${index}`]: generate() }), {})
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET ${usagesToClear
.map((id, index) => `#a.#${index}.#b = :z, #a.#${index}.#c = :${index}`)
.join(', ')}`
});
logger.debug(`Released usage of ${usagesToClear.length} enhanced consumer(s).`);
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
if (usagesToClear.length > 0) {
try {
await client.update({
ExpressionAttributeNames: {
'#a': 'enhancedConsumers',
'#b': 'isUsedBy',
'#c': 'version',
...usagesToClear.reduce((obj, id, index) => ({ ...obj, [`#${index}`]: id }), {})
},
ExpressionAttributeValues: {
':z': null,
...usagesToClear.reduce((obj, id, index) => ({ ...obj, [`:${index}`]: generate() }), {})
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET ${usagesToClear
.map((id, index) => `#a.#${index}.#b = :z, #a.#${index}.#c = :${index}`)
.join(', ')}`
});
logger.debug(`Released usage of ${usagesToClear.length} enhanced consumer(s).`);
} catch (err) {
logger.error(err);

@@ -213,113 +324,63 @@ throw err;

async getEnhancedConsumers() {
const { enhancedConsumers } = await getStreamState(this);
return enhancedConsumers;
}
/**
* Removes an enhanced fan-out consumer from the stream state.
*
* @param {string} name - The name of the enhanced consumer to remove.
* @fulfil {undefined}
* @returns {Promise}
*/
async deregisterEnhancedConsumer(name) {
const { client, consumerGroup, logger, streamName } = internal(this);
async registerConsumer() {
const {
client,
consumerGroup,
consumerId,
logger,
streamName,
useAutoShardAssignment,
useEnhancedFanOut
} = internal(this);
try {
await client.update({
ConditionExpression: 'attribute_not_exists(#a.#b)',
ConditionExpression: 'attribute_exists(#a.#b)',
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId
'#a': 'enhancedConsumers',
'#b': name
},
ExpressionAttributeValues: {
':x': Object.assign(
{
appName,
heartbeat: new Date().toISOString(),
host,
isActive: true,
isStandalone: !useAutoShardAssignment,
pid,
startedOn: new Date(Date.now() - uptime() * 1000).toISOString()
},
!useAutoShardAssignment && !useEnhancedFanOut && { shards: {} }
)
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b = :x'
UpdateExpression: 'REMOVE #a.#b'
});
logger.debug(`The consumer "${consumerId}" is now registered.`);
logger.debug(`The enhanced consumer "${name}" is now de-registered.`);
} catch (err) {
if (err.code === 'ConditionalCheckFailedException') {
await client
.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'heartbeat'
},
ExpressionAttributeValues: {
':x': new Date().toISOString()
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :x'
})
.catch(() => {
logger.debug(`Missed heartbeat for "${consumerId}".`);
});
return;
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
}
logger.error(err);
throw err;
}
}
async updateConsumerIsActive(isActive) {
const { client, consumerGroup, consumerId, logger, streamName } = internal(this);
try {
await client.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'isActive'
},
ExpressionAttributeValues: {
':z': isActive
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :z'
});
} catch (err) {
logger.debug("Can't update the is consumer active flag.");
}
}
/**
* Ensures there's an entry for the given shard ID and data in the stream state.
*
* @param {string} shardId - The ID of the stream shard.
* @param {Object} shardData - The data describing the shard as returned by the AWS Kinesis API.
* @param {Object} [streamState] - The current stream state, if known.
* @fulfil {undefined}
* @returns {Promise}
*/
async ensureShardStateExists(shardId, shardData, streamState) {
const privateProps = internal(this);
const { shardsPath, shardsPathNames } = await this.getShardsData(streamState);
const { client, consumerGroup, logger, streamName } = privateProps;
const { parent } = shardData;
async registerEnhancedConsumer(name, arn) {
const { client, consumerGroup, logger, streamName, useAutoShardAssignment } = internal(this);
try {
await client.update({
ConditionExpression: 'attribute_not_exists(#a.#b)',
ExpressionAttributeNames: {
'#a': 'enhancedConsumers',
'#b': name
},
ConditionExpression: `attribute_not_exists(${shardsPath}.#b)`,
ExpressionAttributeNames: { ...shardsPathNames, '#b': shardId },
ExpressionAttributeValues: {
':x': Object.assign(
{
arn,
isStandalone: !useAutoShardAssignment,
isUsedBy: null,
version: generate()
},
!useAutoShardAssignment && { shards: {} }
)
':x': {
checkpoint: null,
depleted: false,
leaseExpiration: null,
leaseOwner: null,
parent,
version: generate()
}
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b = :x'
UpdateExpression: `SET ${shardsPath}.#b = :x`
});
logger.debug(`The enhanced consumer "${name}" is now registered.`);
} catch (err) {

@@ -333,27 +394,131 @@ if (err.code !== 'ConditionalCheckFailedException') {

async deregisterEnhancedConsumer(name) {
const { client, consumerGroup, logger, streamName } = internal(this);
/**
* Returns the ARN of the enhanced fan-out consumer assigned to this consumer. It will try to
* lock one if the consumer is using enhanced fan-out but hasn't been assigned one before.
*
* @fulfil {string} - The ARN of the assigned enhanced fan-out consumer, `null` otherwise.
* @returns {Promise}
*/
async getAssignedEnhancedConsumer() {
const { consumerId, logger } = internal(this);
try {
await client.update({
ConditionExpression: 'attribute_exists(#a.#b)',
ExpressionAttributeNames: {
'#a': 'enhancedConsumers',
'#b': name
},
Key: { consumerGroup, streamName },
UpdateExpression: 'REMOVE #a.#b'
});
logger.debug(`The enhanced consumer "${name}" is now de-registered.`);
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
let consumerArn;
let consumerName;
const enhancedConsumers = await this.getEnhancedConsumers();
const consumerNames = Object.keys(enhancedConsumers);
// Find out an enhanced consumer was already assigned.
consumerNames.find(name => {
const { arn, isUsedBy } = enhancedConsumers[name];
if (isUsedBy === consumerId) {
consumerName = name;
consumerArn = arn;
return true;
}
return false;
});
// Try to assign an enhanced consumer from the available ones.
if (!consumerArn) {
const availableConsumers = consumerNames.filter(name => !enhancedConsumers[name].isUsedBy);
for (let i = 0; i < availableConsumers.length; i += 1) {
const name = availableConsumers[i];
const { arn, version } = enhancedConsumers[name];
if (await lockEnhancedConsumer(this, name, version)) {
consumerArn = arn;
consumerName = name;
break;
}
}
}
if (!consumerArn) {
logger.warn(`Couldn't lock an enhanced fan-out consumer.`);
await updateConsumerIsActive(this, false);
return null;
}
await updateConsumerIsActive(this, true);
logger.debug(`Using the "${consumerName}" enhanced fan-out consumer.`);
return consumerArn;
}
async getShardsData(streamState = null) {
/**
* Returns the data for the enhanced fan-out consumers stored in the state.
*
* @fulfil {Object} - The enhanced consumers.
* @returns {Promise}
*/
async getEnhancedConsumers() {
const { enhancedConsumers } = await getStreamState(this);
return enhancedConsumers;
}
/**
* Returns an object with the state of the shards for which this consumer has an active lease.
*
* @fulfil {Object} - An object with the state of the owned shards.
* @returns {Promise}
*/
async getOwnedShards() {
const { consumerId } = internal(this);
const streamState = await getStreamState(this);
const { shards } = await this.getShardsData(streamState);
return Object.keys(shards)
.filter(shardId => shards[shardId].leaseOwner === consumerId)
.reduce((obj, shardId) => {
const { checkpoint, depleted, leaseExpiration, version } = shards[shardId];
if (new Date(leaseExpiration).getTime() - Date.now() > 0 && !depleted)
return { ...obj, [shardId]: { checkpoint, leaseExpiration, version } };
return obj;
}, {});
}
/**
* Returns an object with the current states for a given shard and the entire stream.
*
* @param {string} shardId - The ID of the stream shard to get the state for.
* @param {Object} shardData - The data describing the shard as provided by the AWS Kinesis API.
* @fulfil {Object} - An object containing `shardState` (the state of the shard) and
* `streamState` (the stream state).
* @returns {Promise}
*/
async getShardAndStreamState(shardId, shardData) {
const getState = async () => {
const streamState = await getStreamState(this);
const { shards } = await this.getShardsData(streamState);
const shardState = shards[shardId];
return { shardState, streamState };
};
const states = await getState();
if (states.shardState !== undefined) return states;
await this.ensureShardStateExists(shardId, shardData, states.streamState);
return getState();
}
/**
* Returns the current state of the stream shards and pointers that can be used to update the
* stream shard state in subsequent calls. This is useful as the shards state is stored in
* different locations depending on the consumer usage scenario.
*
* - When using automatic shard distribution, the shards state is stored in `.shards`.
* - When reading from all shards, the shards state is stored in `.consumers[].shards`.
* - When reading from all the shards and using enhanced fan-out consumers, the shards state
* is stored in `.enhancedConsumers[].shards`.
*
* @param {Object} [streamState] - The current state for the entire stream, if not provided,
* the stream state is fetched. This parameter is useful to avoid repeated calls for
* the stream state retrieval if that information is already present.
* @fulfil {Object} - An object containing `shards` (the current shards state), `shardsPath`
* (a string pointing to the path to where the shards state is stored), and
* `shardsPathNames` (an object with the value for the path attributes that point to the
* place where the shards state is stored). Both `shardsPath` and `shardsPathNames` are
* to be used in `update` expressions.
* @returns {Promise}
*/
async getShardsData(streamState) {
const { consumerId, useAutoShardAssignment, useEnhancedFanOut } = internal(this);
const normStreamState = streamState === null ? await getStreamState(this) : streamState;
const normStreamState = !streamState ? await getStreamState(this) : streamState;
const { consumers, enhancedConsumers } = normStreamState;

@@ -397,46 +562,12 @@

async ensureShardStateExists(shardId, shardData, streamState) {
const privateProps = internal(this);
const { shardsPath, shardsPathNames } = await this.getShardsData(streamState);
const { client, consumerGroup, logger, streamName } = privateProps;
const { parent } = shardData;
try {
await client.update({
ConditionExpression: `attribute_not_exists(${shardsPath}.#b)`,
ExpressionAttributeNames: { ...shardsPathNames, '#b': shardId },
ExpressionAttributeValues: {
':x': {
checkpoint: null,
depleted: false,
leaseExpiration: null,
leaseOwner: null,
parent,
version: generate()
}
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET ${shardsPath}.#b = :x`
});
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
logger.error(err);
throw err;
}
}
}
async getShardAndStreamState(shardId, shardData) {
const getState = async () => {
const streamState = await getStreamState(this);
const { shards } = await this.getShardsData(streamState);
const shardState = shards[shardId];
return { shardState, streamState };
};
const states = await getState();
if (states.shardState !== undefined) return states;
await this.ensureShardStateExists(shardId, shardData, states.streamState);
return getState();
}
/**
* Tries to lock the lease of a given shard.
*
* @param {string} shardId - The ID of the shard to lock a lease for.
* @param {number} leaseTermTimeout - The duration of the lease in milliseconds.
* @param {string} version - The known version number of the shard state entry.
* @param {Object} streamState - The known stream state.
* @fulfil {boolean} - `true` if the lease was successfuly locked, `false` otherwise.
* @returns {Promise}
*/
async lockShardLease(shardId, leaseTermTimeout, version, streamState) {

@@ -478,3 +609,84 @@ const { client, consumerGroup, consumerId, logger, streamName } = internal(this);

async lockStreamConsumer(consumerName, version) {
/**
* Marks a shard as depleted in the stream state so children shards can be leased.
*
* @param {Object} shardsData - The current shards state.
* @param {string} parentShardId - The ID of the shard to mark as depleted.
* @fulfil {undefined}
* @returns {Promise}
*/
async markShardAsDepleted(shardsData, parentShardId) {
const { client, consumerGroup, streamName } = internal(this);
const streamState = await getStreamState(this);
const { shards, shardsPath, shardsPathNames } = await this.getShardsData(streamState);
const parentShard = shards[parentShardId];
const childrenShards = parentShard.checkpoint
? Object.keys(shardsData)
.filter(shardId => shardsData[shardId].parent === parentShardId)
.map(shardId => {
const { startingSequenceNumber } = shardsData[shardId];
return { shardId, startingSequenceNumber };
})
: [];
await Promise.all(
childrenShards.map(childrenShard => {
return this.ensureShardStateExists(
childrenShard.shardId,
shardsData[childrenShard.shardId],
streamState
);
})
);
await client.update({
ExpressionAttributeNames: Object.assign(
{
...shardsPathNames,
'#b': parentShardId,
'#c': 'depleted',
'#d': 'version'
},
childrenShards.length > 0 && { '#e': 'checkpoint' },
childrenShards.reduce(
(obj, childShard, index) => ({ ...obj, [`#${index}`]: childShard.shardId }),
{}
)
),
ExpressionAttributeValues: {
':x': true,
':y': generate(),
...childrenShards.reduce(
(obj, childShard, index) => ({
...obj,
[`:${index * 2}`]: childShard.startingSequenceNumber,
[`:${index * 2 + 1}`]: generate()
}),
{}
)
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET ${[
`${shardsPath}.#b.#c = :x`,
`${shardsPath}.#b.#d = :y`,
...childrenShards.map((childShard, index) =>
[
`${shardsPath}.#${index}.#e = :${index * 2}`,
`${shardsPath}.#${index}.#d = :${index * 2 + 1}`
].join(', ')
)
].join(', ')}`
});
}
/**
* Registers the current consumer in the state if not present there yet. If present,
* it updates the consumer hearbeat.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async registerConsumer() {
const {

@@ -486,3 +698,4 @@ client,

streamName,
useAutoShardAssignment
useAutoShardAssignment,
useEnhancedFanOut
} = internal(this);

@@ -492,27 +705,84 @@

await client.update({
ConditionExpression: `#a.#b.#d = :z`,
ExpressionAttributeNames: Object.assign(
{
'#a': 'enhancedConsumers',
'#b': consumerName,
'#c': 'isUsedBy',
'#d': 'version'
},
!useAutoShardAssignment && { '#e': 'shards' }
),
ExpressionAttributeValues: Object.assign(
{
':x': consumerId,
':y': generate(),
':z': version
},
!useAutoShardAssignment && { ':e': {} }
),
ConditionExpression: 'attribute_not_exists(#a.#b)',
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId
},
ExpressionAttributeValues: {
':x': Object.assign(
{
appName,
heartbeat: new Date().toISOString(),
host,
isActive: true,
isStandalone: !useAutoShardAssignment,
pid,
startedOn: new Date(Date.now() - uptime() * 1000).toISOString()
},
!useAutoShardAssignment && !useEnhancedFanOut && { shards: {} }
)
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET #a.#b.#c = :x, #a.#b.#d = :y${
!useAutoShardAssignment ? ', #a.#b.#e = if_not_exists(#a.#b.#e, :e)' : ''
}`
UpdateExpression: 'SET #a.#b = :x'
});
return true;
logger.debug(`The consumer "${consumerId}" is now registered.`);
} catch (err) {
if (err.code === 'ConditionalCheckFailedException') {
await client
.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'heartbeat'
},
ExpressionAttributeValues: {
':x': new Date().toISOString()
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :x'
})
.catch(() => {
logger.debug(`Missed heartbeat for "${consumerId}".`);
});
return;
}
logger.error(err);
throw err;
}
}
/**
* Makes sure that an enhanced fan-out consumer is present in the stream state.
*
* @param {string} name - The name of the enhanced fan-out consumer.
* @param {string} arn - The ARN of the enhanced fan-out consumer as given by AWS.
* @fulfil {undefined}
* @returns {Promise}
*/
async registerEnhancedConsumer(name, arn) {
const { client, consumerGroup, logger, streamName, useAutoShardAssignment } = internal(this);
try {
await client.update({
ConditionExpression: 'attribute_not_exists(#a.#b)',
ExpressionAttributeNames: {
'#a': 'enhancedConsumers',
'#b': name
},
ExpressionAttributeValues: {
':x': Object.assign(
{
arn,
isStandalone: !useAutoShardAssignment,
isUsedBy: null,
version: generate()
},
!useAutoShardAssignment && { shards: {} }
)
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b = :x'
});
logger.debug(`The enhanced consumer "${name}" is now registered.`);
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {

@@ -522,6 +792,14 @@ logger.error(err);

}
return false;
}
}
/**
* Tries to release the lease of a shard.
*
* @param {string} shardId - The ID of the shard to release a lease for.
* @param {string} version - The known version number of the shard state entry.
* @param {Object} streamState - The known stream state.
* @fulfil {string} - The new version number if the lease is released, `null` otherwise.
* @returns {Promise}
*/
async releaseShardLease(shardId, version, streamState) {

@@ -566,2 +844,32 @@ const privateProps = internal(this);

/**
* Starts the state store by initializing a DynamoDB client and a document client. Then,
* it will ensure the table exists, that is tagged as required, and there's an entry for
* the stream state.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {
const privateProps = internal(this);
const { tags } = privateProps;
const client = new DynamoDbClient(privateProps);
privateProps.client = client;
privateProps.tableArn = await ensureTableExists(privateProps);
if (tags) await confirmTableTags(privateProps);
await initStreamState(this);
}
/**
* Store a shard checkpoint.
*
* @param {string} shardId - The ID of the shard to store a checkpoint for.
* @param {string} checkpoint - The sequence number to store as the recovery point.
* @param {string} shardsPath - The path pointing to where the shards state is stored.
* @param {Object} shardsPathNames - The values of the attribute names in the path.
* @fulfil {undefined}
* @returns {Promise}
*/
async storeShardCheckpoint(shardId, checkpoint, shardsPath, shardsPathNames) {

@@ -586,132 +894,6 @@ if (typeof checkpoint !== 'string') throw new TypeError('The sequence number is required.');

}
async markShardAsDepleted(shardsData, parentShardId) {
const { client, consumerGroup, streamName } = internal(this);
const streamState = await getStreamState(this);
const { shards, shardsPath, shardsPathNames } = await this.getShardsData(streamState);
const parentShard = shards[parentShardId];
const childrenShards = parentShard.checkpoint
? Object.keys(shardsData)
.filter(shardId => shardsData[shardId].parent === parentShardId)
.map(shardId => {
const { startingSequenceNumber } = shardsData[shardId];
return { shardId, startingSequenceNumber };
})
: [];
await Promise.all(
childrenShards.map(childrenShard => {
return this.ensureShardStateExists(
childrenShard.shardId,
shardsData[childrenShard.shardId],
streamState
);
})
);
await client.update({
ExpressionAttributeNames: Object.assign(
{
...shardsPathNames,
'#b': parentShardId,
'#c': 'depleted',
'#d': 'version'
},
childrenShards.length > 0 && { '#e': 'checkpoint' },
childrenShards.reduce(
(obj, childShard, index) => ({ ...obj, [`#${index}`]: childShard.shardId }),
{}
)
),
ExpressionAttributeValues: {
':x': true,
':y': generate(),
...childrenShards.reduce(
(obj, childShard, index) => ({
...obj,
[`:${index * 2}`]: childShard.startingSequenceNumber,
[`:${index * 2 + 1}`]: generate()
}),
{}
)
},
Key: { consumerGroup, streamName },
UpdateExpression: `SET ${[
`${shardsPath}.#b.#c = :x`,
`${shardsPath}.#b.#d = :y`,
...childrenShards.map((childShard, index) =>
[
`${shardsPath}.#${index}.#e = :${index * 2}`,
`${shardsPath}.#${index}.#d = :${index * 2 + 1}`
].join(', ')
)
].join(', ')}`
});
}
async getOwnedShards() {
const { consumerId } = internal(this);
const streamState = await getStreamState(this);
const { shards } = await this.getShardsData(streamState);
return Object.keys(shards)
.filter(shardId => shards[shardId].leaseOwner === consumerId)
.reduce((obj, shardId) => {
const { checkpoint, depleted, leaseExpiration, version } = shards[shardId];
if (new Date(leaseExpiration).getTime() - Date.now() > 0 && !depleted)
return { ...obj, [shardId]: { checkpoint, leaseExpiration, version } };
return obj;
}, {});
}
async getAssignedEnhancedConsumer() {
const { consumerId, logger } = internal(this);
let consumerArn;
let consumerName;
const enhancedConsumers = await this.getEnhancedConsumers();
const consumerNames = Object.keys(enhancedConsumers);
// Find out an enhanced consumer was already assigned.
consumerNames.find(name => {
const { arn, isUsedBy } = enhancedConsumers[name];
if (isUsedBy === consumerId) {
consumerName = name;
consumerArn = arn;
return true;
}
return false;
});
// Try to assign an enhanced consumer from the available ones.
if (!consumerArn) {
const availableConsumers = consumerNames.filter(name => !enhancedConsumers[name].isUsedBy);
for (let i = 0; i < availableConsumers.length; i += 1) {
const name = availableConsumers[i];
const { arn, version } = enhancedConsumers[name];
if (await this.lockStreamConsumer(name, version)) {
consumerArn = arn;
consumerName = name;
break;
}
}
}
if (!consumerArn) {
logger.warn(`Couldn't lock an enhanced fan-out consumer.`);
await this.updateConsumerIsActive(false);
return null;
}
await this.updateConsumerIsActive(true);
logger.debug(`Using the "${consumerName}" enhanced fan-out consumer.`);
return consumerArn;
}
}
/**
* @external dynamoDbConstructor
* @external AwsJsSdkDynamoDb
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property

@@ -718,0 +900,0 @@ */

@@ -0,1 +1,8 @@

/**
* Module that manages statistics about the requests to AWS.
*
* @module stats
* @private
*/
'use strict';

@@ -11,2 +18,11 @@

/**
* Returns an object where the stats should be stored. If a stream name is not provided,
* the object that stores the module stats is returned instead.
*
* @param {string} [streamName] - The name of a Kinesis stream.
* @param {string} [key] - A key for a store object.
* @returns {Object} The store for the stats.
* @private
*/
function getStore(streamName, key) {

@@ -25,2 +41,10 @@ let store = moduleStats;

/**
* Stores an exception in the history of recent exceptions of the given store.
*
* @param {Object} params - The parameters.
* @param {Error} params.exception - The exception to store.
* @param {Object} params.store - The object where to store the exception.
* @private
*/
function cacheException(params) {

@@ -34,2 +58,9 @@ const { exception, store } = params;

/**
* Returns an object with a list of exceptions and its count.
*
* @param {Object} cache - The cache instance where the exceptions are stored.
* @returns {Object}
* @private
*/
function formatExceptions(cache) {

@@ -41,3 +72,10 @@ cache.prune();

const { code, message, requestId, statusCode } = err;
exceptions.push({ code, message, requestId, statusCode, timestamp });
exceptions.push(
Object.assign(
{ message, timestamp },
code && { code },
requestId && { requestId },
statusCode && { statusCode }
)
);
});

@@ -47,2 +85,9 @@ return { count, exceptions };

/**
* Retrieves the stats for the module, or for a given stream (if the stream name is provided).
*
* @param {string} [streamName] - The name of the stream.
* @returns {Object}
* @memberof module:stats
*/
function getStats(streamName) {

@@ -79,13 +124,14 @@ const formatValues = (results, [key, value]) => {

function reportAwsResponse(source, streamName) {
const now = new Date();
if (streamName) getStore(streamName).lastAwsResponse = now;
getStore(null, source).lastAwsResponse = now;
moduleStats.lastAwsResponse = now;
}
function reportException(source, exception, streamName) {
if (streamName) {
cacheException({ exception, store: getStore(streamName) });
}
/**
* Reports an error into the stats.
*
* @param {string} source - Either `kinesis` or `dynamoDb`.
* @param {Error} exception - The error to store.
* @param {string} [streamName] - If source is `kinesis`, the name of the stream.
* @memberof module:stats
*/
function reportError(source, exception, streamName) {
if (!source) throw new TypeError('The "source" argument is required.');
if (!exception) throw new TypeError('The "exception" argument is required.');
if (streamName) cacheException({ exception, store: getStore(streamName) });
cacheException({ exception, store: getStore(null, source) });

@@ -95,3 +141,10 @@ cacheException({ exception, store: moduleStats });

/**
* Reports the consumption of a Kinesis record by a consumer into the stats.
*
* @param {string} streamName - The name of the Kinesis stream from where the record originated.
* @memberof module:stats
*/
function reportRecordConsumed(streamName) {
if (!streamName) throw new TypeError('The "streamName" argument is required.');
const now = new Date();

@@ -102,3 +155,10 @@ getStore(streamName).lastRecordConsumed = now;

/**
* Reports the submission of a Kinesis record into the stats.
*
* @param {string} streamName - The name of the Kinesis stream where the record was sent.
* @memberof module:stats
*/
function reportRecordSent(streamName) {
if (!streamName) throw new TypeError('The "streamName" argument is required.');
const now = new Date();

@@ -109,8 +169,23 @@ getStore(streamName).lastRecordSent = now;

/**
* Reports a successful AWS request response into the stats.
*
* @param {string} source - Either `kinesis` or `dynamoDb`.
* @param {string} [streamName] - The name of the Kinesis stream.
* @memberof module:stats
*/
function reportResponse(source, streamName) {
if (!source) throw new TypeError('The "source" argument is required.');
const now = new Date();
if (streamName) getStore(streamName).lastAwsResponse = now;
getStore(null, source).lastAwsResponse = now;
moduleStats.lastAwsResponse = now;
}
module.exports = {
getStats,
reportAwsResponse,
reportException,
reportError,
reportRecordConsumed,
reportRecordSent
reportRecordSent,
reportResponse
};

@@ -0,1 +1,8 @@

/**
* Module with statics to handle stream management.
*
* @module stream
* @private
*/
'use strict';

@@ -10,2 +17,16 @@

/**
* Checks if the given stream exists. If the stream is getting deleted or in the middle of an
* update, it will wait for the status change completion. If the stream exists, the stream ARN and
* created-on timestamp is returned. If the stream doesn't exist, the ARN is set to `null`.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {Object} params.logger - An instance of a logger.
* @param {string} params.streamName - The name of the stream to check for.
* @fulfil {Object} - An object with `streamArn` (the stream ARN) and `streamCreatedOn` (the
* stream creation timestamp). If the stream doesn't exist, `streamArn` is set to `null`.
* @returns {Promise}
* @memberof module:stream
*/
async function checkIfStreamExists({ client, logger, streamName }) {

@@ -43,2 +64,15 @@ try {

/**
* Checks if the given stream is tagged as specified. If the stream is currently tagged with
* different tags, the tags are merged and the stream is tagged with them. If the stream has no
* tags, it will get tagged with the specified ones.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {Object} params.logger - An instance of a logger.
* @param {string} params.streamName - The stream to check the tags for.
* @fulfil {undefined}
* @returns {Promise}
* @memberof module:stream
*/
async function confirmStreamTags({ client, logger, streamName, tags }) {

@@ -58,10 +92,21 @@ const params = { StreamName: streamName };

async function describeEnhancedConsumer(props) {
const { client, consumerName, streamArn } = props;
const { Consumers } = await client.listStreamConsumers({ StreamARN: streamArn });
return Consumers.find(i => i.ConsumerName === consumerName) || {};
}
async function ensureStreamEncription(props) {
const { client, encryption, logger, streamName: StreamName } = props;
/**
* Ensures that the stream is encrypted as specified. If not encrypted, the stream will get
* encrypted and the call won't resolve until the stream update process has completed.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {Object} params.encryption - The encryption options to enforce.
* @param {string} params.encryption.keyId - The GUID for the customer-managed AWS KMS key
* to use for encryption. This value can be a globally unique identifier, a fully
* specified ARN to either an alias or a key, or an alias name prefixed by "alias/".
* @param {string} params.encryption.type - The encryption type to use.
* @param {Object} params.logger - An instance of the logger.
* @param {string} params.streamName - The stream to check for encryption.
* @fulfil {undefined}
* @returns {Promise}
* @memberof module:stream
*/
async function ensureStreamEncription(params) {
const { client, encryption, logger, streamName: StreamName } = params;
const { keyId: KeyId, type: EncryptionType } = encryption;

@@ -82,14 +127,31 @@

async function ensureStreamExists(props) {
const { client, createStreamIfNeeded, logger, shardCount, streamName } = props;
/**
* Ensures that the specified stream exists. If it doesn't exist, it process to create it and
* wait until the new stream is activated and ready to go.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An intance of the Kinesis client.
* @param {boolean} params.createStreamIfNeeded - Whether if the Kinesis stream should
* be automatically created if it doesn't exist upon connection.
* @param {Object} params.logger - An instance of a logger.
* @param {number} params.shardCount - The number of shards that the newly-created stream
* will use (if the `createStreamIfNeeded` option is set)
* @param {string} params.streamName - The name of the stream to check/create.
* @fulfil {Object} - An object with `streamArn` (the stream ARN) and `streamCreatedOn` (the
* stream creation timestamp). If the stream doesn't exist, `streamArn` is set to `null`.
* @returns {Promise}
* @memberof module:stream
*/
async function ensureStreamExists(params) {
const { client, createStreamIfNeeded, logger, shardCount, streamName } = params;
logger.debug(`Verifying the "${streamName}" stream exists and it's active…`);
const { streamArn, streamCreatedOn } = await checkIfStreamExists(props);
const { streamArn, streamCreatedOn } = await checkIfStreamExists(params);
if (createStreamIfNeeded && streamArn === null) {
logger.debug('Trying to create the stream…');
const params = { StreamName: streamName };
await client.createStream({ ...params, ShardCount: shardCount });
const awsParams = { StreamName: streamName };
await client.createStream({ ...awsParams, ShardCount: shardCount });
logger.debug('Waiting for the new stream to be active…');
const { StreamDescription } = await client.waitFor('streamExists', params);
const { StreamDescription } = await client.waitFor('streamExists', awsParams);
logger.debug('The new stream is now active.');

@@ -107,4 +169,17 @@ const { StreamARN, StreamCreationTimestamp } = StreamDescription;

async function getEnhancedConsumers(props) {
const { client, logger, streamArn } = props;
/**
* Retrieves a list of the enhanced fan-out consumers registered for the stream. If any of the
* enhanced consumers is changing status, it will wait until they all are active.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {Object} params.logger - An instance of a logger.
* @param {string} params.streamArn - The ARN of the stream to retrieve enhanced consumers from.
* @fulfil {Array<Object>} - An array of objects with the properties `arn` (the ARN of the
* enhanced consumer), and `status`.
* @returns {Promise}
* @memberof module:stream
*/
async function getEnhancedConsumers(params) {
const { client, logger, streamArn } = params;
const { Consumers } = await client.listStreamConsumers({ StreamARN: streamArn });

@@ -127,3 +202,3 @@ const consumers = Consumers.reduce(

await wait(CONSUMER_STATE_CHECK_DELAY);
return getEnhancedConsumers(props);
return getEnhancedConsumers(params);
}

@@ -133,8 +208,18 @@ return consumers;

async function getStreamShards(props) {
const { client, logger, streamName } = props;
/**
* Returns an object with the information of the stream shards. For each pair, the key corresponds
* to the shard ID, while the value stores the details for the shard.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {Object} params.logger - An instance of a logger.
* @param {string} params.streamName - The name of the stream to get shards for.
* @fulfil {Object} - The shard information as an object hashed by shard ID.
* @returns {Promise}
* @memberof module:stream
*/
async function getStreamShards({ client, logger, streamName }) {
logger.debug(`Retrieving shards for the "${streamName}" stream…`);
const params = { StreamName: streamName };
const { Shards } = await client.listShards(params);
const { Shards } = await client.listShards({ StreamName: streamName });

@@ -163,4 +248,16 @@ const shards = Shards.reduce((obj, item) => {

async function registerEnhancedConsumer(props) {
const { client, consumerName, logger, streamArn } = props;
/**
* Registers a new enhanced fan-out consumer for the given stream. The call won't resolve until
* the new enhanced consumer has become active.
*
* @param {Object} params - The parameters.
* @param {Object} params.client - An instance of the Kinesis client.
* @param {string} params.consumerName - The name for the new enhanced fan-out consumer.
* @param {Object} params.logger - An instance of a logger.
* @param {string} params.streamArn - The ARN of the stream to register the consumer on.
* @fulfil {undefined}
* @returns {Promise}
* @memberof module:stream
*/
async function registerEnhancedConsumer({ client, consumerName, logger, streamArn }) {
logger.debug(`Registering enhanced consumer "${consumerName}"…`);

@@ -174,3 +271,6 @@ let { ConsumerStatus } = await client.registerStreamConsumer({

await wait(CONSUMER_STATE_CHECK_DELAY);
({ ConsumerStatus } = await describeEnhancedConsumer({ client, consumerName, streamArn }));
const { Consumers } = await client.listStreamConsumers({ StreamARN: streamArn });
const consumer = Consumers.find(i => i.ConsumerName === consumerName);
if (consumer) ({ ConsumerStatus } = consumer);
else ConsumerStatus = null;
} while (ConsumerStatus !== 'ACTIVE');

@@ -177,0 +277,0 @@ logger.debug(`The enhanced consumer "${consumerName}" is now active.`);

@@ -0,1 +1,8 @@

/**
* A module with statics to handle DynamoDB tables.
*
* @module table
* @private
*/
'use strict';

@@ -14,2 +21,3 @@

* @returns {string} If the table exists, the ARN of the table, `null` otherwise.
* @private
*/

@@ -54,2 +62,3 @@ async function checkIfTableExists({ client, logger, tableName }) {

* @returns {undefined}
* @memberof module:table
*/

@@ -82,2 +91,3 @@ async function confirmTableTags({ client, logger, tableArn, tags }) {

* @returns {string} The ARN of the new table.
* @memberof module:table
*/

@@ -90,3 +100,3 @@ async function ensureTableExists(params) {

if (tableArn === null) {
if (!tableArn) {
logger.debug('Trying to create the table…');

@@ -93,0 +103,0 @@ let billingMode = { BillingMode: 'PAY_PER_REQUEST' };

{
"name": "lifion-kinesis",
"version": "1.0.7",
"version": "1.0.8",
"description": "Lifion client for Amazon Kinesis Data streams",

@@ -17,2 +17,5 @@ "keywords": [

"author": "Edgardo Avilés <Edgardo.Aviles@ADP.com>",
"maintainers": [
"Mackenzie Turner <turner.mackenzie.m@gmail.com>"
],
"license": "MIT",

@@ -37,6 +40,7 @@ "main": "lib/index.js",

"async-retry": "^1.2.3",
"aws-sdk": "^2.460.0",
"aws-sdk": "^2.468.0",
"aws4": "^1.8.0",
"fast-deep-equal": "^2.0.1",
"got": "^9.6.0",
"is-retry-allowed": "^1.1.0",
"lifion-aws-event-stream": "^1.0.4",

@@ -55,4 +59,3 @@ "lru-cache": "^5.1.1",

"eslint-config-lifion": "^1.2.2",
"eslint-plugin-sort-destructure-keys": "^1.3.0",
"husky": "^2.3.0",
"husky": "^2.4.0",
"jest": "^24.8.0",

@@ -64,3 +67,3 @@ "jest-junit": "^6.4.0",

"prettier": "^1.17.1",
"semver": "^6.0.0"
"semver": "^6.1.1"
},

@@ -87,6 +90,6 @@ "auto-changelog": {

"global": {
"statements": 0,
"branches": 0,
"functions": 0,
"lines": 0
"statements": 100,
"branches": 100,
"functions": 100,
"lines": 100
}

@@ -93,0 +96,0 @@ },

# lifion-kinesis
[![npm version](https://badge.fury.io/js/lifion-kinesis.svg)](http://badge.fury.io/js/lifion-kinesis)
Lifion's Node.js client for [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).

@@ -22,4 +24,4 @@

});
kinesis.on('data', chunk => {
console.log('Incoming data:', chunk);
kinesis.on('data', data => {
console.log('Incoming data:', data);
});

@@ -52,5 +54,2 @@ kinesis.startConsumer();

- Optional auto-creation, encryption, and tagging of Kinesis streams.
**Incoming Features:**
- Support for a polling mode, using the [`GetRecords` API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), with automatic checkpointing.

@@ -63,19 +62,23 @@ - Support for multiple concurrent consumers through automatic assignment of shards.

- [lifion-kinesis](#module_lifion-kinesis)
- [Kinesis](#exp_module_lifion-kinesis--Kinesis) ⇐ <code>external:Readable</code> ⏏
- [Kinesis](#exp_module_lifion-kinesis--Kinesis) ⇐ [<code>PassThrough</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough) ⏏
- [new Kinesis(options)](#new_module_lifion-kinesis--Kinesis_new)
- [.startConsumer()](#module_lifion-kinesis--Kinesis+startConsumer) ⇒ <code>Promise</code>
- [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
- _instance_
- [.startConsumer()](#module_lifion-kinesis--Kinesis+startConsumer) ⇒ <code>Promise</code>
- [.stopConsumer()](#module_lifion-kinesis--Kinesis+stopConsumer)
- [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
- [.getStats()](#module_lifion-kinesis--Kinesis+getStats) ⇒ <code>Object</code>
- _static_
- [.getStats()](#module_lifion-kinesis--Kinesis.getStats) ⇒ <code>Object</code>
<a name="exp_module_lifion-kinesis--Kinesis"></a>
### Kinesis ⇐ <code>external:Readable</code> ⏏
### Kinesis ⇐ [<code>PassThrough</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough) ⏏
A [pass-through stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough) class specialization implementing a
consumer of Kinesis Data Streams using the [AWS SDK for JavaScript](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest).
Incoming data can be retrieved through either the `data` event or by piping the instance to a
writable stream.
A [pass-through stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough) class specialization implementing a consumer
of Kinesis Data Streams using the [AWS SDK for JavaScript](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest). Incoming
data can be retrieved through either the `data` event or by piping the instance to other streams.
**Kind**: Exported class
**Extends**: <code>external:Readable</code>
**Extends**: [<code>PassThrough</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough)
<a name="new_module_lifion-kinesis--Kinesis_new"></a>

@@ -87,25 +90,26 @@

| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the stream consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](AwsJsSdkDynamoDb). |
| [options.dynamoDb.tableName] | <code>string</code> | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
| [options.dynamoDb.tags] | <code>Object</code> | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | <code>Object</code> | | The encryption options to enforce in the stream. |
| [options.encryption.type] | <code>string</code> | | The encryption type to use. |
| [options.encryption.keyId] | <code>string</code> | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.logger] | <code>Object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.noRecordsPollDelay] | <code>number</code> | <code>1000</code> | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.pollDelay] | <code>number</code> | <code>250</code> | When the `usePausedPolling` option is `false`, this option defines the delay in milliseconds in between poll requests for more records (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.shardCount] | <code>number</code> | <code>1</code> | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set) |
| [options.statsInterval] | <code>number</code> | <code>30000</code> | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. |
| options.streamName | <code>string</code> | | The name of the stream to consume data from (required) |
| [options.tags] | <code>Object</code> | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.useAutoCheckpoints] | <code>boolean</code> | <code>true</code> | Set to `true` to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to `false` consumers can use the `setCheckpoint()` function to store any sequence number as the checkpoint for the shard. |
| [options.useAutoShardAssignment] | <code>boolean</code> | <code>true</code> | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. |
| [options.useEnhancedFanOut] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client use enhanced fan-out consumers to read from shards. |
| [options.usePausedPolling] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client not to poll for more records until the consumer calls `continuePolling()`. This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when `useEnhancedFanOut` is set to `false`) |
| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property). |
| [options.dynamoDb.tableName] | <code>string</code> | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
| [options.dynamoDb.tags] | <code>Object</code> | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | <code>Object</code> | | The encryption options to enforce in the stream. |
| [options.encryption.type] | <code>string</code> | | The encryption type to use. |
| [options.encryption.keyId] | <code>string</code> | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.limit] | <code>number</code> | <code>10000</code> | The limit of records per get records call (only applicable with `useEnhancedFanOut` is set to `false`) |
| [options.logger] | <code>Object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.noRecordsPollDelay] | <code>number</code> | <code>1000</code> | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.pollDelay] | <code>number</code> | <code>250</code> | When the `usePausedPolling` option is `false`, this option defines the delay in milliseconds in between poll requests for more records (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.shardCount] | <code>number</code> | <code>1</code> | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set) |
| [options.statsInterval] | <code>number</code> | <code>30000</code> | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. |
| options.streamName | <code>string</code> | | The name of the stream to consume data from (required) |
| [options.tags] | <code>Object</code> | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.useAutoCheckpoints] | <code>boolean</code> | <code>true</code> | Set to `true` to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to `false` consumers can use the `setCheckpoint()` function to store any sequence number as the checkpoint for the shard. |
| [options.useAutoShardAssignment] | <code>boolean</code> | <code>true</code> | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. |
| [options.useEnhancedFanOut] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client use enhanced fan-out consumers to read from shards. |
| [options.usePausedPolling] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client not to poll for more records until the consumer calls `continuePolling()`. This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when `useEnhancedFanOut` is set to `false`) |

@@ -116,9 +120,16 @@ <a name="module_lifion-kinesis--Kinesis+startConsumer"></a>

Initializes the client, by ensuring that the stream exists, it's ready, and configured as
requested. The internal managers that deal with heartbeats, state, and consumers will also
be started.
Starts the stream consumer, by ensuring that the stream exists, that it's ready, and
configured as requested. The internal managers that deal with heartbeats, state, and
consumers will also be started.
**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Fulfil**: Once the client has successfully started.
**Fulfil**: <code>undefined</code> - Once the consumer has successfully started.
**Reject**: <code>Error</code> - On any unexpected error while trying to start.
<a name="module_lifion-kinesis--Kinesis+stopConsumer"></a>
#### kinesis.stopConsumer()
Stops the stream consumer. The internal managers will also be stopped.
**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
<a name="module_lifion-kinesis--Kinesis+putRecord"></a>

@@ -128,13 +139,16 @@

Puts a record to a stream.
Writes a single data record into a stream.
**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Fulfil**: If record is successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.
**Fulfil**: <code>Object</code> - The de-serialized data returned from the request.
**Reject**: <code>Error</code> - On any unexpected error while writing to the stream.
| Param | Type | Description |
| ------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The putRecord parameters. In addition to the params described here, uses [`AWS.Kinesis.putRecord` parameters](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property) from the `AWS.Kinesis.putRecord` method in camel case. |
| params.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [params.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| ---------------------------------- | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The parameters. |
| params.data | <code>\*</code> | The data to put into the record. |
| [params.explicitHashKey] | <code>string</code> | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. |
| [params.partitionKey] | <code>string</code> | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. |
| [params.sequenceNumberForOrdering] | <code>string</code> | Set this to the sequence number obtained from the last put record operation to guarantee strictly increasing sequence numbers, for puts from the same client and to the same partition key. If omitted, records are coarsely ordered based on arrival time. |
| [params.streamName] | <code>string</code> | If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation. |

@@ -145,17 +159,36 @@ <a name="module_lifion-kinesis--Kinesis+putRecords"></a>

Batch puts multiple records to a stream.
Writes multiple data records into a stream in a single call.
**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Fulfil**: If records are successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.
**Fulfil**: <code>Object</code> - The de-serialized data returned from the request.
**Reject**: <code>Error</code> - On any unexpected error while writing to the stream.
| Param | Type | Description |
| ------------------- | ------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The putRecords parameters. In addition to the params described here, uses [`AWS.Kinesis.putRecords` parameters](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property) from the `AWS.Kinesis.putRecords` method in camel case. |
| params.records | <code>Array</code> | A list of records to push to a Kinesis stream. |
| params.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [params.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| ---------------------------------- | --------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The parameters. |
| params.records | <code>Array.&lt;Object&gt;</code> | The records associated with the request. |
| params.records[].data | <code>\*</code> | The record data. |
| [params.records[].explicitHashKey] | <code>string</code> | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. |
| [params.records[].partitionKey] | <code>string</code> | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. |
| [params.streamName] | <code>string</code> | If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation. |
<a name="module_lifion-kinesis--Kinesis+getStats"></a>
#### kinesis.getStats() ⇒ <code>Object</code>
Returns statistics for the instance of the client.
**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Returns**: <code>Object</code> - An object with the statistics.
<a name="module_lifion-kinesis--Kinesis.getStats"></a>
#### Kinesis.getStats() ⇒ <code>Object</code>
Returns the aggregated statistics of all the instances of the client.
**Kind**: static method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Returns**: <code>Object</code> - An object with the statistics.
## License
[MIT](./LICENSE)
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc