Security News
pnpm 10.0.0 Blocks Lifecycle Scripts by Default
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.
This package provides an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon for the Node.js framework.
Developers can use the KCL to build distributed applications that process streaming data reliably at scale. The KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume.
This package wraps and manages the interaction with the MultiLangDaemon, which is provided as part of the Amazon KCL for Java so that developers can focus on implementing their record processing logic.
A record processor in Node.js typically looks like the following:
var kcl = require('aws-kcl');
var util = require('util');
/**
* The record processor must provide three functions:
*
* * `initialize` - called once
* * `processRecords` - called zero or more times
* * `shutdown` - called if this KCL instance loses the lease to this shard
*
* Notes:
* * All of the above functions take additional callback arguments. When one is
* done initializing, processing records, or shutting down, callback must be
* called (i.e., `completeCallback()`) in order to let the KCL know that the
* associated operation is complete. Without the invocation of the callback
* function, the KCL will not proceed further.
* * The application will terminate if any error is thrown from any of the
* record processor functions. Hence, if you would like to continue processing
* on exception scenarios, exceptions should be handled appropriately in
* record processor functions and should not be passed to the KCL library. The
* callback must also be invoked in this case to let the KCL know that it can
* proceed further.
*/
var recordProcessor = {
/**
* Called once by the KCL before any calls to processRecords. Any initialization
* logic for record processing can go here.
*
* @param {object} initializeInput - Initialization related information.
* Looks like - {"shardId":"<shard_id>"}
* @param {callback} completeCallback - The callback that must be invoked
* once the initialization operation is complete.
*/
initialize: function(initializeInput, completeCallback) {
// Initialization logic ...
completeCallback();
},
/**
* Called by KCL with a list of records to be processed and checkpointed.
* A record looks like:
* {"data":"<base64 encoded string>","partitionKey":"someKey","sequenceNumber":"1234567890"}
*
* The checkpointer can optionally be used to checkpoint a particular sequence
* number (from a record). If checkpointing, the checkpoint must always be
* invoked before calling `completeCallback` for processRecords. Moreover,
* `completeCallback` should only be invoked once the checkpoint operation
* callback is received.
*
* @param {object} processRecordsInput - Process records information with
* array of records that are to be processed. Looks like -
* {"records":[<record>, <record>], "checkpointer":<Checkpointer>}
* where <record> format is specified above.
* @param {callback} completeCallback - The callback that must be invoked
* once all records are processed and checkpoint (optional) is
* complete.
*/
processRecords: function(processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
var records = processRecordsInput.records;
var record, sequenceNumber, partitionKey, data;
for (var i = 0 ; i < records.length ; ++i) {
record = records[i];
sequenceNumber = record.sequenceNumber;
partitionKey = record.partitionKey;
// Note that "data" is a base64-encoded string. Buffer can be used to
// decode the data into a string.
data = new Buffer(record.data, 'base64').toString();
// Custom record processing logic ...
}
if (!sequenceNumber) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
// If checkpointing, only call completeCallback once checkpoint operation
// is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber,
function(err, checkpointedSequenceNumber) {
// In this example, regardless of error, we mark processRecords
// complete to proceed further with more records.
completeCallback();
}
);
},
/**
* Called by the KCL to indicate that this record processor should shut down.
* After the lease lost operation is complete, there will not be any more calls to
* any other functions of this record processor. Clients should not attempt to
* checkpoint because the lease has been lost by this Worker.
*
* @param {object} leaseLostInput - Lease lost information.
* @param {callback} completeCallback - The callback must be invoked once lease
* lost operations are completed.
*/
leaseLost: function(leaseLostInput, completeCallback) {
// Lease lost logic ...
completeCallback();
},
/**
* Called by the KCL to indicate that this record processor should shutdown.
* After the shard ended operation is complete, there will not be any more calls to
* any other functions of this record processor. Clients are required to checkpoint
* at this time. This indicates that the current record processor has finished
* processing and new record processors for the children will be created.
*
* @param {object} shardEndedInput - ShardEnded information. Looks like -
* {"checkpointer": <Checpointer>}
* @param {callback} completeCallback - The callback must be invoked once shard
* ended operations are completed.
*/
shardEnded: function(shardEndedInput, completeCallback) {
// Shard end logic ...
// Since you are checkpointing, only call completeCallback once the checkpoint
// operation is complete.
shardEndedInput.checkpointer.checkpoint(function(err) {
// In this example, regardless of the error, we mark the shutdown operation
// complete.
completeCallback();
});
completeCallback();
}
};
kcl(recordProcessor).run();
Before you begin, Node.js and NPM must be installed on your system. For download instructions for your platform, see http://nodejs.org/download/.
To get the sample KCL application and bootstrap script, you need git.
Amazon KCL for Node.js uses MultiLangDaemon provided by Amazon KCL for Java. You also need Java version 1.8 or higher installed.
Before running the samples, make sure that your environment is configured to allow the samples to use your AWS Security Credentials, which are used by MultiLangDaemon to interact with AWS services.
By default, the MultiLangDaemon uses the DefaultCredentialsProvider, so make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this. You can provide credentials through a ~/.aws/credentials
file or through environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY). If you're running on Amazon EC2, you can associate an IAM role with your instance with appropriate access.
For more information about Amazon Kinesis and the client libraries, see the Amazon Kinesis documentation as well as the Amazon Kinesis forums.
The Amazon KCL for Node.js repository contains source code for the KCL, a sample data producer and data consumer (processor) application, and the bootstrap script.
To run sample applications, you need to get all required NPM modules. From the root of the repository, execute the following command:
npm install
This downloads all dependencies for running the bootstrap script as well as the sample application.
The sample application consists of two components:
samples/basic_sample/producer/sample_kinesis_producer_app.js
): this script creates an Amazon Kinesis stream and starts putting 10 random records into it.samples/basic_sample/consumer/sample_kcl_app.js
): this script is invoked by the MultiLangDaemon, consumes the data from the Amazon Kinesis stream, and stores received data into files (1 file per shard).The following defaults are used in the sample application:
kclnodejssample
kclnodejssample
kclnodejssample
To run the data producer, execute the following commands from the root of the repository:
cd samples/basic_sample/producer
node sample_kinesis_producer_app.js
samples/basic_sample/producer/sample_kinesis_producer_app.js
takes several parameters that you can use to customize its behavior. To change default parameters, change values in the file samples/basic_sample/producer/config.js
.To start the data processor, run the following command from the root of the repository:
cd samples/basic_sample/consumer
../../../bin/kcl-bootstrap --java /usr/bin/java -e -p ./sample.properties
samples/basic_sample/consumer/sample.properties
controls which Amazon KCL for Node.js application is run. You can specify your own properties file with the -p
or --properties
argument.JAVA_HOME
to locate the java binary. To specify your own java home path, use the -j
or --java
argument when invoking the bootstrap script.-e
or --execute
argument to the bootstrap script. kcl-bootstrap --help
This sample application creates an Amazon Kinesis stream, sends data to it, and creates a DynamoDB table to track the KCL application state. This will incur nominal costs to your AWS account, and continue to do so even when the sample app is finished. To stop being charged, delete these resources. Specifically, the sample application creates following AWS resources:
kclnodejssample
kclnodejssample
You can delete these using the AWS Management Console.
Log into an Amazon EC2 instance running Amazon Linux, then perform the following steps to prepare your environment for running the sample application. Note the version of Java that ships with Amazon Linux can be found at /usr/bin/java
and should be 1.8 or greater.
# install node.js, npm and git
sudo yum install nodejs npm --enablerepo=epel
sudo yum install git
# clone the git repository to work with the samples
git clone https://github.com/awslabs/amazon-kinesis-client-nodejs.git kclnodejs
cd kclnodejs/samples/basic_sample/producer/
# download dependencies
npm install
# run the sample producer
node sample_kinesis_producer_app.js &
# ...and in another terminal, run the sample consumer
export PATH=$PATH:kclnodejs/bin
cd kclnodejs/samples/basic_sample/consumer/
kcl-bootstrap --java /usr/bin/java -e -p ./sample.properties > consumer.out 2>&1 &
To get the Amazon KCL for Node.js module from NPM, use the following command:
npm install aws-kcl
Amazon KCL for Node.js uses Amazon KCL for Java internally. We have implemented a Java-based daemon, called the MultiLangDaemon that does all the heavy lifting. The daemon launches the user-defined record processor script/program as a sub-process, and then communicates with this sub-process over standard input/output using a simple protocol. This allows support for any language. This approach enables the Amazon KCL to be language-agnostic, while providing identical features and similar parallel processing model across all languages.
At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an Amazon Kinesis shard. The MultiLangDaemon ensures that, without any developer intervention.
In this release, we have abstracted these implementation details away and exposed an interface that enables you to focus on writing record processing logic in Node.js.
shutdownRequested()
method.isGracefulLeaseHandoffEnabled
.idleTimeBetweenReadsInMillis
(PollingConfig) now has a minimum default value of 200.
LeaseAssignmentManager
WorkerMetricStatsReporter
LeaseDiscovery
KCLManager
.shutdown
method will continue to work it's recommended to upgrade to the newer interface.
shutdown
has been replaced by leaseLost
and shardEnded
.leaseLost
method which is invoked when a lease is lost.leaseLost
replaces shutdown
where shutdownInput.reason
was ZOMBIE
.shardEnded
method which is invoked when all records from a split or merge have been processed.shardEnded
replaces shutdown
where shutdownInput.reason
was TERMINATE
.kcl-bootstrap
program was been updated to accept either -l
or --log-configuration
to provide a Logback XML configuration file.ListShards
is a new API, and may require updating any explicit IAM policieshttps
.
timeoutInSeconds = <timeout value>
. The default for this is no timeout.aws-kcl
npm module allows implementation of record processors in Node.js using the Amazon KCL MultiLangDaemon.samples
directory contains a sample producer and processing applications using the Amazon KCL for Node.js.This library is licensed under the Apache 2.0 License.
FAQs
Kinesis Client Libray (KCL) in Node.js.
The npm package aws-kcl receives a total of 1,410 weekly downloads. As such, aws-kcl popularity was classified as popular.
We found that aws-kcl demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.