@uwdata/mosaic-core
Advanced tools
Comparing version 0.9.0 to 0.10.0
{ | ||
"name": "@uwdata/mosaic-core", | ||
"version": "0.9.0", | ||
"version": "0.10.0", | ||
"description": "Scalable and extensible linked data views.", | ||
@@ -13,3 +13,3 @@ "keywords": [ | ||
"license": "BSD-3-Clause", | ||
"author": "Jeffrey Heer (http://idl.cs.washington.edu)", | ||
"author": "Jeffrey Heer (https://idl.uw.edu)", | ||
"type": "module", | ||
@@ -32,10 +32,10 @@ "main": "src/index.js", | ||
"dependencies": { | ||
"@duckdb/duckdb-wasm": "^1.28.1-dev195.0", | ||
"@uwdata/mosaic-sql": "^0.9.0", | ||
"apache-arrow": "^15.0.2" | ||
"@duckdb/duckdb-wasm": "^1.28.1-dev232.0", | ||
"@uwdata/mosaic-sql": "^0.10.0", | ||
"apache-arrow": "^16.1.0" | ||
}, | ||
"devDependencies": { | ||
"@uwdata/mosaic-duckdb": "^0.9.0" | ||
"@uwdata/mosaic-duckdb": "^0.10.0" | ||
}, | ||
"gitHead": "89bb9b0dfa747aed691eaeba35379525a6764c61" | ||
"gitHead": "94fc4f0d4efc622001f6afd6714d1e9dda745be2" | ||
} |
import { socketConnector } from './connectors/socket.js'; | ||
import { FilterGroup } from './FilterGroup.js'; | ||
import { DataCubeIndexer } from './DataCubeIndexer.js'; | ||
import { QueryManager, Priority } from './QueryManager.js'; | ||
@@ -7,2 +7,6 @@ import { queryFieldInfo } from './util/field-info.js'; | ||
/** | ||
* The singleton Coordinator instance. | ||
* @type {Coordinator} | ||
*/ | ||
let _instance; | ||
@@ -24,11 +28,27 @@ | ||
/** | ||
* A Mosaic Coordinator manages all database communication for clients and | ||
* handles selection updates. The Coordinator also performs optimizations | ||
* including query caching, consolidation, and data cube indexing. | ||
* @param {*} [db] Database connector. Defaults to a web socket connection. | ||
* @param {object} [options] Coordinator options. | ||
* @param {*} [options.logger=console] The logger to use, defaults to `console`. | ||
* @param {*} [options.manager] The query manager to use. | ||
* @param {boolean} [options.cache=true] Boolean flag to enable/disable query caching. | ||
* @param {boolean} [options.consolidate=true] Boolean flag to enable/disable query consolidation. | ||
* @param {object} [options.indexes] Data cube indexer options. | ||
*/ | ||
export class Coordinator { | ||
constructor(db = socketConnector(), options = {}) { | ||
const { | ||
logger = console, | ||
manager = new QueryManager() | ||
} = options; | ||
constructor(db = socketConnector(), { | ||
logger = console, | ||
manager = new QueryManager(), | ||
cache = true, | ||
consolidate = true, | ||
indexes = {} | ||
} = {}) { | ||
this.manager = manager; | ||
this.manager.cache(cache); | ||
this.manager.consolidate(consolidate); | ||
this.dataCubeIndexer = new DataCubeIndexer(this, indexes); | ||
this.logger(logger); | ||
this.configure(options); | ||
this.databaseConnector(db); | ||
@@ -38,31 +58,15 @@ this.clear(); | ||
logger(logger) { | ||
if (arguments.length) { | ||
this._logger = logger || voidLogger(); | ||
this.manager.logger(this._logger); | ||
} | ||
return this._logger; | ||
} | ||
/** | ||
* Set configuration options for this coordinator. | ||
* @param {object} [options] Configration options. | ||
* @param {boolean} [options.cache=true] Boolean flag to enable/disable query caching. | ||
* @param {boolean} [options.consolidate=true] Boolean flag to enable/disable query consolidation. | ||
* @param {boolean|object} [options.indexes=true] Boolean flag to enable/disable | ||
* automatic data cube indexes or an index options object. | ||
* Clear the coordinator state. | ||
* @param {object} [options] Options object. | ||
* @param {boolean} [options.clients=true] If true, disconnect all clients. | ||
* @param {boolean} [options.cache=true] If true, clear the query cache. | ||
*/ | ||
configure({ cache = true, consolidate = true, indexes = true } = {}) { | ||
this.manager.cache(cache); | ||
this.manager.consolidate(consolidate); | ||
this.indexes = indexes; | ||
} | ||
clear({ clients = true, cache = true } = {}) { | ||
this.manager.clear(); | ||
if (clients) { | ||
this.filterGroups?.forEach(group => group.disconnect()); | ||
this.filterGroups = new Map; | ||
this.clients?.forEach(client => this.disconnect(client)); | ||
this.filterGroups?.forEach(group => group.finalize()); | ||
this.clients = new Set; | ||
this.filterGroups = new Map; | ||
} | ||
@@ -72,2 +76,7 @@ if (cache) this.manager.cache().clear(); | ||
/** | ||
* Get or set the database connector. | ||
* @param {*} [db] The database connector to use. | ||
* @returns The current database connector. | ||
*/ | ||
databaseConnector(db) { | ||
@@ -77,4 +86,23 @@ return this.manager.connector(db); | ||
/** | ||
* Get or set the logger. | ||
* @param {*} logger The logger to use. | ||
* @returns The current logger | ||
*/ | ||
logger(logger) { | ||
if (arguments.length) { | ||
this._logger = logger || voidLogger(); | ||
this.manager.logger(this._logger); | ||
} | ||
return this._logger; | ||
} | ||
// -- Query Management ---- | ||
/** | ||
* Cancel previosuly submitted query requests. These queries will be | ||
* canceled if they are queued but have not yet been submitted. | ||
* @param {import('./util/query-result.js').QueryResult[]} requests An array | ||
* of query result objects, such as those returned by the `query` method. | ||
*/ | ||
cancel(requests) { | ||
@@ -84,2 +112,11 @@ this.manager.cancel(requests); | ||
/** | ||
* Issue a query for which no result (return value) is needed. | ||
* @param {import('@uwdata/mosaic-sql').Query | string} query The query. | ||
* @param {object} [options] An options object. | ||
* @param {number} [options.priority] The query priority, defaults to | ||
* `Priority.Normal`. | ||
* @returns {import('./util/query-result.js').QueryResult} A query result | ||
* promise. | ||
*/ | ||
exec(query, { priority = Priority.Normal } = {}) { | ||
@@ -90,2 +127,14 @@ query = Array.isArray(query) ? query.join(';\n') : query; | ||
/** | ||
* Issue a query to the backing database. The submitted query may be | ||
* consolidate with other queries and its results may be cached. | ||
* @param {import('@uwdata/mosaic-sql').Query | string} query The query. | ||
* @param {object} [options] An options object. | ||
* @param {'arrow' | 'json'} [options.type] The query result format type. | ||
* @param {boolean} [options.cache=true] If true, cache the query result. | ||
* @param {number} [options.priority] The query priority, defaults to | ||
* `Priority.Normal`. | ||
* @returns {import('./util/query-result.js').QueryResult} A query result | ||
* promise. | ||
*/ | ||
query(query, { | ||
@@ -100,2 +149,11 @@ type = 'arrow', | ||
/** | ||
* Issue a query to prefetch data for later use. The query result is cached | ||
* for efficient future access. | ||
* @param {import('@uwdata/mosaic-sql').Query | string} query The query. | ||
* @param {object} [options] An options object. | ||
* @param {'arrow' | 'json'} [options.type] The query result format type. | ||
* @returns {import('./util/query-result.js').QueryResult} A query result | ||
* promise. | ||
*/ | ||
prefetch(query, options = {}) { | ||
@@ -117,12 +175,31 @@ return this.query(query, { ...options, cache: true, priority: Priority.Low }); | ||
/** | ||
* Update client data by submitting the given query and returning the | ||
* data (or error) to the client. | ||
* @param {import('./MosaicClient.js').MosaicClient} client A Mosaic client. | ||
* @param {import('@uwdata/mosaic-sql').Query | string} query The data query. | ||
* @param {number} [priority] The query priority. | ||
* @returns {Promise} A Promise that resolves upon completion of the update. | ||
*/ | ||
updateClient(client, query, priority = Priority.Normal) { | ||
client.queryPending(); | ||
return this.query(query, { priority }).then( | ||
data => client.queryResult(data).update(), | ||
err => { client.queryError(err); this._logger.error(err); } | ||
); | ||
return this.query(query, { priority }) | ||
.then( | ||
data => client.queryResult(data).update(), | ||
err => { this._logger.error(err); client.queryError(err); } | ||
) | ||
.catch(err => this._logger.error(err)); | ||
} | ||
/** | ||
* Issue a query request for a client. If the query is null or undefined, | ||
* the client is simply updated. Otherwise `updateClient` is called. As a | ||
* side effect, this method clears the current data cube indexer state. | ||
* @param {import('./MosaicClient.js').MosaicClient} client The client | ||
* to update. | ||
* @param {import('@uwdata/mosaic-sql').Query | string | null} [query] | ||
* The query to issue. | ||
*/ | ||
requestQuery(client, query) { | ||
this.filterGroups.get(client.filterBy)?.reset(); | ||
this.dataCubeIndexer.clear(); | ||
return query | ||
@@ -135,6 +212,7 @@ ? this.updateClient(client, query) | ||
* Connect a client to the coordinator. | ||
* @param {import('./MosaicClient.js').MosaicClient} client the client to disconnect | ||
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic | ||
* client to connect. | ||
*/ | ||
async connect(client) { | ||
const { clients, filterGroups, indexes } = this; | ||
const { clients } = this; | ||
@@ -153,12 +231,4 @@ if (clients.has(client)) { | ||
// connect filters | ||
const filter = client.filterBy; | ||
if (filter) { | ||
if (filterGroups.has(filter)) { | ||
filterGroups.get(filter).add(client); | ||
} else { | ||
const group = new FilterGroup(this, filter, indexes); | ||
filterGroups.set(filter, group.add(client)); | ||
} | ||
} | ||
// connect filter selection | ||
connectSelection(this, client.filterBy, client); | ||
@@ -170,4 +240,4 @@ client.requestQuery(); | ||
* Disconnect a client from the coordinator. | ||
* | ||
* @param {import('./MosaicClient.js').MosaicClient} client the client to disconnect | ||
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic | ||
* client to disconnect. | ||
*/ | ||
@@ -178,5 +248,80 @@ disconnect(client) { | ||
clients.delete(client); | ||
filterGroups.get(client.filterBy)?.remove(client); | ||
client.coordinator = null; | ||
const group = filterGroups.get(client.filterBy); | ||
if (group) { | ||
group.clients.delete(client); | ||
} | ||
} | ||
} | ||
/** | ||
* Connect a selection-client pair to the coordinator to process updates. | ||
* @param {Coordinator} mc The Mosaic coordinator. | ||
* @param {import('./Selection.js').Selection} selection A selection. | ||
* @param {import('./MosaicClient.js').MosaicClient} client A Mosiac | ||
* client that is filtered by the given selection. | ||
*/ | ||
function connectSelection(mc, selection, client) { | ||
if (!selection) return; | ||
let entry = mc.filterGroups.get(selection); | ||
if (!entry) { | ||
const activate = clause => activateSelection(mc, selection, clause); | ||
const value = () => updateSelection(mc, selection); | ||
selection.addEventListener('activate', activate); | ||
selection.addEventListener('value', value); | ||
entry = { | ||
selection, | ||
clients: new Set, | ||
disconnect() { | ||
selection.removeEventListener('activate', activate); | ||
selection.removeEventListener('value', value); | ||
} | ||
}; | ||
mc.filterGroups.set(selection, entry); | ||
} | ||
entry.clients.add(client); | ||
} | ||
/** | ||
* Activate a selection, providing a clause indicative of potential | ||
* next updates. Activation provides a preview of likely next events, | ||
* enabling potential precomputation to optimize updates. | ||
* @param {Coordinator} mc The Mosaic coordinator. | ||
* @param {import('./Selection.js').Selection} selection A selection. | ||
* @param {import('./util/selection-types.js').SelectionClause} clause A | ||
* selection clause representative of the activation. | ||
*/ | ||
function activateSelection(mc, selection, clause) { | ||
const { dataCubeIndexer, filterGroups } = mc; | ||
const { clients } = filterGroups.get(selection); | ||
for (const client of clients) { | ||
dataCubeIndexer.index(client, selection, clause); | ||
} | ||
} | ||
/** | ||
* Process an updated selection value, querying filtered data for any | ||
* associated clients. | ||
* @param {Coordinator} mc The Mosaic coordinator. | ||
* @param {import('./Selection.js').Selection} selection A selection. | ||
* @returns {Promise} A Promise that resolves when the update completes. | ||
*/ | ||
function updateSelection(mc, selection) { | ||
const { dataCubeIndexer, filterGroups } = mc; | ||
const { clients } = filterGroups.get(selection); | ||
const { active } = selection; | ||
return Promise.allSettled(Array.from(clients, client => { | ||
const info = dataCubeIndexer.index(client, selection, active); | ||
const filter = info ? null : selection.predicate(client); | ||
// skip due to cross-filtering | ||
if (info?.skip || (!info && !filter)) return; | ||
// @ts-ignore | ||
const query = info?.query(active.predicate) ?? client.query(filter); | ||
return mc.updateClient(client, query); | ||
})); | ||
} |
@@ -1,5 +0,9 @@ | ||
import { Query, and, asColumn, create, isBetween, scaleTransform, sql } from '@uwdata/mosaic-sql'; | ||
import { | ||
Query, and, asColumn, create, isBetween, scaleTransform, sql | ||
} from '@uwdata/mosaic-sql'; | ||
import { indexColumns } from './util/index-columns.js'; | ||
import { fnv_hash } from './util/hash.js'; | ||
import { indexColumns } from './util/index-columns.js'; | ||
const Skip = { skip: true, result: null }; | ||
/** | ||
@@ -9,181 +13,183 @@ * Build and query optimized indices ("data cubes") for fast computation of | ||
* A data cube contains pre-aggregated data for a Mosaic client, subdivided | ||
* by possible query values from an active view. Indexes are realized as | ||
* as temporary database tables that can be queried for rapid updates. | ||
* Compatible client queries must pull data from the same backing table and | ||
* must consist of only groupby dimensions and supported aggregates. | ||
* Compatible selections must contain an active clause that exposes metadata | ||
* for an interval or point value predicate. | ||
* by possible query values from an active selection clause. These cubes are | ||
* realized as as database tables that can be queried for rapid updates. | ||
* Compatible client queries must consist of only groupby dimensions and | ||
* supported aggregate functions. Compatible selections must contain an active | ||
* clause that exposes metadata for an interval or point value predicate. | ||
*/ | ||
export class DataCubeIndexer { | ||
/** | ||
* | ||
* @param {import('./Coordinator.js').Coordinator} mc a Mosaic coordinator | ||
* @param {*} options Options hash to configure the data cube indexes and pass selections to the coordinator. | ||
* Create a new data cube index table manager. | ||
* @param {import('./Coordinator.js').Coordinator} coordinator A Mosaic coordinator. | ||
* @param {object} [options] Indexer options. | ||
* @param {boolean} [options.enabled=true] Flag to enable/disable indexer. | ||
* @param {boolean} [options.temp=true] Flag to indicate if generated data | ||
* cube index tables should be temporary tables. | ||
*/ | ||
constructor(mc, { selection, temp = true }) { | ||
/** @type import('./Coordinator.js').Coordinator */ | ||
this.mc = mc; | ||
this.selection = selection; | ||
constructor(coordinator, { | ||
enabled = true, | ||
temp = true | ||
} = {}) { | ||
/** @type {Map<import('./MosaicClient.js').MosaicClient, DataCubeInfo | Skip | null>} */ | ||
this.indexes = new Map(); | ||
this.active = null; | ||
this.temp = temp; | ||
this.reset(); | ||
this.mc = coordinator; | ||
this._enabled = enabled; | ||
} | ||
reset() { | ||
this.enabled = false; | ||
this.clients = null; | ||
this.indices = null; | ||
this.active = null; | ||
/** | ||
* Set the enabled state of this indexer. If false, any cached state is | ||
* cleared and subsequent index calls will return null until re-enabled. | ||
* @param {boolean} state The enabled state. | ||
*/ | ||
enabled(state) { | ||
if (state === undefined) { | ||
return this._enabled; | ||
} else if (this._enabled !== state) { | ||
if (!state) this.clear(); | ||
this._enabled = state; | ||
} | ||
} | ||
/** | ||
* Clear the cache of data cube index table entries for the current active | ||
* selection clause. This method will also cancel any queued data cube table | ||
* creation queries that have not yet been submitted to the database. This | ||
* method does _not_ drop any existing data cube tables. | ||
*/ | ||
clear() { | ||
if (this.indices) { | ||
this.mc.cancel(Array.from(this.indices.values(), index => index?.result)); | ||
this.indices = null; | ||
} | ||
this.mc.cancel(Array.from(this.indexes.values(), info => info?.result)); | ||
this.indexes.clear(); | ||
this.active = null; | ||
} | ||
index(clients, activeClause) { | ||
if (this.clients !== clients) { | ||
// test client views for compatibility | ||
const cols = Array.from(clients, indexColumns).filter(x => x); | ||
const from = cols[0]?.from; | ||
this.enabled = cols.length && cols.every(c => c.from === from); | ||
this.clients = clients; | ||
this.active = null; | ||
this.clear(); | ||
} | ||
if (!this.enabled) return false; // client views are not indexable | ||
/** | ||
* Return data cube index table information for the active state of a | ||
* client-selection pair, or null if the client is not indexable. This | ||
* method has multiple possible side effects, including data cube table | ||
* generation and updating internal caches. | ||
* @param {import('./MosaicClient.js').MosaicClient} client A Mosaic client. | ||
* @param {import('./Selection.js').Selection} selection A Mosaic selection | ||
* to filter the client by. | ||
* @param {import('./util/selection-types.js').SelectionClause} activeClause | ||
* A representative active selection clause for which to (possibly) generate | ||
* data cube index tables. | ||
* @returns {DataCubeInfo | Skip | null} Data cube index table | ||
* information and query generator, or null if the client is not indexable. | ||
*/ | ||
index(client, selection, activeClause) { | ||
// if not enabled, do nothing | ||
if (!this._enabled) return null; | ||
activeClause = activeClause || this.selection.active; | ||
const { indexes, mc, temp } = this; | ||
const { source } = activeClause; | ||
// exit early if indexes already set up for active view | ||
if (source && source === this.active?.source) return true; | ||
this.clear(); | ||
if (!source) return false; // nothing to work with | ||
const active = this.active = activeColumns(activeClause); | ||
if (!active) return false; // active selection clause not compatible | ||
// if there is no clause source to track, do nothing | ||
if (!source) return null; | ||
const logger = this.mc.logger(); | ||
logger.warn('DATA CUBE INDEX CONSTRUCTION'); | ||
// if we have cached active columns, check for updates or exit | ||
if (this.active) { | ||
// if the active clause source has changed, clear indexer state | ||
// this cancels outstanding requests and clears the index cache | ||
// a clear also sets this.active to null | ||
if (this.active.source !== source) this.clear(); | ||
// if we've seen this source and it's not indexable, do nothing | ||
if (this.active?.source === null) return null; | ||
} | ||
// create a selection with the active source removed | ||
const sel = this.selection.remove(source); | ||
// the current active columns cache value | ||
let { active } = this; | ||
// generate data cube indices | ||
const indices = this.indices = new Map; | ||
const { mc, temp } = this; | ||
for (const client of clients) { | ||
// determine if client should be skipped due to cross-filtering | ||
if (sel.skip(client, activeClause)) { | ||
indices.set(client, null); | ||
continue; | ||
} | ||
// if cached active columns are unset, analyze the active clause | ||
if (!active) { | ||
// generate active data cube dimension columns to select over | ||
// will return an object with null source if not indexable | ||
this.active = active = activeColumns(activeClause); | ||
// if the active clause is not indexable, exit now | ||
if (active.source === null) return null; | ||
} | ||
// generate column definitions for data cube and cube queries | ||
const index = indexColumns(client); | ||
// if we have cached data cube index table info, return that | ||
if (indexes.has(client)) { | ||
return indexes.get(client); | ||
} | ||
// skip if client is not indexable | ||
if (!index) { | ||
continue; | ||
} | ||
// get non-active data cube index table columns | ||
const indexCols = indexColumns(client); | ||
// build index table construction query | ||
const query = client.query(sel.predicate(client)) | ||
.select({ ...active.columns, ...index.aux }) | ||
.groupby(Object.keys(active.columns)); | ||
// ensure active view columns are selected by subqueries | ||
const [subq] = query.subqueries; | ||
if (subq) { | ||
const cols = Object.values(active.columns).flatMap(c => c.columns); | ||
subqueryPushdown(subq, cols); | ||
} | ||
// push orderby criteria to later cube queries | ||
const order = query.orderby(); | ||
query.query.orderby = []; | ||
const sql = query.toString(); | ||
const id = (fnv_hash(sql) >>> 0).toString(16); | ||
const table = `cube_index_${id}`; | ||
const result = mc.exec(create(table, sql, { temp })); | ||
result.catch(e => logger.error(e)); | ||
indices.set(client, { table, result, order, ...index }); | ||
let info; | ||
if (!indexCols) { | ||
// if client is not indexable, record null index | ||
info = null; | ||
} else if (selection.skip(client, activeClause)) { | ||
// skip client if untouched by cross-filtering | ||
info = Skip; | ||
} else { | ||
// generate data cube index table | ||
const filter = selection.remove(source).predicate(client); | ||
info = dataCubeInfo(client.query(filter), active, indexCols); | ||
info.result = mc.exec(create(info.table, info.create, { temp })); | ||
info.result.catch(e => mc.logger().error(e)); | ||
} | ||
// index creation successful | ||
return true; | ||
indexes.set(client, info); | ||
return info; | ||
} | ||
async update() { | ||
const { clients, selection, active } = this; | ||
const filter = active.predicate(selection.active.predicate); | ||
return Promise.all( | ||
Array.from(clients).map(client => this.updateClient(client, filter)) | ||
); | ||
} | ||
async updateClient(client, filter) { | ||
const { mc, indices, selection } = this; | ||
// if client has no index, perform a standard update | ||
if (!indices.has(client)) { | ||
filter = selection.predicate(client); | ||
return mc.updateClient(client, client.query(filter)); | ||
}; | ||
const index = this.indices.get(client); | ||
// skip update if cross-filtered | ||
if (!index) return; | ||
// otherwise, query a data cube index table | ||
const { table, dims, aggr, order = [] } = index; | ||
const query = Query | ||
.select(dims, aggr) | ||
.from(table) | ||
.groupby(dims) | ||
.where(filter) | ||
.orderby(order); | ||
return mc.updateClient(client, query); | ||
} | ||
} | ||
/** | ||
* Determines the active data cube dimension columns to select over. Returns | ||
* an object with the clause source, column definitions, and a predicate | ||
* generator function for the active dimensions of a data cube index table. If | ||
* the active clause is not indexable or is missing metadata, this method | ||
* returns an object with a null source property. | ||
* @param {import('./util/selection-types.js').SelectionClause} clause The | ||
* active selection clause to analyze. | ||
*/ | ||
function activeColumns(clause) { | ||
const { source, meta } = clause; | ||
let columns = clause.predicate?.columns; | ||
if (!meta || !columns) return null; | ||
const { type, scales, bin, pixelSize = 1 } = meta; | ||
const clausePred = clause.predicate; | ||
const clauseCols = clausePred?.columns; | ||
let predicate; | ||
let columns; | ||
if (type === 'interval' && scales) { | ||
if (!meta || !clauseCols) { | ||
return { source: null, columns, predicate }; | ||
} | ||
// @ts-ignore | ||
const { type, scales, bin, pixelSize = 1 } = meta; | ||
if (type === 'point') { | ||
predicate = x => x; | ||
columns = Object.fromEntries( | ||
clauseCols.map(col => [`${col}`, asColumn(col)]) | ||
); | ||
} else if (type === 'interval' && scales) { | ||
// determine pixel-level binning | ||
const bins = scales.map(s => binInterval(s, pixelSize, bin)); | ||
// bail if the scale type is unsupported | ||
if (bins.some(b => b == null)) return null; | ||
if (bins.length === 1) { | ||
if (bins.some(b => !b)) { | ||
// bail if a scale type is unsupported | ||
} else if (bins.length === 1) { | ||
// single interval selection | ||
predicate = p => p ? isBetween('active0', p.range.map(bins[0])) : []; | ||
columns = { active0: bins[0](clause.predicate.field) }; | ||
// @ts-ignore | ||
columns = { active0: bins[0](clausePred.field) }; | ||
} else { | ||
// multiple interval selection | ||
predicate = p => p | ||
? and(p.children.map(({ range }, i) => isBetween(`active${i}`, range.map(bins[i])))) | ||
? and(p.children.map( | ||
({ range }, i) => isBetween(`active${i}`, range.map(bins[i])) | ||
)) | ||
: []; | ||
columns = Object.fromEntries( | ||
clause.predicate.children.map((p, i) => [`active${i}`, bins[i](p.field)]) | ||
// @ts-ignore | ||
clausePred.children.map((p, i) => [`active${i}`, bins[i](p.field)]) | ||
); | ||
} | ||
} else if (type === 'point') { | ||
predicate = x => x; | ||
columns = Object.fromEntries(columns.map(col => [`${col}`, asColumn(col)])); | ||
} else { | ||
// unsupported selection type | ||
return null; | ||
} | ||
return { source, columns, predicate }; | ||
return { source: columns ? source : null, columns, predicate }; | ||
} | ||
@@ -193,2 +199,13 @@ | ||
/** | ||
* Returns a bin function generator to discretize a selection interval domain. | ||
* @param {import('./util/selection-types.js').Scale} scale A scale that maps | ||
* domain values to the output range (typically pixels). | ||
* @param {number} pixelSize The interactive pixel size. This value indicates | ||
* the bin step size and may be greater than an actual screen pixel. | ||
* @param {import('./util/selection-types.js').BinMethod} bin The binning | ||
* method to apply, one of `floor`, `ceil', or `round`. | ||
* @returns {(value: any) => import('@uwdata/mosaic-sql').SQLExpression} | ||
* A bin function generator. | ||
*/ | ||
function binInterval(scale, pixelSize, bin) { | ||
@@ -206,2 +223,47 @@ const { type, domain, range, apply, sqlApply } = scaleTransform(scale); | ||
/** | ||
* Generate data cube table query information. | ||
* @param {Query} clientQuery The original client query. | ||
* @param {*} active Active (selected) column definitions. | ||
* @param {*} indexCols Data cube index column definitions. | ||
* @returns {DataCubeInfo} | ||
*/ | ||
function dataCubeInfo(clientQuery, active, indexCols) { | ||
const { dims, aggr, aux } = indexCols; | ||
const { columns } = active; | ||
// build index table construction query | ||
const query = clientQuery | ||
.select({ ...columns, ...aux }) | ||
.groupby(Object.keys(columns)); | ||
// ensure active clause columns are selected by subqueries | ||
const [subq] = query.subqueries; | ||
if (subq) { | ||
const cols = Object.values(columns).flatMap(c => c.columns); | ||
subqueryPushdown(subq, cols); | ||
} | ||
// push orderby criteria to later cube queries | ||
const order = query.orderby(); | ||
query.query.orderby = []; | ||
// generate creation query string and hash id | ||
const create = query.toString(); | ||
const id = (fnv_hash(create) >>> 0).toString(16); | ||
const table = `cube_index_${id}`; | ||
// generate data cube select query | ||
const select = Query | ||
.select(dims, aggr) | ||
.from(table) | ||
.groupby(dims) | ||
.orderby(order); | ||
return new DataCubeInfo({ table, create, active, select }); | ||
} | ||
/** | ||
* Push column selections down to subqueries. | ||
*/ | ||
function subqueryPushdown(query, cols) { | ||
@@ -219,1 +281,44 @@ const memo = new Set; | ||
} | ||
/** | ||
* Metadata and query generator for a data cube index table. This | ||
* object provides the information needed to generate and query | ||
* a data cube index table for a client-selection pair relative to | ||
* a specific active clause and selection state. | ||
*/ | ||
export class DataCubeInfo { | ||
/** | ||
* Create a new DataCubeInfo instance. | ||
* @param {object} options | ||
*/ | ||
constructor({ table, create, active, select } = {}) { | ||
/** The name of the data cube index table. */ | ||
this.table = table; | ||
/** The SQL query used to generate the data cube index table. */ | ||
this.create = create; | ||
/** A result promise returned for the data cube creation query. */ | ||
this.result = null; | ||
/** | ||
* Definitions and predicate function for the active columns, | ||
* which are dynamically filtered by the active clause. | ||
*/ | ||
this.active = active; | ||
/** Select query (sans where clause) for data cube tables. */ | ||
this.select = select; | ||
/** | ||
* Boolean flag indicating a client that should be skipped. | ||
* This value is always false for completed data cube info. | ||
*/ | ||
this.skip = false; | ||
} | ||
/** | ||
* Generate a data cube index table query for the given predicate. | ||
* @param {import('@uwdata/mosaic-sql').SQLExpression} predicate The current | ||
* active clause predicate. | ||
* @returns {Query} A data cube index table query. | ||
*/ | ||
query(predicate) { | ||
return this.select.clone().where(this.active.predicate(predicate)); | ||
} | ||
} |
@@ -6,3 +6,2 @@ export { MosaicClient } from './MosaicClient.js'; | ||
export { Priority } from './QueryManager.js'; | ||
export { point, points, interval, intervals, match } from './SelectionClause.js'; | ||
@@ -14,2 +13,10 @@ export { restConnector } from './connectors/rest.js'; | ||
export { | ||
clauseInterval, | ||
clauseIntervals, | ||
clausePoint, | ||
clausePoints, | ||
clauseMatch | ||
} from './SelectionClause.js'; | ||
export { | ||
isArrowTable, | ||
@@ -20,4 +27,6 @@ convertArrowArrayType, | ||
} from './util/convert-arrow.js' | ||
export { distinct } from './util/distinct.js'; | ||
export { synchronizer } from './util/synchronizer.js'; | ||
export { throttle } from './util/throttle.js'; | ||
export { toDataColumns } from './util/to-data-columns.js' |
@@ -97,4 +97,4 @@ import { throttle } from './util/throttle.js'; | ||
*/ | ||
queryError(error) { | ||
console.error(error); | ||
queryError(error) { // eslint-disable-line no-unused-vars | ||
// do nothing, the coordinator logs the error | ||
return this; | ||
@@ -126,3 +126,3 @@ } | ||
* For example to (re-)render an interface component. | ||
* | ||
* | ||
* @returns {this | Promise<any>} | ||
@@ -129,0 +129,0 @@ */ |
import { Query, Ref, isDescribeQuery } from '@uwdata/mosaic-sql'; | ||
import { queryResult } from './util/query-result.js'; | ||
import { QueryResult } from './util/query-result.js'; | ||
@@ -136,3 +136,3 @@ function wait(callback) { | ||
}, | ||
result: (group.result = queryResult()) | ||
result: (group.result = new QueryResult()) | ||
}); | ||
@@ -139,0 +139,0 @@ } else { |
import { consolidator } from './QueryConsolidator.js'; | ||
import { lruCache, voidCache } from './util/cache.js'; | ||
import { priorityQueue } from './util/priority-queue.js'; | ||
import { queryResult } from './util/query-result.js'; | ||
import { QueryResult } from './util/query-result.js'; | ||
@@ -99,3 +99,3 @@ export const Priority = { High: 0, Normal: 1, Low: 2 }; | ||
request(request, priority = Priority.Normal) { | ||
const result = queryResult(); | ||
const result = new QueryResult(); | ||
const entry = { request, result }; | ||
@@ -112,3 +112,5 @@ if (this._consolidate) { | ||
const set = new Set(requests); | ||
this.queue.remove(({ result }) => set.has(result)); | ||
if (set.size) { | ||
this.queue.remove(({ result }) => set.has(result)); | ||
} | ||
} | ||
@@ -115,0 +117,0 @@ |
@@ -25,6 +25,9 @@ import { or } from '@uwdata/mosaic-sql'; | ||
* be applied to the clients they are associated with. | ||
* @param {boolean} [options.empty=false] Boolean flag indicating if a lack | ||
* of clauses should correspond to an empty selection with no records. This | ||
* setting determines the default selection state. | ||
* @returns {Selection} The new Selection instance. | ||
*/ | ||
static intersect({ cross = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross })); | ||
static intersect({ cross = false, empty = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross, empty })); | ||
} | ||
@@ -39,6 +42,9 @@ | ||
* be applied to the clients they are associated with. | ||
* @param {boolean} [options.empty=false] Boolean flag indicating if a lack | ||
* of clauses should correspond to an empty selection with no records. This | ||
* setting determines the default selection state. | ||
* @returns {Selection} The new Selection instance. | ||
*/ | ||
static union({ cross = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross, union: true })); | ||
static union({ cross = false, empty = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross, empty, union: true })); | ||
} | ||
@@ -53,6 +59,9 @@ | ||
* be applied to the clients they are associated with. | ||
* @param {boolean} [options.empty=false] Boolean flag indicating if a lack | ||
* of clauses should correspond to an empty selection with no records. This | ||
* setting determines the default selection state. | ||
* @returns {Selection} The new Selection instance. | ||
*/ | ||
static single({ cross = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross, single: true })); | ||
static single({ cross = false, empty = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross, empty, single: true })); | ||
} | ||
@@ -63,6 +72,10 @@ | ||
* cross-filtered intersect resolution strategy. | ||
* @param {object} [options] The selection options. | ||
* @param {boolean} [options.empty=false] Boolean flag indicating if a lack | ||
* of clauses should correspond to an empty selection with no records. This | ||
* setting determines the default selection state. | ||
* @returns {Selection} The new Selection instance. | ||
*/ | ||
static crossfilter() { | ||
return new Selection(new SelectionResolver({ cross: true })); | ||
static crossfilter({ empty = false } = {}) { | ||
return new Selection(new SelectionResolver({ cross: true, empty })); | ||
} | ||
@@ -239,7 +252,11 @@ | ||
* @param {boolean} [options.single=false] Boolean flag to indicate single clauses only. | ||
* @param {boolean} [options.empty=false] Boolean flag indicating if a lack | ||
* of clauses should correspond to an empty selection with no records. This | ||
* setting determines the default selection state. | ||
*/ | ||
constructor({ union, cross, single } = {}) { | ||
constructor({ union, cross, single, empty } = {}) { | ||
this.union = !!union; | ||
this.cross = !!cross; | ||
this.single = !!single; | ||
this.empty = !!empty; | ||
} | ||
@@ -282,4 +299,8 @@ | ||
predicate(clauseList, active, client) { | ||
const { union } = this; | ||
const { empty, union } = this; | ||
if (empty && !clauseList.length) { | ||
return ['FALSE']; | ||
} | ||
// do nothing if cross-filtering and client is currently active | ||
@@ -286,0 +307,0 @@ if (this.skip(client, active)) return undefined; |
@@ -27,3 +27,6 @@ import { | ||
*/ | ||
export function point(field, value, { source, clients = undefined }) { | ||
export function clausePoint(field, value, { | ||
source, | ||
clients = source ? new Set([source]) : undefined | ||
}) { | ||
/** @type {SQLExpression | null} */ | ||
@@ -54,3 +57,6 @@ const predicate = value !== undefined | ||
*/ | ||
export function points(fields, value, { source, clients = undefined }) { | ||
export function clausePoints(fields, value, { | ||
source, | ||
clients = source ? new Set([source]) : undefined | ||
}) { | ||
/** @type {SQLExpression | null} */ | ||
@@ -88,4 +94,8 @@ let predicate = null; | ||
*/ | ||
export function interval(field, value, { | ||
source, clients, bin, scale, pixelSize = 1 | ||
export function clauseInterval(field, value, { | ||
source, | ||
clients = source ? new Set([source]) : undefined, | ||
bin, | ||
scale, | ||
pixelSize = 1 | ||
}) { | ||
@@ -95,3 +105,3 @@ /** @type {SQLExpression | null} */ | ||
/** @type {import('./util/selection-types.js').IntervalMetadata} */ | ||
const meta = { type: 'interval', scales: [scale], bin, pixelSize }; | ||
const meta = { type: 'interval', scales: scale && [scale], bin, pixelSize }; | ||
return { meta, source, clients, value, predicate }; | ||
@@ -115,4 +125,8 @@ } | ||
*/ | ||
export function intervals(fields, value, { | ||
source, clients, bin, scales = [], pixelSize = 1 | ||
export function clauseIntervals(fields, value, { | ||
source, | ||
clients = source ? new Set([source]) : undefined, | ||
bin, | ||
scales = [], | ||
pixelSize = 1 | ||
}) { | ||
@@ -143,3 +157,3 @@ /** @type {SQLExpression | null} */ | ||
*/ | ||
export function match(field, value, { | ||
export function clauseMatch(field, value, { | ||
source, clients = undefined, method = 'contains' | ||
@@ -146,0 +160,0 @@ }) { |
/** | ||
* Event dispatcher supporting asynchronous updates. | ||
* If an event handler callback returns a Promise, this dispatcher will | ||
* wait for all such Promises to settle before dispatching future events | ||
* of the same type. | ||
* Event dispatcher supporting asynchronous updates. If an event handler | ||
* callback returns a Promise, the dispatcher waits for all such Promises | ||
* to settle before dispatching future events of the same type. | ||
*/ | ||
@@ -66,3 +65,3 @@ export class AsyncDispatch { | ||
* any other unemitted event values should be dropped (that is, all | ||
* queued events are filtered) | ||
* queued events are filtered). | ||
* @param {string} type The event type. | ||
@@ -88,2 +87,13 @@ * @param {*} value The new event value that will be enqueued. | ||
/** | ||
* Returns a promise that resolves when any pending updates complete for | ||
* the event of the given type currently being processed. The Promise will | ||
* resolve immediately if the queue for the given event type is empty. | ||
* @param {string} type The event type to wait for. | ||
* @returns {Promise} A pending event promise. | ||
*/ | ||
async pending(type) { | ||
await this._callbacks.get(type)?.pending; | ||
} | ||
/** | ||
* Emit an event value to listeners for the given event type. | ||
@@ -90,0 +100,0 @@ * If a previous emit has not yet resolved, the event value |
import { Query, agg, sql } from '@uwdata/mosaic-sql'; | ||
import { MosaicClient } from '../MosaicClient.js'; | ||
export const NO_INDEX = { from: NaN }; | ||
/** | ||
@@ -10,12 +8,13 @@ * Determine data cube index columns for a given Mosaic client. | ||
* @returns An object with necessary column data to generate data | ||
* cube index columns, null if an invalid or unsupported expression | ||
* is encountered, or NO_INDEX if the client is not indexable. | ||
* cube index columns, or null if the client is not indexable or | ||
* the client query contains an invalid or unsupported expression. | ||
*/ | ||
export function indexColumns(client) { | ||
if (!client.filterIndexable) return NO_INDEX; | ||
if (!client.filterIndexable) return null; | ||
const q = client.query(); | ||
const from = getBaseTable(q); | ||
if (typeof from !== 'string' || !q.groupby) return NO_INDEX; | ||
const g = new Set(q.groupby().map(c => c.column)); | ||
// bail if no base table or the query is not analyzable | ||
if (typeof from !== 'string' || !q.select) return null; | ||
const aggr = []; // list of output aggregate columns | ||
@@ -131,3 +130,3 @@ const dims = []; // list of grouping dimension columns | ||
default: | ||
if (g.has(as)) dims.push(as); | ||
if (!aggregate) dims.push(as); | ||
else return null; // unsupported aggregate | ||
@@ -137,2 +136,5 @@ } | ||
// bail if the query has no aggregates | ||
if (!aggr.length) return null; | ||
return { from, dims, aggr, aux }; | ||
@@ -139,0 +141,0 @@ } |
@@ -1,9 +0,42 @@ | ||
export function queryResult() { | ||
let resolve; | ||
let reject; | ||
const p = new Promise((r, e) => { resolve = r; reject = e; }); | ||
return Object.assign(p, { | ||
fulfill: value => (resolve(value), p), | ||
reject: err => (reject(err), p) | ||
}); | ||
/** | ||
* A query result Promise that can allows external callers | ||
* to resolve or reject the Promise. | ||
*/ | ||
export class QueryResult extends Promise { | ||
/** | ||
* Create a new query result Promise. | ||
*/ | ||
constructor() { | ||
let resolve; | ||
let reject; | ||
super((r, e) => { | ||
resolve = r; | ||
reject = e; | ||
}); | ||
this._resolve = resolve; | ||
this._reject = reject; | ||
} | ||
/** | ||
* Resolve the result Promise with the provided value. | ||
* @param {*} value The result value. | ||
* @returns {this} | ||
*/ | ||
fulfill(value) { | ||
this._resolve(value); | ||
return this; | ||
} | ||
/** | ||
* Rejects the result Promise with the provided error. | ||
* @param {*} error The error value. | ||
* @returns {this} | ||
*/ | ||
reject(error) { | ||
this._reject(error); | ||
return this; | ||
} | ||
} | ||
// necessary to make Promise subclass act like a Promise | ||
QueryResult.prototype.constructor = Promise; |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1494855
31982
+ Added@uwdata/mosaic-sql@0.10.0(transitive)
+ Addedapache-arrow@16.1.0(transitive)
- Removed@uwdata/mosaic-sql@0.9.0(transitive)
- Removedapache-arrow@15.0.2(transitive)
- Removedflatbuffers@23.5.26(transitive)
Updated@uwdata/mosaic-sql@^0.10.0
Updatedapache-arrow@^16.1.0