You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

github.com/CrisisTextLine/modular/modules/eventbus

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/CrisisTextLine/modular/modules/eventbus

v0.1.0
Source
Go
Version published
Created
Source

EventBus Module

Go Reference

The EventBus Module provides a publish-subscribe messaging system for Modular applications. It enables decoupled communication between components through a flexible event-driven architecture.

Features

  • In-memory event publishing and subscription
  • Support for both synchronous and asynchronous event handling
  • Topic-based routing
  • Event history tracking
  • Configurable worker pool for asynchronous event processing
  • Extensible design with support for external message brokers

Installation

import (
    "github.com/CrisisTextLine/modular"
    "github.com/CrisisTextLine/modular/modules/eventbus"
)

// Register the eventbus module with your Modular application
app.RegisterModule(eventbus.NewModule())

Configuration

The eventbus module can be configured using the following options:

eventbus:
  engine: memory              # Event bus engine (memory, redis, kafka)
  maxEventQueueSize: 1000     # Maximum events to queue per topic
  defaultEventBufferSize: 10  # Default buffer size for subscription channels
  workerCount: 5              # Worker goroutines for async event processing
  eventTTL: 3600              # TTL for events in seconds (1 hour)
  retentionDays: 7            # Days to retain event history
  externalBrokerURL: ""       # URL for external message broker (if used)
  externalBrokerUser: ""      # Username for external message broker (if used)
  externalBrokerPassword: ""  # Password for external message broker (if used)

Usage

Accessing the EventBus Service

// In your module's Init function
func (m *MyModule) Init(app modular.Application) error {
    var eventBusService *eventbus.EventBusModule
    err := app.GetService("eventbus.provider", &eventBusService)
    if err != nil {
        return fmt.Errorf("failed to get event bus service: %w", err)
    }
    
    // Now you can use the event bus service
    m.eventBus = eventBusService
    return nil
}

Using Interface-Based Service Matching

// Define the service dependency
func (m *MyModule) RequiresServices() []modular.ServiceDependency {
    return []modular.ServiceDependency{
        {
            Name:               "eventbus",
            Required:           true,
            MatchByInterface:   true,
            SatisfiesInterface: reflect.TypeOf((*eventbus.EventBus)(nil)).Elem(),
        },
    }
}

// Access the service in your constructor
func (m *MyModule) Constructor() modular.ModuleConstructor {
    return func(app modular.Application, services map[string]any) (modular.Module, error) {
        eventBusService := services["eventbus"].(eventbus.EventBus)
        return &MyModule{eventBus: eventBusService}, nil
    }
}

Publishing Events

// Publish a simple event
err := eventBusService.Publish(ctx, "user.created", user)
if err != nil {
    // Handle error
}

// Publish an event with metadata
metadata := map[string]interface{}{
    "source": "user-service",
    "version": "1.0",
}

event := eventbus.Event{
    Topic:    "user.created",
    Payload:  user,
    Metadata: metadata,
}

err = eventBusService.Publish(ctx, event)
if err != nil {
    // Handle error
}

Subscribing to Events

// Synchronous subscription
subscription, err := eventBusService.Subscribe(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    user := event.Payload.(User)
    fmt.Printf("User created: %s\n", user.Name)
    return nil
})

if err != nil {
    // Handle error
}

// Asynchronous subscription (handler runs in a worker goroutine)
asyncSub, err := eventBusService.SubscribeAsync(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    // This function is executed asynchronously
    user := event.Payload.(User)
    time.Sleep(1 * time.Second) // Simulating work
    fmt.Printf("Processed user asynchronously: %s\n", user.Name)
    return nil
})

// Unsubscribe when done
defer eventBusService.Unsubscribe(ctx, subscription)
defer eventBusService.Unsubscribe(ctx, asyncSub)

Working with Topics

// List all active topics
topics := eventBusService.Topics()
fmt.Println("Active topics:", topics)

// Get subscriber count for a topic
count := eventBusService.SubscriberCount("user.created")
fmt.Printf("Subscribers for 'user.created': %d\n", count)

Event Handling Best Practices

  • Keep Handlers Lightweight: Event handlers should be quick and efficient, especially for synchronous subscriptions

  • Error Handling: Always handle errors in your event handlers, especially for async handlers

  • Topic Organization: Use hierarchical topics like "domain.event.action" for better organization

  • Type Safety: Consider defining type-safe wrappers around the event bus for specific event types

  • Context Usage: Use the provided context to implement cancellation and timeouts

Implementation Notes

  • The in-memory event bus uses channels to distribute events to subscribers
  • Asynchronous handlers are executed in a worker pool to limit concurrency
  • Event history is retained based on the configured retention period
  • The module is extensible to support external message brokers in the future

Testing

The eventbus module includes tests for module initialization, configuration, and lifecycle management.

FAQs

Package last updated on 10 Jul 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

About

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc

U.S. Patent No. 12,346,443 & 12,314,394. Other pending.