Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
github.com/wuzhc/gmq-redis
目前gmq
有两个版本,一个依赖于redis
特性,使用redis作为数据的存储载体https://github.com/wuzhc/gmq-redis,另一个不依赖于redis
,数据存储在磁盘.两者比较上,后者功能更加丰富,数据更加可靠,性能也更加优秀https://github.com/wuzhc/gmq/
gmq
是基于redis
提供的特性,使用go
语言开发的一个简单易用的队列,可参考Redis 实现队列;
gmq
的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq
不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:
dispatcher
调度分配各个bucket
,而不是由timer
bucket
维护一个timer
,而不是所有bucket一个timer
timer
每次扫描bucket
到期job
时,会一次性返回多个到期job
,而不是每次只返回一个job
timer
的扫描时钟由bucket
中下个job
到期时间决定,而不是每秒扫描一次(TTR)
没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的dispatcher
任务调度器,负责将job
分配到bucket
或直接推送到ready queue
bucket
任务桶,用于存放延迟任务;每个bucket
会维护一个timer
定时器,然后将到期的job
推送到ready queue
ready queue
存放已准备好的job
,等待被consumer
消费ready queue
,等待被消费ready queue
,等待被消费参考第一个图的流程,当job被消费者读取后,如果job.TTR>0
,即job设置了执行超时时间,那么job会在读取后会被添加到TTRBucket(专门存放设置了超时时间的job),并且设置job.delay = job.TTR
,如果在TTR时间内没有得到消费者ack确认然后删除job,job将在TTR时间之后添加到ready queue
,然后再次被消费(如果消费者在TTR时间之后才请求ack,会得到失败的响应)
主要和TTR的设置有关系,确认机制可以分为两种:
pop
出job时,即会自动删除job pool
中的job元数据pop
出job时开始到用户ack
确认删除结束这段时间,如果在这段时间没有ACK
,job会被再次加入到ready queue
,然后再次被消费,只有用户调用了ACK
,才会去删除job pool
中job元数据配置文件位于gmq/conf.ini
,可以根据自己项目需求修改配置
go get -v -u github.com/wuzhc/gmq
cd $GOPATH/src/github.com/wuzhc/gmq
go run main.go
目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq
# php
# 生产者
php producer.php
# 消费者
php consumer.php
# python
# 生产者
python producer.py
# 消费者
python consumer.py
{
"id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键
"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
"body": "xxx", # 消息内容
"delay": "111", # 延迟时间,单位秒
"TTR": "11111", # 执行超时时间,单位秒
"status": 1, # job执行状态,该字段由gmq生成
"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 单位秒,半个小时后执行
'TTR' => '0'
];
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
];
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
gmq
提供了一个简单web监控平台(后期会提供根据job.Id追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000
,例如:http://127.0.0.1:8000, 界面如下:
后台模板来源于https://github.com/george518/PPGo_Job
以下是开发遇到的问题,以及一些粗糙的解决方案
如果强行中止gmq
的运行,可能会导致一些数据丢失,例如下面一个例子:
如果发生上面的情况,就会出现job不在bucket
中,也不在ready queue
,这就出现了job丢失的情况,而且将没有任何机会去删除job pool
中已丢失的job,长久之后job pool
可能会堆积很多的已丢失job的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine
处理完手中的事情,然后再退出
gmq
退出流程
首先gmq
通过context传递关闭信号给dispatcher
,dispatcher
接收到信号会关闭dispatcher.closed
,每个bucket
会收到close
信号,然后先退出timer
检索,再退出bucket
,dispatcher
等待所有bucket退出后,然后退出
dispatcher
退出顺序流程: timer
-> bucket
-> dispatcher
不要使用kill -9 pid
来强制杀死进程,因为系统无法捕获SIGKILL信号,导致gmq可能执行到一半就被强制中止,应该使用kill -15 pid
,kill -1 pid
或kill -2 pid
,各个数字对应信号如下:
bucket
都会维护一个timer
,不同于有赞设计,timer
不是每秒轮询一次,而是根据bucket
下一个job到期时间来设置timer
的定时时间 ,这样的目的在于如果bucket
没有job或job到期时间要很久才会发生,就可以减少不必要的轮询;timer
只有处理完一次业务后才会重置定时器;,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了timer
就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题我们知道redis的命令是排队执行,在一个复杂的业务中可能会多次执行redis命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;
redis保证整个事务原子性和一致性问题一般用multi/exec
或lua脚本
,gmq
在操作涉及复杂业务时使用的是lua脚本
,因为lua脚本
除了有multi/exec
的功能外,还有Pipepining
功能(主要打包命令,减少和redis server
通信次数),下面是一个gmq
定时器扫描bucket集合到期job的lua脚本:
-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 检验job状态
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,然后到期的job返回给timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res
可能一般phper写业务很少会接触到连接池,其实这是由php本身所决定他应用不大,当然在php的扩展swoole
还是很有用处的
gmq
的redis连接池是使用gomodule/redigo/redis
自带连接池,它带来的好处是限制redis连接数,通过复用redis连接来减少开销,另外可以防止tcp被消耗完,这在生产者大量生成数据时会很有用
// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空闲链接
MaxActive: 10000, // 最大链接
IdleTimeout: 240 * time.Second, // 空闲链接超时
Wait: true, // 当连接池耗尽时,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}
job pool
是唯一的,它将作为redis的缓存键;gmq
不自动为job生成唯一id值是为了用户可以根据自己生成的job.id来追踪job情况,如果job.id是重复的,push时会报重复id的错误ready queue
的速度取决与redis性能,而不是bucket数量netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -
这个是正常现象,由tcp四次挥手可以知道,当接收到LAST_ACK发出的FIN后会处于TIME_WAIT
状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到ACK,会等待2MSL时间,这个时间是为了再次发送ACK,例如被动关闭方可能因为接收不到ACK而重传FIN;另外也是为了旧数据过期,不影响到下一个链接,; 如果要避免大量TIME_WAIT
的连接导致tcp被耗尽;一般方法如下:
TIME_WAIT
状态的连接FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.