Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-kinesis

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-kinesis

AsyncIO Kinesis Library

  • 1.1.5
  • PyPI
  • Socket score

Maintainers
1

async-kinesis

Code style: black PyPI version Python 3.7 Python 3.8

pip install async-kinesis

Features

  • uses queues for both producer and consumer
    • producer flushes with put_records() if has enough to flush or after "buffer_time" reached
    • consumer iterates over msg queue independent of shard readers
  • Configurable to handle Sharding limits but will throttle/retry if required
    • ie multiple independent clients are saturating the Shards
  • Checkpointing with heartbeats
    • deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"
  • processors (aggregator + serializer)
    • json line delimited, msgpack

See docs/design for more details. See docs/yetanother as to why reinvent the wheel.

Environment Variables

As required by boto3

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

Producer

from kinesis import Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

ArgDefaultDescription
sessionNoneAioSession (to use non default profile etc)
region_nameNoneAWS Region
buffer_time0.5Buffer time in seconds before auto flushing records
put_rate_limit_per_shard1000"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes"
put_bandwidth_limit_per_shard1024Kb per sec. max is 1024 per shard (ie 1 MiB). Keep below to minimize ProvisionedThroughputExceeded" errors *
batch_size500"Each PutRecords request can support up to 500 records"
max_queue_size10000put() method will block when queue is at max
after_flush_funNoneasync function to call after doing a flush (err put_records()) call
processorJsonProcessor()Record aggregator/serializer. Default is JSON without aggregation. Note this is highly inefficient as each record can be up to 1Mib
retry_limitNoneHow many connection attempts should be made before raising a exception
expo_backoffNoneExponential Backoff when connection attempt fails
expo_backoff_limit120Max amount of seconds Exponential Backoff can grow
create_streamFalseCreates a Kinesis Stream based on the stream_name keyword argument. Note if stream already existing it will ignore
create_stream_shards1Sets the amount of shard you want for your new stream. Note if stream already existing it will ignore

You can lower this limit to reduce spamming due to excessive retries. However, the best practice is for each producer is to retry for maximum throughput aggressively and to handle any resulting throttling determined as excessive by expanding the capacity of the stream and implementing an appropriate partition key strategy.

Even though our default here is to limit at this threshold (1024kb) in reality the threshold seems lower (~80%). If you wish to avoid excessive throttling or have multiple producers on a stream you will want to set this quite a bit lower.

Consumer

from kinesis import Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

Options:

(comments in quotes are Kinesis Limits as per AWS Docs)

ArgDefaultDescription
sessionNoneAioSession (to use non default profile etc)
region_nameNoneAWS Region
max_queue_size10000the fetch() task shard will block when queue is at max
max_shard_consumersNoneMax number of shards to use. None = all
record_limit10000Number of records to fetch with get_records()
sleep_time_no_records2No of seconds to sleep when caught up
iterator_typeTRIM_HORIZONDefault shard iterator type for new/unknown shards (ie start from start of stream). Alternatives are "LATEST" (ie end of stream), "AT_TIMESTAMP" (ie particular point in time, requires defining timestamp arg)
shard_fetch_rate1No of fetches per second (max = 5). 1 is recommended as allows having multiple consumers without hitting the max limit.
checkpointerMemoryCheckPointer()Checkpointer to use
processorJsonProcessor()Record aggregator/serializer. Must Match processor used by Producer()
retry_limitNoneHow many connection attempts should be made before raising a exception
expo_backoffNoneExponential Backoff when connection attempt fails
expo_backoff_limit120Max amount of seconds Exponential Backoff can grow
create_streamFalseCreates a Kinesis Stream based on the stream_name keyword argument. Note if stream already existing it will ignore
create_stream_shards1Sets the amount of shard you want for your new stream. Note if stream already existing it will ignore
timestampNoneTimestamp to start reading stream from. Used with iterator type "AT_TIMESTAMP"

Checkpointers

  • memory (the default but kinda pointless)
    MemoryCheckPointer()
  • redis
    RedisCheckPointer(name, session_timeout=60, heartbeat_frequency=15, is_cluster=False)

Requires ENV:

    REDIS_HOST

Requires pip install aredis

Processors (Aggregator + Serializer)

Aggregation enable batching up multiple records to more efficiently use the stream. Refer https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

ClassAggregatorSerializerDescription
StringProcessorSimpleAggregatorStringSerializerSingle String record
JsonProcessorSimpleAggregatorJsonSerializerSingle JSON record
JsonLineProcessorNewlineAggregatorJsonSerializerMultiple JSON record separated by new line char
JsonListProcessorListAggregatorJsonSerializerMultiple JSON record returned by list
MsgpackProcessorNetstringAggregatorMsgpackSerializerMultiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring)
KPLJsonProcessorKPLAggregatorJsonSerializerMultiple JSON record in a KPL Aggregated Record (https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md)
KPLStringProcessorKPLAggregatorStringSerializerMultiple String record in a KPL Aggregated Record (https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md)

Note you can define your own processor easily as it's simply a class inheriting the Aggregator + Serializer.

class MsgpackProcessor(Processor, NetstringAggregator, MsgpackSerializer):
    pass

Just define a new Serializer class with serialize() and deserialize() methods.

Note:

  • Json will use pip install ujson if installed
  • Msgpack requires pip install msgpack to be installed
  • KPL requires pip install aws-kinesis-agg to be installed

Benchmark/Example

See benchmark.py for code

50k items of approx 1k (python) in size, using single shard.

Benchmark

Unit Testing

Uses https://github.com/mhart/kinesalite for local testing.

Run tests via docker

docker-compose up --abort-on-container-exit --exit-code-from test

For local testing use

docker-compose up kinesis redis

then within your virtualenv

nosetests

# or run individual test
nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded

Note there are a few test cases using the actual AWS Kinesis (AWSKinesisTests) These require setting an env in order to run

Create an ".env" file with

TESTING_USE_AWS_KINESIS=1

Note you can ignore these tests if submitting PR unless core batching/processing behaviour is being changed.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc