Go Redis Queue Manger
Redis队列控制器,支持多worker并发队列,
支持使用Topic,Group,进行队列数据隔离
例子
package go_redis_queue_manager
import (
"fmt"
"github.com/go-redis/redis"
"os"
"testing"
)
type StandaloneQueue struct {
}
func (b StandaloneQueue) Execute(payload *QueuePayload) *QueueResult {
return NewQueueResult(true, "StandaloneQueue.ok", nil)
}
type DemoDemoQueue struct {
}
func (b DemoDemoQueue) Execute(payload *QueuePayload) *QueueResult {
return NewQueueResult(true, "DemoDemoQueue.ok", nil)
}
type DemoDemo2Queue struct {
}
func (b DemoDemo2Queue) Execute(payload *QueuePayload) *QueueResult {
return NewQueueResult(true, "DemoDemoQueue2.ok ", nil)
}
func TestNewQueueManager(t *testing.T) {
os.Setenv("DB_REDIS_BUS_HOST", "127.0.0.1")
os.Setenv("DB_REDIS_BUS_PORT", "6379")
os.Setenv("DB_REDIS_BUS_PASSWORD", "")
host := os.Getenv("DB_REDIS_BUS_HOST")
port := os.Getenv("DB_REDIS_BUS_PORT")
password := os.Getenv("DB_REDIS_BUS_PASSWORD")
rds := redis.NewClient(&redis.Options{
Addr: host + ":" + port,
Password: password,
})
qm := NewQueueManager()
qm.UseRedis(rds)
qm.WorkerNum = 4
qm.Handlers = map[string]Queueable{
"DEMO::DEMO": DemoDemoQueue{},
"DEMO::DEMO2": DemoDemo2Queue{},
"STANDALONE": StandaloneQueue{},
}
qm.RegisterOnInterrupt(func(stack string) {
fmt.Println(stack)
})
go func() {
for {
qm.QueuePush(&QueuePayload{
IsFast: true,
Topic: "DEMO",
Group: "DEMO",
Body: "do the job on channel[DEMO::DEMO]",
})
qm.QueuePush(&QueuePayload{
IsFast: true,
Topic: "DEMO",
Group: "DEMO2",
Body: "do the job on channel[DEMO]",
})
qm.QueuePush(&QueuePayload{
IsFast: true,
Topic: "STANDALONE",
Group: "",
Body: "do the job on STANDALONE",
})
}
}()
go qm.QueueHandler("DEMO", "DEMO")
go qm.QueueHandler("DEMO", "DEMO2")
go qm.QueueHandler("STANDALONE", "")
go qm.QueueRunner()
select {}
}
执行效果:
Worker 0 FastQueues {a418c276-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO2 do the job on channel[DEMO] 0 0} true DemoDemoQueue2.ok
Worker 3 FastQueues {a418c9ce-a95b-11ea-bd41-34363bd057d6 true STANDALONE do the job on STANDALONE 0 0} true StandaloneQueue.ok
Worker 2 FastQueues {a418cff0-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO do the job on channel[DEMO::DEMO] 0 0} true DemoDemoQueue.ok
Worker 1 FastQueues {a418d8b0-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO2 do the job on channel[DEMO] 0 0} true DemoDemoQueue2.ok
Worker 0 FastQueues {a418e062-a95b-11ea-bd41-34363bd057d6 true STANDALONE do the job on STANDALONE 0 0} true StandaloneQueue.ok
Worker 3 FastQueues {a418e918-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO do the job on channel[DEMO::DEMO] 0 0} true DemoDemoQueue.ok
Worker 2 FastQueues {a418f8cc-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO2 do the job on channel[DEMO] 0 0} true DemoDemoQueue2.ok
Worker 1 FastQueues {a418fd68-a95b-11ea-bd41-34363bd057d6 true STANDALONE do the job on STANDALONE 0 0} true StandaloneQueue.ok
Worker 0 FastQueues {a419060a-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO do the job on channel[DEMO::DEMO] 0 0} true DemoDemoQueue.ok
Worker 3 FastQueues {a4190e66-a95b-11ea-bd41-34363bd057d6 true DEMO DEMO2 do the job on channel[DEMO] 0 0} true DemoDemoQueue2.ok
Worker 2 FastQueues {a41914ec-a95b-11ea-bd41-34363bd057d6 true STANDALONE do the job on STANDALONE 0 0} true StandaloneQueue.ok