New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

amqplib-init

Package Overview
Dependencies
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib-init

消息队列初始化 - 移除PM2依赖,改用Supervisor管理 (支持15分钟长任务)

latest
npmnpm
Version
1.3.4
Version published
Weekly downloads
0
-100%
Maintainers
1
Weekly downloads
 
Created
Source

amqplib-init

一个强大且易用的 RabbitMQ 消息队列初始化和管理库,支持自动重连、消息处理、长任务执行等功能。

🚀 特性

  • 模块化架构 - 清晰的模块分离,易于维护和扩展
  • 自动重连 - 连接断开时自动重连,确保服务稳定性
  • 消息处理 - 内置消息确认/拒绝机制,支持延迟确认
  • 长任务支持 - 支持最长15分钟的消息处理任务
  • 错误处理 - 智能处理空消息和格式错误的消息,防止阻塞
  • 自动重载 - 支持自动重载监控
  • 配置灵活 - 支持多种配置方式和自动配置获取
  • 代码保护 - 打包后代码经过混淆和压缩,保护源码安全

📦 安装

npm install amqplib-init

🔧 快速开始

CommonJS (Node.js require)

const { init } = require('amqplib-init');

// 初始化消息队列
await init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  callback: async (message) => {
    console.log('收到消息:', message);
    await processMessage(message);
  }
});

ES Module (import)

import { init, AMQPInitializer } from 'amqplib-init';

// 方式1: 使用 init 函数
await init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  callback: async (message) => {
    console.log('收到消息:', message);
  }
});

// 方式2: 使用类实例
const initializer = new AMQPInitializer();
await initializer.init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  callback: async (message) => {
    console.log('收到消息:', message);
  }
});

TypeScript

import { init, AmqplibInitOptions } from 'amqplib-init';

const options: AmqplibInitOptions = {
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  prefetch: 10,
  callback: async (message: any) => {
    console.log('收到消息:', message);
  }
};

await init(options);

高级用法

const { AMQPInitializer } = require('amqplib-init');

const initializer = new AMQPInitializer();

await initializer.init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  prefetch: 1,
  durable: true,
  delay: 1000,
  heartbeat: 60,
  timeout: 300000,
  messageTimeout: 900000,
  callback: async (message) => {
    // 消息处理逻辑
    return processMessage(message);
  },
  finish: () => {
    console.log('初始化完成');
  },
  initHook: async ({ channel, connection }) => {
    // 初始化钩子
    console.log('连接已建立');
  }
});

// 获取状态信息
console.log(initializer.getStatus());

// 优雅关闭
await initializer.shutdown();

⚙️ 配置参数

参数类型默认值描述
channelNamestring'node-test-channel'队列名称
amqpLinkstring''RabbitMQ 连接地址
amqpAutoLinkstring''自动获取连接地址的API
prefetchnumber1预取消息数量
durablebooleantrue队列持久化
delaynumber0消息确认延迟时间(ms)
heartbeatnumber60心跳间隔(秒)
timeoutnumber300000连接超时时间(ms)
messageTimeoutnumber900000消息处理超时时间(ms)
reconnectDelaynumber5000重连延迟时间(ms)
maxReconnectAttemptsnumber10最大重连次数
autoReloadnumber0自动重载间隔(秒)
callbackfunction() => {}消息处理回调
finishfunction() => {}初始化完成回调
initHookfunction() => {}初始化钩子
queryHookfunction() => {}查询钩子

🛡️ 错误处理

v1.2.3+ 版本增强了错误处理能力:

  • 空消息处理 - 自动识别并确认空消息,防止消费阻塞
  • JSON解析错误 - 智能处理格式错误的消息,避免无限循环
  • 连接异常 - 自动重连机制,确保服务稳定性
// 库会自动处理以下情况:
// - 空消息内容
// - null 或 undefined 消息
// - 无效的 JSON 格式
// - 连接断开重连

🔍 状态监控

const status = initializer.getStatus();
console.log({
  isConnected: status.isConnected,        // 连接状态
  processingCount: status.processingCount, // 处理中消息数量
  autoReloaderActive: status.autoReloaderActive, // 自动重载状态
  channelName: status.channelName         // 当前队列名
});

🚨 注意事项

  • 长任务支持 - 支持最长15分钟的消息处理,超时会自动处理
  • 消息确认 - 消息处理成功后会自动确认,失败会重新排队
  • 重连机制 - 连接断开时会自动重连并恢复消息处理
  • 内存管理 - 内置消息状态跟踪,防止内存泄漏

📝 更新日志

v1.2.3 (2025-08-25)

  • 🐛 修复 JSON 解析错误导致的消息处理阻塞
  • ✨ 增加空消息和无效消息的自动确认机制
  • 📝 完善错误日志输出,便于问题排查

v1.2.1

  • 🔧 模块化架构重构
  • ✨ 支持15分钟长任务处理
  • 🚀 优化自动重连逻辑

📄 许可证

ISC

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📞 支持

如果您在使用过程中遇到问题,请通过以下方式获取支持:

  • 查看本文档的常见问题
  • 提交 GitHub Issue
  • 查看示例代码 demo.js

Keywords

rabbitmq

FAQs

Package last updated on 24 Jan 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts