KQ: Kafka Job Queue for Python

KQ (Kafka Queue) is a lightweight Python library which lets you enqueue and
execute jobs asynchronously using Apache Kafka. It uses
kafka-python under the hood.
Announcements
- Support for Python 3.5 will be dropped from KQ version 3.0.0.
- See releases for latest updates.
Requirements
Installation
Install using pip:
pip install kq
Getting Started
Start your Kafka instance.
Example using Docker:
docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
Define your KQ worker.py
module:
import logging
from kafka import KafkaConsumer
from kq import Worker
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
consumer = KafkaConsumer(
bootstrap_servers="127.0.0.1:9092",
group_id="group",
auto_offset_reset="latest"
)
worker = Worker(topic="topic", consumer=consumer)
worker.start()
Start your worker:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...
Enqueue a function call:
import requests
from kafka import KafkaProducer
from kq import Queue
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")
queue = Queue(topic="topic", producer=producer)
job = queue.enqueue(requests.get, "https://google.com")
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")
The worker executes the job in the background:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>
See the documentation for more information.