
Research
Security News
Malicious npm Package Wipes Codebases with Remote Trigger
A malicious npm typosquat uses remote commands to silently delete entire project directories after a single mistyped install.
github.com/NikolaiKovalenko/stepper
A simple, efficient, concurrent task runner.
go get github.com/NikolaiKovalenko/stepper
go get github.com/NikolaiKovalenko/stepper/engines/mongo
or
go get github.com/NikolaiKovalenko/stepper/engines/pg
package main
import (
"log"
"github.com/NikolaiKovalenko/stepper"
"github.com/NikolaiKovalenko/stepper/engines/mongo"
)
func main() {
mongoEngine, err := mongo.NewMongo("mongodb://localhost:27017", "example_database")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
service := stepper.NewService(mongoEngine)
// Will publish a task on startup
if err := service.Publish(ctx, "example-task", []byte("Hello world")); err != nil {
log.Fatal(err)
}
service.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
fmt.Println(string(data))
return nil
})
if err := service.Listen(ctx); err != nil {
log.Fatal(err)
}
}
Or you can use PostgresQL:
import "github.com/NikolaiKovalenko/stepper/engines/pg"
engine, err := pg.NewPG("postgres://postgres:test@localhost:5432/postgres")
if err != nil {
log.Fatal(err)
}
If you use the stepper you will use a lot of things but first of all you will publish and execute tasks. Let's discuss how you can publish tasks.
service.Publish(context.Background(), "example-task", []byte("hello"))
The example shows the simple way to publish a task. The code will publish a task with a name example-task and content hello.
But also the stepper allows you to use additional options.
If you don't want to execute a task immediately you can set up a delay.
service.Publish(
context.Background(),
"example-task",
[]byte("hello"),
stepper.SetDelay(time.Minute * 1),
)
Or you can use particular a date
service.Publish(
context.Background(),
"example-task",
[]byte("hello"),
stepper.LaunchAt(time.Now().Add(time.Minute * 10)),
)
The second part of the Stepper is execution of tasks in queue.
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
fmt.Println(string(data))
return nil
})
The example shows the simple way to execute a task.
If your handler returns an error, a task will be returned to the queue. And the task will be held in the queue for 10 seconds. But you can set up a delay manually.
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
ctx.SetRetryAfter(time.Minute) // will be returned in 1 minute
return fmt.Errorf("some error")
})
If you have a log running task, you can bind a state of task (cursor for example), and if your task failed you will be able to continue the task with the last state
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
var lastId int
if err := ctx.BindState(&lastId); err != nil {
return err
}
iter := getSomethingFromId(lastId) // something like a mongodb iterator or anything else
for iter.Next() {
lastId = ... // do something
if err := ctx.SetState(lastId); err != nil {
return err
}
}
return nil
})
The most powerful feature of the stepper is creating subtasks. The feature allows you to split a long-running task into separate tasks which will run on different nodes. And when all subtasks will be completed the stepper will call a onFinish
hook of parent task.
The following example shows how to spawn subtasks within a main task.
s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
fmt.Println("have received the word for splitting: ", string(data))
for _, symbol := range strings.Split(string(data), "") {
ctx.CreateSubtask(stepper.CreateTask{
Data: []byte(symbol),
})
}
return nil
}).Subtask(func(ctx stepper.Context, data []byte) error {
fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
fmt.Println("subtasks are over")
return nil
})
Or you can use existing a subtask:
ctx.CreateSubtask(stepper.CreateTask{
Name: "some-task",
Data: []byte(symbol),
})
If you want to run repeatead task (cron) you can use jobs
s.RegisterJob(context.Background(), &stepper.JobConfig{
Name: "log-job",
Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
fmt.Println("wake up the log-job")
return nil
})
Read https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format for more information about a pattern.
Also you can create subtasks from a job:
s.RegisterJob(context.Background(), &stepper.JobConfig{
Name: "log-job",
Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
fmt.Println("wake up the log-job")
ctx.CreateSubtask(stepper.CreateTask{
Name: "log-subtask",
Data: []byte("Hello 1 subtask"),
})
return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
fmt.Println("success job log-job")
return nil
})
The retry middleware allows you to limit a number of retries.
service := stepper.NewService(db)
s.UseMiddleware(middlewares.Retry(middlewares.RetryOptions{
Interval: time.Second * 5,
MaxRetries: 3,
}))
service := stepper.NewService(db)
prometheusMiddleware := middlewares.NewPrometheus()
s.UseMiddleware(prometheusMiddleware.GetMiddleware())
go func() {
http.ListenAndServe(":3999", promhttp.HandlerFor(prometheusMiddleware.GetRegistry(), promhttp.HandlerOpts{}))
}()
if err := s.Listen(context.Background()); err != nil {
log.Fatal(err)
}
The prometheus middleware provides following metrics:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "stepper_task_execution",
Help: "Count of all task executions",
}, []string{"task", "status"})
prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "stepper_task_duration_seconds",
Help: "Duration of all executions",
Buckets: []float64{.025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"task", "status"})
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A malicious npm typosquat uses remote commands to silently delete entire project directories after a single mistyped install.
Research
Security News
Malicious PyPI package semantic-types steals Solana private keys via transitive dependency installs using monkey patching and blockchain exfiltration.
Security News
New CNA status enables OpenJS Foundation to assign CVEs for security vulnerabilities in projects like ESLint, Fastify, Electron, and others, while leaving disclosure responsibility with individual maintainers.