@iobroker/db-base
Advanced tools
Comparing version 4.0.0-alpha.45-20220114-88aaebdb to 4.0.0-alpha.46-20220119-9e487e55
const Resp = require('respjs'); | ||
const { EventEmitter } = require('events'); | ||
const { QUEUED_STR_BUF, OK_STR_BUF } = require('./constants'); | ||
@@ -43,3 +44,3 @@ /** | ||
if (this.initialized) { | ||
this.sendError(null, new Error('PARSER ERROR ' + err)); // TODO | ||
this.sendError(null, new Error(`PARSER ERROR ${err}`)); // TODO | ||
} else { | ||
@@ -103,7 +104,6 @@ this.close(); | ||
} | ||
if (command === 'info') { | ||
this.initialized = true; | ||
} | ||
const t = process.hrtime(); | ||
const responseId = t[0] * 1e3 + t[1] / 1e6; | ||
if (this.options.enhancedLogging) { | ||
@@ -113,3 +113,3 @@ this.log.silly( | ||
JSON.stringify(data).length > 1024 | ||
? JSON.stringify(data).substring(0, 100) + ' -- ' + JSON.stringify(data).length + ' bytes' | ||
? `${JSON.stringify(data).substring(0, 100)} -- ${JSON.stringify(data).length} bytes` | ||
: JSON.stringify(data) | ||
@@ -119,7 +119,32 @@ }` | ||
} | ||
this.writeQueue.push({ id: responseId, data: false }); | ||
if (command === 'multi') { | ||
this._handleMulti(); | ||
return; | ||
} | ||
// multi active and exec not called yet | ||
if (this.multiActive && !this.execCalled && command !== 'exec') { | ||
// store all response ids so we know which need to be in the multi call | ||
this.multiResponseIds.push(responseId); | ||
// add it for the correct order will be overwritten with correct response | ||
this.multiResponseMap.set(responseId, null); | ||
} else { | ||
// multi response ids should not be pushed - we will answer combined | ||
this.writeQueue.push({ id: responseId, data: false }); | ||
} | ||
if (command === 'exec') { | ||
this._handleExec(responseId); | ||
return; | ||
} | ||
if (command === 'info') { | ||
this.initialized = true; | ||
} | ||
if (this.listenerCount(command) !== 0) { | ||
setImmediate(() => this.emit(command, data, responseId)); | ||
} else { | ||
this.sendError(responseId, new Error(command + ' NOT SUPPORTED')); | ||
this.sendError(responseId, new Error(`${command} NOT SUPPORTED`)); | ||
} | ||
@@ -137,2 +162,3 @@ } | ||
let idx = 0; | ||
while (this.writeQueue.length && idx < this.writeQueue.length) { | ||
@@ -156,3 +182,3 @@ // we found the queue entry that matches with the responseId, so store the data so be sent out | ||
response.data.length > 1024 | ||
? data.length + ' bytes' | ||
? `${data.length} bytes` | ||
: response.data.toString().replace(/[\r\n]+/g, '') | ||
@@ -162,2 +188,3 @@ }` | ||
} | ||
this._write(response.data); | ||
@@ -209,4 +236,5 @@ // We sended out first queue entry but no further response is ready | ||
this.log.warn(`${this.socketId} Not able to write ${JSON.stringify(data)}`); | ||
data = Resp.encodeError(new Error('INVALID RESPONSE: ' + JSON.stringify(data))); | ||
data = Resp.encodeError(new Error(`INVALID RESPONSE: ${JSON.stringify(data)}`)); | ||
} | ||
setImmediate(() => this._sendQueued(responseId, data)); | ||
@@ -237,2 +265,7 @@ } | ||
sendNull(responseId) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeNull()); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeNull()); | ||
@@ -246,2 +279,7 @@ } | ||
sendNullArray(responseId) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeNullArray()); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeNullArray()); | ||
@@ -256,2 +294,7 @@ } | ||
sendString(responseId, str) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeString(str)); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeString(str)); | ||
@@ -267,2 +310,8 @@ } | ||
this.log.warn(`${this.socketId} Error from InMemDB: ${error}`); | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeError(error)); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeError(error)); | ||
@@ -277,2 +326,7 @@ } | ||
sendInteger(responseId, num) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeInteger(num)); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeInteger(num)); | ||
@@ -287,2 +341,7 @@ } | ||
sendBulk(responseId, str) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeBulk(str)); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeBulk(str)); | ||
@@ -297,2 +356,7 @@ } | ||
sendBufBulk(responseId, buf) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeBufBulk(buf)); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeBufBulk(buf)); | ||
@@ -329,6 +393,75 @@ } | ||
sendArray(responseId, arr) { | ||
if (this.multiActive && this.multiResponseIds.includes(responseId)) { | ||
this._handleMultiResponse(responseId, Resp.encodeArray(this.encodeRespArray(arr))); | ||
return; | ||
} | ||
this.sendResponse(responseId, Resp.encodeArray(this.encodeRespArray(arr))); | ||
} | ||
/** | ||
* Handles a 'multi' command | ||
* | ||
* @private | ||
*/ | ||
_handleMulti() { | ||
if (this.multiActive) { | ||
this.log.warn(`${this.socketId} Conflicting multi call`); | ||
} | ||
this.multiActive = true; | ||
this.execCalled = false; | ||
this.multiResponseIds = []; | ||
this.multiResponseCount = 0; | ||
this.multiResponseMap = new Map(); | ||
} | ||
/** | ||
* Handles an 'exec' command | ||
* | ||
* @param {number} responseId ID of the response | ||
* @private | ||
*/ | ||
_handleExec(responseId) { | ||
this.execCalled = true; | ||
this.execId = responseId; | ||
// maybe we have all fullfilled yet | ||
if (this.multiResponseCount === this.multiResponseIds.length) { | ||
this._sendExecReponse(); | ||
} | ||
} | ||
/** | ||
* Builds up the exec response and sends it | ||
* | ||
* @private | ||
*/ | ||
_sendExecReponse() { | ||
this.multiActive = false; | ||
// collect all 'QUEUED' answers | ||
const queuedStrArr = new Array(this.multiResponseCount).fill(QUEUED_STR_BUF); | ||
this._sendQueued( | ||
this.execId, | ||
Buffer.concat([OK_STR_BUF, ...queuedStrArr, Resp.encodeArray(Array.from(this.multiResponseMap.values()))]) | ||
); | ||
} | ||
/** | ||
* Handles a multi response | ||
* | ||
* @param {number} responseId ID of the response | ||
* @param {Buffer} buf buffer to include in response | ||
* @private | ||
*/ | ||
_handleMultiResponse(responseId, buf) { | ||
this.multiResponseMap.set(responseId, buf); | ||
this.multiResponseCount++; | ||
if (this.execCalled && this.multiResponseCount === this.multiResponseIds.length) { | ||
this._sendExecReponse(); | ||
} | ||
} | ||
} | ||
module.exports = RedisHandler; |
{ | ||
"name": "@iobroker/db-base", | ||
"version": "4.0.0-alpha.45-20220114-88aaebdb", | ||
"version": "4.0.0-alpha.46-20220119-9e487e55", | ||
"engines": { | ||
@@ -8,3 +8,3 @@ "node": ">=12.0.0" | ||
"dependencies": { | ||
"@iobroker/js-controller-common": "4.0.0-alpha.45-20220114-88aaebdb", | ||
"@iobroker/js-controller-common": "4.0.0-alpha.46-20220119-9e487e55", | ||
"deep-clone": "^3.0.3", | ||
@@ -39,3 +39,3 @@ "fs-extra": "^10.0.0", | ||
], | ||
"gitHead": "e2fd3e7af91eb83c30f8ae45f71d6e178edc7b28" | ||
"gitHead": "e081e38bf6d24e767e16a0bb5a8d8202f65a9302" | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
34426
8
832