Socket
Socket
Sign inDemoInstall

github.com/nikolaikovalenko/stepper

Package Overview
Dependencies
38
Alerts
File Explorer

Install Socket

Detect and block malicious and high-risk dependencies

Install

    github.com/nikolaikovalenko/stepper


Version published

Readme

Source

Stepper

A simple, efficient, concurrent task runner.

  • Simple. Run tasks and schedule jobs with GO.
  • Database agnostic. Stepper supports MongoDB, Postgresql (beta).
  • Concurrent. Stepper can be used in an unlimited number of instances.
  • Scalable. Split one task into small subtasks which will run on different nodes.

Install

go get github.com/NikolaiKovalenko/stepper
go get github.com/NikolaiKovalenko/stepper/engines/mongo

or

go get github.com/NikolaiKovalenko/stepper/engines/pg

Getting started

package main

import (
    "log"

    "github.com/NikolaiKovalenko/stepper"
    "github.com/NikolaiKovalenko/stepper/engines/mongo"
)

func main() {
    mongoEngine, err := mongo.NewMongo("mongodb://localhost:27017", "example_database")
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    service := stepper.NewService(mongoEngine)

    // Will publish a task on startup
    if err := service.Publish(ctx, "example-task", []byte("Hello world")); err != nil {
        log.Fatal(err)
    }

    service.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
        fmt.Println(string(data))

        return nil
    })

    if err := service.Listen(ctx); err != nil {
        log.Fatal(err)
    }
}

Or you can use PostgresQL:

import "github.com/NikolaiKovalenko/stepper/engines/pg"
engine, err := pg.NewPG("postgres://postgres:test@localhost:5432/postgres")
if err != nil {
    log.Fatal(err)
}

Table of Contents

Publish task

If you use the stepper you will use a lot of things but first of all you will publish and execute tasks. Let's discuss how you can publish tasks.

Simple way

service.Publish(context.Background(), "example-task", []byte("hello"))

The example shows the simple way to publish a task. The code will publish a task with a name example-task and content hello.

But also the stepper allows you to use additional options.

Publish with delay

If you don't want to execute a task immediately you can set up a delay.

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.SetDelay(time.Minute * 1),
)

Or you can use particular a date

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.LaunchAt(time.Now().Add(time.Minute * 10)),
)

Execute a task

The second part of the Stepper is execution of tasks in queue.

Simple way

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    fmt.Println(string(data))

    return nil
})

The example shows the simple way to execute a task.

Error handling

If your handler returns an error, a task will be returned to the queue. And the task will be held in the queue for 10 seconds. But you can set up a delay manually.

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    ctx.SetRetryAfter(time.Minute) // will be returned in 1 minute
    return fmt.Errorf("some error")
})

Bind a state

If you have a log running task, you can bind a state of task (cursor for example), and if your task failed you will be able to continue the task with the last state

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    var lastId int

    if err := ctx.BindState(&lastId); err != nil {
        return err
    }

    iter := getSomethingFromId(lastId) // something like a mongodb iterator or anything else

    for iter.Next() {
        lastId = ... // do something

        if err := ctx.SetState(lastId); err != nil {
            return err
        }
    }

    return nil
})

Subtasks

The most powerful feature of the stepper is creating subtasks. The feature allows you to split a long-running task into separate tasks which will run on different nodes. And when all subtasks will be completed the stepper will call a onFinish hook of parent task.

Create a subtask

The following example shows how to spawn subtasks within a main task.

s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
    fmt.Println("have received the word for splitting: ", string(data))

    for _, symbol := range strings.Split(string(data), "") {
        ctx.CreateSubtask(stepper.CreateTask{
            Data: []byte(symbol),
        })
    }

    return nil
}).Subtask(func(ctx stepper.Context, data []byte) error {
    fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
    return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
    fmt.Println("subtasks are over")
    return nil
})

Or you can use existing a subtask:

ctx.CreateSubtask(stepper.CreateTask{
    Name: "some-task",
    Data: []byte(symbol),
})

Repeated tasks

If you want to run repeatead task (cron) you can use jobs

s.RegisterJob(context.Background(), &stepper.JobConfig{
    Name:    "log-job",
    Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
    fmt.Println("wake up the log-job")
    return nil
})

Read https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format for more information about a pattern.

Also you can create subtasks from a job:

s.RegisterJob(context.Background(), &stepper.JobConfig{
    Name:    "log-job",
    Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
    fmt.Println("wake up the log-job")

    ctx.CreateSubtask(stepper.CreateTask{
        Name: "log-subtask",
        Data: []byte("Hello 1 subtask"),
    })

    return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
    fmt.Println("success job log-job")

    return nil
})

Middlewares

Retry

The retry middleware allows you to limit a number of retries.

service := stepper.NewService(db)

s.UseMiddleware(middlewares.Retry(middlewares.RetryOptions{
    Interval:   time.Second * 5,
    MaxRetries: 3,
}))

Prometheus

service := stepper.NewService(db)

prometheusMiddleware := middlewares.NewPrometheus()

s.UseMiddleware(prometheusMiddleware.GetMiddleware())

go func() {
    http.ListenAndServe(":3999", promhttp.HandlerFor(prometheusMiddleware.GetRegistry(), promhttp.HandlerOpts{}))
}()

if err := s.Listen(context.Background()); err != nil {
    log.Fatal(err)
}

The prometheus middleware provides following metrics:

prometheus.NewCounterVec(prometheus.CounterOpts{
    Name: "stepper_task_execution",
    Help: "Count of all task executions",
}, []string{"task", "status"})

prometheus.NewHistogramVec(prometheus.HistogramOpts{
    Name:    "stepper_task_duration_seconds",
    Help:    "Duration of all executions",
    Buckets: []float64{.025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"task", "status"})

FAQs

Last updated on 10 Apr 2024

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc