Introducing Socket Firewall: Free, Proactive Protection for Your Software Supply Chain.Learn More
Socket
Book a DemoInstallSign in
Socket

@coorpacademy/kinesis

Package Overview
Dependencies
Maintainers
14
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@coorpacademy/kinesis

A stream implementation of Amazon's Kinesis forked from mhart/heroku/kinesis

latest
Source
npmnpm
Version
2.2.4
Version published
Maintainers
14
Created
Source

kinesis

npm Build Status codecov

A Node.js stream implementation of Amazon's Kinesis.

Allows the consumer to pump data directly into (and out of) a Kinesis stream.

This makes it trivial to setup Kinesis as a logging sink with Bunyan, or any other logging library.

For setting up a local Kinesis instance (eg for testing), check out Kinesalite.

Installation

npm install --save @coorpacademy/kinesis

Note, this is a fork from @heroku kinesis which was a fork of @mhart kinesis who is the author of Kinesalite the local implementation of kinesis. Original kinesis library can be found there

Example

const fs = require('fs');
const {Transform} = require('stream');
const kinesis = require('@coorpacademy/kinesis');
const {KinesisStream} = kinesis;

// Uses credentials from process.env by default

kinesis.listStreams({region: 'us-west-1'}, function(err, streams) {
  if (err) throw err;

  console.log(streams); // ["http-logs", "click-logs"]
});

const kinesisSink = kinesis.stream('http-logs');
// OR new KinesisStream('http-logs')

fs.createReadStream('http.log').pipe(kinesisSink);

const kinesisSource = kinesis.stream({name: 'click-logs', oldest: true});

// Data is retrieved as Record objects, so let's transform into Buffers
const bufferify = new Transform({objectMode: true});
bufferify._transform = function(record, encoding, cb) {
  cb(null, record.Data);
};

kinesisSource.pipe(bufferify).pipe(fs.createWriteStream('click.log'));

// Create a new Kinesis stream using the raw API
kinesis.request('CreateStream', {StreamName: 'test', ShardCount: 2}, function(err) {
  if (err) throw err;

  kinesis.request('DescribeStream', {StreamName: 'test'}, function(err, data) {
    if (err) throw err;

    console.dir(data);
  });
});

API

kinesis.stream(options)

new KinesisStream(options)

Returns a readable and writable Node.js stream for the given Kinesis stream

options include:

  • region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default)
  • credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default)
  • shards: an array of shard IDs, or shard objects. If not provided, these will be fetched and cached.
  • oldest: if truthy, then will start at the oldest records (using TRIM_HORIZON) instead of the latest
  • writeConcurrency: how many parallel writes to allow (1 by default)
  • cacheSize: number of PartitionKey-to-SequenceNumber mappings to cache (1000 by default)
  • agent: HTTP agent used (uses Node.js defaults otherwise)
  • timeout: HTTP request timeout (uses Node.js defaults otherwise)
  • initialRetryMs: first pause before retrying under the default policy (50 by default)
  • maxRetries: max number of retries under the default policy (10 by default)
  • errorCodes: array of Node.js error codes to retry on (['EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE'] by default)
  • errorNames: array of Kinesis exceptions to retry on (['ProvisionedThroughputExceededException', 'ThrottlingException'] by default)
  • retryPolicy: a function to implement a retry policy different from the default one
  • logger: an object which implements a log method, e.g. console.
  • endpoint: new configuration to specify host, port and protocol (https or not) a once

kinesis.listStreams([options], callback)

Calls the callback with an array of all stream names for the AWS account

kinesis.request(action, [data], [options], callback)

Makes a generic Kinesis request with the given action (eg, ListStreams) and data as the body.

options include:

  • region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default)
  • credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default)
  • agent: HTTP agent used (uses Node.js defaults otherwise)
  • timeout: HTTP request timeout (uses Node.js defaults otherwise)
  • initialRetryMs: first pause before retrying under the default policy (50 by default)
  • maxRetries: max number of retries under the default policy (10 by default)
  • errorCodes: array of Node.js error codes to retry on (['EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE'] by default)
  • errorNames: array of Kinesis exceptions to retry on (['ProvisionedThroughputExceededException', 'ThrottlingException'] by default)
  • retryPolicy: a function to implement a retry policy different from the default one
  • endpoint: new configuration to specify host, port and protocol (https or not) a once

Note, if targeting localhost/127.0.0.1, or having specified http as protocol, you won't get any error for self-signed certificates. (which is what you need in a docker testing context)

Keywords

kinesis

FAQs

Package last updated on 17 Jan 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts