Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Afkak is a Twisted-native Apache Kafka client library. It provides support for:
Learn more in the documentation, download from PyPI, or review the contribution guidelines. Please report any issues on GitHub.
Afkak supports these Pythons:
We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions:
Testing against 2.0.0 is planned (see #45).
Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs.
Note: This code is not meant to be runnable. See producer_example and consumer_example for runnable example code.
from afkak.client import KafkaClient
from afkak.consumer import Consumer
from afkak.producer import Producer
from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
PRODUCER_ACK_LOCAL_WRITE)
kClient = KafkaClient("localhost:9092")
# To send messages
producer = Producer(kClient)
d1 = producer.send_messages("my-topic", msgs=[b"some message"])
d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
# To get confirmations/errors on the sends, add callbacks to the returned deferreds
d1.addCallbacks(handleResponses, handleErrors)
# To wait for acknowledgements
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# [ the default ]
# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
# by all in sync replicas before sending a response
producer = Producer(kClient,
req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
ack_timeout=2000)
responseD = producer.send_messages("my-topic", msgs=[b"message"])
# Using twisted's @inlineCallbacks:
responses = yield responseD
if response:
print(response[0].error)
print(response[0].offset)
# To send messages in batch: You can use a producer with any of the
# partitioners for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds (whichever comes first). You can
# also batch by number of bytes.
# Notes:
# * If the producer dies before the messages are sent, the caller would
# * not have had the callbacks called on the send_messages() returned
# * deferreds, and so can retry.
# * Calling producer.stop() before the messages are sent will
# errback() the deferred(s) returned from the send_messages call(s)
producer = Producer(kClient, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])
# To consume messages
# define a function which takes a list of messages to process and
# possibly returns a deferred which fires when the processing is
# complete.
def processor_func(consumer, messages):
# Store_Messages_In_Database may return a deferred
result = store_messages_in_database(messages)
# record last processed message
consumer.commit()
return result
the_partition = 3 # Consume only from partition 3.
consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message
# The deferred returned by consumer.start() will fire when an error
# occurs that can't handled by the consumer, or when consumer.stop()
# is called
yield d
consumer.stop()
kClient.close()
from afkak.client import KafkaClient
from afkak.producer import Producer
from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost:9092")
# Use the HashedPartitioner so that the producer will use the optional key
# argument on send_messages()
producer = Producer(kafka, partitioner_class=HashedPartitioner)
producer.send_messages("my-topic", "key1", [b"some message"])
producer.send_messages("my-topic", "key2", [b"this method"])
from afkak.client import KafkaClient
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProtocol.encode_message(b"some message")])
resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
resps[0].topic # b"my-topic"
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request
Afkak releases are available on PyPI.
Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use:
Debian/Ubuntu: | sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
|
OS X | brew install python pypy snappy pip install virtualenv |
Then Afkak can be installed with pip as usual:
Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE
Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE
Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See LICENSE
This project began as a port of the kafka-python library to Twisted.
See AUTHORS.md for the full contributor list.
FAQs
Twisted Python client for Apache Kafka
We found that afkak demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 4 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.