Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/james-johnston-thumbtack/kafka-message-scheduler

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/james-johnston-thumbtack/kafka-message-scheduler

  • v0.0.4
  • Source
  • Go
  • Socket score

Version published
Created
Source

Go

Kafka message scheduler

Kafka message scheduler allows you to send message to a target topic on a specific time with a particular payload.

Why ?

You always need to trigger events to do something and when you are using kafka in your company for data processing, you want to trigger this processing based on kafka message.

Use cases

For example, on the TF1 website, videos are set online on a given date and set offline on another date. We are using kafka consumers on specific topic to perform this activation/deactivation.

Another use case is retriable actions. For example when you are using messages of a topic for performing action to an external API. This service can be down temporarily. So in this case failed messages can be rescheduled for a retry by the scheduler. Of course you have to make sure of idempotence, based on the original timestamp.

You can also imagine to use the scheduler to delete inactive user, each time a user logins you schedule an event with a date now + 1year. And if the user didn't login for a long time the scheduled message will be triggered and the user can be deleted by a specific consumer.

A lot of use cases can be found... it depends on your imagination ;)

How does it work ?

Kafka message scheduler is simply using kafka topics. These topics contains all the schedules to trigger. These messages are regular kafka messages with headers and a payload. But it should contains specific headers:

  • scheduler-epoch: the date of the schedule in epoch (number of second since 1970)
  • scheduler-target-topic: the topic to send the message to
  • scheduler-target-key: the key to use when sending the triggered schedule to the target topic

That is all you need, the paylaod will be the one defined in the schedule message. Warning: if the payload of the message changes, a new schedule message should be send to the scheduler topic.

Example:

Schedule message:

Headers:
    scheduler-epoch: 1893456000
    scheduler-target-topic: online-videos
    scheduler-target-key: vid1
    customer-header: dummy
Timestamp: 1607918336
Key: vid1-online
Value: "video 1"

Triggered message in topic 'online-videos':

Headers:
    scheduler-timestamp: 1607918336 # original message timestamp
    scheduler-key: vid1-online
    scheduler-topic: schedules
    customer-header: dummy
Key: vid1
Value: "video 1"

The same message will also be produced to the 'history' topic for auditing. Once the message is triggered a tombstone message (nil payload) will also be produced in the scheduler topic for deleting the schedule.

For GO there is a clientlib for wrapping your kafka messages, check clientlib for more details.

High availability

You can launch multiple instance of the scheduler, schedules will be load balanced and for large schedules topic this will reduce the memory pressure on each scheduler instance. Scheduler can work with several incoming topics, but schedules must have uniq key accross all these topics.

Fail over

If a scheduler crashed or is down for long period of time, it will resynch all schedules assigned to it and trigger missed schedules first as soon as possible and manage all live incoming schedules on the fly.

Observability

The scheduler is exposing metrics on a specific port (8001 by default) at the URI /metrics. It can be used by a prometheus server for scrapping metrics. Available metrics are the number of missed, invalid, deleted, planned and triggered schedules. The metric name is kafka_scheduler_event_total.

Configuration

The scheduler can be configured with environment variables:

Env. VariableDefaultDescription
BOOTSTRAP_SERVERSlocalhost:9092Kafka bootstrap servers list separated by comma
SCHEDULES_TOPICSschedulesTopic list for incoming schedules separated by comma
SINCE_DELTA0Number of days to go back for considering missed schedules (0:today, -1: yesterday, etc ...)
GROUP_IDscheduler-cgConsumer group id for the scheduler consumer
METRICS_HTTP_ADDR:8001HTTP address where prometheus metrics will be exposed (URI /metrics)
HISTORY_TOPIChistoryTopic name where a copy of triggered schedules will be kept for auditing

Usage

You can use kafka message scheduler with docker or as code in your go program:

  • go run :
BOOTSTRAP_SERVERS="kafka:9092" go run ./cmd/kafka
  • docker run :
docker run -e BOOTSTRAP_SERVERS="kafka:9092" etf1/kafka-message-scheduler
  • as code in your go program:
import(
    runner "github.com/etf1/kafka-message-scheduler/runner/kafka"
)

...
kafkaRunner := runner.DefaultRunner()
go func(){
    if err := kafkaRunner.Start(); err != nil {
        log.Printf("failed to start scheduler: %v", err)
    }
}
...
kafkaRunner.Close()

Schedules topics

The topic used by the scheduler has to be compacted and retention unlimited otherwise you will loose schedules. If you plan to run multiple instances of the scheduler for example 3, you need at least 3 partitions in the topic.

kafka-topics --bootstrap-server "${BOOTSTRAP_SERVERS}" --create --topic schedules \
             --partitions 3 --config "cleanup.policy=compact" --config "retention.ms=-1"

History topic

Triggered schedules are stored in the history topic specified by the env. variable HISTORY_TOPIC.

kafka-topics --bootstrap-server "${BOOTSTRAP_SERVERS}" --create --topic history

Scheduler mini

For integration tests, you can start a "mini" version of the scheduler which doesn't need any dependencies (no need for kafka). This version is useful for integration tests with the scheduler rest api.

  • go run :
go run ./cmd/mini
  • docker run :
docker run etf1/kafka-message-scheduler:mini

FAQs

Package last updated on 31 Aug 2021

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc