haraka-plugin-redis
Advanced tools
Comparing version 2.0.6 to 2.0.7
345
index.js
@@ -1,204 +0,212 @@ | ||
'use strict'; | ||
'use strict' | ||
/* global server */ | ||
const redis = require('redis'); | ||
const redis = require('redis') | ||
exports.register = function () { | ||
this.load_redis_ini(); | ||
this.load_redis_ini() | ||
// another plugin has called us with: inherits('haraka-plugin-redis') | ||
if (this.name !== 'redis') return; | ||
// another plugin has called us with: inherits('haraka-plugin-redis') | ||
if (this.name !== 'redis') return | ||
// register when 'redis' is declared in config/plugins | ||
this.register_hook('init_master', 'init_redis_shared'); | ||
this.register_hook('init_child', 'init_redis_shared'); | ||
// register when 'redis' is declared in config/plugins | ||
this.register_hook('init_master', 'init_redis_shared') | ||
this.register_hook('init_child', 'init_redis_shared') | ||
} | ||
const defaultOpts = { socket: { host: '127.0.0.1', port: '6379' } } | ||
const socketOpts = [ 'host', 'port', 'path', 'tls', 'connectTimeout', 'noDelay', 'keepAlive', 'reconnectStrategy' ] | ||
const socketOpts = [ | ||
'host', | ||
'port', | ||
'path', | ||
'tls', | ||
'connectTimeout', | ||
'noDelay', | ||
'keepAlive', | ||
'reconnectStrategy', | ||
] | ||
exports.load_redis_ini = function () { | ||
const plugin = this; | ||
const plugin = this | ||
// store redis cfg at redisCfg, to avoid conflicting with plugins that | ||
// inherit this plugin and have *their* config at plugin.cfg | ||
plugin.redisCfg = plugin.config.get('redis.ini', function () { | ||
plugin.load_redis_ini(); | ||
}); | ||
// store redis cfg at redisCfg, to avoid conflicting with plugins that | ||
// inherit this plugin and have *their* config at plugin.cfg | ||
plugin.redisCfg = plugin.config.get('redis.ini', () => { | ||
plugin.load_redis_ini() | ||
}) | ||
// backwards compat | ||
if (plugin.redisCfg?.server?.ip && !plugin.redisCfg?.server?.host) { | ||
plugin.redisCfg.server.host = plugin.redisCfg.server.ip | ||
delete plugin.redisCfg.server.ip | ||
} | ||
if (plugin.redisCfg.db && !plugin.redisCfg.database) { | ||
plugin.redisCfg.database = plugin.redisCfg.db | ||
delete plugin.redisCfg.db | ||
} | ||
// backwards compat | ||
if (plugin.redisCfg?.server?.ip && !plugin.redisCfg?.server?.host) { | ||
plugin.redisCfg.server.host = plugin.redisCfg.server.ip | ||
delete plugin.redisCfg.server.ip | ||
} | ||
if (plugin.redisCfg.db && !plugin.redisCfg.database) { | ||
plugin.redisCfg.database = plugin.redisCfg.db | ||
delete plugin.redisCfg.db | ||
} | ||
plugin.redisCfg.server = Object.assign({}, defaultOpts, plugin.redisCfg.opts, plugin.redisCfg.server); | ||
plugin.redisCfg.pubsub = Object.assign({}, defaultOpts, plugin.redisCfg.opts, plugin.redisCfg.pubsub); | ||
plugin.redisCfg.server = | ||
{ ...defaultOpts, ...plugin.redisCfg.opts, ...plugin.redisCfg.server }; | ||
plugin.redisCfg.pubsub = | ||
{ ...defaultOpts, ...plugin.redisCfg.opts, ...plugin.redisCfg.pubsub }; | ||
// socket options. In redis < 4, the options like host and port were | ||
// top level, now they're in socket.*. Permit legacy configs to still work | ||
for (const s of [ 'server', 'pubsub' ]) { | ||
for (const o of socketOpts) { | ||
if (plugin.redisCfg[s][o]) plugin.redisCfg[s].socket[o] = plugin.redisCfg[s][o] | ||
delete plugin.redisCfg[s][o] | ||
} | ||
// socket options. In redis < 4, the options like host and port were | ||
// top level, now they're in socket.*. Permit legacy configs to still work | ||
for (const s of ['server', 'pubsub']) { | ||
for (const o of socketOpts) { | ||
if (plugin.redisCfg[s][o]) | ||
plugin.redisCfg[s].socket[o] = plugin.redisCfg[s][o] | ||
delete plugin.redisCfg[s][o] | ||
} | ||
} | ||
} | ||
exports.merge_redis_ini = function () { | ||
if (!this.cfg) this.cfg = {} // no <plugin>.ini loaded? | ||
if (!this.cfg.redis) this.cfg.redis = {} // no [redis] in <plugin>.ini file | ||
if (!this.redisCfg) this.load_redis_ini() | ||
if (!this.cfg) this.cfg = {}; // no <plugin>.ini loaded? | ||
if (!this.cfg.redis) this.cfg.redis = {}; // no [redis] in <plugin>.ini file | ||
if (!this.redisCfg) this.load_redis_ini(); | ||
this.cfg.redis = Object.assign({}, this.redisCfg.server, this.cfg.redis) | ||
this.cfg.redis = Object.assign({}, this.redisCfg.server, this.cfg.redis); | ||
// backwards compatibility | ||
for (const o of socketOpts) { | ||
if (this.cfg.redis[o] === undefined) continue | ||
this.cfg.redis.socket[o] = this.cfg.redis[o] | ||
delete this.cfg.redis[o] | ||
} | ||
if (this.cfg.redis.db && !this.cfg.redis.database) { | ||
this.cfg.redis.database = this.cfg.redis.db | ||
delete this.cfg.redis.db | ||
} | ||
// backwards compatibility | ||
for (const o of socketOpts) { | ||
if (this.cfg.redis[o] === undefined) continue | ||
this.cfg.redis.socket[o] = this.cfg.redis[o] | ||
delete this.cfg.redis[o] | ||
} | ||
if (this.cfg.redis.db && !this.cfg.redis.database) { | ||
this.cfg.redis.database = this.cfg.redis.db | ||
delete this.cfg.redis.db | ||
} | ||
} | ||
exports.init_redis_shared = function (next, server) { | ||
let calledNext = false | ||
function nextOnce(e) { | ||
if (e) this.logerror(`Redis error: ${e.message}`) | ||
if (calledNext) return | ||
calledNext = true | ||
next() | ||
} | ||
let calledNext = false; | ||
function nextOnce (e) { | ||
if (e) this.logerror(`Redis error: ${e.message}`); | ||
if (calledNext) return; | ||
calledNext = true; | ||
next(); | ||
} | ||
// this is the server-wide redis, shared by plugins that don't | ||
// specify a db ID. | ||
if (!server.notes.redis) { | ||
this.get_redis_client(this.redisCfg.server).then((client) => { | ||
server.notes.redis = client | ||
nextOnce() | ||
}) | ||
return | ||
} | ||
// this is the server-wide redis, shared by plugins that don't | ||
// specificy a db ID. | ||
if (!server.notes.redis) { | ||
this.get_redis_client(this.redisCfg.server).then(client => { | ||
server.notes.redis = client | ||
nextOnce() | ||
}) | ||
return | ||
} | ||
server.notes.redis.ping((err) => { | ||
if (err) return nextOnce(err) | ||
server.notes.redis.ping((err, res) => { | ||
if (err) return nextOnce(err); | ||
this.loginfo('already connected'); | ||
nextOnce(); // connection is good | ||
}); | ||
this.loginfo('already connected') | ||
nextOnce() // connection is good | ||
}) | ||
} | ||
exports.init_redis_plugin = function (next, server) { | ||
const plugin = this; | ||
const plugin = this | ||
// this function is called by plugins at init_*, to establish their | ||
// shared or unique redis db handle. | ||
// this function is called by plugins at init_*, to establish their | ||
// shared or unique redis db handle. | ||
let calledNext=false; | ||
function nextOnce () { | ||
if (calledNext) return; | ||
calledNext = true; | ||
next(); | ||
} | ||
let calledNext = false | ||
function nextOnce() { | ||
if (calledNext) return | ||
calledNext = true | ||
next() | ||
} | ||
// for tests that do not load a shared config | ||
if (!plugin.cfg) { | ||
plugin.cfg = { redis: {} }; | ||
if (plugin.redisCfg) plugin.cfg.redis = JSON.parse(JSON.stringify(plugin.redisCfg)) | ||
} | ||
if (!server) server = { notes: {} }; | ||
// for tests that do not load a shared config | ||
if (!plugin.cfg) { | ||
plugin.cfg = { redis: {} } | ||
if (plugin.redisCfg) | ||
plugin.cfg.redis = JSON.parse(JSON.stringify(plugin.redisCfg)) | ||
} | ||
if (!server) server = { notes: {} } | ||
const pidb = plugin.cfg.redis.database; | ||
if (server.notes.redis) { // server-wide redis is available | ||
// and the DB not specified or is the same as server-wide | ||
if (pidb === undefined || pidb === plugin.redisCfg.db) { | ||
server.loginfo(plugin, 'using server.notes.redis'); | ||
plugin.db = server.notes.redis; | ||
nextOnce(); | ||
return; | ||
} | ||
const pidb = plugin.cfg.redis.database | ||
if (server.notes.redis) { | ||
// server-wide redis is available | ||
// and the DB not specified or is the same as server-wide | ||
if (pidb === undefined || pidb === plugin.redisCfg.db) { | ||
server.loginfo(plugin, 'using server.notes.redis') | ||
plugin.db = server.notes.redis | ||
nextOnce() | ||
return | ||
} | ||
} | ||
plugin.get_redis_client(plugin.cfg.redis).then(client => { | ||
plugin.db = client | ||
nextOnce() | ||
}) | ||
plugin.get_redis_client(plugin.cfg.redis).then((client) => { | ||
plugin.db = client | ||
nextOnce() | ||
}) | ||
} | ||
exports.shutdown = function () { | ||
if (this.db) this.db.quit(); | ||
if (this.db) this.db.quit() | ||
if (server && server.notes && server.notes.redis) { | ||
server.notes.redis.quit(); | ||
} | ||
if (server && server.notes && server.notes.redis) { | ||
server.notes.redis.quit() | ||
} | ||
} | ||
exports.redis_ping = async function () { | ||
this.redis_pings = false | ||
if (!this.db) throw new Error('redis not initialized') | ||
this.redis_pings=false; | ||
if (!this.db) throw new Error('redis not initialized'); | ||
const r = await this.db.ping() | ||
if (r !== 'PONG') throw new Error('not PONG') | ||
this.redis_pings = true | ||
const r = await this.db.ping() | ||
if (r !== 'PONG') throw new Error('not PONG'); | ||
this.redis_pings=true | ||
return true | ||
return true | ||
} | ||
function getUriStr (client, opts) { | ||
let msg = `redis://${opts?.socket?.host}:${opts?.socket?.port}`; | ||
if (opts?.database) msg += `/${opts?.database}`; | ||
if (client?.server_info?.redis_version) { | ||
msg += `\tv${client?.server_info?.redis_version}`; | ||
} | ||
return msg; | ||
function getUriStr(client, opts) { | ||
let msg = `redis://${opts?.socket?.host}:${opts?.socket?.port}` | ||
if (opts?.database) msg += `/${opts?.database}` | ||
if (client?.server_info?.redis_version) { | ||
msg += `\tv${client?.server_info?.redis_version}` | ||
} | ||
return msg | ||
} | ||
exports.get_redis_client = async function (opts) { | ||
const client = redis.createClient(opts) | ||
const client = redis.createClient(opts) | ||
let urlStr | ||
let urlStr | ||
client | ||
.on('error', (err) => { | ||
this.logerror(err.message) | ||
}) | ||
.on('end', () => { | ||
this.loginfo(`Disconnected from ${urlStr}`) | ||
}) | ||
client | ||
.on('error', (err) => { | ||
this.logerror(err.message); | ||
}) | ||
.on('end', () => { | ||
this.loginfo(`Disconnected from ${urlStr}`); | ||
}) | ||
try { | ||
await client.connect() | ||
try { | ||
await client.connect() | ||
if (opts?.database) client.dbid = opts?.database | ||
if (opts?.database) client.dbid = opts?.database | ||
client.server_info = await client.info() | ||
urlStr = getUriStr(client, opts) | ||
this.loginfo(`connected to ${urlStr}`) | ||
client.server_info = await client.info() | ||
urlStr = getUriStr(client, opts) | ||
this.loginfo(`connected to ${urlStr}`); | ||
return client | ||
} | ||
catch (e) { | ||
console.error(e) | ||
this.logerror(e); | ||
} | ||
return client | ||
} catch (e) { | ||
console.error(e) | ||
this.logerror(e) | ||
} | ||
} | ||
exports.get_redis_pub_channel = function (conn) { | ||
return `result-${conn.transaction ? conn.transaction.uuid : conn.uuid}`; | ||
return `result-${conn.transaction ? conn.transaction.uuid : conn.uuid}` | ||
} | ||
exports.get_redis_sub_channel = function (conn) { | ||
return `result-${conn.uuid}*`; | ||
return `result-${conn.uuid}*` | ||
} | ||
@@ -208,10 +216,9 @@ | ||
exports.redis_subscribe_pattern = async function (pattern, onMessage) { | ||
if (this.redis) return // already subscribed? | ||
if (this.redis) return // already subscribed? | ||
this.redis = redis.createClient(this.redisCfg.pubsub) | ||
await this.redis.connect() | ||
this.redis = redis.createClient(this.redisCfg.pubsub) | ||
await this.redis.connect() | ||
await this.redis.pSubscribe(pattern, onMessage); | ||
this.logdebug(this, `pSubscribed to ${pattern}`); | ||
await this.redis.pSubscribe(pattern, onMessage) | ||
this.logdebug(this, `pSubscribed to ${pattern}`) | ||
} | ||
@@ -221,33 +228,31 @@ | ||
exports.redis_subscribe = async function (connection, onMessage) { | ||
if (connection.notes.redis) { | ||
connection.logdebug(this, `redis already subscribed`) | ||
return // another plugin has already called this. | ||
} | ||
if (connection.notes.redis) { | ||
connection.logdebug(this, `redis already subscribed`); | ||
return; // another plugin has already called this. | ||
} | ||
const timer = setTimeout(() => { | ||
connection.logerror('redis subscribe timed out') | ||
}, 3 * 1000) | ||
const timer = setTimeout(() => { | ||
connection.logerror('redis subscribe timed out'); | ||
}, 3 * 1000); | ||
connection.notes.redis = redis.createClient(this.redisCfg.pubsub) | ||
await connection.notes.redis.connect() | ||
connection.notes.redis = redis.createClient(this.redisCfg.pubsub) | ||
await connection.notes.redis.connect() | ||
clearTimeout(timer) | ||
clearTimeout(timer); | ||
const pattern = this.get_redis_sub_channel(connection) | ||
connection.notes.redis.pSubscribe(pattern, onMessage); | ||
connection.logdebug(this, `pSubscribed to ${pattern}`); | ||
const pattern = this.get_redis_sub_channel(connection) | ||
connection.notes.redis.pSubscribe(pattern, onMessage) | ||
connection.logdebug(this, `pSubscribed to ${pattern}`) | ||
} | ||
exports.redis_unsubscribe = async function (connection) { | ||
if (!connection.notes.redis) { | ||
connection.logerror(this, `redis_unsubscribe called when no redis`) | ||
return | ||
} | ||
if (!connection.notes.redis) { | ||
connection.logerror(this, `redis_unsubscribe called when no redis`) | ||
return; | ||
} | ||
const pattern = this.get_redis_sub_channel(connection) | ||
await connection.notes.redis.unsubscribe(pattern); | ||
connection.logdebug(this, `unsubsubscribed from ${pattern}`); | ||
connection.notes.redis.quit(); | ||
const pattern = this.get_redis_sub_channel(connection) | ||
await connection.notes.redis.unsubscribe(pattern) | ||
connection.logdebug(this, `unsubsubscribed from ${pattern}`) | ||
connection.notes.redis.quit() | ||
} |
{ | ||
"name": "haraka-plugin-redis", | ||
"version": "2.0.6", | ||
"version": "2.0.7", | ||
"description": "Redis plugin for Haraka & other plugins to inherit from", | ||
"main": "index.js", | ||
"files": [ | ||
"config" | ||
], | ||
"directories": { | ||
@@ -10,16 +13,17 @@ "test": "test" | ||
"dependencies": { | ||
"redis": "4" | ||
"redis": "^4.6.0" | ||
}, | ||
"devDependencies": { | ||
"eslint": "8", | ||
"eslint-plugin-haraka": "*", | ||
"haraka-test-fixtures": "*", | ||
"mocha": "9" | ||
"@haraka/eslint-config": "^1.1.3", | ||
"haraka-test-fixtures": "1.3.7" | ||
}, | ||
"scripts": { | ||
"lint": "npx eslint *.js test", | ||
"lintfix": "npx eslint --fix *.js test", | ||
"cover": "NODE_ENV=cov npx nyc --reporter=lcovonly npm run test", | ||
"format": "npm run prettier:fix && npm run lint:fix", | ||
"lint": "npx eslint@^8 *.js test", | ||
"lint:fix": "npx eslint@^8 *.js test --fix", | ||
"prettier": "npx prettier . --check", | ||
"prettier:fix": "npx prettier . --write --log-level=warn", | ||
"test": "npx mocha@10", | ||
"versions": "npx dependency-version-checker check", | ||
"test": "npx mocha" | ||
"versions:fix": "npx dependency-version-checker update && npm run prettier:fix" | ||
}, | ||
@@ -32,2 +36,3 @@ "repository": { | ||
"haraka", | ||
"haraka-plugin", | ||
"mail", | ||
@@ -34,0 +39,0 @@ "smtp", |
@@ -75,12 +75,12 @@ # haraka-plugin-redis | ||
exports.register = function () { | ||
this.inherits('redis'); | ||
this.inherits('redis') | ||
this.cfg = this.config.get('my-plugin.ini'); | ||
this.cfg = this.config.get('my-plugin.ini') | ||
// populate plugin.cfg.redis with defaults from redis.ini | ||
this.merge_redis_ini(); | ||
// populate plugin.cfg.redis with defaults from redis.ini | ||
this.merge_redis_ini() | ||
// cluster aware redis connection(s) | ||
this.register_hook('init_master', 'init_redis_plugin'); | ||
this.register_hook('init_child', 'init_redis_plugin'); | ||
// cluster aware redis connection(s) | ||
this.register_hook('init_master', 'init_redis_plugin') | ||
this.register_hook('init_child', 'init_redis_plugin') | ||
} | ||
@@ -91,3 +91,3 @@ ``` | ||
```` | ||
``` | ||
[INFO] [-] [redis] connected to redis://172.16.15.16:6379 v3.2.6 | ||
@@ -97,3 +97,3 @@ [INFO] [-] [limit] connected to redis://172.16.15.16:6379/1 v3.2.6 | ||
[INFO] [-] [known-senders] connected to redis://172.16.15.16:6379/3 v3.2.6 | ||
```` | ||
``` | ||
@@ -103,3 +103,2 @@ Notice the database ID numbers appended to each plugins redis connection | ||
[ci-img]: https://github.com/haraka/haraka-plugin-redis/actions/workflows/ci.yml/badge.svg | ||
@@ -106,0 +105,0 @@ [ci-url]: https://github.com/haraka/haraka-plugin-redis/actions/workflows/ci.yml |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2
12566
5
208
104
1
Updatedredis@^4.6.0