🚀 Socket Launch Week Day 5:Introducing Repository Access Permissions and Custom Roles.Learn more
Sign In

@nldoc/baseworker

Package Overview
Dependencies
Maintainers
3
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@nldoc/baseworker

This repository contains a foundational worker implementation for Typescript projects.

latest
npmnpm
Version
8.2.9
Version published
Maintainers
3
Created
Source

NLdoc Base Worker for TypeScript

pipeline status coverage report Latest Release NPM Version

A foundational worker implementation for TypeScript projects in the NLdoc ecosystem. This library provides a robust base class for creating workers that handle AMQP message processing with built-in file storage capabilities.

Overview

The Base Worker provides a complete foundation for building distributed document processing workers. It handles:

  • AMQP Connection Management: Automatic connection setup, health monitoring, and graceful reconnection
  • Message Queue Processing: Structured job processing with configurable prefetch limits
  • File Storage Integration: Built-in MinIO integration for handling document assets
  • Worker Discovery: Automatic worker registration and health reporting
  • Error Handling: Comprehensive error types and graceful failure handling
  • Logging Integration: Built-in structured logging with the NLdoc logger

Getting Started

Prerequisites

  • Node.js >= 22.0.0
  • npm >= 10.0.0
  • Access to an AMQP broker (RabbitMQ)
  • Optional: MinIO instance for file storage

Installation

Install the package via npm:

npm install @nldoc/baseworker

Or using yarn:

yarn add @nldoc/baseworker

Basic Usage

import { logger } from '@nldoc/logger'
import { Worker, WorkerConfig, WorkerMessageHandler } from '@nldoc/baseworker'

try {
    if (!process.env.EXCHANGE) {
        throw new Error('No exchange prefix defined')
    }

    const config: WorkerConfig = {
        name: 'your-awesome-worker-name',
        amqpHost: process.env.AMQP_HOST ?? 'localhost',
        amqpPass: process.env.AMQP_PASS,
        amqpPort: process.env.AMQP_PORT ?? '5672',
        amqpProtocol: process.env.AMQP_PROTOCOL ?? 'amqp',
        amqpUser: process.env.AMQP_USER,
        exchangePrefix: process.env.EXCHANGE,
        workerInstanceName: process.env.HOSTNAME ?? '',
        prefetchLimit: 10,
    }

    const handler: WorkerMessageHandler = async ({ job, storage }) => {
        // Get files from storage
        const fileBuffer = await storage.getFileBuffer({
            bucketName: job.bucketName,
            filePath: job.filePath
        })

        // Process the document
        // Your document processing logic here...

        return {
            success: true,
            result: 'Processing completed successfully',
            confidence: 100,
        }
    }

    const worker = new Worker({
        logger,
        messageHandler: handler,
        config,
    })

    await worker.start()
} catch (error) {
    if (error instanceof Error) {
        logger.error('Worker failed to start', { error: error.message })
    }
    throw error
}

Configuration

Worker Configuration

For all available configuration options, see the WorkerConfig type definition in src/types/WorkerConfig.ts.

Environment Variables

Common environment variables for configuration:

AMQP_HOST=localhost
AMQP_PORT=5672
AMQP_PROTOCOL=amqp
AMQP_USER=guest
AMQP_PASS=guest
EXCHANGE=your-exchange-prefix
HOSTNAME=worker-instance-01

Message Handler

Your message handler receives a context object with:

  • job: The parsed job data (type: WorkerJob)
  • storage: File storage interface for accessing assets
  • logger: Structured logger instance
const handler: WorkerMessageHandler = async ({ job, storage, logger }) => {
    logger.info('Processing job', { jobId: job.id })

    // Access job properties
    const { bucketName, filePath, metadata } = job

    // Get file from storage
    const fileBuffer = await storage.getFileBuffer({ bucketName, filePath })

    // Process and return result
    return {
        success: true,
        result: { processedData: 'example' },
        confidence: 95,
        metadata: { processingTime: Date.now() }
    }
}

Worker Features

Automatic Health Monitoring

Workers automatically report their availability status and handle graceful shutdowns.

File Storage Integration

Built-in MinIO integration provides seamless file access:

// Get file as buffer
const buffer = await storage.getFileBuffer({ bucketName, filePath })

// Store result files
await storage.putFile({ bucketName: 'output', filePath: 'result.json', data: resultBuffer })

Error Handling

The library includes comprehensive error types:

  • ConnectionError: AMQP connection issues
  • InvalidJob: Malformed job data
  • WorkerError: General worker errors
  • ExternalFileStorageNotReadyError: Storage unavailable

Development

Local Setup

  • Clone the repository:

    git clone https://gitlab.com/logius/nldoc/lib/typescript/baseworker.git
    cd baseworker
    
  • Install dependencies:

    npm install
    
  • Build the project:

    npm run build
    

Available Scripts

  • npm run test - Run the test suite with Vitest
  • npm run build - Build TypeScript to JavaScript
  • npm run check - Type check without emitting files
  • npm run lint - Lint the codebase using ESLint
  • npm run format - Format code using Prettier
  • npm run format:check - Check code formatting
  • npm run fix - Auto-fix linting and formatting issues

Testing

Run the test suite:

npm test

Contributing

We welcome contributions! Please ensure:

  • All tests pass (npm test)
  • Code is properly formatted (npm run format:check)
  • Linting rules are followed (npm run lint)
  • Type checking passes (npm run check)

License

This project is licensed under the European Union Public License 1.2 - see LICENSE for details.

Acknowledgements

  • Built on AMQP for reliable message queuing
  • Integrates with MinIO for object storage

FAQs

Package last updated on 18 Aug 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts