
Company News
Socket Named Top Sales Organization by RepVue
Socket won two 2026 Reppy Awards from RepVue, ranking in the top 5% of all sales orgs. AE Alexandra Lister shares what it's like to grow a sales career here.
github.com/mapped/taskq/v3
Advanced tools
taskq supports 2 last Go versions and requires a Go version with modules support. So make sure to initialize a Go module:
go mod init github.com/my/repo
And then install taskq/v3 (note v3 in the import; omitting it is a popular mistake):
go get github.com/vmihailenco/taskq/v3
I recommend that you split your app into the two parts:
This way you can:
There is an api_worker example that demonstrates this approach using Redis as a backend:
cd example/api_worker
go run worker/worker.go
go run api/api.go
You start by choosing a backend to use - in our case Redis:
package api_worker
var QueueFactory = redisq.NewFactory()
Using that factory you create a queue that contains tasks:
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
Name: "api-worker",
Redis: Redis, // go-redis client
})
Using the queue you create a task with handler that does some useful work:
var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
Name: "counter",
Handler: func() error {
IncrLocalCounter()
return nil
},
})
Then in an API binary you use tasks to add messages/jobs to queues:
ctx := context.Background()
for {
// call task handler without any args
err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
if err != nil {
log.Fatal(err)
}
}
And in a worker binary you start processing queues:
err := api_worker.MainQueue.Start(context.Background())
if err != nil {
log.Fatal(err)
}
t := myQueue.RegisterTask(&taskq.TaskOptions{
Name: "greeting",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
panic(err)
}
// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world" // unique
_ = myQueue.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world"
msg.Delay = time.Hour
_ = myQueue.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
_ = myQueue.Add(msg)
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
_ = myQueue.Add(msg)
}
If a Message has a Name then this will be used as unique identifier and
messages with the same name will be deduplicated (i.e. not processed again)
within a 24 hour period (or possibly longer if not evicted from local cache
after that period). Where Name is omitted then non deduplication occurs and
each message will be processed. Task's WithMessage and WithArgs both
produces messages with no Name so will not be deduplicated. OnceWithArgs
sets a name based off a consistent hash of the arguments and a quantised period
of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This
guarantees that the same function will not be called with the same arguments
during `period'.
A Handler and FallbackHandler are supplied to RegisterTask in the
TaskOptions.
There are three permitted types of signature:
*Message argumentIf a task is registered with a handler that takes a Go context.Context as its
first argument then when that handler is invoked it will be passed the same
Context that was passed to Consumer.Start(ctx). This can be used to transmit
a signal to abort to all tasks being processed:
var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
Name: "SomethingLongwinded",
Handler: func(ctx context.Context) error {
for range time.Tick(time.Second) {
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("Wee!")
}
}
return nil
},
})
If error returned by handler implements Delay() time.Duration interface then
that delay is used to postpone message processing.
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return time.Hour
}
func handler() error {
return RateLimitError("calm down")
}
taskq supports tracing out-of-the-box using OpenTelemetry API. To instrument a queue, use the following code:
import "github.com/vmihailenco/taskq/extra/taskqotel"
consumer := queue.Consumer()
consumer.AddHook(&taskqotel.OpenTelemetryHook{})
or using a taskq.Factory:
factory.Range(func(q taskq.Queue) bool {
consumer := q.Consumer()
consumer.AddHook(&taskqext.OpenTelemetryHook{})
return true
})
We recommend using Uptrace.dev as a tracing backend.
FAQs
Unknown package
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Company News
Socket won two 2026 Reppy Awards from RepVue, ranking in the top 5% of all sales orgs. AE Alexandra Lister shares what it's like to grow a sales career here.

Security News
NIST will stop enriching most CVEs under a new risk-based model, narrowing the NVD's scope as vulnerability submissions continue to surge.

Company News
/Security News
Socket is an initial recipient of OpenAI's Cybersecurity Grant Program, which commits $10M in API credits to defenders securing open source software.