
Product
Introducing Scala and Kotlin Support in Socket
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
github.com/ryan406/delayqueue
基于Golang实现的延时队列
延时队列是指在指定的时间点进行消息消费,具体消费逻辑由用户自己来实现。
传统解决方法一般采用cron来实现,但有以下缺点:
主要使用到的两个数据结构是 环形队列 和 集合。其中环形队列是由数组来实现。
currentSlot 表示当前操作的环位置,这里是数组的索引值
timer 定时器,默认每秒移动一个slot
系统主要由三部分组成,分别为slot、Elements和Element。
Slots 代表一个环, 由多个slot组成,每个slot对应一个Elements
Elements slot对应的值
Element 组成Elements集合的元素
每个环节点slot就是一个数据集合 Elements,这个集合内的数据则表示当前时间点需要进行消费的信息集合,有可能是下次循环到这个节点的时间进行消费。
环与集合的关系
slots[0] = Elements
slots[1] = Elements
slots[...] = ...
一个Elements是由一个或多个 Element 元素组成,每个 Element 元素都有一个 cycleNum 字段,用来表示此元素是立即消费还是以后消费,其值也可以理解成环的循环周期。 如果cycleNum字段值为0,则表示立即消费,如果cycleNum=2则表示还需要两个环周期才能消费,每次循环都进行 cycleNum-- 操作,直到为0时结束。
集合与元素的关系
Elements = {Element、Element、Element}
所以整个延时队列看起来是这个样子:
slots[0] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[1] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[2] = []*Elements{*Element{}, *Element{}, *Element{}...}
...
系统会有一个定时器timer,每1秒(可通过delayqueue.WithFrequency 函数调整)会移动一个slot, 此时currentSlot的值加1,表示下一个节点位置。
然后遍历当前环点中的所有元素,如果当前元素生命周期cycleNum=0,则立即消费,否则将cycleNum--, 直到循环完集合中的所有元素。
同时每次添加新元素时,都要以当前时间所在的slot位置为起点,假如当前时间为 00:05:10, 在第 310 (560+10) 个slot, 这时添加一个元素时间为 00:02:50,
由于每秒移动一个slot, 而新添加元素时间slot为179(260+50), 则将这个元素放在当前位置后往数的第179个slot, 即这个环的第 310+179=489个slot中。
如果添加的时间大于当前时间的多个环周期时,只需要将环周期对应的slot个数减去即可,环的周期数使用 cycleNum 值来表示。
package main
import (
"fmt"
"time"
"github.com/cfanbo/delayqueue"
)
func consume(entry delayqueue.Entry) {
fmt.Println("当前:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("消费:", entry.ConsumeTime().Format("2006-01-02 15:04:05"))
fmt.Println("消费内容", entry.Body())
fmt.Println("=======================")
}
func main() {
q := delayqueue.New()
q.Put(time.Now().Add(time.Second*2), "2秒后")
q.Put(time.Now().Add(time.Second*15), "15秒后")
q.Put(time.Now().Add(time.Second*8), "8秒后")
q.Put(time.Now().Add(time.Second*43), "43秒后")
q.Put(time.Now().Add(time.Second*50), "50秒后")
q.Put(time.Now().Add(time.Second*28), "28秒后")
q.Run(consume)
}
支持用户自定义间隔时间,如每分钟,每小时,只要是time.NewTicker()支持的 time.Duration 类型即可。
调用方法如下:
// 在New() 函数里调用 WithFrequency() 函数即可
q := delayqueue.New(delayqueue.WithFrequency(time.Minute))
q.Put(time.Now().Add(time.Minute * 2), "2分钟后消费此内容")
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports Scala and Kotlin, bringing AI-powered threat detection to JVM projects with easy manifest generation and fast, accurate scans.
Application Security
/Security News
Socket CEO Feross Aboukhadijeh and a16z partner Joel de la Garza discuss vibe coding, AI-driven software development, and how the rise of LLMs, despite their risks, still points toward a more secure and innovative future.
Research
/Security News
Threat actors hijacked Toptal’s GitHub org, publishing npm packages with malicious payloads that steal tokens and attempt to wipe victim systems.