Security News
PyPI’s New Archival Feature Closes a Major Security Gap
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
amqp-as-promised
Advanced tools
A high-level promise-based API built on
amqplib
extended with functions for AMQP-based RPC.
Old versions of this package were based on node-amqp.
Syntax to access the library has been changed in 5.0 to improve connection management. See the Running-section for instructions.
The underlying amqp library was changed from
node-amqp
to amqplib
. Efforts have been made to keep everything as
backwards compatible as possible, but some things have changed:
queue.shift()
is no longer supported.Q
has been dropped in favor of native promises. As a result,
support for promise progress notifications over RPC is no longer
supported.npm install amqp-as-promised
5.0+
conf = require './myconf.json' # see example conf below
((require 'amqp-as-promised') conf.amqp).then (amqpc) ->
Earlier versions
conf = require './myconf.json' # see example conf below
amqpc = (require 'amqp-as-promised') conf.amqp
As of version 0.1.0, the following config parameters are accepted, although we also try to keep backwards compatibility with the older format.
connection
Connection settings accepted by node-amqp. You need to at minimum specify either
host
vhost
login
password
or
url
.rpc
timeout
: timeout in ms for rpc calls. Default: 1000mslogLevel
logLevel
: sets the log level. Defaults to INFO
. Possible levels
are DEBUG
, INFO
, WARN
, ERROR
errorHandler
Since 2.0.0 connection errors are rethrown to crash process.
errorHandler
: sets a handler function to receive the error instead
of throwing to process. This option is deprecated, as a better way to
do this is to attach an error
event handler.waitForConnection
Since 4.1.0
waitForConnection
: on startup, keeps retrying to connect until
successful. Will not attempt reconnect after established connection.{
"connection": {
"host": "192.168.0.10",
"vhost": "test",
"login": "test",
"password": "supersecret"
},
"logLevel": "warn",
"rpc": {
"timeout": 2000
}
}
Or with url:
{
"connection": {
"url": "amqp://myuser:supersecret@192.168.0.10/test"
},
"logLevel": "warn"
}
Amqp-as-promised emits error
events on unexpected network errors,
for example then the connection to the server has been lost. It is up
to the client to handle these errors, as amqp-as-promised doesn't
reconnect automatically. Keep in mind that error recovery can be
tricky, and the best option might be to just crash and restart the
application on error.
This is a simple but effective error handler:
amqpc.on 'error', (err) ->
console.log err
process.exit 1
If there are no error handlers attached (either using amqp.on()
or
setting the errorHandler
in the configuration), amqp-as-promised
will as a last resort throw the error. This will most likely result in
an application crash unless there is an uncaught exception handler set
on the process
.
amqpc
to publishamqpc.exchange('myexchange').then (ex) ->
msg = {}
msg.domain = domain
ex.publish('mytopic.foo', msg).then ->
console.log 'published message!'
amqpc
to bindThis is shorthand for binding and subscribing.
amqpc.bind 'myexchange', 'myqueue', 'mytopic.#', (msg, headers, del) ->
console.log 'received message', msg
To bind an anonymous queue.
amqpc.bind 'myexchange', '', 'mytopic.#', (msg, headers, del) ->
console.log 'received message', msg
Or even shorter
amqpc.bind 'myexchange', 'mytopic.#', (msg, headers, del) ->
console.log 'received message', msg
To bind the queue to the exchange without subscribing to it, skip the
last parameter (the subscription callback). This is essentially the
same as queue.bind myexchange, 'mytopic'
, except the exchange and
queue are specified by their names:
amqpc.bind 'myexchange', 'myqueue', 'mytopic.#'
amqpc
to get an anomymous queueTo create an anomymous queue.
amqpc.queue().then (q) -> console.log 'my queue', q
amqpc
to do RPC-style callsto send a message to a service that honors the replyTo/correlationId contract:
amqpc.rpc('myexchange', 'routing.key', msg, [headers], [options]).then (response) ->
console.log 'received message', response
headers
is an optional parameter holding any custom headers to be
passed on the RPC service.options
supports the following settings
timeout
- the timeout in ms for this callNote! In earlier versions the response was an array that included the response headers. As of version 0.1.0, this is no longer the case.
amqpc
to serve RPC-style callsTo set up a message consumer that automatically honors the replyTo/correlationId contract:
amqpc.serve 'myexchange', 'mytopic.#', (msg, headers, del) ->
return { result: 'ok' }
The value returned from the handler will be sent back on the queue
specified by the replyTo
header, with the correlationId
set.
If an exception is thrown by the handler, it will be propagated back to the client as an object:
{
"error": {
"message": <exception.message>,
[ "code": <exception.code>, ]
[ "errno": <exception.errno> ]
}
}
To rate limit the rpc calls to 5 concurrent, we use an options object
to set {ack:true, prefetchCount:5}
.
Notice that the message acking is handled by the rpc backend wrapper.
amqpc.serve 'myexchange', 'mytopic.#', {ack:true, prefetchCount:5}, (msg, headers, del) ->
return { result: 'ok' }
graceful = (opts) ->
log.info 'Shutting down'
amqpc.shutdown().then ->
process.exit 0
process.on 'SIGINT', graceful
process.on 'SIGTERM', graceful
amqpc
objectamqpc.on(event, handler)
Attach an event handler. Currently only error
events are supported.
amqpc.exchange(name, opts)
A promise for an exchange. If opts
is omitted, then passive:true
is assumed.
amqpc.queue(qname, opts)
A promise for a queue. If qname
is omitted, ""
is used. If opts is
omitted, then exclusive:true
is assumed if the name is empty, or
passive:true
if not.
Thus, amqpc.queue()
will create a new exclusive, anonymous, queue
that is automatically deleted on disconnect, while
amqpc.queue('my-queue')
will try to passively declare the existing
queue my-queue
.
See queue.*
below.
amqpc.bind(exchange, queue, topic[, callback])
Shorthand for
exchange
is a string, then look up the existing exchange with
that name.queue
is a string, then look up the existing queue with that name.exchange/topic
.callback
to queue (optional).exchange
- an exchange object or a string with the name of an
exchangequeue
- a queue object or a string with the name of a queuetopic
- a string with the topic name.callback
- see queue.subscribe
below.amqpc.shutdown()
Will unbind all queues and unsubscribe all callbacks then gracefully shut down the socket connection.
exchange
objectexchange.publish(routingKey, msg, options)
Publishes a message, returning a promise.
queue
objectqueue.bind(exchange, topic)
Binds the queue to the given exchange (object, or string). Will unbind if queue was already bound.
queue.unbind()
Unbinds the queue (if currently bound).
queue.subscribe(opts, callback)
Subscribes the callback to this queue. Will unsubscribe any previous
callback. If opts is omitted, defaults to ack: false, prefetchCount: 1
The callback will be called with arguments (msg, headers, deliveryinfo, actions)
, where actions
is an object that holds these methods:
acknowledge()
: returns a Promise to acknowledge the message. This is
only relevant if opts.ack
is false (which is the default).queue.unsubscribe()
Unsubscribes current callback (if any).
queue.name
Read only property with the queue name.
amqpc.rpc(exchange, routingKey, msg, [headers], [options])
Perform an AMQP-based remote procedure call, and returns a promise for the return value:
exchange
, with the
specified routingkey
, headers
and options
. The replyTo
and
correlationId
headers are set automatically.error
property set are assumed to be remote
errors, and will result in a rejected promise.exchange
- the name of an exchange, or an exchange objectroutingkey
headers
- AMQP headers to be sent with the message. See exchange.publish().options
- valid options are:
timeout
- timeout in milliseconds. If none is specified, the
default value specified when creating the client is used.compress
- set to true
to use payload compressionSince 0.4.0
The RPC mechanism has a transparent payload gzip compression of JSON
objects Buffer. When activated both request and response are
compressed. To activate, the rpc client must ask for compression by setting
the compress
option.
Example
amqpc.rpc('myexchange', 'routing.key', msg, [headers], {compress:true}).then (response) ->
console.log 'received message', response
FAQs
A promise-based AMQP API build on node-amqp
The npm package amqp-as-promised receives a total of 330 weekly downloads. As such, amqp-as-promised popularity was classified as not popular.
We found that amqp-as-promised demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
Research
Security News
Malicious npm package postcss-optimizer delivers BeaverTail malware, targeting developer systems; similarities to past campaigns suggest a North Korean connection.
Security News
CISA's KEV data is now on GitHub, offering easier access, API integration, commit history tracking, and automated updates for security teams and researchers.