
Security News
Deno 2.2 Improves Dependency Management and Expands Node.js Compatibility
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
github.com/hey-xico/grabbit
Grabbit is a simplified and idiomatic wrapper around the RabbitMQ Go client, making it easier to consume messages using common AMQP patterns. It abstracts the boilerplate code involved in setting up consumers, exchanges, and queues, allowing developers to focus on writing business logic.
context.Context
for cancellation and timeouts, facilitating graceful shutdowns.To install Grabbit, use go get
:
go get github.com/hey-xico/grabbit
Ensure your project is using Go modules (a go.mod
file is present).
The Broker
is responsible for managing connections and consumers. You can create a new broker with the application's context:
ctx := context.Background()
broker := grabbit.NewBroker(ctx)
Customize the broker by setting an error handler or adjusting the backoff configuration:
broker.SetErrorHandler(func(err error) {
log.Printf("Broker error: %v", err)
})
broker.SetBackoffConfig(grabbit.BackoffConfig{
InitialInterval: 2 * time.Second,
MaxInterval: 1 * time.Minute,
Multiplier: 1.5,
})
Create a consumer by specifying a name and a handler function:
consumer := broker.Consumer("my_consumer", func(ctx *grabbit.Context) error {
// Process the message
fmt.Printf("Received message: %s\n", string(ctx.Body()))
return nil
})
Configure the consumer's exchange, queue, and binding:
consumer.
Exchange("my_exchange", grabbit.DirectExchange, grabbit.WithExchangeDurable(true)).
Queue("my_queue", grabbit.WithQueueDurable(true)).
Binding("routing_key")
Set consumer options and QoS settings:
consumer.
ConsumerOptions(grabbit.WithConsumerAutoAck(false)).
QoS(10) // Prefetch 10 messages
Grabbit supports middleware functions that can be applied to consumers or the broker:
// Define a middleware
func loggingMiddleware(next grabbit.HandlerFunc) grabbit.HandlerFunc {
return func(ctx *grabbit.Context) error {
log.Printf("Processing message: %s", string(ctx.Body()))
return next(ctx)
}
}
// Apply middleware to the consumer
consumer.Use(loggingMiddleware)
// Apply middleware to the broker (applies to all consumers)
broker.Use(loggingMiddleware)
To gracefully shut down the broker and all consumers, call Shutdown
:
// Start the broker in a separate goroutine
go func() {
if err := broker.Start("amqp://guest:guest@localhost:5672/"); err != nil {
log.Fatalf("Broker stopped: %v", err)
}
}()
// Listen for OS signals for graceful shutdown
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
<-signalChan
log.Println("Shutting down broker...")
if err := broker.Shutdown(); err != nil {
log.Fatalf("Failed to shut down broker: %v", err)
}
Here's a complete example of setting up a broker and a consumer:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/hey-xico/grabbit"
)
func main() {
ctx := context.Background()
broker := grabbit.NewBroker(ctx)
broker.SetErrorHandler(func(err error) {
log.Printf("Broker error: %v", err)
})
// Define the message handler
handler := func(ctx *grabbit.Context) error {
log.Printf("Received message: %s", string(ctx.Body()))
return nil
}
// Create a consumer
broker.Consumer("my_consumer", handler).
Exchange("my_exchange", grabbit.DirectExchange, grabbit.WithExchangeDurable(true)).
Queue("my_queue", grabbit.WithQueueDurable(true)).
Binding("routing_key").
ConsumerOptions(grabbit.WithConsumerAutoAck(false)).
QoS(10)
// Start the broker
go func() {
if err := broker.Start("amqp://guest:guest@localhost:5672/"); err != nil {
log.Fatalf("Broker stopped: %v", err)
}
}()
// Wait for termination signal
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
<-signalChan
log.Println("Shutting down broker...")
if err := broker.Shutdown(); err != nil {
log.Fatalf("Failed to shut down broker: %v", err)
}
log.Println("Broker shut down gracefully.")
}
To run this example:
Ensure RabbitMQ is running on your local machine at localhost:5672
.
Save the code to main.go
.
Run the program:
go run main.go
Set custom AMQP configurations, such as TLS settings:
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/rabbitmq/amqp091-go"
)
// Load client certificate and CA
cert, err := tls.LoadX509KeyPair("client_cert.pem", "client_key.pem")
if err != nil {
log.Fatalf("Failed to load certificates: %v", err)
}
caCert, err := ioutil.ReadFile("ca_cert.pem")
if err != nil {
log.Fatalf("Failed to read CA certificate: %v", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
amqpConfig := amqp.Config{
TLSClientConfig: tlsConfig,
}
broker.SetConfig(amqpConfig)
Monitor the broker's connection status using the StatusChan
channel:
go func() {
for status := range broker.StatusChan {
switch status {
case grabbit.StatusConnected:
log.Println("Broker connected")
case grabbit.StatusDisconnected:
log.Println("Broker disconnected")
case grabbit.StatusConnecting:
log.Println("Broker connecting")
}
}
}()
Message handlers can return errors to trigger retries or log issues:
handler := func(ctx *grabbit.Context) error {
// Process the message
if err := processMessage(ctx.Body()); err != nil {
// Optionally, Nack the message to requeue it
ctx.Nack(false, true)
return fmt.Errorf("failed to process message: %w", err)
}
return nil
}
Detailed documentation is available on pkg.go.dev.
Key types and functions:
Broker
: Manages connections and consumers.Consumer
: Represents a message consumer.Context
: Provides methods to interact with the incoming message.Contributions are welcome! Please follow these steps:
Fork the repository on GitHub.
Create a new feature branch:
git checkout -b feature/my-feature
Commit your changes with clear messages:
git commit -am 'Add new feature'
Push to the branch:
git push origin feature/my-feature
Create a new Pull Request on GitHub.
Please ensure that your code adheres to Go conventions and includes appropriate tests.
go fmt
for formatting.go test ./...
).golint
and go vet
to check for issues.Grabbit is released under the MIT License.
Need Help?
If you encounter any issues or have questions, feel free to:
Happy Coding!
Grabbit aims to simplify your experience with RabbitMQ in Go. By abstracting the complexities of AMQP setup, you can focus on building robust and efficient applications.
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
Security News
React's CRA deprecation announcement sparked community criticism over framework recommendations, leading to quick updates acknowledging build tools like Vite as valid alternatives.
Security News
Ransomware payment rates hit an all-time low in 2024 as law enforcement crackdowns, stronger defenses, and shifting policies make attacks riskier and less profitable.