
Security News
New CVE Forecasting Tool Predicts 47,000 Disclosures in 2025
CVEForecast.org uses machine learning to project a record-breaking surge in vulnerability disclosures in 2025.
github.com/golang-queue/queue
Queue is a Golang library that helps you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the CPU capacity of your machine.
A simple queue service using a ring buffer as the default backend.
Easily switch the queue service to use NSQ, NATS, or Redis.
Supports multiple producers and consumers.
Go version 1.22 or above
To install the stable version:
go get github.com/golang-queue/queue
To install the latest version:
go get github.com/golang-queue/queue@master
By calling the QueueTask()
method, you can schedule tasks to be executed by workers (Goroutines) in the pool.
package main
import (
"context"
"fmt"
"time"
"github.com/golang-queue/queue"
)
func main() {
taskN := 100
rets := make(chan string, taskN)
// initialize the queue pool
q := queue.NewPool(5)
// shut down the service and notify all workers
// wait until all jobs are complete
defer q.Release()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.QueueTask(func(ctx context.Context) error {
rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
return nil
}); err != nil {
panic(err)
}
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(20 * time.Millisecond)
}
}
Define a new message struct and implement the Bytes()
function to encode the message. Use the WithFn
function to handle messages from the queue.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Name string
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// initialize the queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- "Hi, " + v.Name + ", " + v.Message
return nil
}))
// shut down the service and notify all workers
// wait until all jobs are complete
defer q.Release()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.Queue(&job{
Name: "Gopher",
Message: fmt.Sprintf("handle the job: %d", i+1),
}); err != nil {
log.Println(err)
}
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
}
Refer to the NSQ documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nsq"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := nsq.NewWorker(
nsq.WithAddr("127.0.0.1:4150"),
nsq.WithTopic("example"),
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q := queue.NewPool(
5,
queue.WithWorker(w),
)
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
Refer to the NATS documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nats"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := nats.NewWorker(
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the workers
q.Start()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
Refer to the Redis documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the workers
q.Start()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
CVEForecast.org uses machine learning to project a record-breaking surge in vulnerability disclosures in 2025.
Security News
Browserslist-rs now uses static data to reduce binary size by over 1MB, improving memory use and performance for Rust-based frontend tools.
Research
Security News
Eight new malicious Firefox extensions impersonate games, steal OAuth tokens, hijack sessions, and exploit browser permissions to spy on users.