@iobroker/db-states-redis
Advanced tools
Comparing version 4.0.0-alpha.9-20211115-5dac659e to 4.0.3
@@ -5,4 +5,4 @@ module.exports = { | ||
getDefaultPort: host => { | ||
return (host.includes(',')) ? 26379 : 6379; | ||
return host.includes(',') ? 26379 : 6379; | ||
} | ||
}; |
/** | ||
* States DB in redis - Client | ||
* | ||
* Copyright 2013-2021 bluefox <dogafox@gmail.com> | ||
* Copyright 2013-2022 bluefox <dogafox@gmail.com> | ||
* Copyright 2013-2014 hobbyquaker | ||
@@ -25,3 +25,10 @@ * | ||
function bufferJsonDecoder(key, value) { | ||
if (value && typeof value === 'object' && typeof value.type === 'string' && value.type === 'Buffer' && value.data && Array.isArray(value.data)) { | ||
if ( | ||
value && | ||
typeof value === 'object' && | ||
typeof value.type === 'string' && | ||
value.type === 'Buffer' && | ||
value.data && | ||
Array.isArray(value.data) | ||
) { | ||
return Buffer.from(value.data); | ||
@@ -33,3 +40,2 @@ } | ||
class StateRedisClient { | ||
constructor(settings) { | ||
@@ -42,2 +48,3 @@ this.settings = settings || {}; | ||
this.namespaceSession = (this.settings.namespaceSession || 'session') + '.'; | ||
this.metaNamespace = (this.settings.metaNamespace || 'meta') + '.'; | ||
@@ -48,2 +55,4 @@ this.globalMessageId = Math.round(Math.random() * 100000000); | ||
this.supportedProtocolVersions = ['4']; | ||
this.stop = false; | ||
@@ -61,2 +70,35 @@ this.client = null; | ||
/** | ||
* Checks if we are allowed to start and sets the protocol version accordingly | ||
* | ||
* @returns {Promise<void>} | ||
* @private | ||
*/ | ||
async _determineProtocolVersion() { | ||
let protoVersion; | ||
try { | ||
protoVersion = await this.client.get(`${this.metaNamespace}states.protocolVersion`); | ||
} catch (e) { | ||
if (e.message.includes('GET-UNSUPPORTED')) { | ||
// secondary updated and primary < 4.0 | ||
return; | ||
} | ||
} | ||
if (!protoVersion) { | ||
// if no proto version existent yet, we set ours | ||
const highestVersion = Math.max(...this.supportedProtocolVersions); | ||
await this.setProtocolVersion(highestVersion); | ||
this.activeProtocolVersion = highestVersion.toString(); | ||
return; | ||
} | ||
// check if we can support this version | ||
if (this.supportedProtocolVersions.includes(protoVersion)) { | ||
this.activeProtocolVersion = protoVersion; | ||
} else { | ||
throw new Error(`This host does not support protocol version "${protoVersion}"`); | ||
} | ||
} | ||
connectDb() { | ||
@@ -68,4 +110,2 @@ this.settings.connection = this.settings.connection || {}; | ||
const ioRegExp = new RegExp('^' + this.namespaceRedis.replace(/\./g, '\\.') + '[_A-Za-z0-9ÄÖÜäöüа-яА-Я]+'); // io.[_A-Za-z0-9]+ | ||
let ready = false; | ||
@@ -123,7 +163,11 @@ let initError = false; | ||
if (this.settings.connection.port === 0) { // Port = 0 means unix socket | ||
if (this.settings.connection.port === 0) { | ||
// Port = 0 means unix socket | ||
// initiate a unix socket connection | ||
this.settings.connection.options.path = this.settings.connection.host; | ||
this.log.debug(`${this.namespace} Redis States: Use File Socket for connection: ${this.settings.connection.options.path}`); | ||
} else if (Array.isArray(this.settings.connection.host)) { // Host is an array means we use a sentinel | ||
this.log.debug( | ||
`${this.namespace} Redis States: Use File Socket for connection: ${this.settings.connection.options.path}` | ||
); | ||
} else if (Array.isArray(this.settings.connection.host)) { | ||
// Host is an array means we use a sentinel | ||
const defaultPort = Array.isArray(this.settings.connection.port) ? null : this.settings.connection.port; | ||
@@ -134,8 +178,16 @@ this.settings.connection.options.sentinels = this.settings.connection.host.map((redisNode, idx) => ({ | ||
})); | ||
this.settings.connection.options.name = this.settings.connection.sentinelName ? this.settings.connection.sentinelName : 'mymaster'; | ||
this.log.debug(`${this.namespace} Redis States: Use Sentinel for connection: ${this.settings.connection.options.name}, ${JSON.stringify(this.settings.connection.options.sentinels)}`); | ||
this.settings.connection.options.name = this.settings.connection.sentinelName | ||
? this.settings.connection.sentinelName | ||
: 'mymaster'; | ||
this.log.debug( | ||
`${this.namespace} Redis States: Use Sentinel for connection: ${ | ||
this.settings.connection.options.name | ||
}, ${JSON.stringify(this.settings.connection.options.sentinels)}` | ||
); | ||
} else { | ||
this.settings.connection.options.host = this.settings.connection.host; | ||
this.settings.connection.options.port = this.settings.connection.port; | ||
this.log.debug(`${this.namespace} Redis States: Use Redis connection: ${this.settings.connection.options.host}:${this.settings.connection.options.port}`); | ||
this.log.debug( | ||
`${this.namespace} Redis States: Use Redis connection: ${this.settings.connection.options.host}:${this.settings.connection.options.port}` | ||
); | ||
} | ||
@@ -148,3 +200,4 @@ if (this.settings.connection.options.db === undefined) { | ||
} | ||
this.settings.connection.options.password = this.settings.connection.options.auth_pass || this.settings.connection.pass || null; | ||
this.settings.connection.options.password = | ||
this.settings.connection.options.auth_pass || this.settings.connection.pass || null; | ||
this.settings.connection.options.autoResubscribe = false; // We do our own resubscribe because other sometimes not work | ||
@@ -157,3 +210,6 @@ // REDIS does not allow whitespaces, we have some because of pid | ||
this.client.on('error', error => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} Redis ERROR States: (${this.stop}) ${error.message} / ${error.stack}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly( | ||
`${this.namespace} Redis ERROR States: (${this.stop}) ${error.message} / ${error.stack}` | ||
); | ||
if (this.stop) { | ||
@@ -166,3 +222,5 @@ return; | ||
if (error.message.startsWith('Protocol error, got "H" as reply type byte.')) { | ||
this.log.error(`${this.namespace} Could not connect to states database at ${this.settings.connection.options.host}:${this.settings.connection.options.port} (invalid protocol). Please make sure the configured IP and port points to a host running JS-Controller >= 2.0. and that the port is not occupied by other software!`); | ||
this.log.error( | ||
`${this.namespace} Could not connect to states database at ${this.settings.connection.options.host}:${this.settings.connection.options.port} (invalid protocol). Please make sure the configured IP and port points to a host running JS-Controller >= 2.0. and that the port is not occupied by other software!` | ||
); | ||
} | ||
@@ -176,3 +234,4 @@ return; | ||
this.client.on('end', () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis Event end (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis Event end (stop=${this.stop})`); | ||
if (ready && typeof this.settings.disconnected === 'function') { | ||
@@ -184,3 +243,4 @@ this.settings.disconnected(); | ||
this.client.on('connect', () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis Event connect (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis Event connect (stop=${this.stop})`); | ||
connected = true; | ||
@@ -194,3 +254,4 @@ if (errorLogged) { | ||
this.client.on('close', () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis Event close (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis Event close (stop=${this.stop})`); | ||
//if (ready && typeof this.settings.disconnected === 'function') this.settings.disconnected(); | ||
@@ -203,5 +264,11 @@ }); | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})`); | ||
if (reconnectCounter > 2) { // fallback logic for nodejs <10 | ||
this.log.error(`${this.namespace} The DB port ${this.settings.connection.options.port} is occupied by something that is not a Redis protocol server. Please check other software running on this port or, if you use iobroker, make sure to update to js-controller 2.0 or higher!`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly( | ||
`${this.namespace} States-Redis Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})` | ||
); | ||
if (reconnectCounter > 2) { | ||
// fallback logic for nodejs <10 | ||
this.log.error( | ||
`${this.namespace} The DB port ${this.settings.connection.options.port} is occupied by something that is not a Redis protocol server. Please check other software running on this port or, if you use iobroker, make sure to update to js-controller 2.0 or higher!` | ||
); | ||
return; | ||
@@ -214,3 +281,4 @@ } | ||
this.client.on('ready', async () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis Event ready (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis Event ready (stop=${this.stop})`); | ||
if (this.stop) { | ||
@@ -225,5 +293,7 @@ return; | ||
try { | ||
await this.client.config('set', ['notify-keyspace-events', 'Exe']);// enable Expiry/Evicted events in server | ||
await this.client.config('set', ['notify-keyspace-events', 'Exe']); // enable Expiry/Evicted events in server | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} Unable to enable Expiry Keyspace events from Redis Server: ${e.message}`); | ||
this.log.warn( | ||
`${this.namespace} Unable to enable Expiry Keyspace events from Redis Server: ${e.message}` | ||
); | ||
} | ||
@@ -238,6 +308,26 @@ | ||
setImmediate(() => { | ||
this.log.silly(`${this.namespace} States system redis pmessage ${pattern}/${channel}:${message}`); | ||
this.log.silly( | ||
`${this.namespace} States system redis pmessage ${pattern}/${channel}:${message}` | ||
); | ||
if (channel.startsWith(this.metaNamespace)) { | ||
if ( | ||
channel === `${this.metaNamespace}states.protocolVersion` && | ||
message !== this.activeProtocolVersion | ||
) { | ||
if (typeof this.settings.disconnected === 'function') { | ||
// protocol version has changed, restart controller | ||
this.log.info( | ||
`${this.namespace} States protocol version has changed, disconnecting!` | ||
); | ||
this.settings.disconnected(); | ||
} | ||
} | ||
return; | ||
} | ||
try { | ||
message = message ? JSON.parse(message, message.includes('"Buffer"') ? bufferJsonDecoder: undefined) : null; | ||
message = message | ||
? JSON.parse(message, message.includes('"Buffer"') ? bufferJsonDecoder : undefined) | ||
: null; | ||
} catch { | ||
@@ -249,3 +339,3 @@ this.log.warn(`${this.namespace} Cannot parse system pmessage "${message}"`); | ||
try { | ||
if (ioRegExp.test(channel)) { | ||
if (channel.startsWith(this.namespaceRedis) && channel.length > this.namespaceRedisL) { | ||
onChange(channel.substring(this.namespaceRedisL), message); | ||
@@ -256,3 +346,7 @@ } else { | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} States system pmessage ${channel} ${JSON.stringify(message)} ${e.message}`); | ||
this.log.warn( | ||
`${this.namespace} States system pmessage ${channel} ${JSON.stringify(message)} ${ | ||
e.message | ||
}` | ||
); | ||
this.log.warn(`${this.namespace} ${e.stack}`); | ||
@@ -267,6 +361,8 @@ } | ||
setImmediate(() => { | ||
this.log.silly(this.namespace + ' redis message expired/evicted ' + channel + ':' + message); | ||
this.log.silly(`${this.namespace} redis message expired/evicted ${channel}:${message}`); | ||
try { | ||
if (channel === `__keyevent@${this.settings.connection.options.db}__:evicted`) { | ||
this.log.warn(this.namespace + ' Redis has evicted state ' + message + '. Please check your maxMemory settings for your redis instance!'); | ||
this.log.warn( | ||
`${this.namespace} Redis has evicted state ${message}. Please check your maxMemory settings for your redis instance!` | ||
); | ||
} else if (channel !== `__keyevent@${this.settings.connection.options.db}__:expired`) { | ||
@@ -278,3 +374,5 @@ this.log.warn(`${this.namespace} Unknown user message ${channel} ${message}`); | ||
// Find deleted states and notify user | ||
const found = Object.values(this.subSystem.ioBrokerSubscriptions).find(regex => regex !== true && regex.test(message)); | ||
const found = Object.values(this.subSystem.ioBrokerSubscriptions).find( | ||
regex => regex !== true && regex.test(message) | ||
); | ||
found && onChange(message.substring(this.namespaceRedisL), null); | ||
@@ -284,3 +382,5 @@ } | ||
// Find deleted states and notify user | ||
const found = Object.values(this.sub.ioBrokerSubscriptions).find(regex => regex !== true && regex.test(message)); | ||
const found = Object.values(this.sub.ioBrokerSubscriptions).find( | ||
regex => regex !== true && regex.test(message) | ||
); | ||
found && onChangeUser(message.substring(this.namespaceRedisL), null); | ||
@@ -292,6 +392,8 @@ } | ||
} | ||
})); | ||
}) | ||
); | ||
} | ||
this.subSystem.on('end', () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis System Event end sub (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis System Event end sub (stop=${this.stop})`); | ||
ready && typeof this.settings.disconnected === 'function' && this.settings.disconnected(); | ||
@@ -304,3 +406,6 @@ }); | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} Sub-Client States System No redis connection: ${JSON.stringify(error)}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly( | ||
`${this.namespace} Sub-Client States System No redis connection: ${JSON.stringify(error)}` | ||
); | ||
}); | ||
@@ -310,9 +415,18 @@ | ||
this.subSystem.on('connect', () => | ||
this.log.silly(`${this.namespace} PubSub client States-Redis System Event connect (stop=${this.stop})`)); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis System Event connect (stop=${this.stop})` | ||
) | ||
); | ||
this.subSystem.on('close', () => | ||
this.log.silly(`${this.namespace} PubSub client States-Redis System Event close (stop=${this.stop})`)); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis System Event close (stop=${this.stop})` | ||
) | ||
); | ||
this.subSystem.on('reconnecting', reconnectCounter => | ||
this.log.silly(`${this.namespace} PubSub client States-Redis System Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})`)); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis System Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})` | ||
) | ||
); | ||
} | ||
@@ -322,18 +436,45 @@ | ||
try { | ||
this.subSystem && await this.subSystem.subscribe(`__keyevent@${this.settings.connection.options.db}__:expired`); | ||
this.subSystem && | ||
(await this.subSystem.subscribe( | ||
`__keyevent@${this.settings.connection.options.db}__:expired` | ||
)); | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} Unable to subscribe to expiry Keyspace events from Redis Server: ${e.message}`); | ||
this.log.warn( | ||
`${this.namespace} Unable to subscribe to expiry Keyspace events from Redis Server: ${e.message}` | ||
); | ||
} | ||
try { | ||
this.subSystem && await this.subSystem.subscribe(`__keyevent@${this.settings.connection.options.db}__:evicted`); | ||
this.subSystem && | ||
(await this.subSystem.subscribe( | ||
`__keyevent@${this.settings.connection.options.db}__:evicted` | ||
)); | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} Unable to subscribe to evicted Keyspace events from Redis Server: ${e.message}`); | ||
this.log.warn( | ||
`${this.namespace} Unable to subscribe to evicted Keyspace events from Redis Server: ${e.message}` | ||
); | ||
} | ||
// subscribe to meta changes | ||
try { | ||
this.subSystem && (await this.subSystem.psubscribe(`${this.metaNamespace}*`)); | ||
} catch (e) { | ||
this.log.warn( | ||
`${this.namespace} Unable to subscribe to meta namespace "${this.metaNamespace}" changes: ${e.message}` | ||
); | ||
} | ||
if (--initCounter < 1) { | ||
if (this.settings.connection.port === 0) { | ||
this.log.debug(`${this.namespace} States ${ready ? 'system re' : ''}connected to redis: ${this.settings.connection.host}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'system re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}` | ||
); | ||
} else { | ||
this.log.debug(`${this.namespace} States ${ready ? 'system re' : ''}connected to redis: ${this.settings.connection.host}:${this.settings.connection.port}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'system re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}:${this.settings.connection.port}` | ||
); | ||
} | ||
@@ -368,3 +509,5 @@ !ready && typeof this.settings.connected === 'function' && this.settings.connected(); | ||
try { | ||
message = message ? JSON.parse(message, message.includes('"Buffer"') ? bufferJsonDecoder: undefined) : null; | ||
message = message | ||
? JSON.parse(message, message.includes('"Buffer"') ? bufferJsonDecoder : undefined) | ||
: null; | ||
} catch { | ||
@@ -376,3 +519,3 @@ this.log.warn(`${this.namespace} Cannot parse user pmessage "${message}"`); | ||
try { | ||
if (ioRegExp.test(channel)) { | ||
if (channel.startsWith(this.namespaceRedis) && channel.length > this.namespaceRedisL) { | ||
onChangeUser(channel.substring(this.namespaceRedisL), message); | ||
@@ -383,3 +526,7 @@ } else { | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} States user pmessage ${channel} ${JSON.stringify(message)} ${e.message}`); | ||
this.log.warn( | ||
`${this.namespace} States user pmessage ${channel} ${JSON.stringify(message)} ${ | ||
e.message | ||
}` | ||
); | ||
this.log.warn(`${this.namespace} ${e.stack}`); | ||
@@ -391,3 +538,4 @@ } | ||
this.sub.on('end', () => { | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} States-Redis User Event end sub (stop=${this.stop})`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} States-Redis User Event end sub (stop=${this.stop})`); | ||
if (ready && typeof this.settings.disconnected === 'function') { | ||
@@ -403,3 +551,5 @@ this.settings.disconnected(); | ||
if (this.settings.connection.enhancedLogging) { | ||
this.log.silly(`${this.namespace} Sub-Client States User No redis connection: ${JSON.stringify(error)}`); | ||
this.log.silly( | ||
`${this.namespace} Sub-Client States User No redis connection: ${JSON.stringify(error)}` | ||
); | ||
} | ||
@@ -410,11 +560,17 @@ }); | ||
this.sub.on('connect', () => { | ||
this.log.silly(`${this.namespace} PubSub client States-Redis User Event connect (stop=${this.stop})`); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis User Event connect (stop=${this.stop})` | ||
); | ||
}); | ||
this.sub.on('close', () => { | ||
this.log.silly(`${this.namespace} PubSub client States-Redis User Event close (stop=${this.stop})`); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis User Event close (stop=${this.stop})` | ||
); | ||
}); | ||
this.sub.on('reconnecting', reconnectCounter => { | ||
this.log.silly(`${this.namespace} PubSub client States-Redis User Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})`); | ||
this.log.silly( | ||
`${this.namespace} PubSub client States-Redis User Event reconnect (reconnectCounter=${reconnectCounter}, stop=${this.stop})` | ||
); | ||
}); | ||
@@ -426,5 +582,13 @@ } | ||
if (this.settings.connection.port === 0) { | ||
this.log.debug(`${this.namespace} States ${ready ? 'user re' : ''}connected to redis: ${this.settings.connection.host}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'user re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}` | ||
); | ||
} else { | ||
this.log.debug(`${this.namespace} States ${ready ? 'user re' : ''}connected to redis: ${this.settings.connection.host}:${this.settings.connection.port}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'user re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}:${this.settings.connection.port}` | ||
); | ||
} | ||
@@ -445,7 +609,22 @@ !ready && typeof this.settings.connected === 'function' && this.settings.connected(); | ||
try { | ||
await this._determineProtocolVersion(); | ||
} catch (e) { | ||
this.log.error(`${this.namespace} ${e.message}`); | ||
throw new Error('States DB is not allowed to start in the current Multihost environment'); | ||
} | ||
if (initCounter < 1) { | ||
if (this.settings.connection.port === 0) { | ||
this.log.debug(`${this.namespace} States ${ready ? 'client re' : ''}connected to redis: ${this.settings.connection.host}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'client re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}` | ||
); | ||
} else { | ||
this.log.debug(`${this.namespace} States ${ready ? 'client re' : ''}connected to redis: ${this.settings.connection.host}:${this.settings.connection.port}`); | ||
this.log.debug( | ||
`${this.namespace} States ${ready ? 'client re' : ''}connected to redis: ${ | ||
this.settings.connection.host | ||
}:${this.settings.connection.port}` | ||
); | ||
} | ||
@@ -455,3 +634,2 @@ !ready && typeof this.settings.connected === 'function' && this.settings.connected(); | ||
} | ||
}); | ||
@@ -461,3 +639,3 @@ } | ||
getStatus() { | ||
return {type: 'redis', server: false}; | ||
return { type: 'redis', server: false }; | ||
} | ||
@@ -526,3 +704,3 @@ | ||
if (!oldObj) { | ||
oldObj = {val: null}; | ||
oldObj = { val: null }; | ||
} else { | ||
@@ -533,3 +711,3 @@ try { | ||
this.log.warn(`${this.namespace} Cannot parse "${oldObj}"`); | ||
oldObj = {val: null}; | ||
oldObj = { val: null }; | ||
} | ||
@@ -551,5 +729,5 @@ } | ||
if (state.ts !== undefined) { | ||
obj.ts = (state.ts < 946681200000) ? state.ts * 1000 : state.ts; // if less 2000.01.01 00:00:00 | ||
obj.ts = state.ts < 946681200000 ? state.ts * 1000 : state.ts; // if less 2000.01.01 00:00:00 | ||
} else { | ||
obj.ts = (new Date()).getTime(); | ||
obj.ts = new Date().getTime(); | ||
} | ||
@@ -595,3 +773,4 @@ | ||
// publish event in redis | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis publish ${this.namespaceRedis}${id} ${objString}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis publish ${this.namespaceRedis}${id} ${objString}`); | ||
await this.client.publish(this.namespaceRedis + id, objString); | ||
@@ -606,3 +785,4 @@ return tools.maybeCallbackWithError(callback, null, id); | ||
// publish event in redis | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis publish ${this.namespaceRedis}${id} ${objString}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis publish ${this.namespaceRedis}${id} ${objString}`); | ||
await this.client.publish(this.namespaceRedis + id, objString); | ||
@@ -653,3 +833,4 @@ return tools.maybeCallbackWithError(callback, null, id); | ||
* @param {String} id | ||
* @param callback | ||
* @param {function?} callback | ||
* @return {Promise<object>} | ||
*/ | ||
@@ -689,18 +870,6 @@ async getState(id, callback) { | ||
getStateAsync(id) { | ||
return new Promise((resolve, reject) => { | ||
this.getState(id, (err, res) => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(res); | ||
} | ||
}); | ||
}); | ||
return this.getState(id); | ||
} | ||
async getStates(keys, callback, dontModify) { | ||
if (typeof callback !== 'function') { | ||
this.log.warn(`${this.namespace} redis getStates no callback`); | ||
return; | ||
} | ||
if (!keys || !Array.isArray(keys)) { | ||
@@ -725,5 +894,6 @@ return tools.maybeCallbackWithError(callback, 'no keys', null); | ||
obj = await this.client.mget(_keys); | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis mget ${(!obj) ? 0 : obj.length} ${_keys.length}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis mget ${!obj ? 0 : obj.length} ${_keys.length}`); | ||
} catch (e) { | ||
this.log.warn(`${this.namespace} redis mget ${(!obj) ? 0 : obj.length} ${_keys.length}, err: ${e.message}`); | ||
this.log.warn(`${this.namespace} redis mget ${!obj ? 0 : obj.length} ${_keys.length}, err: ${e.message}`); | ||
} | ||
@@ -800,3 +970,2 @@ const result = []; | ||
} | ||
} | ||
@@ -848,3 +1017,4 @@ if (this.subSystem) { | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis keys ${obj.length} ${pattern}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis keys ${obj.length} ${pattern}`); | ||
if (obj && !dontModify) { | ||
@@ -880,6 +1050,9 @@ const len = this.namespaceRedisL; | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis psubscribe ${this.namespaceRedis}${pattern}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis psubscribe ${this.namespaceRedis}${pattern}`); | ||
try { | ||
await subClient.psubscribe(this.namespaceRedis + pattern); | ||
subClient.ioBrokerSubscriptions[this.namespaceRedis + pattern] = new RegExp(tools.pattern2RegEx(this.namespaceRedis + pattern)); | ||
subClient.ioBrokerSubscriptions[this.namespaceRedis + pattern] = new RegExp( | ||
tools.pattern2RegEx(this.namespaceRedis + pattern) | ||
); | ||
return tools.maybeCallback(callback); | ||
@@ -901,6 +1074,12 @@ } catch (e) { | ||
unsubscribe(pattern, subClient, callback) { | ||
/** | ||
* Unsubscribe pattern | ||
* @param {string} pattern | ||
* @param {object?} subClient | ||
* @param {(err: Error) => void?} callback | ||
* @return {Promise<void>} | ||
*/ | ||
async unsubscribe(pattern, subClient, callback) { | ||
if (!pattern || typeof pattern !== 'string') { | ||
typeof callback === 'function' && setImmediate(callback, `invalid pattern ${JSON.stringify(pattern)}`); | ||
return; | ||
return tools.maybeCallbackWithError(callback, `invalid pattern ${JSON.stringify(pattern)}`); | ||
} | ||
@@ -914,12 +1093,17 @@ if (typeof subClient === 'function') { | ||
if (!subClient) { | ||
return typeof callback === 'function' && setImmediate(callback, tools.ERRORS.ERROR_DB_CLOSED); | ||
return tools.maybeCallbackWithRedisError(callback, tools.ERRORS.ERROR_DB_CLOSED); | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis punsubscribe ${this.namespaceRedis}${pattern}`); | ||
subClient.punsubscribe(this.namespaceRedis + pattern, err => { | ||
if (!err && subClient.ioBrokerSubscriptions[this.namespaceRedis + pattern] !== undefined) { | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis punsubscribe ${this.namespaceRedis}${pattern}`); | ||
try { | ||
await subClient.punsubscribe(this.namespaceRedis + pattern); | ||
if (subClient.ioBrokerSubscriptions[this.namespaceRedis + pattern] !== undefined) { | ||
delete subClient.ioBrokerSubscriptions[this.namespaceRedis + pattern]; | ||
} | ||
typeof callback === 'function' && callback(err); | ||
}); | ||
return tools.maybeCallback(callback); | ||
} catch (e) { | ||
return tools.maybeCallbackWithRedisError(callback, e); | ||
} | ||
} | ||
@@ -930,4 +1114,5 @@ | ||
* | ||
* @param pattern | ||
* @param {function} callback callback function (optional) | ||
* @param {string} pattern | ||
* @param {function?} callback callback function (optional) | ||
* @return {Promise<void>} | ||
*/ | ||
@@ -948,3 +1133,3 @@ unsubscribeUser(pattern, callback) { | ||
state._id = this.globalMessageId++; | ||
if (this.globalMessageId >= 0xFFFFFFFF) { | ||
if (this.globalMessageId >= 0xffffffff) { | ||
this.globalMessageId = 0; | ||
@@ -972,3 +1157,4 @@ } | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis subscribeMessage ${this.namespaceMsg}${id}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis subscribeMessage ${this.namespaceMsg}${id}`); | ||
try { | ||
@@ -995,3 +1181,4 @@ await this.subSystem.psubscribe(this.namespaceMsg + id); | ||
} | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis unsubscribeMessage ${this.namespaceMsg}${id}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis unsubscribeMessage ${this.namespaceMsg}${id}`); | ||
try { | ||
@@ -1014,3 +1201,3 @@ await this.subSystem.punsubscribe(this.namespaceMsg + id); | ||
log._id = this.globalLogId++; | ||
if (this.globalLogId >= 0xFFFFFFFF) { | ||
if (this.globalLogId >= 0xffffffff) { | ||
this.globalLogId = 0; | ||
@@ -1067,3 +1254,4 @@ } | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis subscribeMessage ${this.namespaceLog}${id}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis subscribeMessage ${this.namespaceLog}${id}`); | ||
try { | ||
@@ -1087,3 +1275,4 @@ await this.subSystem.psubscribe(this.namespaceLog + id); | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis unsubscribeMessage ${this.namespaceLog}${id}`); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis unsubscribeMessage ${this.namespaceLog}${id}`); | ||
try { | ||
@@ -1137,3 +1326,4 @@ await this.subSystem.punsubscribe(this.namespaceLog + id); | ||
await this.client.setex(this.namespaceSession + id, expire, JSON.stringify(obj)); | ||
this.settings.connection.enhancedLogging && this.log.silly(`${this.namespace} redis setex`, id, expire, obj); | ||
this.settings.connection.enhancedLogging && | ||
this.log.silly(`${this.namespace} redis setex`, id, expire, obj); | ||
return tools.maybeCallback(callback); | ||
@@ -1219,4 +1409,37 @@ } catch (e) { | ||
} | ||
/** | ||
* Returns the protocol version from DB | ||
* | ||
* @returns {Promise<string>} | ||
*/ | ||
getProtocolVersion() { | ||
if (!this.client) { | ||
throw new Error(tools.ERRORS.ERROR_DB_CLOSED); | ||
} | ||
return this.client.get(`${this.metaNamespace}states.protocolVersion`); | ||
} | ||
/** | ||
* Sets the protocol version to the DB | ||
* @param {number} version - protocol version | ||
* @returns {Promise<void>} | ||
*/ | ||
async setProtocolVersion(version) { | ||
if (!this.client) { | ||
throw new Error(tools.ERRORS.ERROR_DB_CLOSED); | ||
} | ||
version = version.toString(); | ||
// we can only set a protocol if we actually support it | ||
if (this.supportedProtocolVersions.includes(version)) { | ||
await this.client.set(`${this.metaNamespace}states.protocolVersion`, version); | ||
await this.client.publish(`${this.metaNamespace}states.protocolVersion`, version); | ||
} else { | ||
throw new Error('Cannot set an unsupported protocol version on the current host'); | ||
} | ||
} | ||
} | ||
module.exports = StateRedisClient; |
{ | ||
"name": "@iobroker/db-states-redis", | ||
"version": "4.0.0-alpha.9-20211115-5dac659e", | ||
"version": "4.0.3", | ||
"engines": { | ||
@@ -8,4 +8,4 @@ "node": ">=12.0.0" | ||
"dependencies": { | ||
"@iobroker/db-base": "4.0.0-alpha.9-20211115-5dac659e", | ||
"ioredis": "^4.28.0" | ||
"@iobroker/db-base": "4.0.3", | ||
"ioredis": "^4.28.2" | ||
}, | ||
@@ -37,3 +37,3 @@ "keywords": [ | ||
], | ||
"gitHead": "fe67b6f4f17e34f417ec8b4a11a57a107124c475" | ||
"gitHead": "79a76a082db91399d0e432ef02bf2981a6739107" | ||
} |
@@ -7,3 +7,3 @@ # Redis DB states classes for ioBroker | ||
Copyright (c) 2014-2020 bluefox <dogafox@gmail.com>, | ||
Copyright (c) 2014-2022 bluefox <dogafox@gmail.com>, | ||
Copyright (c) 2014 hobbyquaker |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
57135
1241
1
+ Added@datalust/winston-seq@1.0.2(transitive)
+ Added@iobroker/db-base@4.0.3(transitive)
+ Added@iobroker/js-controller-common@4.0.3(transitive)
+ Addedbuffer-equal-constant-time@1.0.1(transitive)
+ Addedecdsa-sig-formatter@1.0.11(transitive)
+ Addedjsonwebtoken@8.5.1(transitive)
+ Addedjwa@1.4.1(transitive)
+ Addedjws@3.2.2(transitive)
+ Addedlodash.includes@4.3.0(transitive)
+ Addedlodash.isboolean@3.0.3(transitive)
+ Addedlodash.isinteger@4.0.4(transitive)
+ Addedlodash.isnumber@3.0.3(transitive)
+ Addedlodash.isplainobject@4.0.6(transitive)
+ Addedlodash.isstring@4.0.1(transitive)
+ Addedlodash.once@4.1.1(transitive)
+ Addednode-forge@1.3.1(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedseq-logging@1.1.2(transitive)
- Removed@iobroker/db-base@4.0.0-alpha.9-20211115-5dac659e(transitive)
- Removed@iobroker/js-controller-common@4.0.0-alpha.9-20211115-5dac659e(transitive)
- Removednode-forge@0.10.0(transitive)
- Removedseq-logging@0.4.6(transitive)
- Removedwinston-seq-updated@1.0.4(transitive)
Updated@iobroker/db-base@4.0.3
Updatedioredis@^4.28.2