Security News
PyPI’s New Archival Feature Closes a Major Security Gap
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
Redis Streams client implementation for high availability usage including consumer, monitor and scaler implementation
This package builds on Redis Streams and provides an easy to use interface for batch collection and processing. Simplifies the consumer group and consumers management. Designed for a highly available, scalable and distributed environment, it thus offers, in addition to the main functionality, monitoring and scaling capabilities.
The main idea is that Redis Streams supports several message producers. The messages then organized into consumer groups where multiple consumers can collect a batch of items, process them and acknowledge the successfully processed ones. If processing fails, the message has not been acknowledged will be part of the next batch. In case of consumer failure the monitor component will re-assign the related messages to a healthy consumer this way messages don't get lost. Optional scaling component monitors incoming/processed message rate and suggests consumer scale if necessary
Latest version:
pip3 install redis-streams
Overview of the components Image source: tgrall.github.io
As its name suggests, this component is responsible for providing the messages in the stream. Redis supports multiple providers.
redis_conn = Redis()
sample_data = {"message": "stuff goes here"}
redis_conn.xadd(name=STREAM, fields=sample_data)
The consumer registers in the consumer group and start fetching for available messages. Once a preconfigured batch size is reached, it gives back the list of items to the caller which then can acknowledge this way remove from the Stream the message. The consumer implementation returns after the preconfigured maximum weight time, even if the lot is not full. This way the items won't wait long in the stream
# It is crucial to enable "decode_response" feature of Redis
redis_conn = Redis(decode_responses=True)
consumer = Consumer(
redis_conn=redis_conn,
stream=STREAM,
consumer_group=GROUP,
batch_size=10,
max_wait_time_ms=30000
)
while True:
messages = consumer.get_items()
total_no_of_messages = len(messages)
for i, item in enumerate(messages):
print(f"Pocessing {i}/{total_no_of_messages} message:{item}")
process_message(item=item)
consumer.remove_item_from_stream(item_id=item.msgid)
Periodically check the activity of the consumers warns if they are idle - not fetching message from the Stream for longer than the preconfigured inactivity threshold or have more assigned messages than the batch size. Automatic or on-demand cleanup are also supported.
monitor = Monitor(
redis_conn=Redis(),
stream=STREAM,
consumer_group=GROUP,
batch_size=10, # batch size has to be tha same as for consumers
)
monitor.collect_monitoring_data(auto_cleanup=True)
monitor.print_monitoring_data()
Output
+-------------------------+-------------+-----------------+----------------------------------+
| Consumer id | Idle time | Pending items | Status |
+=========================+=============+=================+==================================+
| b'29102140026848155456' | 923 | 7 | OK |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29104139791624517440' | 294191 | 5 | WARNING - idle for long time |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29144140168467982144' | 361502 | 8 | WARNING - idle for long time |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29304140033034540864' | 8658 | 11 | WARNING - too many pending items |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29312139940580673344' | 11734 | 58 | WARNING - too many pending items |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29314139867734665024' | 14216 | 1 | OK |
+-------------------------+-------------+-----------------+----------------------------------+
By checking the number of messages waiting to be assigned and the number of pending items, utilization ratio can be calculated. Once this rate crosses a lower (scale in) or higher (scale out) the code will give a suggestion of scale in / out.
scaler = Scaler(
redis_conn=Redis(decode_responses=True),
stream=STREAM,
consumer_group=GROUP
)
scaler.collect_metrics()
rate, suggestion = scaler.get_scale_decision(
scale_out_rate=60, scale_in_rate=20
)
print(
f"Consumers should be {suggestion} as stream length "
f"({scaler.stream_lenght}) / pending ({scaler.stream_pending}) "
f"rate is {rate}%"
)
Output
Consumers should be IN as stream length (11) / pending (83) rate is 13.253%
Consumers should be NO_SCALE as stream length (18) / pending (79) rate is 22.7848%
This project is licensed under the terms of the GPL3.0.
FAQs
Redis Streams client implementation for high availability usage including consumer, monitor and scaler implementation
We found that redis-streams demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
Research
Security News
Malicious npm package postcss-optimizer delivers BeaverTail malware, targeting developer systems; similarities to past campaigns suggest a North Korean connection.
Security News
CISA's KEV data is now on GitHub, offering easier access, API integration, commit history tracking, and automated updates for security teams and researchers.