🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

github.com/ryan406/delayqueue

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/ryan406/delayqueue

v0.0.13
Source
Go
Version published
Created
Source

延时队列

Build Status GoDoc

基于Golang实现的延时队列

功能

延时队列是指在指定的时间点进行消息消费,具体消费逻辑由用户自己来实现。

传统解决方法一般采用cron来实现,但有以下缺点:

  • 轮训效率太低,每次都需要扫库。
  • 如果扫库频率太高,则后端数据库压力过大,如果频率太低,则存在有效性时间差较大的问题

使用场景

  • 用户网上购买时后,如果收货后,15天内未对交易进行评论,则系统进行默认5星评论。
  • 订单超过30分钟未支付,则系统进行自动取消

实现原理

主要使用到的两个数据结构是 环形队列 和 集合。其中环形队列是由数组来实现。

基本概念

currentSlot 表示当前操作的环位置,这里是数组的索引值
timer 定时器,默认每秒移动一个slot

系统主要由三部分组成,分别为slot、Elements和Element。

Slots 代表一个环, 由多个slot组成,每个slot对应一个Elements
Elements slot对应的值
Element 组成Elements集合的元素

每个环节点slot就是一个数据集合 Elements,这个集合内的数据则表示当前时间点需要进行消费的信息集合,有可能是下次循环到这个节点的时间进行消费。

环与集合的关系
slots[0] = Elements
slots[1] = Elements
slots[...] = ...

一个Elements是由一个或多个 Element 元素组成,每个 Element 元素都有一个 cycleNum 字段,用来表示此元素是立即消费还是以后消费,其值也可以理解成环的循环周期。 如果cycleNum字段值为0,则表示立即消费,如果cycleNum=2则表示还需要两个环周期才能消费,每次循环都进行 cycleNum-- 操作,直到为0时结束。

集合与元素的关系
Elements = {Element、Element、Element}

所以整个延时队列看起来是这个样子:

slots[0] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[1] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[2] = []*Elements{*Element{}, *Element{}, *Element{}...}
...

实现原理

系统会有一个定时器timer,每1秒(可通过delayqueue.WithFrequency 函数调整)会移动一个slot, 此时currentSlot的值加1,表示下一个节点位置。
然后遍历当前环点中的所有元素,如果当前元素生命周期cycleNum=0,则立即消费,否则将cycleNum--, 直到循环完集合中的所有元素。

同时每次添加新元素时,都要以当前时间所在的slot位置为起点,假如当前时间为 00:05:10, 在第 310 (560+10) 个slot, 这时添加一个元素时间为 00:02:50,
由于每秒移动一个slot, 而新添加元素时间slot为179(2
60+50), 则将这个元素放在当前位置后往数的第179个slot, 即这个环的第 310+179=489个slot中。 如果添加的时间大于当前时间的多个环周期时,只需要将环周期对应的slot个数减去即可,环的周期数使用 cycleNum 值来表示。

演示代码

package main

import (
	"fmt"
	"time"

	"github.com/cfanbo/delayqueue"
)

func consume(entry delayqueue.Entry) {
	fmt.Println("当前:", time.Now().Format("2006-01-02 15:04:05"))
	fmt.Println("消费:", entry.ConsumeTime().Format("2006-01-02 15:04:05"))
	fmt.Println("消费内容", entry.Body())
	fmt.Println("=======================")
}

func main() {
	q := delayqueue.New()
	q.Put(time.Now().Add(time.Second*2), "2秒后")
	q.Put(time.Now().Add(time.Second*15), "15秒后")
	q.Put(time.Now().Add(time.Second*8), "8秒后")
	q.Put(time.Now().Add(time.Second*43), "43秒后")
	q.Put(time.Now().Add(time.Second*50), "50秒后")
	q.Put(time.Now().Add(time.Second*28), "28秒后")

	q.Run(consume)
}

支持用户自定义间隔时间,如每分钟,每小时,只要是time.NewTicker()支持的 time.Duration 类型即可。
调用方法如下:

// 在New() 函数里调用 WithFrequency() 函数即可
q := delayqueue.New(delayqueue.WithFrequency(time.Minute))
q.Put(time.Now().Add(time.Minute * 2), "2分钟后消费此内容")

说明

  • 系统支持频率周期类型是time.Duration, 如 time.Second、time.Minute 和 time.Hour。
  • 队列暂不支持数据持久化,所以若停止服务或者退出重启,则队列数据将全部丢失。
    建议在数据消费后对其状态进行变更存储,以便在下次服务启动成功后,立即将需要处理的数据写入延时队列。

FAQs

Package last updated on 11 Jun 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts