New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cubejs-backend/query-orchestrator

Package Overview
Dependencies
Maintainers
2
Versions
493
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cubejs-backend/query-orchestrator - npm Package Compare versions

Comparing version 0.19.0 to 0.19.5

driver/utils.js

16

CHANGELOG.md

@@ -6,2 +6,18 @@ # Change Log

## [0.19.5](https://github.com/cube-js/cube.js/compare/v0.19.4...v0.19.5) (2020-04-13)
### Bug Fixes
* Broken query and pre-aggregation cancel ([aa82256](https://github.com/cube-js/cube.js/commit/aa82256))
### Features
* Log queue state on Waiting for query ([395c63c](https://github.com/cube-js/cube.js/commit/395c63c))
# [0.19.0](https://github.com/cube-js/cube.js/compare/v0.18.32...v0.19.0) (2020-04-09)

@@ -8,0 +24,0 @@

const { reduce } = require('ramda');
const { cancelCombinator } = require('./utils');

@@ -172,4 +173,8 @@ const sortByKeys = (unordered) => {

}
cancelCombinator(fn) {
return cancelCombinator(fn);
}
}
module.exports = BaseDriver;

59

orchestrator/PreAggregations.js
const crypto = require('crypto');
const R = require('ramda');
const { cancelCombinator } = require('../driver/utils');
const RedisCacheDriver = require('./RedisCacheDriver');

@@ -383,9 +384,9 @@ const LocalCacheDriver = require('./LocalCacheDriver');

}
const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry);
resultPromise.cancel = () => {}; // TODO implement cancel (loading rollup into table and external upload)
return resultPromise;
return cancelCombinator(
saveCancelFn => refreshStrategy.bind(this)(client, newVersionEntry, saveCancelFn)
);
};
}
async refreshImplStoreInSourceStrategy(client, newVersionEntry) {
async refreshImplStoreInSourceStrategy(client, newVersionEntry, saveCancelFn) {
const [loadSql, params] =

@@ -406,14 +407,14 @@ Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];

});
await client.loadPreAggregationIntoTable(
await saveCancelFn(client.loadPreAggregationIntoTable(
targetTableName,
query,
params
);
await this.createIndexes(client, newVersionEntry);
));
await this.createIndexes(client, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, targetTableName);
await this.dropOrphanedTables(client, targetTableName, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
}
async refreshImplTempTableExternalStrategy(client, newVersionEntry) {
async refreshImplTempTableExternalStrategy(client, newVersionEntry, saveCancelFn) {
const [loadSql, params] =

@@ -435,14 +436,14 @@ Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];

});
await client.loadPreAggregationIntoTable(
await saveCancelFn(client.loadPreAggregationIntoTable(
targetTableName,
query,
params
);
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry);
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
));
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn);
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, targetTableName);
await this.dropOrphanedTables(client, targetTableName, saveCancelFn);
}
async refreshImplStreamExternalStrategy(client, newVersionEntry) {
async refreshImplStreamExternalStrategy(client, newVersionEntry, saveCancelFn) {
const [sql, params] =

@@ -458,8 +459,8 @@ Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []];

});
const tableData = await client.downloadQueryResults(sql, params);
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
const tableData = await saveCancelFn(client.downloadQueryResults(sql, params));
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
}
async downloadTempExternalPreAggregation(client, newVersionEntry) {
async downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn) {
if (!client.downloadTable) {

@@ -473,8 +474,8 @@ throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`);

});
const tableData = await client.downloadTable(table);
tableData.types = await client.tableColumnTypes(table);
const tableData = await saveCancelFn(client.downloadTable(table));
tableData.types = await saveCancelFn(client.tableColumnTypes(table));
return tableData;
}
async uploadExternalPreAggregation(tableData, newVersionEntry) {
async uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn) {
const table = this.targetTableName(newVersionEntry);

@@ -489,9 +490,9 @@ const externalDriver = await this.externalDriverFactory();

});
await externalDriver.uploadTable(table, tableData.types, tableData);
await this.createIndexes(externalDriver, newVersionEntry);
await saveCancelFn(externalDriver.uploadTable(table, tableData.types, tableData));
await this.createIndexes(externalDriver, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(externalDriver, table);
await this.dropOrphanedTables(externalDriver, table, saveCancelFn);
}
async createIndexes(driver, newVersionEntry) {
async createIndexes(driver, newVersionEntry, saveCancelFn) {
if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) {

@@ -512,3 +513,3 @@ return;

});
await driver.query(
await saveCancelFn(driver.query(
QueryCache.replacePreAggregationTableNames(

@@ -522,7 +523,7 @@ query,

params
);
));
}
}
async dropOrphanedTables(client, justCreatedTable) {
async dropOrphanedTables(client, justCreatedTable, saveCancelFn) {
await this.preAggregations.addTableUsed(justCreatedTable);

@@ -547,3 +548,3 @@ const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema);

});
await Promise.all(toDrop.map(table => client.dropTable(table)));
await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table))));
}

@@ -550,0 +551,0 @@ }

@@ -76,2 +76,4 @@ const R = require('ramda');

requestId: options.requestId,
activeQueryKeys: active,
toProcessQueryKeys: toProcess,
active: active.indexOf(redisClient.redisHash(queryKey)) !== -1,

@@ -278,9 +280,12 @@ queueIndex: toProcess.indexOf(redisClient.redisHash(queryKey)),

if (e instanceof TimeoutError) {
this.logger('Cancelling query due to timeout', {
processingId,
queryKey: query.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: query.requestId
});
await this.sendCancelMessageFn(query);
const queryWithCancelHandle = await redisClient.getQueryDef(queryKey);
if (queryWithCancelHandle) {
this.logger('Cancelling query due to timeout', {
processingId,
queryKey: queryWithCancelHandle.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: queryWithCancelHandle.requestId
});
await this.sendCancelMessageFn(queryWithCancelHandle);
}
}

@@ -287,0 +292,0 @@ }

@@ -5,3 +5,3 @@ {

"author": "Statsbot, Inc.",
"version": "0.19.0",
"version": "0.19.5",
"repository": {

@@ -29,3 +29,3 @@ "type": "git",

"license": "Apache-2.0",
"gitHead": "ae0584d2f0c3bda6208597911e135f5bd639ee4f"
"gitHead": "2083d46c5f71f872bba3cbe99bc8b45e43982d2b"
}

@@ -1,2 +0,2 @@

/* globals describe, it, should, before */
/* globals describe, beforeAll, afterAll, test, expect */
const QueryOrchestrator = require('../orchestrator/QueryOrchestrator');

@@ -8,7 +8,15 @@

this.executedQueries = [];
this.cancelledQueries = [];
}
async query(query) {
query(query) {
this.executedQueries.push(query);
return [query];
let promise = Promise.resolve([query]);
if (query.match(`orders_too_big`)) {
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000)));
}
promise.cancel = async () => {
this.cancelledQueries.push(query);
};
return promise;
}

@@ -25,4 +33,5 @@

async loadPreAggregationIntoTable(preAggregationTableName) {
loadPreAggregationIntoTable(preAggregationTableName, loadSql) {
this.tables.push(preAggregationTableName.substring(0, 100));
return this.query(loadSql);
}

@@ -32,2 +41,3 @@

this.tables = this.tables.filter(t => t !== tableName.split('.')[1]);
return this.query(`DROP TABLE ${tableName}`);
}

@@ -39,3 +49,11 @@ }

const queryOrchestrator = new QueryOrchestrator(
'TEST', async () => mockDriver, (msg, params) => console.log(msg, params)
'TEST',
async () => mockDriver,
(msg, params) => console.log(msg, params), {
preAggregationsOptions: {
queueOptions: {
executionTimeout: 1
}
}
}
);

@@ -126,2 +144,27 @@

});
test('cancel pre-aggregation', async () => {
const query = {
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20181101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000",
values: ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"],
cacheKeyQueries: {
renewalThreshold: 21600,
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
},
preAggregations: [{
preAggregationsSchema: "stb_pre_aggregations",
tableName: "stb_pre_aggregations.orders_number_and_count20181101",
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20181101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders_too_big AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"]],
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
}],
renewQuery: true,
requestId: 'cancel pre-aggregation'
};
try {
await queryOrchestrator.fetchQuery(query);
} catch (e) {
expect(e.toString()).toMatch(/timeout/);
}
expect(mockDriver.cancelledQueries[0]).toMatch(/orders_too_big/);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc