Kafka REST API
Installation
If you are installing from a remote or public repository, run: pip install kafka-rest-api
.
If you are installing from a wheel file in the local directory, run: pip install {filename}.whl
,
and replace {filename}
with the name of the .whl file.
Getting Started
Interactions with a Kafka cluster can be performed on a Producer/Consumer paradigm. As such there are two classes that
can be imported and used to publish and subscribe to topics in Kafka: Producer and Consumer.
Configuration
When using this package to access Merck API Gateway, you can define the following environment variables:
- KAFKA_REST_API_URL: Target Kafka REST API URL. In alternative, you can pass the argument
kafka_rest_api_url
to the Producer and the Consumer constructor. - X_API_KEY: The authorization token to validate API requests to API Gateway. In alternative, you can pass a dictionary
with the format
{"x-api-key": "your-api-key", "other-header-key": "other-header-value", etc...}
to the key parameter auth_headers
in both the Producer and the Consumer constructors. - TOPIC_ID: MSK Topic ID assigned to the user. In alternative, you can pass a string with the topic ID
to the key argument
topic_id
in both the Producer and the Consumer constructors.
Producer
Produce json data
In the snippet below the topic pke is used as example. The pattern for the producer is the following:
from kafka_rest import Producer
producer = Producer()
new_keys = producer.produce(messages_to_pke_endpoint, "pke")
Please note that each message in the list of messages to the target endpoint should correspond to the payload that
is expected by that endpoint that would otherwise be a JSON object.
For example, a valid message to the pke endpoint is:
{
"text": "Genome engineering is a powerful tool.",
"algorithm": "TopicRank",
"n_best": 10
}
To know which message format you should use for each endpoint, please consult the documentation for NLP API.
The Producer.produce
method automatically generates a unique key (UUID) for each message.
Optionally, you can provide your unique keys as well, passing a list of keys (strings) to the argument keys
.
Produce files
To produce files as inputs to a given endpoint, you can use the method produce_files
. The required arguments
for this method are:
files
which consists of a list of absolute or relative paths to the input files;endpoint
target endpoint (pdf2text, for example);
from kafka_rest import Producer
producer = Producer()
new_keys = producer.produce_files(files=list_of_files, endpoint="pdf2text")
Consumer
Pattern 1 - Iterator
Arguably, the most useful way of consuming messages with the Consumer class is as follows:
from kafka_rest import Consumer
with Consumer() as consumer:
for data, remaining_keys in consumer.consume(keys):
print((data, remaining_keys))
Pattern 2 - Step-by-step instantiation (Chain)
You can also opt to do a step-by-step instantiation and have a finer control of each request sent by the Consumer to the NLP API:
from kafka_rest import Consumer
consumer = Consumer()
consumer.create()
consumer.subscribe()
for data, remaining_keys in consumer.consume(keys):
print((data, remaining_keys))
consumer.delete()
Pattern 3 - Consume all
Optionally, the Consumer can just return when all keys were exhausted, i.e.: when all messages were consumed.
For that, please use the consume_all
method.
from kafka_rest import Consumer
with Consumer() as consumer:
data = consumer.consume_all(keys)
Full example
Produce files as inputs and consume outputs.
from kafka_rest import Producer, Consumer
producer = Producer()
new_keys = producer.produce_files(["files/file1.pdf", "files/file2.pdf"], "pdf2text")
with Consumer() as consumer:
for data, remaining_keys in enumerate(consumer.consume(new_keys)):
if data:
print(f"Data: {data} | Remaining Keys: {remaining_keys}")
For more snippets, please check the example in the file kafka_rest/snippets
in this repo.