
Research
2025 Report: Destructive Malware in Open Source Packages
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.
goworker2 is a Go-based background job processing library with pluggable components. It provides a clean, modular architecture supporting multiple queue backends, serializers, and statistics providers.
Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework that can work with Redis, RabbitMQ, and custom backends.
Note: This is a complete rewrite and modernization of the original goworker library by Benjamin Manns, designed as a new project rather than a backwards-compatible upgrade. We're grateful for the inspiration and foundation provided by the original work.
The easiest way to get started is with pre-configured engines:
package main
import (
"context"
"log"
"time"
"github.com/BranchIntl/goworker2/engines"
)
func emailJob(queue string, args ...interface{}) error {
// Process email job
return nil
}
func main() {
options := engines.DefaultResqueOptions()
options.Queues = []string{"email", "default"}
options.PollInterval = 3 * time.Second
engine := engines.NewResqueEngine(options)
engine.Register("EmailJob", emailJob)
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
package main
import (
"context"
"log"
"github.com/BranchIntl/goworker2/engines"
)
func imageProcessor(queue string, args ...interface{}) error {
// Process image
return nil
}
func main() {
options := engines.DefaultSneakersOptions()
options.Queues = []string{"images", "default"}
engine := engines.NewSneakersEngine(options)
engine.Register("ImageProcessor", imageProcessor)
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
For more control, you can configure components manually:
package main
import (
"context"
"log"
"time"
"github.com/BranchIntl/goworker2/brokers/redis"
"github.com/BranchIntl/goworker2/core"
"github.com/BranchIntl/goworker2/registry"
"github.com/BranchIntl/goworker2/serializers/resque"
"github.com/BranchIntl/goworker2/statistics/resque"
)
func main() {
// Configure broker with queues
brokerOpts := redis.DefaultOptions()
brokerOpts.Queues = []string{"critical", "default"}
brokerOpts.PollInterval = 5 * time.Second
// Create components
serializer := resque.NewSerializer()
broker := redis.NewBroker(brokerOpts, serializer)
stats := resque.NewStatistics(resque.DefaultOptions())
registry := registry.NewRegistry()
// Create engine with custom options
engine := core.NewEngine(
broker,
stats,
registry,
core.WithConcurrency(10),
core.WithShutdownTimeout(30*time.Second),
core.WithJobBufferSize(200),
)
// Register workers
registry.Register("MyJob", func(queue string, args ...interface{}) error {
// Handle job
return nil
})
// Start processing
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
go get github.com/BranchIntl/goworker2
This project was inspired by and builds upon the concepts from the original goworker library by Benjamin Manns. While this is a complete rewrite with different architecture and capabilities, we acknowledge and appreciate the foundational work that made this project possible.
goworker2 uses a modular architecture with dependency injection:
┌─────────────────┐
│ Engine │ ← Orchestrates components
├─────────────────┤
│ Broker │ ← Queue backend with job consumption
│ Statistics │ ← Metrics and monitoring
│ Registry │ ← Worker function registry
│ Serializer │ ← Job serialization format
│ WorkerPool │ ← Manages concurrent workers
└─────────────────┘
See engines/ directory for detailed engine documentation.
engine := core.NewEngine(
broker, stats, registry, serializer,
core.WithConcurrency(25), // Number of workers
core.WithShutdownTimeout(30*time.Second), // Graceful shutdown timeout
core.WithJobBufferSize(100), // Job channel buffer
)
options := redis.DefaultOptions()
options.URI = "redis://localhost:6379/"
options.Namespace = "jobs:"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PollInterval = 5 * time.Second // Polling frequency
options.MaxConnections = 10
options := rabbitmq.DefaultOptions()
options.URI = "amqp://guest:guest@localhost:5672/"
options.Exchange = "jobs"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PrefetchCount = 1
goworker2 uses Go's standard log/slog library for structured logging. By default, it uses the default slog logger. To customize logging, configure your logger before creating the engine using slog.SetDefault(logger). For example: slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))).
Worker functions must match this signature:
func(queue string, args ...interface{}) error
Use Go type assertions to handle job arguments:
func processUser(queue string, args ...interface{}) error {
if len(args) != 2 {
return fmt.Errorf("expected 2 arguments, got %d", len(args))
}
userID, ok := args[0].(float64) // JSON numbers are float64
if !ok {
return fmt.Errorf("invalid user ID type")
}
action, ok := args[1].(string)
if !ok {
return fmt.Errorf("invalid action type")
}
// Process user
return processUserAction(int(userID), action)
}
goworker handles these signals automatically:
// Automatic signal handling
engine.Run(ctx) // Blocks until SIGINT/SIGTERM
// Manual control
engine.Start(ctx)
// ... custom signal handling ...
engine.Stop()
For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.
Complete working examples are available in the examples/ directory covering both pre-configured engines and manual component setup.
health := engine.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Active Workers: %d\n", health.ActiveWorkers)
for queue, count := range health.QueuedJobs {
fmt.Printf("Queue %s: %d jobs\n", queue, count)
}
stats, err := engine.GetStats().GetGlobalStats(ctx)
if err == nil {
fmt.Printf("Total Processed: %d\n", stats.TotalProcessed)
fmt.Printf("Total Failed: %d\n", stats.TotalFailed)
}
git checkout -b feature/amazing-feature)git commit -am 'Add amazing feature')git push origin feature/amazing-feature)This project is licensed under the MIT License - see the LICENSE file for details.
FAQs
Unknown package
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.

Security News
Socket CTO Ahmad Nassri shares practical AI coding techniques, tools, and team workflows, plus what still feels noisy and why shipping remains human-led.

Research
/Security News
A five-month operation turned 27 npm packages into durable hosting for browser-run lures that mimic document-sharing portals and Microsoft sign-in, targeting 25 organizations across manufacturing, industrial automation, plastics, and healthcare for credential theft.