Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
github.com/go-msgqueue/msgqueue
go get -u github.com/go-msgqueue/msgqueue
Options.Compress
.go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.
go-msgqueue consists of following components:
rate limiting is implemented in the processor package using redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-redis/redis"
import "golang.org/x/time/rate"
// Create in-memory queue that prints greetings.
q := memqueue.NewQueue(&msgqueue.Options{
// Handler is automatically retried on error.
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
RateLimit: rate.Every(time.Second),
// Redis is only needed for rate limiting and call once.
Redis: redis.NewClient(&redis.Options{
Addr: ":6379",
}),
})
// Invoke handler with arguments.
q.Call("World")
// Same using Message API.
q.Add(msgqueue.NewMessage("World"))
// Say "Hello World" with 1 hour delay.
msg := msgqueue.NewMessage("World")
msg.Delay = time.Hour
q.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.Name = "hello-world"
q.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.Name = "hello-world"
msg.Delay = time.Hour
q.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
q.CallOnce(time.Hour, "hello")
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.SetDelayName(delay, "europe") // set delay and autogenerate message name
q.Add(msg)
}
SQS, IronMQ, and memqueue share the same API and can be used interchangeably.
azsqs package uses Amazon Simple Queue Service as queue backend.
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/azsqs"
import "github.com/aws/aws-sdk-go/service/sqs"
// Create queue.
awsAccountId := "123456789"
q := azsqs.NewQueue(awsSQS(), awsAccountId, &msgqueue.Options{
Name: "sqs-queue-name",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using Manager.
man := azsqs.NewManager(awsSQS(), accountId)
q := man.NewQueue(&msgqueue.Options{...})
// Add message.
q.Call("World")
// Start processing queue.
p := q.Processor()
p.Start()
// Stop processing.
p.Stop()
ironmq package uses IronMQ as queue backend.
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/ironmq"
import "github.com/iron-io/iron_go3/mq"
// Create queue.
q := ironmq.NewQueue(mq.New("ironmq-queue-name"), &msgqueue.Options{
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using manager.
cfg := iron_config.Config("iron_mq")
man := ironmq.NewManager(&cfg)
q := man.NewQueue(&msgqueue.Options{...})
// Add message.
q.Call("World")
// Start processing queue.
p := q.Processor()
p.Start()
// Stop processing.
p.Stop()
memqueue is in-memory queue backend implementation primarily useful for local development / unit testing. Unlike SQS and IronMQ it has running queue processor by default.
import "github.com/go-msgqueue/msgqueue"
// Create queue.
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using Manager.
man := memqueue.NewManager()
q := man.NewQueue(&msgqueue.Options{...})
// Stop processor if you don't need it.
p := q.Processor()
p.Stop()
// Process one message.
err := p.ProcessOne()
// Process all buffered messages.
err := p.ProcessAll()
If error returned by handler implements Delay() time.Duration
that delay is used to postpone message processing.
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return time.Hour
}
func handler() error {
return RateLimitError("calm down")
}
q := memqueue.NewQueue(&msgqueue.Options{
Handler: handler,
})
You can log local queue stats using following code:
func LogQueueStats(q msgqueue.Queue) {
p := q.Processor()
opt := p.Options()
var old *msgqueue.ProcessorStats
for _ = range time.Tick(3 * time.Second) {
st := p.Stats()
if st == nil {
break
}
if old != nil && st.Processed == old.Processed &&
st.Fails == old.Fails &&
st.Retries == old.Retries {
continue
}
old = st
glog.Infof(
"%s: buffered=%d/%d in_flight=%d/%d "+
"processed=%d fails=%d retries=%d "+
"avg_dur=%s min_dur=%s max_dur=%s",
q, st.Buffered, opt.BufferSize, st.InFlight, opt.WorkerNumber,
st.Processed, st.Fails, st.Retries,
st.AvgDuration, st.MinDuration, st.MaxDuration,
)
}
}
go LogQueueStats(myqueue)
which will log something like this
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=3/16 processed=16183872 fails=0 retries=0 avg_dur=44.8ms min_dur=100µs max_dur=5.102s
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=8/16 processed=16184022 fails=0 retries=0 avg_dur=42ms min_dur=100µs max_dur=5.102s
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.