
Research
Malicious npm Package Brand-Squats TanStack to Exfiltrate Environment Variables
A brand-squatted TanStack npm package used postinstall scripts to steal .env files and exfiltrate developer secrets to an attacker-controlled endpoint.
github.com/golang-queue/queue
Advanced tools
Queue is a Golang library designed to help you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the full CPU capacity of your machine.
A simple queue service using a ring buffer as the default backend.
Easily switch the queue service to use NSQ, NATS, or Redis.
Supports multiple producers and consumers.
Go version 1.22 or above
To install the stable version:
go get github.com/golang-queue/queue
To install the latest version:
go get github.com/golang-queue/queue@master
By calling the QueueTask() method, you can schedule tasks to be executed by workers (Goroutines) in the pool.
package main
import (
"context"
"fmt"
"time"
"github.com/golang-queue/queue"
)
func main() {
taskN := 100
rets := make(chan string, taskN)
// initialize the queue pool
q := queue.NewPool(5)
// shut down the service and notify all workers
// wait until all jobs are complete
defer q.Release()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.QueueTask(func(ctx context.Context) error {
rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
return nil
}); err != nil {
panic(err)
}
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(20 * time.Millisecond)
}
}
Define a new message struct and implement the Bytes() function to encode the message. Use the WithFn function to handle messages from the queue.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Name string
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// initialize the queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- "Hi, " + v.Name + ", " + v.Message
return nil
}))
// shut down the service and notify all workers
// wait until all jobs are complete
defer q.Release()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.Queue(&job{
Name: "Gopher",
Message: fmt.Sprintf("handle the job: %d", i+1),
}); err != nil {
log.Println(err)
}
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
}
Refer to the NSQ documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nsq"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := nsq.NewWorker(
nsq.WithAddr("127.0.0.1:4150"),
nsq.WithTopic("example"),
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q := queue.NewPool(
5,
queue.WithWorker(w),
)
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
Refer to the NATS documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nats"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := nats.NewWorker(
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the workers
q.Start()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
Refer to the Redis documentation for more details.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var v job
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(10),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the workers
q.Start()
// assign tasks to the queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks are done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shut down the service and notify all workers
q.Release()
}
FAQs
Unknown package
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
A brand-squatted TanStack npm package used postinstall scripts to steal .env files and exfiltrate developer secrets to an attacker-controlled endpoint.

Research
Compromised SAP CAP npm packages download and execute unverified binaries, creating urgent supply chain risk for affected developers and CI/CD environments.

Company News
Socket has acquired Secure Annex to expand extension security across browsers, IDEs, and AI tools.