
Research
Security News
The Landscape of Malicious Open Source Packages: 2025 Mid‑Year Threat Report
A look at the top trends in how threat actors are weaponizing open source packages to deliver malware and persist across the software supply chain.
github.com/ryan406/delayqueue
基于Golang实现的延时队列
延时队列是指在指定的时间点进行消息消费,具体消费逻辑由用户自己来实现。
传统解决方法一般采用cron来实现,但有以下缺点:
主要使用到的两个数据结构是 环形队列 和 集合。其中环形队列是由数组来实现。
currentSlot 表示当前操作的环位置,这里是数组的索引值
timer 定时器,默认每秒移动一个slot
系统主要由三部分组成,分别为slot、Elements和Element。
Slots 代表一个环, 由多个slot组成,每个slot对应一个Elements
Elements slot对应的值
Element 组成Elements集合的元素
每个环节点slot就是一个数据集合 Elements,这个集合内的数据则表示当前时间点需要进行消费的信息集合,有可能是下次循环到这个节点的时间进行消费。
环与集合的关系
slots[0] = Elements
slots[1] = Elements
slots[...] = ...
一个Elements是由一个或多个 Element 元素组成,每个 Element 元素都有一个 cycleNum 字段,用来表示此元素是立即消费还是以后消费,其值也可以理解成环的循环周期。 如果cycleNum字段值为0,则表示立即消费,如果cycleNum=2则表示还需要两个环周期才能消费,每次循环都进行 cycleNum-- 操作,直到为0时结束。
集合与元素的关系
Elements = {Element、Element、Element}
所以整个延时队列看起来是这个样子:
slots[0] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[1] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[2] = []*Elements{*Element{}, *Element{}, *Element{}...}
...
系统会有一个定时器timer,每1秒(可通过delayqueue.WithFrequency 函数调整)会移动一个slot, 此时currentSlot的值加1,表示下一个节点位置。
然后遍历当前环点中的所有元素,如果当前元素生命周期cycleNum=0,则立即消费,否则将cycleNum--, 直到循环完集合中的所有元素。
同时每次添加新元素时,都要以当前时间所在的slot位置为起点,假如当前时间为 00:05:10, 在第 310 (560+10) 个slot, 这时添加一个元素时间为 00:02:50,
由于每秒移动一个slot, 而新添加元素时间slot为179(260+50), 则将这个元素放在当前位置后往数的第179个slot, 即这个环的第 310+179=489个slot中。
如果添加的时间大于当前时间的多个环周期时,只需要将环周期对应的slot个数减去即可,环的周期数使用 cycleNum 值来表示。
package main
import (
"fmt"
"time"
"github.com/cfanbo/delayqueue"
)
func consume(entry delayqueue.Entry) {
fmt.Println("当前:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("消费:", entry.ConsumeTime().Format("2006-01-02 15:04:05"))
fmt.Println("消费内容", entry.Body())
fmt.Println("=======================")
}
func main() {
q := delayqueue.New()
q.Put(time.Now().Add(time.Second*2), "2秒后")
q.Put(time.Now().Add(time.Second*15), "15秒后")
q.Put(time.Now().Add(time.Second*8), "8秒后")
q.Put(time.Now().Add(time.Second*43), "43秒后")
q.Put(time.Now().Add(time.Second*50), "50秒后")
q.Put(time.Now().Add(time.Second*28), "28秒后")
q.Run(consume)
}
支持用户自定义间隔时间,如每分钟,每小时,只要是time.NewTicker()支持的 time.Duration 类型即可。
调用方法如下:
// 在New() 函数里调用 WithFrequency() 函数即可
q := delayqueue.New(delayqueue.WithFrequency(time.Minute))
q.Put(time.Now().Add(time.Minute * 2), "2分钟后消费此内容")
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A look at the top trends in how threat actors are weaponizing open source packages to deliver malware and persist across the software supply chain.
Security News
ESLint now supports HTML linting with 48 new rules, expanding its language plugin system to cover more of the modern web development stack.
Security News
CISA is discontinuing official RSS support for KEV and cybersecurity alerts, shifting updates to email and social media, disrupting automation workflows.