@cubejs-backend/query-orchestrator
Advanced tools
Comparing version 0.4.4 to 0.5.0
@@ -6,4 +6,15 @@ # Change Log | ||
# [0.5.0](https://github.com/statsbotco/cubejs-client/compare/v0.4.6...v0.5.0) (2019-04-01) | ||
### Features | ||
* use local queue and cache for local dev server instead of Redis one ([50f1bbb](https://github.com/statsbotco/cubejs-client/commit/50f1bbb)) | ||
## [0.4.4](https://github.com/statsbotco/cubejs-client/compare/v0.4.3...v0.4.4) (2019-03-17) | ||
**Note:** Version bump only for package @cubejs-backend/query-orchestrator |
const crypto = require('crypto'); | ||
const R = require('ramda'); | ||
const redis = require('redis'); | ||
const RedisCacheDriver = require('./RedisCacheDriver'); | ||
const LocalCacheDriver = require('./LocalCacheDriver'); | ||
@@ -53,11 +54,11 @@ const QueryCache = require('./QueryCache'); | ||
this.queryResults = {}; | ||
this.redisClient = preAggregations.redisClient; | ||
this.cacheDriver = preAggregations.cacheDriver; | ||
} | ||
async tablesFromCache(schema) { | ||
let tables = JSON.parse(await this.redisClient.getAsync(this.tablesRedisKey())); | ||
let tables = await this.cacheDriver.get(this.tablesRedisKey()); | ||
if (!tables) { | ||
const client = await this.driverFactory(); | ||
tables = await client.getTablesQuery(schema); | ||
await this.redisClient.setAsync(this.tablesRedisKey(), JSON.stringify(tables), 'EX', 120); | ||
await this.cacheDriver.set(this.tablesRedisKey(), tables, 120); | ||
} | ||
@@ -110,3 +111,3 @@ return tables; | ||
this.versionEnries = undefined; | ||
await this.redisClient.delAsync(this.tablesRedisKey()); | ||
await this.cacheDriver.remove(this.tablesRedisKey()); | ||
} | ||
@@ -317,3 +318,5 @@ } | ||
this.tablesUsedInQuery = {}; // TODO should be in redis | ||
this.redisClient = redis.createClient(process.env.REDIS_URL); | ||
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? | ||
new RedisCacheDriver() : | ||
new LocalCacheDriver(); | ||
} | ||
@@ -320,0 +323,0 @@ |
@@ -1,5 +0,6 @@ | ||
const redis = require('redis'); | ||
const crypto = require('crypto'); | ||
const QueryQueue = require('./QueryQueue'); | ||
const ContinueWaitError = require('./ContinueWaitError'); | ||
const RedisCacheDriver = require('./RedisCacheDriver'); | ||
const LocalCacheDriver = require('./LocalCacheDriver'); | ||
@@ -12,3 +13,5 @@ class QueryCache { | ||
this.logger = logger; | ||
this.redisClient = redis.createClient(process.env.REDIS_URL); | ||
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? | ||
new RedisCacheDriver() : | ||
new LocalCacheDriver(); | ||
} | ||
@@ -197,3 +200,3 @@ | ||
}; | ||
return this.redisClient.setAsync(redisKey, JSON.stringify(result), 'EX', expiration) | ||
return this.cacheDriver.set(redisKey, result, expiration) | ||
.then(() => { | ||
@@ -206,3 +209,3 @@ this.logger('Renewed', { cacheKey }); | ||
this.logger('Dropping Cache', { cacheKey, error: e.stack || e }); | ||
this.redisClient.delAsync(redisKey) | ||
this.cacheDriver.remove(redisKey) | ||
.catch(e => this.logger('Error removing key', { cacheKey, error: e.stack || e })); | ||
@@ -219,5 +222,5 @@ } | ||
return this.redisClient.getAsync(redisKey).then(res => { | ||
return this.cacheDriver.get(redisKey).then(res => { | ||
if (res) { | ||
const parsedResult = JSON.parse(res); | ||
const parsedResult = res; | ||
const renewedAgo = (new Date()).getTime() - parsedResult.time; | ||
@@ -224,0 +227,0 @@ this.logger('Found cache entry', { |
@@ -1,11 +0,7 @@ | ||
const redis = require('redis'); | ||
const crypto = require('crypto'); | ||
const promisifyAll = require('util-promisifyall'); | ||
const R = require('ramda'); | ||
const TimeoutError = require('./TimeoutError'); | ||
const ContinueWaitError = require('./ContinueWaitError'); | ||
const RedisQueueDriver = require('./RedisQueueDriver'); | ||
const LocalQueueDriver = require('./LocalQueueDriver'); | ||
promisifyAll(redis.RedisClient.prototype); | ||
promisifyAll(redis.Multi.prototype); | ||
class QueryQueue { | ||
@@ -19,8 +15,18 @@ constructor(redisQueuePrefix, options) { | ||
this.heartBeatInterval = options.heartBeatInterval || 30; | ||
this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey) => { this.processQuery(queryKey) }); | ||
this.sendCancelMessageFn = options.sendCancelMessageFn || ((query) => { this.processCancel(query) }); | ||
this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey) => { this.processQuery(queryKey); }); | ||
this.sendCancelMessageFn = options.sendCancelMessageFn || ((query) => { this.processCancel(query); }); | ||
this.queryHandlers = options.queryHandlers; | ||
this.cancelHandlers = options.cancelHandlers; | ||
this.createRedisClient = options.createdRedisClient || (() => redis.createClient(process.env.REDIS_URL)); | ||
this.logger = options.logger || ((message, event) => console.log(`${message} ${JSON.stringify(event)}`)); | ||
const queueDriverOptions = { | ||
redisQueuePrefix: this.redisQueuePrefix, | ||
concurrency: this.concurrency, | ||
continueWaitTimeout: this.continueWaitTimeout, | ||
orphanedTimeout: this.orphanedTimeout, | ||
heartBeatTimeout: this.heartBeatInterval * 4, | ||
createRedisClient: options.createRedisClient | ||
}; | ||
this.queueDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL || !!options.createRedisClient ? | ||
new RedisQueueDriver(queueDriverOptions) : | ||
new LocalQueueDriver(queueDriverOptions); | ||
} | ||
@@ -30,3 +36,3 @@ | ||
options = options || {}; | ||
const redisClient = this.createRedisClient(); | ||
const redisClient = this.queueDriver.createConnection(); | ||
try { | ||
@@ -39,4 +45,3 @@ if (priority == null) { | ||
} | ||
const resultListKey = this.resultListKey(queryKey); | ||
let result = await redisClient.rpopAsync(resultListKey); | ||
let result = await redisClient.getResult(queryKey); | ||
if (result) { | ||
@@ -48,12 +53,6 @@ return this.parseResult(result); | ||
const [added, b, c, queueSize] = await redisClient.multi() | ||
.zadd([this.toProcessRedisKey(), 'NX', keyScore, this.redisHash(queryKey)]) | ||
.zadd([this.recentRedisKey(), time, this.redisHash(queryKey)]) | ||
.hsetnx([ | ||
this.queriesDefKey(), | ||
this.redisHash(queryKey), | ||
JSON.stringify({ queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority }) | ||
]) | ||
.zcard(this.toProcessRedisKey()) | ||
.execAsync(); | ||
// eslint-disable-next-line no-unused-vars | ||
const [added, b, c, queueSize] = await redisClient.addToQueue( | ||
keyScore, queryKey, time, queryHandler, query, priority, options | ||
); | ||
@@ -65,9 +64,9 @@ if (added > 0) { | ||
await this.reconcileQueue(redisClient); | ||
result = await redisClient.brpopAsync([resultListKey, this.continueWaitTimeout]); | ||
result = await redisClient.getResultBlocking(queryKey); | ||
if (!result) { | ||
throw new ContinueWaitError(); | ||
} | ||
return this.parseResult(result[1]); | ||
return this.parseResult(result); | ||
} finally { | ||
redisClient.quit(); | ||
redisClient.release(); | ||
} | ||
@@ -80,6 +79,6 @@ } | ||
} | ||
result = JSON.parse(result); | ||
if (result.error) { | ||
throw new Error(result.error); // TODO | ||
} else { | ||
// eslint-disable-next-line consistent-return | ||
return result.result; | ||
@@ -89,27 +88,12 @@ } | ||
queriesDefKey() { | ||
return this.queryRedisKey('QUERIES'); | ||
} | ||
resultListKey(queryKey) { | ||
return this.queryRedisKey(queryKey, 'RESULT'); | ||
} | ||
async reconcileQueue(redisClient) { | ||
const toCancel = ( | ||
await redisClient.zrangebyscoreAsync([this.activeRedisKey(), 0, (new Date().getTime() - this.heartBeatInterval * 4 * 1000)]) | ||
).concat( | ||
await redisClient.zrangebyscoreAsync([this.recentRedisKey(), 0, (new Date().getTime() - this.orphanedTimeout * 1000)]) | ||
); | ||
await redisClient.getStalledQueries() | ||
).concat( | ||
await redisClient.getOrphanedQueries() | ||
); | ||
await Promise.all(toCancel.map(async queryKey => { | ||
let [query] = await redisClient.multi() | ||
.hget([this.queriesDefKey(), this.redisHash(queryKey)]) | ||
.zrem([this.activeRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.recentRedisKey(), this.redisHash(queryKey)]) | ||
.hdel([this.queriesDefKey(), this.redisHash(queryKey)]) | ||
.execAsync(); | ||
const [query] = await redisClient.getQueryAndRemove(queryKey); | ||
if (query) { | ||
query = JSON.parse(query); | ||
this.logger('Removing orphaned query', { queryKey: query.queryKey }); | ||
@@ -120,4 +104,4 @@ await this.sendCancelMessageFn(query); | ||
const active = await redisClient.zrangeAsync([this.activeRedisKey(), 0, -1]); | ||
const toProcess = await redisClient.zrangeAsync([this.toProcessRedisKey(), 0, -1]); | ||
const active = await redisClient.getActiveQueries(); | ||
const toProcess = await redisClient.getToProcessQueries(); | ||
await Promise.all( | ||
@@ -134,15 +118,15 @@ R.pipe( | ||
let timeout; | ||
const executionTimeout = this.executionTimeout; | ||
const { executionTimeout } = this; | ||
return Promise.race([ | ||
promise, | ||
new Promise(function(resolve, reject) { | ||
timeout = setTimeout(function() { | ||
new Promise((resolve, reject) => { | ||
timeout = setTimeout(() => { | ||
reject(new TimeoutError(`Query execution timeout after ${executionTimeout / 60} min of waiting`)); | ||
}, executionTimeout * 1000); | ||
}), | ||
]).then(function(v) { | ||
]).then((v) => { | ||
clearTimeout(timeout); | ||
return v; | ||
}, function(err) { | ||
}, (err) => { | ||
clearTimeout(timeout); | ||
@@ -154,12 +138,7 @@ throw err; | ||
async fetchQueryStageState() { | ||
const redisClient = this.createRedisClient(); | ||
const redisClient = this.queueDriver.createConnection(); | ||
try { | ||
const [active, toProcess, allQueryDefs] = await redisClient.multi() | ||
.zrange([this.activeRedisKey(), 0, -1]) | ||
.zrange([this.toProcessRedisKey(), 0, -1]) | ||
.hgetall(this.queriesDefKey()) | ||
.execAsync(); | ||
return [active, toProcess, R.map(q => JSON.parse(q), allQueryDefs || {})] | ||
return redisClient.getQueryStageState(); | ||
} finally { | ||
redisClient.quit(); | ||
redisClient.release(); | ||
} | ||
@@ -172,6 +151,4 @@ } | ||
const queryDefs = toProcess.map(k => allQueryDefs[k]).filter(q => !!q); | ||
const queryInQueue = queryDefs.find(q => | ||
this.redisHash(q.stageQueryKey) === this.redisHash(stageQueryKey) && | ||
(priorityFilter != null ? q.priority === priorityFilter : true) | ||
); | ||
const queryInQueue = queryDefs.find(q => this.redisHash(q.stageQueryKey) === this.redisHash(stageQueryKey) && | ||
(priorityFilter != null ? q.priority === priorityFilter : true)); | ||
@@ -195,21 +172,17 @@ if (queryInQueue) { | ||
async processQuery(queryKey) { | ||
const redisClient = this.createRedisClient(); | ||
const redisClient = this.queueDriver.createConnection(); | ||
try { | ||
const [insertedCount, removedCount, activeKeys, queueSize] = await redisClient.multi() | ||
.zadd([this.activeRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)]) | ||
.zremrangebyrank([this.activeRedisKey(), this.concurrency, -1]) | ||
.zrange([this.activeRedisKey(), 0, this.concurrency - 1]) | ||
.zcard(this.toProcessRedisKey()) | ||
.execAsync(); | ||
// eslint-disable-next-line no-unused-vars | ||
const [insertedCount, removedCount, activeKeys, queueSize] = | ||
await redisClient.retrieveForProcessing(queryKey); | ||
if (insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1) { | ||
let query = await redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]); | ||
let query = await redisClient.getQueryDef(queryKey); | ||
if (query) { | ||
query = JSON.parse(query); | ||
let executionResult; | ||
const startQueryTime = (new Date()).getTime(); | ||
this.logger('Performing query', { queueSize, queryKey: query.queryKey }); | ||
await this.optimisticQueryUpdate(redisClient, queryKey, { startQueryTime }); | ||
await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }); | ||
const heartBeatTimer = setInterval( | ||
() => redisClient.zaddAsync([this.activeRedisKey(), new Date().getTime(), this.redisHash(queryKey)]), | ||
() => redisClient.updateHeartBeat(queryKey), | ||
this.heartBeatInterval * 1000 | ||
@@ -224,6 +197,7 @@ ); | ||
try { | ||
return this.optimisticQueryUpdate(redisClient, queryKey, { cancelHandler }); | ||
return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }); | ||
} catch (e) { | ||
this.logger(`Error while query update`, { queryKey: queryKey, error: e.stack || e }); | ||
this.logger(`Error while query update`, { queryKey, error: e.stack || e }); | ||
} | ||
return null; | ||
} | ||
@@ -240,6 +214,6 @@ ) | ||
if (e instanceof TimeoutError) { | ||
query = await redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]); | ||
query = await redisClient.getQueryDef(queryKey); | ||
if (query) { | ||
this.logger('Cancelling query due to timeout', { queryKey: query.queryKey }); | ||
await this.sendCancelMessageFn(JSON.parse(query)); | ||
await this.sendCancelMessageFn(query); | ||
} | ||
@@ -251,17 +225,6 @@ } | ||
await redisClient.multi() | ||
.lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) | ||
.zrem([this.activeRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.recentRedisKey(), this.redisHash(queryKey)]) | ||
.hdel([this.queriesDefKey(), this.redisHash(queryKey)]) | ||
.execAsync(); | ||
await redisClient.setResultAndRemoveQuery(queryKey, executionResult); | ||
} else { | ||
this.logger('Query cancelled in-flight', { queueSize, queryKey }); | ||
await redisClient.multi() | ||
.zrem([this.activeRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) | ||
.zrem([this.recentRedisKey(), this.redisHash(queryKey)]) | ||
.hdel([this.queriesDefKey(), this.redisHash(queryKey)]) | ||
.execAsync(); | ||
await redisClient.removeQuery(queryKey); | ||
} | ||
@@ -272,27 +235,8 @@ | ||
} finally { | ||
redisClient.quit(); | ||
redisClient.release(); | ||
} | ||
} | ||
async optimisticQueryUpdate(redisClient, queryKey, toUpdate) { | ||
let query = await redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]); | ||
for (let i = 0; i < 10; i++) { | ||
if (query) { | ||
const parsedQuery = JSON.parse(query); | ||
const [beforeUpdate] = await redisClient | ||
.multi() | ||
.hget([this.queriesDefKey(), this.redisHash(queryKey)]) | ||
.hset([this.queriesDefKey(), this.redisHash(queryKey), JSON.stringify({ ...parsedQuery, ...toUpdate })]) | ||
.execAsync(); | ||
if (query === beforeUpdate) { | ||
return true; | ||
} | ||
query = beforeUpdate; | ||
} | ||
} | ||
throw new Error(`Can't update ${queryKey} with ${JSON.stringify(toUpdate)}`); | ||
} | ||
async processCancel(query) { | ||
const queryHandler = query.queryHandler; | ||
const { queryHandler } = query; | ||
try { | ||
@@ -308,33 +252,7 @@ if (!this.cancelHandlers[queryHandler]) { | ||
stageRedisKey(stageQueryKey) { | ||
return this.queryRedisKey(stageQueryKey, 'STAGE'); | ||
} | ||
toProcessRedisKey() { | ||
return this.queueRedisKey('QUEUE'); | ||
} | ||
recentRedisKey() { | ||
return this.queueRedisKey('RECENT'); | ||
} | ||
activeRedisKey() { | ||
return this.queueRedisKey('ACTIVE'); | ||
} | ||
redisHash(queryKey) { | ||
return typeof queryKey === 'string' && queryKey.length < 256 ? | ||
queryKey : | ||
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest("hex"); | ||
return this.queueDriver.redisHash(queryKey); | ||
} | ||
queryRedisKey(queryKey, suffix) { | ||
return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}` | ||
} | ||
queueRedisKey(suffix) { | ||
return `${this.redisQueuePrefix}_${suffix}`; | ||
} | ||
} | ||
module.exports = QueryQueue; |
@@ -5,3 +5,3 @@ { | ||
"author": "Statsbot, Inc.", | ||
"version": "0.4.4", | ||
"version": "0.5.0", | ||
"engines": { | ||
@@ -25,3 +25,3 @@ "node": ">=8.11.1" | ||
"license": "Apache-2.0", | ||
"gitHead": "a6fd93c6e0ecb0addb5abc5c3e6248a23ff5d250" | ||
"gitHead": "0837734afe8c7b3583d4f707b6423c4202b13d7b" | ||
} |
const QueryQueue = require('../orchestrator/QueryQueue'); | ||
const should = require('should'); | ||
const redis = require('redis'); | ||
describe('QueryQueue', function () { | ||
this.timeout(5000); | ||
const QueryQueueTest = (name, options) => { | ||
describe(`QueryQueue${name}`, function () { | ||
this.timeout(5000); | ||
let delayCount = 0; | ||
const delayFn = (result, delay) => new Promise(resolve => setTimeout(() => resolve(result), delay)); | ||
let cancelledQuery; | ||
const queue = new QueryQueue('test_query_queue', { | ||
queryHandlers: { | ||
foo: async (query) => `${query[0]} bar`, | ||
delay: async (query, setCancelHandler) => { | ||
const result = query.result + delayCount; | ||
delayCount += 1; | ||
await setCancelHandler(result); | ||
return await delayFn(result, query.delay); | ||
} | ||
}, | ||
cancelHandlers: { | ||
delay: (query) => { | ||
console.log(`cancel call: ${JSON.stringify(query)}`); | ||
cancelledQuery = query.queryKey; | ||
} | ||
}, | ||
continueWaitTimeout: 1, | ||
executionTimeout: 1, | ||
orphanedTimeout: 2, | ||
concurrency: 1 | ||
}); | ||
let delayCount = 0; | ||
const delayFn = (result, delay) => new Promise(resolve => setTimeout(() => resolve(result), delay)); | ||
let cancelledQuery; | ||
const queue = new QueryQueue('test_query_queue', { | ||
queryHandlers: { | ||
foo: async (query) => `${query[0]} bar`, | ||
delay: async (query, setCancelHandler) => { | ||
const result = query.result + delayCount; | ||
delayCount += 1; | ||
await setCancelHandler(result); | ||
return await delayFn(result, query.delay); | ||
} | ||
}, | ||
cancelHandlers: { | ||
delay: (query) => { | ||
console.log(`cancel call: ${JSON.stringify(query)}`); | ||
cancelledQuery = query.queryKey; | ||
} | ||
}, | ||
continueWaitTimeout: 1, | ||
executionTimeout: 2, | ||
orphanedTimeout: 2, | ||
concurrency: 1, | ||
...options | ||
}); | ||
it('gutter', async () => { | ||
const query = ['select * from']; | ||
const result = await queue.executeInQueue('foo', query, query); | ||
should(result).be.eql('select * from bar'); | ||
}); | ||
it('gutter', async () => { | ||
const query = ['select * from']; | ||
const result = await queue.executeInQueue('foo', query, query); | ||
should(result).be.eql('select * from bar'); | ||
}); | ||
it('priority', async () => { | ||
delayCount = 0; | ||
const result = await Promise.all([ | ||
queue.executeInQueue('delay', `11`, { delay: 200, result: '1' }, 1), | ||
queue.executeInQueue('delay', `12`, { delay: 300, result: '2' }, 0), | ||
queue.executeInQueue('delay', `13`, { delay: 400, result: '3' }, 10) | ||
]); | ||
should(result).be.eql(['11', '22', '30']); | ||
}); | ||
it('priority', async () => { | ||
delayCount = 0; | ||
const result = await Promise.all([ | ||
queue.executeInQueue('delay', `11`, { delay: 200, result: '1' }, 1), | ||
queue.executeInQueue('delay', `12`, { delay: 300, result: '2' }, 0), | ||
queue.executeInQueue('delay', `13`, { delay: 400, result: '3' }, 10) | ||
]); | ||
should(result).be.eql(['11', '22', '30']); | ||
}); | ||
it('timeout', async () => { | ||
delayCount = 0; | ||
const query = ['select * from 2']; | ||
await queue.executeInQueue('delay', query, { delay: 100, result: '1' }); | ||
try { | ||
await queue.executeInQueue('delay', query, { delay: 1500, result: '1' }); | ||
throw new Error('Another'); | ||
} catch (e) { | ||
should(e.toString().indexOf('timeout')).not.be.eql(-1); | ||
} | ||
should(cancelledQuery).be.eql(query); | ||
}); | ||
it('timeout', async () => { | ||
delayCount = 0; | ||
const query = ['select * from 2']; | ||
let errorString = ''; | ||
for (let i = 0; i < 5; i++) { | ||
try { | ||
await queue.executeInQueue('delay', query, { delay: 3000, result: '1' }); | ||
} catch (e) { | ||
if (e.message === 'Continue wait') { | ||
continue; | ||
} | ||
errorString = e.toString(); | ||
break; | ||
} | ||
} | ||
should(errorString.indexOf('timeout')).not.be.eql(-1); | ||
}); | ||
it('stage reporting', async () => { | ||
delayCount = 0; | ||
const resultPromise = queue.executeInQueue('delay', '1', { delay: 50, result: '1' }, 0, { stageQueryKey: '1' }); | ||
await delayFn(null, 10); | ||
should((await queue.getQueryStage('1')).stage).be.eql('Executing query'); | ||
await resultPromise; | ||
should(await queue.getQueryStage('1')).be.eql(undefined); | ||
}); | ||
it('stage reporting', async () => { | ||
delayCount = 0; | ||
const resultPromise = queue.executeInQueue('delay', '1', { delay: 50, result: '1' }, 0, { stageQueryKey: '1' }); | ||
await delayFn(null, 10); | ||
should((await queue.getQueryStage('1')).stage).be.eql('Executing query'); | ||
await resultPromise; | ||
should(await queue.getQueryStage('1')).be.eql(undefined); | ||
}); | ||
it('priority stage reporting', async () => { | ||
delayCount = 0; | ||
const resultPromise = queue.executeInQueue('delay', '31', { delay: 100, result: '1' }, 20, { stageQueryKey: '12' }); | ||
await delayFn(null, 10); | ||
const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 100, result: '1' }, 10, { stageQueryKey: '12' }); | ||
await delayFn(null, 10); | ||
should((await queue.getQueryStage('12', 10)).stage).be.eql('#1 in queue'); | ||
await resultPromise; | ||
await resultPromise2; | ||
should(await queue.getQueryStage('12')).be.eql(undefined); | ||
}); | ||
it('priority stage reporting', async () => { | ||
delayCount = 0; | ||
const resultPromise = queue.executeInQueue('delay', '31', { delay: 100, result: '1' }, 20, { stageQueryKey: '12' }); | ||
await delayFn(null, 10); | ||
const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 100, result: '1' }, 10, { stageQueryKey: '12' }); | ||
await delayFn(null, 10); | ||
should((await queue.getQueryStage('12', 10)).stage).be.eql('#1 in queue'); | ||
await resultPromise; | ||
await resultPromise2; | ||
should(await queue.getQueryStage('12')).be.eql(undefined); | ||
}); | ||
it('orphaned', async () => { | ||
for (let i = 1; i <= 4; i++) { | ||
await queue.executeInQueue('delay', `11` + i, { delay: 50, result: '' + i }, 0); | ||
} | ||
cancelledQuery = null; | ||
delayCount = 0; | ||
it('orphaned', async () => { | ||
for (let i = 1; i <= 4; i++) { | ||
await queue.executeInQueue('delay', `11` + i, { delay: 50, result: '' + i }, 0); | ||
} | ||
cancelledQuery = null; | ||
delayCount = 0; | ||
let result = queue.executeInQueue('delay', `111`, { delay: 800, result: '1' }, 0); | ||
delayFn(null, 50).then(() => queue.executeInQueue('delay', `112`, { delay: 800, result: '2' }, 0)).catch(e => e); | ||
delayFn(null, 60).then(() => queue.executeInQueue('delay', `113`, { delay: 500, result: '3' }, 0)).catch(e => e); | ||
delayFn(null, 70).then(() => queue.executeInQueue('delay', `114`, { delay: 900, result: '4' }, 0)).catch(e => e); | ||
let result = queue.executeInQueue('delay', `111`, { delay: 800, result: '1' }, 0); | ||
delayFn(null, 50).then(() => queue.executeInQueue('delay', `112`, { delay: 800, result: '2' }, 0)).catch(e => e); | ||
delayFn(null, 60).then(() => queue.executeInQueue('delay', `113`, { delay: 500, result: '3' }, 0)).catch(e => e); | ||
delayFn(null, 70).then(() => queue.executeInQueue('delay', `114`, { delay: 900, result: '4' }, 0)).catch(e => e); | ||
should(await result).be.eql('10'); | ||
await queue.executeInQueue('delay', `112`, { delay: 800, result: '2' }, 0); | ||
result = await queue.executeInQueue('delay', `113`, { delay: 900, result: '3' }, 0); | ||
should(result).be.eql('32'); | ||
await delayFn(null, 200); | ||
should(cancelledQuery).be.eql('114'); | ||
await queue.executeInQueue('delay', `114`, { delay: 50, result: '4' }, 0); | ||
should(await result).be.eql('10'); | ||
await queue.executeInQueue('delay', `112`, { delay: 800, result: '2' }, 0); | ||
result = await queue.executeInQueue('delay', `113`, { delay: 900, result: '3' }, 0); | ||
should(result).be.eql('32'); | ||
await delayFn(null, 200); | ||
should(cancelledQuery).be.eql('114'); | ||
await queue.executeInQueue('delay', `114`, { delay: 50, result: '4' }, 0); | ||
}); | ||
}); | ||
}); | ||
}; | ||
QueryQueueTest('Local'); | ||
QueryQueueTest('Redis', { createRedisClient: () => redis.createClient() }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
66143
17
1406
9