lifion-kinesis
Lifion's Node.js client for Amazon Kinesis Data Streams.
Getting Started
To install the module:
npm install lifion-kinesis --save
The main module export is a Kinesis class that instantiates as a readable stream.
const Kinesis = require('lifion-kinesis');
const client = new Kinesis({
streamName: 'sample-stream',
consumerName: 'sample-consumer'
});
client.on('data', chunk => {
console.log('Incoming data:', chunk);
});
To take advantage of back-pressure, the client can be piped to a writable stream:
const Kinesis = require('lifion-kinesis');
const { pipeline } = require('stream');
pipeline([
new Kinesis(),
new Writable({
objectMode: true,
write(data, encoding, callback) {
console.log(data);
callback();
}
})
]);
Features
- Standard Node.js stream abstraction of Kinesis streams.
- Node.js implementation of the new enhanced fan-out feature.
- Optional auto-creation, encryption, and tagging of Kinesis streams.
Incoming Features:
- Support for a polling mode, using the
GetRecords
API, with automatic checkpointing. - Support for multiple concurrent consumers through automatic assignment of shards.
- Support for sending messages to streams, with auto-retries.
API Reference
A specialization of the readable stream class implementing a
consumer of Kinesis Data Streams using the
enhanced fan-out feature. Upon connection, instances of this
class will subscribe to receive data from all the shards of the given stream. Incoming data can
be retrieved through either the data
event or by piping the instance to a writable stream.
Kind: Exported class
Extends: Readable
new Kinesis(options)
Initializes a new instance of the Kinesis client.
Param | Type | Default | Description |
---|
options | Object | | The initialization options. In addition to the below options, this object can also contain the AWS.Kinesis options. |
[options.compression] | string | | The kind of data compression to use with records. The currently available compression options are either "LZ-UTF8" or none. |
options.consumerName | string | | The unique name of the consumer for the given stream. This option is required. |
[options.createStreamIfNeeded] | boolean | false | Whether if the Kinesis stream should be created if it doesn't exist upon connection. |
[options.compression] | Object | | The kind of compression to enforce in the stream. |
[options.compression.type] | string | | The encryption type to use. |
[options.compression.keyId] | string | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
[options.logger] | Object | | An object with the warn , debug , and error functions that will be used for logging purposes. If not provided, logging will be omitted. |
[options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is set). |
options.streamName | string | | The name of the stream to consume data from. This option is required. |
[options.tags] | Object | | If provided, the client will ensure that the stream is tagged with these hash of tags upon connection. If the stream is already tagged same tag keys, they won't be overriden. If the stream is already tagged with different tag keys, they won't be removed. |
kinesis.connect() ⇒ Promise
Initializes the Kinesis client, then it proceeds to:
- Create the stream if asked for.
- Ensure that the stream is active.
- Ensure that the stream is encrypted as indicated.
- Ensure that the stream is tagged as requested.
- Ensure an enhanced fan-out consumer with the given name exists.
- Ensure that the enhanced fan-out consumer is active.
- A subscription for data is issued to all the shard in the stream.
- Data will then be available in both stream read modes.
Kind: instance method of Kinesis
Fulfil: Once the stream is active, encrypted, tagged, the enhanced fan-out consumer is active,
and the client is subscribed to the data in all the stream shards.
Reject: Error
- If at least one of the above steps fails to succeed.
License
MIT