
Security News
MCP Community Begins Work on Official MCP Metaregistry
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
github.com/nikolaikovalenko/stepper
A simple, efficient, concurrent task runner.
go get github.com/NikolaiKovalenko/stepper
go get github.com/NikolaiKovalenko/stepper/engines/mongo
or
go get github.com/NikolaiKovalenko/stepper/engines/pg
package main
import (
"log"
"github.com/NikolaiKovalenko/stepper"
"github.com/NikolaiKovalenko/stepper/engines/mongo"
)
func main() {
mongoEngine, err := mongo.NewMongo("mongodb://localhost:27017", "example_database")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
service := stepper.NewService(mongoEngine)
// Will publish a task on startup
if err := service.Publish(ctx, "example-task", []byte("Hello world")); err != nil {
log.Fatal(err)
}
service.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
fmt.Println(string(data))
return nil
})
if err := service.Listen(ctx); err != nil {
log.Fatal(err)
}
}
Or you can use PostgresQL:
import "github.com/NikolaiKovalenko/stepper/engines/pg"
engine, err := pg.NewPG("postgres://postgres:test@localhost:5432/postgres")
if err != nil {
log.Fatal(err)
}
If you use the stepper you will use a lot of things but first of all you will publish and execute tasks. Let's discuss how you can publish tasks.
service.Publish(context.Background(), "example-task", []byte("hello"))
The example shows the simple way to publish a task. The code will publish a task with a name example-task and content hello.
But also the stepper allows you to use additional options.
If you don't want to execute a task immediately you can set up a delay.
service.Publish(
context.Background(),
"example-task",
[]byte("hello"),
stepper.SetDelay(time.Minute * 1),
)
Or you can use particular a date
service.Publish(
context.Background(),
"example-task",
[]byte("hello"),
stepper.LaunchAt(time.Now().Add(time.Minute * 10)),
)
The second part of the Stepper is execution of tasks in queue.
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
fmt.Println(string(data))
return nil
})
The example shows the simple way to execute a task.
If your handler returns an error, a task will be returned to the queue. And the task will be held in the queue for 10 seconds. But you can set up a delay manually.
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
ctx.SetRetryAfter(time.Minute) // will be returned in 1 minute
return fmt.Errorf("some error")
})
If you have a log running task, you can bind a state of task (cursor for example), and if your task failed you will be able to continue the task with the last state
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
var lastId int
if err := ctx.BindState(&lastId); err != nil {
return err
}
iter := getSomethingFromId(lastId) // something like a mongodb iterator or anything else
for iter.Next() {
lastId = ... // do something
if err := ctx.SetState(lastId); err != nil {
return err
}
}
return nil
})
The most powerful feature of the stepper is creating subtasks. The feature allows you to split a long-running task into separate tasks which will run on different nodes. And when all subtasks will be completed the stepper will call a onFinish
hook of parent task.
The following example shows how to spawn subtasks within a main task.
s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
fmt.Println("have received the word for splitting: ", string(data))
for _, symbol := range strings.Split(string(data), "") {
ctx.CreateSubtask(stepper.CreateTask{
Data: []byte(symbol),
})
}
return nil
}).Subtask(func(ctx stepper.Context, data []byte) error {
fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
fmt.Println("subtasks are over")
return nil
})
Or you can use existing a subtask:
ctx.CreateSubtask(stepper.CreateTask{
Name: "some-task",
Data: []byte(symbol),
})
If you want to run repeatead task (cron) you can use jobs
s.RegisterJob(context.Background(), &stepper.JobConfig{
Name: "log-job",
Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
fmt.Println("wake up the log-job")
return nil
})
Read https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format for more information about a pattern.
Also you can create subtasks from a job:
s.RegisterJob(context.Background(), &stepper.JobConfig{
Name: "log-job",
Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
fmt.Println("wake up the log-job")
ctx.CreateSubtask(stepper.CreateTask{
Name: "log-subtask",
Data: []byte("Hello 1 subtask"),
})
return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
fmt.Println("success job log-job")
return nil
})
The retry middleware allows you to limit a number of retries.
service := stepper.NewService(db)
s.UseMiddleware(middlewares.Retry(middlewares.RetryOptions{
Interval: time.Second * 5,
MaxRetries: 3,
}))
service := stepper.NewService(db)
prometheusMiddleware := middlewares.NewPrometheus()
s.UseMiddleware(prometheusMiddleware.GetMiddleware())
go func() {
http.ListenAndServe(":3999", promhttp.HandlerFor(prometheusMiddleware.GetRegistry(), promhttp.HandlerOpts{}))
}()
if err := s.Listen(context.Background()); err != nil {
log.Fatal(err)
}
The prometheus middleware provides following metrics:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "stepper_task_execution",
Help: "Count of all task executions",
}, []string{"task", "status"})
prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "stepper_task_duration_seconds",
Help: "Duration of all executions",
Buckets: []float64{.025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"task", "status"})
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
Research
Security News
Socket uncovers an npm Trojan stealing crypto wallets and BullX credentials via obfuscated code and Telegram exfiltration.
Research
Security News
Malicious npm packages posing as developer tools target macOS Cursor IDE users, stealing credentials and modifying files to gain persistent backdoor access.