
Research
Security News
The Landscape of Malicious Open Source Packages: 2025 Mid‑Year Threat Report
A look at the top trends in how threat actors are weaponizing open source packages to deliver malware and persist across the software supply chain.
github.com/qbcheng/delayqueue
Delay queues that depend on Kafka
package main
import (
"context"
"fmt"
"github.com/QbCheng/delayqueue/consummer"
"github.com/QbCheng/delayqueue/producer"
"github.com/Shopify/sarama"
"log"
"sync"
"time"
)
// testDelayQueue Latency to be concerned about
var testDelayQueue = []time.Duration{
15 * time.Second,
30 * time.Second,
60 * time.Second,
10 * time.Minute,
20 * time.Minute,
30 * time.Minute,
time.Hour,
3 * time.Hour,
6 * time.Hour,
}
var testNet = []string{"192.168.1.133:9092"}
func main() {
dpConsumer, err := consummer.NewController("testDp", testNet, testDelayQueue)
if err != nil {
panic(err)
}
err = dpConsumer.Run(context.Background())
if err != nil {
panic(err)
}
dpProducer, err := producer.NewDQProducer("testDp", testDelayQueue, testNet)
if err != nil {
panic(err)
}
wg := &sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
for _, delayTime := range testDelayQueue {
_ = dpProducer.Send(delayTime, []byte("hell World"), "testTopic")
}
}
}()
}
handle := RealTopicHandle{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go handle.Run(ctx, []string{"testTopic"}, wg)
time.Sleep(24 * time.Hour)
dpConsumer.Close()
cancel()
_ = dpProducer.Close()
}
type RealTopicHandle struct {
}
func (h RealTopicHandle) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h RealTopicHandle) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h RealTopicHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
session.MarkMessage(msg, "")
}
return nil
}
func (h *RealTopicHandle) Run(ctx context.Context, topics []string, wg *sync.WaitGroup) {
defer wg.Done()
log.SetFlags(log.Lshortfile | log.Ltime | log.Ldate)
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, err := sarama.NewConsumerGroup(testNet, "RealTopic-group", config)
if err != nil {
panic(err)
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
for err = range group.Errors() {
log.Print("err : ", err.Error())
}
}()
for {
err = group.Consume(ctx, topics, h)
if err == nil {
return
} else {
fmt.Println(err)
}
}
}
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A look at the top trends in how threat actors are weaponizing open source packages to deliver malware and persist across the software supply chain.
Security News
ESLint now supports HTML linting with 48 new rules, expanding its language plugin system to cover more of the modern web development stack.
Security News
CISA is discontinuing official RSS support for KEV and cybersecurity alerts, shifting updates to email and social media, disrupting automation workflows.