@cubejs-backend/query-orchestrator
Advanced tools
Comparing version 0.13.0 to 0.13.2
@@ -6,2 +6,13 @@ # Change Log | ||
## [0.13.2](https://github.com/cube-js/cube.js/compare/v0.13.1...v0.13.2) (2019-12-13) | ||
### Features | ||
* Propagate `requestId` for trace logging ([24d7b41](https://github.com/cube-js/cube.js/commit/24d7b41)) | ||
# [0.13.0](https://github.com/cube-js/cube.js/compare/v0.12.3...v0.13.0) (2019-12-10) | ||
@@ -8,0 +19,0 @@ |
@@ -64,3 +64,3 @@ const R = require('ramda'); | ||
const queryQueueObj = { | ||
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority | ||
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority, requestId: options.requestId | ||
}; | ||
@@ -67,0 +67,0 @@ const key = this.redisHash(queryKey); |
@@ -168,2 +168,3 @@ const crypto = require('crypto'); | ||
this.externalDriverFactory = preAggregations.externalDriverFactory; | ||
this.requestId = options.requestId; | ||
} | ||
@@ -183,3 +184,7 @@ | ||
if (!(e instanceof ContinueWaitError)) { | ||
this.logger('Error loading pre-aggregation', { error: (e.stack || e), preAggregation: this.preAggregation }); | ||
this.logger('Error loading pre-aggregation', { | ||
error: (e.stack || e), | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
} | ||
@@ -253,3 +258,6 @@ }); | ||
if (versionEntry.structure_version !== newVersionEntry.structure_version) { | ||
this.logger('Invalidating pre-aggregation structure', { preAggregation: this.preAggregation }); | ||
this.logger('Invalidating pre-aggregation structure', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
await this.executeInQueue(invalidationKeys, 10, newVersionEntry); | ||
@@ -259,3 +267,6 @@ return mostRecentTargetTableName(); | ||
if (this.waitForRenew) { | ||
this.logger('Waiting for pre-aggregation renew', { preAggregation: this.preAggregation }); | ||
this.logger('Waiting for pre-aggregation renew', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
await this.executeInQueue(invalidationKeys, 0, newVersionEntry); | ||
@@ -275,3 +286,6 @@ return mostRecentTargetTableName(); | ||
} else { | ||
this.logger('Creating pre-aggregation from scratch', { preAggregation: this.preAggregation }); | ||
this.logger('Creating pre-aggregation from scratch', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
await this.executeInQueue(invalidationKeys, 10, newVersionEntry); | ||
@@ -291,3 +305,6 @@ return mostRecentTargetTableName(); | ||
scheduleRefresh(invalidationKeys, newVersionEntry) { | ||
this.logger('Refreshing pre-aggregation content', { preAggregation: this.preAggregation }); | ||
this.logger('Refreshing pre-aggregation content', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
this.executeInQueue(invalidationKeys, 0, newVersionEntry) | ||
@@ -306,5 +323,7 @@ .then(() => { | ||
} | ||
this.logger('Error refreshing pre-aggregation', { error: (e.stack || e), preAggregation: this.preAggregation }) | ||
this.logger('Error refreshing pre-aggregation', { | ||
error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId | ||
}); | ||
} | ||
}) | ||
}); | ||
} | ||
@@ -319,3 +338,4 @@ | ||
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables, | ||
newVersionEntry | ||
newVersionEntry, | ||
requestId: this.requestId | ||
}, | ||
@@ -371,3 +391,6 @@ priority, | ||
const table = this.targetTableName(newVersionEntry); | ||
this.logger('Downloading external pre-aggregation', { preAggregation: this.preAggregation }); | ||
this.logger('Downloading external pre-aggregation', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
const tableData = await client.downloadTable(table); | ||
@@ -379,3 +402,6 @@ const columns = await client.tableColumnTypes(table); | ||
} | ||
this.logger('Uploading external pre-aggregation', { preAggregation: this.preAggregation }); | ||
this.logger('Uploading external pre-aggregation', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
await externalDriver.uploadTable(table, columns, tableData); | ||
@@ -402,3 +428,6 @@ await this.loadCache.reset(this.preAggregation); | ||
.filter(t => tablesToSave.indexOf(t) === -1); | ||
this.logger('Dropping orphaned tables', { tablesToDrop: JSON.stringify(toDrop) }); | ||
this.logger('Dropping orphaned tables', { | ||
tablesToDrop: JSON.stringify(toDrop), | ||
requestId: this.requestId | ||
}); | ||
await Promise.all(toDrop.map(table => client.dropTable(table))); | ||
@@ -448,3 +477,3 @@ } | ||
loadCache, | ||
{ waitForRenew: queryBody.renewQuery } | ||
{ waitForRenew: queryBody.renewQuery, requestId: queryBody.requestId } | ||
); | ||
@@ -463,3 +492,5 @@ const preAggregationPromise = () => loader.loadPreAggregation().then(async targetTableName => { | ||
this.queue = QueryCache.createQueue(`SQL_PRE_AGGREGATIONS_${this.redisPrefix}`, this.driverFactory, (client, q) => { | ||
const { preAggregation, preAggregationsTablesToTempTables, newVersionEntry } = q; | ||
const { | ||
preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId | ||
} = q; | ||
const loader = new PreAggregationLoader( | ||
@@ -473,3 +504,4 @@ this.redisPrefix, | ||
preAggregationsTablesToTempTables, | ||
new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this) | ||
new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this), | ||
{ requestId } | ||
); | ||
@@ -476,0 +508,0 @@ return loader.refresh(newVersionEntry)(client); |
@@ -44,3 +44,8 @@ const crypto = require('crypto'); | ||
if (!cacheKeyQueries) { | ||
return { data: await this.queryWithRetryAndRelease(query, values, queryBody.external) }; | ||
return { | ||
data: await this.queryWithRetryAndRelease(query, values, { | ||
external: queryBody.external, | ||
requestId: queryBody.requestId | ||
}) | ||
}; | ||
} | ||
@@ -50,5 +55,6 @@ const cacheKey = QueryCache.queryCacheKey(queryBody); | ||
if (queryBody.renewQuery) { | ||
this.logger('Requested renew', { cacheKey }); | ||
this.logger('Requested renew', { cacheKey, requestId: queryBody.requestId }); | ||
return this.renewQuery(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, { | ||
external: queryBody.external | ||
external: queryBody.external, | ||
requestId: queryBody.requestId | ||
}); | ||
@@ -64,3 +70,4 @@ } | ||
forceNoCache, | ||
external: queryBody.external | ||
external: queryBody.external, | ||
requestId: queryBody.requestId | ||
} | ||
@@ -71,3 +78,4 @@ ); | ||
this.startRenewCycle(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, { | ||
external: queryBody.external | ||
external: queryBody.external, | ||
requestId: queryBody.requestId | ||
}); | ||
@@ -103,5 +111,10 @@ } | ||
queryWithRetryAndRelease(query, values, priority, cacheKey, external) { | ||
queryWithRetryAndRelease(query, values, { | ||
priority, cacheKey, external, requestId | ||
}) { | ||
const queue = external ? this.getExternalQueue() : this.getQueue(); | ||
return queue.executeInQueue('query', cacheKey, { query, values }, priority, { stageQueryKey: cacheKey }); | ||
return queue.executeInQueue('query', cacheKey, { query, values }, priority, { | ||
stageQueryKey: cacheKey, | ||
requestId | ||
}); | ||
} | ||
@@ -182,3 +195,5 @@ | ||
if (!(e instanceof ContinueWaitError)) { | ||
this.logger('Error while renew cycle', { query, query_values: values, error: e.stack || e }); | ||
this.logger('Error while renew cycle', { | ||
query, query_values: values, error: e.stack || e, requestId: options.requestId | ||
}); | ||
} | ||
@@ -196,3 +211,8 @@ }); | ||
expireSecs, | ||
{ renewalThreshold: this.options.refreshKeyRenewalThreshold || 2 * 60, renewalKey: q, waitForRenew: true } | ||
{ | ||
renewalThreshold: this.options.refreshKeyRenewalThreshold || 2 * 60, | ||
renewalKey: q, | ||
waitForRenew: true, | ||
requestId: options.requestId | ||
} | ||
)) | ||
@@ -204,3 +224,3 @@ ) | ||
} | ||
this.logger('Error fetching cache key queries', { error: e.stack || e }); | ||
this.logger('Error fetching cache key queries', { error: e.stack || e, requestId: options.requestId }); | ||
return []; | ||
@@ -220,3 +240,4 @@ }) | ||
waitForRenew: true, | ||
external: options.external | ||
external: options.external, | ||
requestId: options.requestId | ||
} | ||
@@ -236,3 +257,5 @@ ), | ||
const fetchNew = () => ( | ||
this.queryWithRetryAndRelease(query, values, options.priority, cacheKey, options.external).then(res => { | ||
this.queryWithRetryAndRelease(query, values, { | ||
priority: options.priority, cacheKey, external: options.external, requestId: options.requestId | ||
}).then(res => { | ||
const result = { | ||
@@ -245,3 +268,3 @@ time: (new Date()).getTime(), | ||
.then(() => { | ||
this.logger('Renewed', { cacheKey }); | ||
this.logger('Renewed', { cacheKey, requestId: options.requestId }); | ||
return res; | ||
@@ -251,5 +274,9 @@ }); | ||
if (!(e instanceof ContinueWaitError)) { | ||
this.logger('Dropping Cache', { cacheKey, error: e.stack || e }); | ||
this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId }); | ||
this.cacheDriver.remove(redisKey) | ||
.catch(err => this.logger('Error removing key', { cacheKey, error: err.stack || err })); | ||
.catch(err => this.logger('Error removing key', { | ||
cacheKey, | ||
error: err.stack || err, | ||
requestId: options.requestId | ||
})); | ||
} | ||
@@ -261,3 +288,3 @@ throw e; | ||
if (options.forceNoCache) { | ||
this.logger('Force no cache for', { cacheKey }); | ||
this.logger('Force no cache for', { cacheKey, requestId: options.requestId }); | ||
return fetchNew(); | ||
@@ -276,3 +303,4 @@ } | ||
newRenewalKey: renewalKey, | ||
renewalThreshold | ||
renewalThreshold, | ||
requestId: options.requestId | ||
}); | ||
@@ -288,9 +316,9 @@ if ( | ||
if (options.waitForRenew) { | ||
this.logger('Waiting for renew', { cacheKey, renewalThreshold }); | ||
this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId }); | ||
return fetchNew(); | ||
} else { | ||
this.logger('Renewing existing key', { cacheKey, renewalThreshold }); | ||
this.logger('Renewing existing key', { cacheKey, renewalThreshold, requestId: options.requestId }); | ||
fetchNew().catch(e => { | ||
if (!(e instanceof ContinueWaitError)) { | ||
this.logger('Error renewing', { cacheKey, error: e.stack || e }); | ||
this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId }); | ||
} | ||
@@ -300,6 +328,6 @@ }); | ||
} | ||
this.logger('Using cache for', { cacheKey }); | ||
this.logger('Using cache for', { cacheKey, requestId: options.requestId }); | ||
return parsedResult.result; | ||
} else { | ||
this.logger('Missing cache for', { cacheKey }); | ||
this.logger('Missing cache for', { cacheKey, requestId: options.requestId }); | ||
return fetchNew(); | ||
@@ -306,0 +334,0 @@ } |
@@ -57,3 +57,7 @@ const R = require('ramda'); | ||
this.logger('Added to queue', { | ||
priority, queueSize, queryKey, queuePrefix: this.redisQueuePrefix | ||
priority, | ||
queueSize, | ||
queryKey, | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: options.requestId | ||
}); | ||
@@ -95,3 +99,7 @@ } | ||
if (query) { | ||
this.logger('Removing orphaned query', { queryKey: query.queryKey, queuePrefix: this.redisQueuePrefix }); | ||
this.logger('Removing orphaned query', { | ||
queryKey: query.queryKey, | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
await this.sendCancelMessageFn(query); | ||
@@ -178,3 +186,4 @@ } | ||
queryKey: query.queryKey, | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -199,3 +208,4 @@ await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }); | ||
error: e.stack || e, | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -212,3 +222,4 @@ } | ||
queryKey: query.queryKey, | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -222,3 +233,4 @@ } catch (e) { | ||
error: (e.stack || e).toString(), | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -230,3 +242,4 @@ if (e instanceof TimeoutError) { | ||
queryKey: query.queryKey, | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -242,3 +255,8 @@ await this.sendCancelMessageFn(query); | ||
} else { | ||
this.logger('Query cancelled in-flight', { queueSize, queryKey, queuePrefix: this.redisQueuePrefix }); | ||
this.logger('Query cancelled in-flight', { | ||
queueSize, | ||
queryKey, | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
await redisClient.removeQuery(queryKey); | ||
@@ -265,3 +283,4 @@ } | ||
error: e.stack || e, | ||
queuePrefix: this.redisQueuePrefix | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
@@ -268,0 +287,0 @@ } |
@@ -37,3 +37,3 @@ const R = require('ramda'); | ||
JSON.stringify({ | ||
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority | ||
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority, requestId: options.requestId | ||
}) | ||
@@ -40,0 +40,0 @@ ]) |
@@ -5,3 +5,3 @@ { | ||
"author": "Statsbot, Inc.", | ||
"version": "0.13.0", | ||
"version": "0.13.2", | ||
"repository": { | ||
@@ -29,3 +29,3 @@ "type": "git", | ||
"license": "Apache-2.0", | ||
"gitHead": "61e21aec1489a441a0b8aeceb9ee8cb7fbd6efb1" | ||
"gitHead": "43689f13802742bfcbfce0cecc27dfc5bfcfebfa" | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
90451
1823
11