@cubejs-backend/query-orchestrator
Advanced tools
Comparing version 0.17.9 to 0.17.10
@@ -6,2 +6,13 @@ # Change Log | ||
## [0.17.10](https://github.com/cube-js/cube.js/compare/v0.17.9...v0.17.10) (2020-02-20) | ||
### Features | ||
* Support external rollups from readonly source ([#395](https://github.com/cube-js/cube.js/issues/395)) ([b17e841](https://github.com/cube-js/cube.js/commit/b17e841)) | ||
## [0.17.9](https://github.com/cube-js/cube.js/compare/v0.17.8...v0.17.9) (2020-02-18) | ||
@@ -8,0 +19,0 @@ |
@@ -41,9 +41,17 @@ const { reduce } = require('ramda'); | ||
testConnection() { | ||
throw 'Not implemented'; | ||
throw new Error('Not implemented'); | ||
} | ||
query() { | ||
throw 'Not implemented'; | ||
throw new Error('Not implemented'); | ||
} | ||
downloadQueryResults() { | ||
throw new Error('Not implemented'); | ||
} | ||
readOnly() { | ||
return false; | ||
} | ||
tablesSchema() { | ||
@@ -138,3 +146,3 @@ const query = this.informationSchemaQuery(); | ||
columns.data_type | ||
FROM information_schema.columns | ||
FROM information_schema.columns | ||
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`, | ||
@@ -141,0 +149,0 @@ [name, schema] |
@@ -390,33 +390,8 @@ const crypto = require('crypto'); | ||
return (client) => { | ||
const [loadSql, params] = | ||
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; | ||
let queryPromise = null; | ||
const refreshImpl = async () => { | ||
if (this.preAggregation.external) { // TODO optimize | ||
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); | ||
} | ||
queryPromise = client.loadPreAggregationIntoTable( | ||
this.targetTableName(newVersionEntry), | ||
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables) | ||
.replace( | ||
this.preAggregation.tableName, | ||
this.targetTableName(newVersionEntry) | ||
), | ||
params | ||
); | ||
await queryPromise; | ||
if (this.preAggregation.external) { | ||
await this.loadExternalPreAggregation(client, newVersionEntry); | ||
} else { | ||
await this.createIndexes(client, newVersionEntry); | ||
} | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry)); | ||
if (!this.preAggregation.external) { | ||
await this.loadCache.reset(this.preAggregation); | ||
} | ||
}; | ||
const resultPromise = refreshImpl(); | ||
resultPromise.cancel = () => queryPromise.cancel(); // TODO cancel for external upload | ||
let refreshStrategy = this.refreshImplStoreInSourceStrategy; | ||
if (this.preAggregation.external) { | ||
refreshStrategy = client.readOnly() ? this.refreshImplStreamExternalStrategy : this.refreshImplTempTableExternalStrategy; | ||
} | ||
const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry); | ||
resultPromise.cancel = () => {} // TODO implement cancel (loading rollup into table and external upload) | ||
return resultPromise; | ||
@@ -426,3 +401,56 @@ }; | ||
async loadExternalPreAggregation(client, newVersionEntry) { | ||
async refreshImplStoreInSourceStrategy(client, newVersionEntry) { | ||
const [loadSql, params] = | ||
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; | ||
await client.loadPreAggregationIntoTable( | ||
this.targetTableName(newVersionEntry), | ||
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables) | ||
.replace( | ||
this.preAggregation.tableName, | ||
this.targetTableName(newVersionEntry) | ||
), | ||
params | ||
); | ||
await this.createIndexes(client, newVersionEntry); | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry)); | ||
await this.loadCache.reset(this.preAggregation); | ||
} | ||
async refreshImplTempTableExternalStrategy(client, newVersionEntry) { | ||
const [loadSql, params] = | ||
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; | ||
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); | ||
await client.loadPreAggregationIntoTable( | ||
this.targetTableName(newVersionEntry), | ||
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables) | ||
.replace( | ||
this.preAggregation.tableName, | ||
this.targetTableName(newVersionEntry) | ||
), | ||
params | ||
); | ||
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry); | ||
await this.loadCache.reset(this.preAggregation); | ||
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry)); | ||
} | ||
async refreshImplStreamExternalStrategy(client, newVersionEntry) { | ||
const [sql, params] = | ||
Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; | ||
if (!client.downloadQueryResults) { | ||
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadQueryResults()`); | ||
} | ||
this.logger('Downloading external pre-aggregation via query', { | ||
preAggregation: this.preAggregation, | ||
requestId: this.requestId | ||
}); | ||
const tableData = await client.downloadQueryResults(sql, params); | ||
await this.uploadExternalPreAggregation(tableData, newVersionEntry); | ||
await this.loadCache.reset(this.preAggregation); | ||
} | ||
async downloadTempExternalPreAggregation(client, newVersionEntry) { | ||
if (!client.downloadTable) { | ||
@@ -437,3 +465,8 @@ throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`); | ||
const tableData = await client.downloadTable(table); | ||
const columns = await client.tableColumnTypes(table); | ||
tableData.types = await client.tableColumnTypes(table); | ||
return tableData; | ||
} | ||
async uploadExternalPreAggregation(tableData, newVersionEntry) { | ||
const table = this.targetTableName(newVersionEntry); | ||
const externalDriver = await this.externalDriverFactory(); | ||
@@ -447,3 +480,3 @@ if (!externalDriver.uploadTable) { | ||
}); | ||
await externalDriver.uploadTable(table, columns, tableData); | ||
await externalDriver.uploadTable(table, tableData.types, tableData); | ||
await this.createIndexes(externalDriver, newVersionEntry); | ||
@@ -450,0 +483,0 @@ await this.loadCache.reset(this.preAggregation); |
@@ -5,3 +5,3 @@ { | ||
"author": "Statsbot, Inc.", | ||
"version": "0.17.9", | ||
"version": "0.17.10", | ||
"repository": { | ||
@@ -29,3 +29,3 @@ "type": "git", | ||
"license": "Apache-2.0", | ||
"gitHead": "9f044d37191ba6e7c6fe6e178a01dbc52e06efa5" | ||
"gitHead": "70d41c2d5aa7b3e2061730ab7fe9bcbc125bd220" | ||
} |
104798
2077