
Security News
Risky Biz Podcast: Making Reachability Analysis Work in Real-World Codebases
This episode explores the hard problem of reachability analysis, from static analysis limits to handling dynamic languages and massive dependency trees.
github.com/CrisisTextLine/modular/modules/eventbus
The EventBus Module provides a publish-subscribe messaging system for Modular applications with support for multiple concurrent engines, topic-based routing, and flexible configuration. It enables decoupled communication between components through a powerful event-driven architecture.
user.*
or analytics.*
import (
"github.com/CrisisTextLine/modular"
"github.com/CrisisTextLine/modular/modules/eventbus"
)
// Register the eventbus module with your Modular application
app.RegisterModule(eventbus.NewModule())
eventbus:
engine: memory # Event bus engine (memory, redis, kafka, kinesis)
maxEventQueueSize: 1000 # Maximum events to queue per topic
defaultEventBufferSize: 10 # Default buffer size for subscription channels
workerCount: 5 # Worker goroutines for async event processing
eventTTL: 3600s # TTL for events (duration)
retentionDays: 7 # Days to retain event history
externalBrokerURL: "" # URL for external message broker
externalBrokerUser: "" # Username for authentication
externalBrokerPassword: "" # Password for authentication
eventbus:
engines:
- name: "memory-fast"
type: "memory"
config:
maxEventQueueSize: 500
defaultEventBufferSize: 10
workerCount: 3
retentionDays: 1
- name: "redis-durable"
type: "redis"
config:
url: "redis://localhost:6379"
db: 0
poolSize: 10
- name: "kafka-analytics"
type: "kafka"
config:
brokers: ["localhost:9092"]
groupId: "eventbus-analytics"
- name: "kinesis-stream"
type: "kinesis"
config:
region: "us-east-1"
streamName: "events-stream"
shardCount: 2
- name: "custom-engine"
type: "custom"
config:
enableMetrics: true
metricsInterval: "30s"
routing:
- topics: ["user.*", "auth.*"]
engine: "memory-fast"
- topics: ["analytics.*", "metrics.*"]
engine: "kafka-analytics"
- topics: ["stream.*"]
engine: "kinesis-stream"
- topics: ["*"] # Fallback for all other topics
engine: "redis-durable"
// Get the eventbus service
var eventBus *eventbus.EventBusModule
err := app.GetService("eventbus.provider", &eventBus)
if err != nil {
return fmt.Errorf("failed to get eventbus service: %w", err)
}
// Publish an event
err = eventBus.Publish(ctx, "user.created", userData)
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
// Subscribe to events
subscription, err := eventBus.Subscribe(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
user := event.Payload.(UserData)
fmt.Printf("User created: %s\n", user.Name)
return nil
})
// Events are automatically routed based on configured rules
eventBus.Publish(ctx, "user.login", userData) // -> memory-fast engine
eventBus.Publish(ctx, "analytics.click", clickData) // -> kafka-analytics engine
eventBus.Publish(ctx, "custom.event", customData) // -> redis-durable engine (fallback)
// Check which engine handles a specific topic
router := eventBus.GetRouter()
engine := router.GetEngineForTopic("user.created")
fmt.Printf("Topic 'user.created' routes to engine: %s\n", engine)
// Register a custom engine type
eventbus.RegisterEngine("myengine", func(config map[string]interface{}) (eventbus.EventBus, error) {
return NewMyCustomEngine(config), nil
})
// Use in configuration
engines:
- name: "my-custom"
type: "myengine"
config:
customSetting: "value"
See examples/multi-engine-eventbus/ for a complete application demonstrating:
cd examples/multi-engine-eventbus
go run main.go
Sample output:
🚀 Started Multi-Engine EventBus Demo in development environment
📊 Multi-Engine EventBus Configuration:
- memory-fast: Handles user.* and auth.* topics
- memory-reliable: Handles analytics.*, metrics.*, and fallback topics
🔵 [MEMORY-FAST] User registered: user123 (action: register)
📈 [MEMORY-RELIABLE] Page view: /dashboard (session: sess123)
⚙️ [MEMORY-RELIABLE] System info: database - Connection established
The module includes comprehensive BDD tests covering:
cd modules/eventbus
go test ./... -v
Existing single-engine configurations continue to work unchanged. To migrate to multi-engine:
# Before (single-engine)
eventbus:
engine: memory
workerCount: 5
# After (multi-engine with same behavior)
eventbus:
engines:
- name: "default"
type: "memory"
config:
workerCount: 5
# High-throughput configuration
eventbus:
engines:
- name: "high-perf"
type: "memory"
config:
maxEventQueueSize: 10000
defaultEventBufferSize: 100
workerCount: 20
When contributing to the eventbus module:
This module is part of the Modular framework and follows the same license terms.
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
This episode explores the hard problem of reachability analysis, from static analysis limits to handling dynamic languages and massive dependency trees.
Security News
/Research
Malicious Nx npm versions stole secrets and wallet info using AI CLI tools; Socket’s AI scanner detected the supply chain attack and flagged the malware.
Security News
CISA’s 2025 draft SBOM guidance adds new fields like hashes, licenses, and tool metadata to make software inventories more actionable.