lifion-kinesis
Lifion's Node.js client for Amazon Kinesis Data Streams.
Getting Started
To install the module:
npm install lifion-kinesis --save
The main module export is a Kinesis class that instantiates as a readable stream.
const Kinesis = require('lifion-kinesis');
const client = new Kinesis({
streamName: 'sample-stream',
consumerName: 'sample-consumer'
});
client.on('data', chunk => {
console.log('Incoming data:', chunk);
});
To take advantage of back-pressure, the client can be piped to a writable stream:
const Kinesis = require('lifion-kinesis');
const { pipeline } = require('stream');
pipeline([
new Kinesis(),
new Writable({
objectMode: true,
write(data, encoding, callback) {
console.log(data);
callback();
}
})
]);
Features
- Standard Node.js stream abstraction of Kinesis streams.
- Node.js implementation of the new enhanced fan-out feature.
- Optional auto-creation, encryption, and tagging of Kinesis streams.
Incoming Features:
- Support for a polling mode, using the
GetRecords
API, with automatic checkpointing. - Support for multiple concurrent consumers through automatic assignment of shards.
- Support for sending messages to streams, with auto-retries.
API Reference
Kinesis ⇐ external:Readable
⏏
A pass-through stream class specialization implementing a
consumer of Kinesis Data Streams using the AWS SDK for JavaScript.
Incoming data can be retrieved through either the data
event or by piping the instance to a
writable stream.
Kind: Exported class
Extends: external:Readable
new Kinesis(options)
Initializes a new instance of the Kinesis client.
Param | Type | Default | Description |
---|
options | Object | | The initialization options. In addition to the below options, it can also contain any of the AWS.Kinesis options. |
[options.compression] | string | | The kind of data compression to use with records. The currently available compression options are either "LZ-UTF8" or none. |
[options.consumerGroup] | string | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
[options.createStreamIfNeeded] | boolean | true | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
[options.dynamoDb] | Object | {} | The initialization options for the DynamoDB client used to store the state of the stream consumers. In addition to tableNames and tags , it can also contain any of the AWS.DynamoDB options. |
[options.dynamoDb.tableName] | string | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
[options.dynamoDb.tags] | Object | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
[options.encryption] | Object | | The encryption options to enforce in the stream. |
[options.encryption.type] | string | | The encryption type to use. |
[options.encryption.keyId] | string | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
[options.logger] | Object | | An object with the warn , debug , and error functions that will be used for logging purposes. If not provided, logging will be omitted. |
[options.noRecordsPollDelay] | number | 1000 | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when useEnhancedFanOut is set to false ) |
[options.pollDelay] | number | 250 | When the usePausedPolling option is false , this option defines the delay in milliseconds in between poll requests for more records (only applicable when useEnhancedFanOut is set to false ) |
[options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is set) |
options.streamName | string | | The name of the stream to consume data from (required) |
[options.tags] | Object | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
[options.useAutoCheckpoints] | boolean | true | Set to true to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to false consumers can use the setCheckpoint() function to store any sequence number as the checkpoint for the shard. |
[options.useAutoShardAssignment] | boolean | true | Set to true to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to false to make the client read from all shards. |
[options.useEnhancedFanOut] | boolean | false | Set to true to make the client use enhanced fan-out consumers to read from shards. |
[options.usePausedPolling] | boolean | false | Set to true to make the client not to poll for more records until the consumer calls continuePolling() . This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when useEnhancedFanOut is set to false ) |
kinesis.startConsumer() ⇒ Promise
Initializes the client, by ensuring that the stream exists, it's ready, and configured as
requested. The internal managers that deal with heartbeats, state, and consumers will also
be started.
Kind: instance method of Kinesis
Fulfil: Once the client has successfully started.
Reject: Error
- On any unexpected error while trying to start.
kinesis.putRecord(options) ⇒ Promise
Puts a record to a stream.
Kind: instance method of Kinesis
Fulfil: If record is successfully pushed to stream.
Reject: Error
- On any unexpected error while pushing to stream.
Param | Type | Description |
---|
options | Object | The putRecord options. In addition to the params described here, uses AWS.Kinesis.putRecord options from the AWS.Kinesis.putRecord method in camel case. |
options.data | Object | string | The data to be used as the Kinesis message. |
[options.streamName] | string | If provided, overrides the stream name provided on client instantiation. |
kinesis.putRecords(options) ⇒ Promise
Batch puts multiple records to a stream.
Kind: instance method of Kinesis
Fulfil: If records are successfully pushed to stream.
Reject: Error
- On any unexpected error while pushing to stream.
Param | Type | Description |
---|
options | Object | The putRecords options. In addition to the params described here, uses AWS.Kinesis.putRecords options from the AWS.Kinesis.putRecords method in camel case. |
options.records | Array | A list of records to push to a Kinesis stream. |
options.records.data | Object | string | The data to be used as the Kinesis message. |
[options.streamName] | string | If provided, overrides the stream name provided on client instantiation. |
License
MIT