Queue
Queue is a Golang library for spawning and managing a Goroutine pool, allowing you to create multiple workers based on the limited CPU capacity of the machine.
Features
Queue Scenario
A simple queue service using a ring buffer as the default backend.
Easily switch the queue service to use NSQ, NATS, or Redis.
Supports multiple producers and consumers.
Requirements
Go version 1.22 or above
Installation
Install the stable version:
go get github.com/golang-queue/queue
Install the latest version:
go get github.com/golang-queue/queue@master
Usage
Basic Usage of Pool (using the Task function)
By calling the QueueTask()
method, tasks are scheduled to be executed by workers (goroutines) in the pool.
package main
import (
"context"
"fmt"
"time"
"github.com/golang-queue/queue"
)
func main() {
taskN := 100
rets := make(chan string, taskN)
q := queue.NewPool(5)
defer q.Release()
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.QueueTask(func(ctx context.Context) error {
rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
return nil
}); err != nil {
panic(err)
}
}(i)
}
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(20 * time.Millisecond)
}
}
Basic Usage of Pool (using a message queue)
Define a new message struct and implement the Bytes()
function to encode the message. Use the WithFn
function to handle the message from the queue.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Name string
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- "Hi, " + v.Name + ", " + v.Message
return nil
}))
defer q.Release()
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.Queue(&job{
Name: "Gopher",
Message: fmt.Sprintf("handle the job: %d", i+1),
}); err != nil {
log.Println(err)
}
}(i)
}
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
}
Using NSQ as a Queue
Refer to the NSQ documentation.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nsq"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
w := nsq.NewWorker(
nsq.WithAddr("127.0.0.1:4150"),
nsq.WithTopic("example"),
nsq.WithChannel("foobar"),
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
q := queue.NewPool(
5,
queue.WithWorker(w),
)
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
q.Release()
}
Using NATS as a Queue
Refer to the NATS documentation.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nats"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
w := nats.NewWorker(
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
q.Start()
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
q.Release()
}
Using Redis (Pub/Sub) as a Queue
Refer to the Redis documentation.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
q.Start()
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
q.Release()
}