
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
A minimal, elegant, and robust async RabbitMQ client for Node.js
coniglio (Italian for “rabbit”) is a modern wrapper around RabbitMQ designed to be dead-simple to use, resilient in production, and fully composable with async iterators and streaming libraries. Inspired by libraries like postgres, it gives you just the right abstraction for real-world systems without hiding the power of AMQP.
for await…of streaming API — process messages naturally with backpressureack() / nack() — total control over message flowp-map, exstream.js, highland, standard Node streams and moreamqplib under the hoodnpm install coniglio
import coniglio from 'coniglio'
const conn = await coniglio('amqp://localhost')
// Configure queues and exchanges
// (optional, useful for bootstrapping or tests)
await conn.configure({
queues: [
{ name: 'jobs.email', durable: true }
],
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
]
})
// Consume messages from a queue
for await (const msg of conn.listen('jobs.email')) {
await sendEmail(msg.data)
conn.ack(msg)
}
// Publish messages to an exchange
setInterval(() => {
conn.publish('domain.events', 'jobs.email', { message: 'hello world' })
}, 200)
If you’ve used amqplib, you know it’s the canonical RabbitMQ library for Node.js — powerful, stable, and low-level. But it leaves you wiring:
Coniglio wraps amqplib with a modern, minimal layer built for real apps.
You get the same underlying power, but with an API that feels natural and production-ready.
| Feature | amqplib | coniglio ✅ |
|---|---|---|
| Promise API | ⚠️ Basic (thenable) | ✅ Fully async/await |
| Manual reconnects | ❌ You handle it | ✅ Built-in |
| Message streaming | ❌ No | ✅ for await...of |
| Built-in JSON decoding | ❌ Raw Buffer | ✅ On by default |
Safe manual ack() / nack() | ✅ Yes | ✅ Ergonomic handling |
| Channel separation (pub/sub) | ❌ Manual | ✅ Automatic |
| Backpressure-friendly | ❌ Needs plumbing | ✅ Native support |
| TypeScript types | ⚠️ Community | ✅ First-class |
🐇 Coniglio is Italian for “rabbit” — simple, fast, and alert.
for await (const msg of coniglio.listen('my-queue')) {
try {
await handle(msg.body)
msg.ack()
} catch (err) {
msg.nack()
}
}
const conn = await coniglio(url, opts?)Creates a new connection instance.
const conn = await coniglio('amqp://localhost', {
logger: console, // optional custom logger
json: true, // default: true (parse JSON messages)
prefetch: 10, // default: 10 (number of unacknowledged messages)
})
If you want to use a custom logger, it should implement the Logger interface:
export interface Logger {
debug(...args: any[]): void
info(...args: any[]): void
warn(...args: any[]): void
error(...args: any[]): void
}
Example with pino:
import pino from 'pino'
const log = pino({ level: 'debug' })
const conn = await coniglio('amqp://localhost', { logger: log })
conn.listen(queue, opts?)Returns an async iterator of messages consumed from a queue.
interface ListenOptions {
json?: boolean // default: true (parse JSON)
prefetch?: number // default: 10
routingKeys?: (keyof T)[] // optional filter, TypeScript-safe
}
Each yielded msg: Message<T> has:
interface Message<T> {
raw: amqp.ConsumeMessage // original AMQP message
content: Buffer // raw message body
data?: T // parsed JSON if json = true
contentIsJson: boolean // true if JSON parsed successfully
routingKey: string // the routing key
}
for await (const msg of conn.listen('my-queue', { prefetch: 100 })) {
// msg.data is parsed JSON
if (msg.contentIsJson) {
processData(msg.data)
} else {
// handle parsing error
console.warn('Failed to parse JSON:', msg.content.toString())
}
conn.ack(msg)
}
for await (const msg of conn.listen('my-queue', { json: false })) {
// msg.data === undefined
// use msg.content (Buffer) directly
processRaw(msg.content)
conn.ack(msg)
}
conn.ack(msg) / conn.nack(msg, requeue = false)Manually acknowledge or reject a message.
conn.ack(msg)
conn.nack(msg, true) // requeue = true
conn.publish(exchange, routingKey, payload, opts?)Publish a message. Payload is serialized with JSON.stringify under the hood.
await conn.publish('domain.events', 'user.created', { userId: '123' }, { priority: 5 })
conn.configure({ queues, exchanges })Declare queues and exchanges explicitly. Useful for bootstrapping, tests, and dynamic setups.
await conn.configure({
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
],
queues: [
{
name: 'jobs.email',
durable: true,
bindTo: [{ exchange: 'domain.events', routingKey: 'jobs.email' }]
}
]
})
interface ConfigureOptions {
exchanges?: {
name: string
type: 'topic' | 'fanout' | 'direct' | 'headers'
durable?: boolean
autoDelete?: boolean
internal?: boolean
arguments?: Record<string, any>
}[]
queues?: {
name: string
durable?: boolean
exclusive?: boolean
autoDelete?: boolean
deadLetterExchange?: string
messageTtl?: number
maxLength?: number
arguments?: Record<string, any>
bindTo?: {
exchange: string
routingKey: string
arguments?: Record<string, any>
}[]
}[]
}
await conn.close()Gracefully close all channels and the underlying connection. Use this during application shutdown.
coniglio doesn’t manage your concurrency. It just delivers messages as an async generator. Use your favorite tool:
import pMap from 'p-map'
await pMap(
conn.listen('jobs.video'),
async msg => {
await transcode(msg.data)
conn.ack(msg)
},
{ concurrency: 5 }
)
Or go reactive:
import { pipeline } from 'exstream'
await pipeline(
conn.listen('metrics'),
s => s.map(msg => parse(msg.data)),
s => s.forEach(logMetric)
)
const prod = coniglio('amqp://prod-host')
const qa = coniglio('amqp://qa-host')
await prod.publish('events', 'prod.ready', { ok: true })
await qa.publish('events', 'qa.ready', { ok: true })
If you are using TypeScript and want full type safety over your messages, you can pass a generic type map to coniglio():
type RouteKeyMap = {
'user.created': { userId: string }
'invoice.sent': { invoiceId: string; total: number }
}
const r = await coniglio<RouteKeyMap>('amqp://localhost')
// Consume all events from a queue
for await (const msg of conn.listen('queue.main')) {
switch (msg.event) {
case 'user.created': {
msg.data.userId // ✅ typed as string
break
}
case 'invoice.sent': {
msg.data.invoiceId // ✅ typed as string
msg.data.total // ✅ typed as number
break
}
}
}
// Or filter statically by known events (with narrow typing)
for await (const msg of conn.listen('queue.main', { routeKeys: ['user.created'] })) {
msg.data.userId // ✅ typed as string
}
Coniglio is designed to thrive in real-world setups with high message throughput and resilience requirements.
✔️ Backpressure support
Using native for await...of, you get natural backpressure without buffering hell.
✔️ Controlled flow Acknowledge or retry only when you're ready — no leaking messages or auto-ack surprises.
✔️ Crash-safe reconnect
If RabbitMQ restarts, coniglio handles reconnection, re-initialization, and queue rebinds with zero config.
✔️ Works in microservices & monoliths Use it in a Fastify server, a background worker, or a Kubernetes job — it's just an async iterator.
✔️ Easy to observe and test You own the consumer loop. Add metrics, tracing, or mocks wherever you need — no magic, no black boxes.
conn.stream() for full ReadableStream interop (with AbortSignal)MIT — as simple and open as the API itself.
FAQs
A minimal, elegant, and robust async RabbitMQ client for Node.js
We found that coniglio demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.