@cubejs-backend/query-orchestrator
Advanced tools
Comparing version 0.19.0 to 0.19.5
@@ -6,2 +6,18 @@ # Change Log | ||
## [0.19.5](https://github.com/cube-js/cube.js/compare/v0.19.4...v0.19.5) (2020-04-13) | ||
### Bug Fixes | ||
* Broken query and pre-aggregation cancel ([aa82256](https://github.com/cube-js/cube.js/commit/aa82256)) | ||
### Features | ||
* Log queue state on Waiting for query ([395c63c](https://github.com/cube-js/cube.js/commit/395c63c)) | ||
# [0.19.0](https://github.com/cube-js/cube.js/compare/v0.18.32...v0.19.0) (2020-04-09) | ||
@@ -8,0 +24,0 @@ |
const { reduce } = require('ramda'); | ||
const { cancelCombinator } = require('./utils'); | ||
@@ -172,4 +173,8 @@ const sortByKeys = (unordered) => { | ||
} | ||
cancelCombinator(fn) { | ||
return cancelCombinator(fn); | ||
} | ||
} | ||
module.exports = BaseDriver; |
const crypto = require('crypto'); | ||
const R = require('ramda'); | ||
const { cancelCombinator } = require('../driver/utils'); | ||
const RedisCacheDriver = require('./RedisCacheDriver'); | ||
@@ -383,9 +384,9 @@ const LocalCacheDriver = require('./LocalCacheDriver'); | ||
} | ||
const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry); | ||
resultPromise.cancel = () => {}; // TODO implement cancel (loading rollup into table and external upload) | ||
return resultPromise; | ||
return cancelCombinator( | ||
saveCancelFn => refreshStrategy.bind(this)(client, newVersionEntry, saveCancelFn) | ||
); | ||
}; | ||
} | ||
async refreshImplStoreInSourceStrategy(client, newVersionEntry) { | ||
async refreshImplStoreInSourceStrategy(client, newVersionEntry, saveCancelFn) { | ||
const [loadSql, params] = | ||
@@ -406,14 +407,14 @@ Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; | ||
}); | ||
await client.loadPreAggregationIntoTable( | ||
await saveCancelFn(client.loadPreAggregationIntoTable( | ||
targetTableName, | ||
query, | ||
params | ||
); | ||
await this.createIndexes(client, newVersionEntry); | ||
)); | ||
await this.createIndexes(client, newVersionEntry, saveCancelFn); | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(client, targetTableName); | ||
await this.dropOrphanedTables(client, targetTableName, saveCancelFn); | ||
await this.loadCache.reset(this.preAggregation); | ||
} | ||
async refreshImplTempTableExternalStrategy(client, newVersionEntry) { | ||
async refreshImplTempTableExternalStrategy(client, newVersionEntry, saveCancelFn) { | ||
const [loadSql, params] = | ||
@@ -435,14 +436,14 @@ Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; | ||
}); | ||
await client.loadPreAggregationIntoTable( | ||
await saveCancelFn(client.loadPreAggregationIntoTable( | ||
targetTableName, | ||
query, | ||
params | ||
); | ||
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry); | ||
)); | ||
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn); | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(client, targetTableName); | ||
await this.dropOrphanedTables(client, targetTableName, saveCancelFn); | ||
} | ||
async refreshImplStreamExternalStrategy(client, newVersionEntry) { | ||
async refreshImplStreamExternalStrategy(client, newVersionEntry, saveCancelFn) { | ||
const [sql, params] = | ||
@@ -458,8 +459,8 @@ Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; | ||
}); | ||
const tableData = await client.downloadQueryResults(sql, params); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry); | ||
const tableData = await saveCancelFn(client.downloadQueryResults(sql, params)); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn); | ||
await this.loadCache.reset(this.preAggregation); | ||
} | ||
async downloadTempExternalPreAggregation(client, newVersionEntry) { | ||
async downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn) { | ||
if (!client.downloadTable) { | ||
@@ -473,8 +474,8 @@ throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`); | ||
}); | ||
const tableData = await client.downloadTable(table); | ||
tableData.types = await client.tableColumnTypes(table); | ||
const tableData = await saveCancelFn(client.downloadTable(table)); | ||
tableData.types = await saveCancelFn(client.tableColumnTypes(table)); | ||
return tableData; | ||
} | ||
async uploadExternalPreAggregation(tableData, newVersionEntry) { | ||
async uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn) { | ||
const table = this.targetTableName(newVersionEntry); | ||
@@ -489,9 +490,9 @@ const externalDriver = await this.externalDriverFactory(); | ||
}); | ||
await externalDriver.uploadTable(table, tableData.types, tableData); | ||
await this.createIndexes(externalDriver, newVersionEntry); | ||
await saveCancelFn(externalDriver.uploadTable(table, tableData.types, tableData)); | ||
await this.createIndexes(externalDriver, newVersionEntry, saveCancelFn); | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(externalDriver, table); | ||
await this.dropOrphanedTables(externalDriver, table, saveCancelFn); | ||
} | ||
async createIndexes(driver, newVersionEntry) { | ||
async createIndexes(driver, newVersionEntry, saveCancelFn) { | ||
if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) { | ||
@@ -512,3 +513,3 @@ return; | ||
}); | ||
await driver.query( | ||
await saveCancelFn(driver.query( | ||
QueryCache.replacePreAggregationTableNames( | ||
@@ -522,7 +523,7 @@ query, | ||
params | ||
); | ||
)); | ||
} | ||
} | ||
async dropOrphanedTables(client, justCreatedTable) { | ||
async dropOrphanedTables(client, justCreatedTable, saveCancelFn) { | ||
await this.preAggregations.addTableUsed(justCreatedTable); | ||
@@ -547,3 +548,3 @@ const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema); | ||
}); | ||
await Promise.all(toDrop.map(table => client.dropTable(table))); | ||
await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table)))); | ||
} | ||
@@ -550,0 +551,0 @@ } |
@@ -76,2 +76,4 @@ const R = require('ramda'); | ||
requestId: options.requestId, | ||
activeQueryKeys: active, | ||
toProcessQueryKeys: toProcess, | ||
active: active.indexOf(redisClient.redisHash(queryKey)) !== -1, | ||
@@ -278,9 +280,12 @@ queueIndex: toProcess.indexOf(redisClient.redisHash(queryKey)), | ||
if (e instanceof TimeoutError) { | ||
this.logger('Cancelling query due to timeout', { | ||
processingId, | ||
queryKey: query.queryKey, | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: query.requestId | ||
}); | ||
await this.sendCancelMessageFn(query); | ||
const queryWithCancelHandle = await redisClient.getQueryDef(queryKey); | ||
if (queryWithCancelHandle) { | ||
this.logger('Cancelling query due to timeout', { | ||
processingId, | ||
queryKey: queryWithCancelHandle.queryKey, | ||
queuePrefix: this.redisQueuePrefix, | ||
requestId: queryWithCancelHandle.requestId | ||
}); | ||
await this.sendCancelMessageFn(queryWithCancelHandle); | ||
} | ||
} | ||
@@ -287,0 +292,0 @@ } |
@@ -5,3 +5,3 @@ { | ||
"author": "Statsbot, Inc.", | ||
"version": "0.19.0", | ||
"version": "0.19.5", | ||
"repository": { | ||
@@ -29,3 +29,3 @@ "type": "git", | ||
"license": "Apache-2.0", | ||
"gitHead": "ae0584d2f0c3bda6208597911e135f5bd639ee4f" | ||
"gitHead": "2083d46c5f71f872bba3cbe99bc8b45e43982d2b" | ||
} |
@@ -1,2 +0,2 @@ | ||
/* globals describe, it, should, before */ | ||
/* globals describe, beforeAll, afterAll, test, expect */ | ||
const QueryOrchestrator = require('../orchestrator/QueryOrchestrator'); | ||
@@ -8,7 +8,15 @@ | ||
this.executedQueries = []; | ||
this.cancelledQueries = []; | ||
} | ||
async query(query) { | ||
query(query) { | ||
this.executedQueries.push(query); | ||
return [query]; | ||
let promise = Promise.resolve([query]); | ||
if (query.match(`orders_too_big`)) { | ||
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000))); | ||
} | ||
promise.cancel = async () => { | ||
this.cancelledQueries.push(query); | ||
}; | ||
return promise; | ||
} | ||
@@ -25,4 +33,5 @@ | ||
async loadPreAggregationIntoTable(preAggregationTableName) { | ||
loadPreAggregationIntoTable(preAggregationTableName, loadSql) { | ||
this.tables.push(preAggregationTableName.substring(0, 100)); | ||
return this.query(loadSql); | ||
} | ||
@@ -32,2 +41,3 @@ | ||
this.tables = this.tables.filter(t => t !== tableName.split('.')[1]); | ||
return this.query(`DROP TABLE ${tableName}`); | ||
} | ||
@@ -39,3 +49,11 @@ } | ||
const queryOrchestrator = new QueryOrchestrator( | ||
'TEST', async () => mockDriver, (msg, params) => console.log(msg, params) | ||
'TEST', | ||
async () => mockDriver, | ||
(msg, params) => console.log(msg, params), { | ||
preAggregationsOptions: { | ||
queueOptions: { | ||
executionTimeout: 1 | ||
} | ||
} | ||
} | ||
); | ||
@@ -126,2 +144,27 @@ | ||
}); | ||
test('cancel pre-aggregation', async () => { | ||
const query = { | ||
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20181101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000", | ||
values: ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"], | ||
cacheKeyQueries: { | ||
renewalThreshold: 21600, | ||
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]] | ||
}, | ||
preAggregations: [{ | ||
preAggregationsSchema: "stb_pre_aggregations", | ||
tableName: "stb_pre_aggregations.orders_number_and_count20181101", | ||
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20181101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders_too_big AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"]], | ||
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]] | ||
}], | ||
renewQuery: true, | ||
requestId: 'cancel pre-aggregation' | ||
}; | ||
try { | ||
await queryOrchestrator.fetchQuery(query); | ||
} catch (e) { | ||
expect(e.toString()).toMatch(/timeout/); | ||
} | ||
expect(mockDriver.cancelledQueries[0]).toMatch(/orders_too_big/); | ||
}); | ||
}); |
167410
22
2421