@latitude-data/source-manager
Advanced tools
Comparing version 1.1.0 to 1.2.0-canary.0
# @latitude-data/source-manager | ||
## 1.2.0-canary.0 | ||
### Minor Changes | ||
- d9d0326: Materialized queries now accepts a TTL config. Running the materialize command will skip any previously materialized query that still has a valid TTL. To force a rematerialization of valid cached queries, run the materialize command with the `--force` option. | ||
### Patch Changes | ||
- a692a3a: Fix: Loading the same source from the config file and from a query resulted in two different source instances. | ||
## 1.1.0 | ||
@@ -4,0 +14,0 @@ |
@@ -417,9 +417,9 @@ import { emptyMetadata, compile, readMetadata } from '@latitude-data/sql-compiler'; | ||
} | ||
async batchQuery(compiledQuery, options) { | ||
return Promise.reject(new Error(` | ||
batchQuery not implemented for TestConnector | ||
Mock it in your tests | ||
CompiledQuery: ${JSON.stringify(compiledQuery)} | ||
batchOptions: ${JSON.stringify(options)} | ||
`)); | ||
async batchQuery(compiledQuery, { onBatch }) { | ||
const result = await this.runQuery(compiledQuery); | ||
onBatch({ | ||
rows: result.toArray(), | ||
fields: result.fields, | ||
lastBatch: true, | ||
}); | ||
} | ||
@@ -650,4 +650,3 @@ async runQuery(compiledQuery) { | ||
class MaterializedFileNotFoundError extends Error { | ||
} | ||
const ROW_GROUP_SIZE = 4096; // How many rows are in the ParquetWriter file buffer at a time | ||
function mapDataTypeToParquet(dataType) { | ||
@@ -674,8 +673,2 @@ switch (dataType) { | ||
} | ||
const ROW_GROUP_SIZE = 4096; // PARQUET BATCH WRITE | ||
/** | ||
* In order to hash a SQL query, we need to know the source path | ||
* it came from. This way we ensure the path is unique even | ||
* if two sources share the same query. | ||
*/ | ||
class StorageDriver { | ||
@@ -686,16 +679,29 @@ manager; | ||
} | ||
async writeParquet({ queryPath, params, batchSize, onDebug, }) { | ||
let writer; | ||
const source = await this.manager.loadFromQuery(queryPath); | ||
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath); | ||
if (!config.materialize) { | ||
async getParquetFilepath(queryPath) { | ||
const { localFilepath } = await this.read(queryPath); | ||
return this.resolveUrl(localFilepath); | ||
} | ||
async isMaterialized(queryPath) { | ||
const { queryConfig, localFilepath } = await this.read(queryPath); | ||
if (!(await this.exists(localFilepath))) | ||
return false; | ||
const { ttl } = queryConfig; | ||
const maxLifeTime = (ttl ?? 0) * 1000; // ttl is in seconds | ||
const lifeTime = Date.now() - (await this.parquetFileTime(localFilepath)); // In milliseconds | ||
return lifeTime < maxLifeTime; | ||
} | ||
async materialize({ queryPath, batchSize, onDebug, }) { | ||
const { source, queryConfig, localFilepath } = await this.read(queryPath); | ||
if (!queryConfig.materialize) { | ||
throw new Error('Query is not configured as materialized'); | ||
} | ||
const compiled = await source.compileQuery({ queryPath, params }); | ||
const globalFilepath = await this.getParquetFilepath(queryPath); | ||
const compiled = await source.compileQuery({ queryPath, params: {} }); | ||
let writer; | ||
let currentHeap = 0; | ||
return new Promise((resolve) => { | ||
let filePath; | ||
let queryRows = 0; | ||
return new Promise((resolve, reject) => { | ||
let rows = 0; | ||
const size = batchSize ?? 1000; | ||
source.batchQuery(compiled, { | ||
source | ||
.batchQuery(compiled, { | ||
batchSize: size, | ||
@@ -705,9 +711,3 @@ onBatch: async (batch) => { | ||
const schema = this.buildParquetSchema(batch.fields); | ||
filePath = await this.getUrl({ | ||
sqlHash: sqlHash, | ||
queryName: queryPath, | ||
sourcePath: source.path, | ||
ignoreMissingFile: true, | ||
}); | ||
writer = await ParquetWriter.openFile(schema, filePath, { | ||
writer = await ParquetWriter.openFile(schema, globalFilepath, { | ||
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE, | ||
@@ -734,22 +734,25 @@ }); | ||
} | ||
queryRows += batch.rows.length; | ||
rows += batch.rows.length; | ||
if (batch.lastBatch) { | ||
await writer.close(); | ||
resolve({ filePath, queryRows }); | ||
const fileSize = await this.parquetFileSize(localFilepath); | ||
resolve({ fileSize, rows }); | ||
} | ||
}, | ||
}) | ||
.catch((error) => { | ||
reject(error); | ||
}); | ||
}); | ||
} | ||
getUrl(args) { | ||
const name = StorageDriver.hashName(args); | ||
const filename = `${name}.parquet`; | ||
return this.resolveUrl({ ...args, filename }); | ||
async read(queryPath) { | ||
const source = await this.manager.loadFromQuery(queryPath); | ||
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath); | ||
const uniqueHash = createHash('sha256') | ||
.update(sqlHash) | ||
.update(source.path) | ||
.digest('hex'); | ||
const localFilepath = `${uniqueHash}.parquet`; | ||
return { source, queryConfig: config, localFilepath }; | ||
} | ||
static hashName({ sqlHash, sourcePath }) { | ||
const hash = createHash('sha256'); | ||
hash.update(sqlHash); | ||
hash.update(sourcePath); | ||
return hash.digest('hex'); | ||
} | ||
buildParquetSchema(fields) { | ||
@@ -773,14 +776,14 @@ const columns = fields.reduce((schema, field) => { | ||
} | ||
get basePath() { | ||
return '/dummy-base-path'; | ||
async resolveUrl(localFilepath) { | ||
return localFilepath; | ||
} | ||
getUrl(args) { | ||
return this.resolveUrl({ | ||
...args, | ||
filename: `ENCRYPTED[${args.sqlHash}].parquet`, | ||
}); | ||
async exists() { | ||
return false; | ||
} | ||
resolveUrl({ filename }) { | ||
return Promise.resolve(filename); | ||
async parquetFileTime() { | ||
return Date.now(); | ||
} | ||
async parquetFileSize() { | ||
return 0; | ||
} | ||
} | ||
@@ -869,67 +872,24 @@ | ||
materializeDir; | ||
constructor({ path, manager }) { | ||
constructor({ path: materializeDir, manager }) { | ||
super({ manager }); | ||
this.materializeDir = path; | ||
this.materializeDir = materializeDir; | ||
if (!fs__default.existsSync(this.materializeDir)) { | ||
fs__default.mkdirSync(this.materializeDir, { recursive: true }); | ||
} | ||
} | ||
get basePath() { | ||
return this.materializeDir; | ||
async resolveUrl(localFilepath) { | ||
const filepath = path.join(this.materializeDir, localFilepath); | ||
return filepath; | ||
} | ||
resolveUrl({ queryName, filename, ignoreMissingFile = false, }) { | ||
const filepath = path.join(this.materializeDir, filename); | ||
if (ignoreMissingFile) | ||
return Promise.resolve(filepath); | ||
if (fs__default.existsSync(filepath)) | ||
return Promise.resolve(filepath); | ||
return Promise.reject(new MaterializedFileNotFoundError(`materialize query not found for: '${queryName}'`)); | ||
async exists(localFilepath) { | ||
return fs__default.existsSync(await this.resolveUrl(localFilepath)); | ||
} | ||
} | ||
class NoDiskDriverError extends Error { | ||
constructor() { | ||
super('Disk driver is required for materializing queries'); | ||
async parquetFileTime(localFilepath) { | ||
return fs__default.statSync(await this.resolveUrl(localFilepath)).mtimeMs; | ||
} | ||
} | ||
class NonMaterializableQueryError extends Error { | ||
constructor(query) { | ||
super(`Query ${query} is not materializable`); | ||
async parquetFileSize(localFilepath) { | ||
return fs__default.statSync(await this.resolveUrl(localFilepath)).size; | ||
} | ||
} | ||
function ensureMaterializeDirExists(storage) { | ||
if (!(storage instanceof DiskDriver)) { | ||
throw new NoDiskDriverError(); | ||
} | ||
const basePath = storage.basePath; | ||
if (!fs__default.existsSync(basePath)) { | ||
fs__default.mkdirSync(basePath, { recursive: true }); | ||
} | ||
} | ||
function parseTime({ start, end }) { | ||
const minuteTime = Math.floor((end - start) / 60000); | ||
const min = minuteTime.toString().padStart(2, '0'); | ||
const secondsTime = ((end - start) % 60000) / 1000; | ||
const seconds = minuteTime | ||
? secondsTime.toFixed(0).padStart(2, '0') | ||
: secondsTime.toFixed(2).padStart(5, '0'); | ||
return minuteTime ? `${min}:${seconds} minutes` : `${seconds} seconds`; | ||
} | ||
function humanizeFileSize(bytes) { | ||
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB']; | ||
if (bytes === 0) | ||
return '0 Byte'; | ||
const i = Math.floor(Math.log(bytes) / Math.log(1024)); | ||
return parseFloat((bytes / Math.pow(1024, i)).toFixed(2)) + ' ' + sizes[i]; | ||
} | ||
function buildInfo({ startTime, endTime, query, filePath, queryRows, }) { | ||
const file = path.basename(filePath); | ||
return { | ||
query, | ||
queryRows: `${queryRows.toLocaleString('de-DE', { | ||
style: 'decimal', | ||
useGrouping: true, | ||
})} rows`, | ||
time: parseTime({ start: startTime, end: endTime }), | ||
fileSize: humanizeFileSize(fs__default.statSync(filePath).size), | ||
file, | ||
}; | ||
} | ||
function recursiveFindQueriesInDir(rootDir, nextDir) { | ||
@@ -947,86 +907,74 @@ const dir = nextDir ?? rootDir; | ||
} | ||
async function isMaterializableQuery({ sourceManager, query, }) { | ||
const source = await sourceManager.loadFromQuery(query); | ||
const { config } = await source.getMetadataFromQuery(query); | ||
return config.materialize === true; | ||
} | ||
async function findQueries({ sourceManager, selected, }) { | ||
const storage = sourceManager.materializeStorage; | ||
ensureMaterializeDirExists(storage); | ||
async function findMaterializableQueries(sourceManager) { | ||
const queriesDir = sourceManager.queriesDir; | ||
const allQueries = recursiveFindQueriesInDir(queriesDir); | ||
// We don' filter queries if user pass a specific list | ||
// If one of then is not materializable we fail | ||
if (selected.length > 0) { | ||
return Promise.all(allQueries | ||
.filter((query) => selected.some((selectedQuery) => query === selectedQuery || query === `${selectedQuery}.sql`)) | ||
.map(async (query) => { | ||
const canMaterialize = await isMaterializableQuery({ | ||
sourceManager, | ||
query, | ||
}); | ||
if (!canMaterialize) { | ||
throw new NonMaterializableQueryError(query); | ||
} | ||
return query; | ||
})); | ||
const materializableQueries = await Promise.all(allQueries.map(async (queryPath) => { | ||
const source = await sourceManager.loadFromQuery(queryPath); | ||
const { config } = await source.getMetadataFromQuery(queryPath); | ||
return config.materialize === true ? queryPath : null; | ||
})); | ||
return materializableQueries.filter(Boolean); | ||
} | ||
async function materializeQuery({ storage, queryPath, onDebug, force, }) { | ||
if (!force && (await storage.isMaterialized(queryPath))) { | ||
return { | ||
queryPath, | ||
cached: true, | ||
}; | ||
} | ||
const queries = await Promise.all(allQueries.map(async (query) => { | ||
const canMaterialize = await isMaterializableQuery({ | ||
sourceManager, | ||
query, | ||
try { | ||
const startTime = performance.now(); | ||
const materialized = await storage.materialize({ | ||
onDebug, | ||
queryPath, | ||
batchSize: BATCH_SIZE, | ||
}); | ||
return { query, canMaterialize }; | ||
})); | ||
return queries.filter((q) => q.canMaterialize).map((q) => q.query); | ||
const endTime = performance.now(); | ||
return { | ||
queryPath, | ||
cached: false, | ||
success: true, | ||
rows: materialized.rows, | ||
fileSize: materialized.fileSize, | ||
time: endTime - startTime, | ||
}; | ||
} | ||
catch (error) { | ||
return { | ||
queryPath, | ||
cached: false, | ||
success: false, | ||
error: error, | ||
}; | ||
} | ||
} | ||
const BATCH_SIZE = 4096; | ||
async function findAndMaterializeQueries({ sourceManager, onStartQuery, onDebug, selectedQueries = [], }) { | ||
async function findAndMaterializeQueries({ sourceManager, onStartQuery, onMaterialized, onDebug, selectedQueries = [], force = false, }) { | ||
const startTotalTime = performance.now(); | ||
const info = []; | ||
const result = { | ||
batchSize: BATCH_SIZE, | ||
successful: false, | ||
totalTime: '', | ||
queriesInfo: [], | ||
}; | ||
const storage = sourceManager.materializeStorage; | ||
try { | ||
const queries = await findQueries({ | ||
sourceManager, | ||
selected: selectedQueries, | ||
const queries = selectedQueries.length | ||
? selectedQueries | ||
: await findMaterializableQueries(sourceManager); | ||
const materializations = []; | ||
for (let index = 0; index < queries.length; index++) { | ||
const queryPath = queries[index]; | ||
onStartQuery?.({ | ||
index, | ||
count: queries.length, | ||
query: queryPath, | ||
}); | ||
for (const [index, query] of queries.entries()) { | ||
const startTime = performance.now(); | ||
onStartQuery?.({ count: queries.length, index: index + 1, query }); | ||
const materialize = await storage.writeParquet({ | ||
onDebug, | ||
queryPath: query, | ||
params: {}, | ||
batchSize: BATCH_SIZE, | ||
}); | ||
const endTime = performance.now(); | ||
info.push(buildInfo({ | ||
startTime, | ||
endTime, | ||
query, | ||
filePath: materialize.filePath, | ||
queryRows: materialize.queryRows, | ||
})); | ||
} | ||
result.successful = true; | ||
const materializationInfo = await materializeQuery({ | ||
storage, | ||
queryPath, | ||
onDebug, | ||
force, | ||
}); | ||
materializations.push(materializationInfo); | ||
onMaterialized?.(materializationInfo); | ||
} | ||
catch (error) { | ||
if (process.env.NODE_ENV !== 'test') { | ||
console.error(error); | ||
} | ||
result.successful = false; | ||
} | ||
finally { | ||
const endTotalTime = performance.now(); | ||
const totalTime = parseTime({ start: startTotalTime, end: endTotalTime }); | ||
result.totalTime = totalTime; | ||
result.queriesInfo = info; | ||
} | ||
return result; | ||
const endtotalTime = performance.now(); | ||
return { | ||
totalTime: endtotalTime - startTotalTime, | ||
materializations, | ||
}; | ||
} | ||
@@ -1033,0 +981,0 @@ |
@@ -1,9 +0,11 @@ | ||
import { ResolveUrlParams, StorageDriver } from '@/materialize/drivers/StorageDriver'; | ||
import { StorageDriver } from '@/materialize/drivers/StorageDriver'; | ||
import { FullDriverConfig } from '@/materialize'; | ||
export default class DiskDriver extends StorageDriver { | ||
private materializeDir; | ||
constructor({ path, manager }: FullDriverConfig<'disk'>); | ||
get basePath(): string; | ||
resolveUrl({ queryName, filename, ignoreMissingFile, }: ResolveUrlParams): Promise<string>; | ||
constructor({ path: materializeDir, manager }: FullDriverConfig<'disk'>); | ||
protected resolveUrl(localFilepath: string): Promise<string>; | ||
protected exists(localFilepath: string): Promise<boolean>; | ||
protected parquetFileTime(localFilepath: string): Promise<number>; | ||
protected parquetFileSize(localFilepath: string): Promise<number>; | ||
} | ||
//# sourceMappingURL=DiskDriver.d.ts.map |
import SourceManager from '@/manager'; | ||
import { GetUrlParams, ResolveUrlParams, StorageDriver } from '@/materialize/drivers/StorageDriver'; | ||
import { StorageDriver } from '@/materialize/drivers/StorageDriver'; | ||
export default class DummyDriver extends StorageDriver { | ||
@@ -7,6 +7,7 @@ constructor({ manager }: { | ||
}); | ||
get basePath(): string; | ||
getUrl(args: GetUrlParams): Promise<string>; | ||
resolveUrl({ filename }: ResolveUrlParams): Promise<string>; | ||
protected resolveUrl(localFilepath: string): Promise<string>; | ||
protected exists(): Promise<boolean>; | ||
protected parquetFileTime(): Promise<number>; | ||
protected parquetFileSize(): Promise<number>; | ||
} | ||
//# sourceMappingURL=DummyDriver.d.ts.map |
import SourceManager from '@/manager'; | ||
import { QueryRequest } from '@/types'; | ||
export type GetUrlParams = { | ||
sqlHash: string; | ||
queryName: string; | ||
sourcePath: string; | ||
ignoreMissingFile?: boolean; | ||
}; | ||
export type ResolveUrlParams = GetUrlParams & { | ||
filename: string; | ||
ignoreMissingFile?: boolean; | ||
}; | ||
export declare class MaterializedFileNotFoundError extends Error { | ||
} | ||
type Result = { | ||
filePath: string; | ||
queryRows: number; | ||
}; | ||
export type WriteParquetParams = QueryRequest & { | ||
export type MaterializeProps = Omit<QueryRequest, 'params'> & { | ||
batchSize?: number; | ||
@@ -25,7 +9,6 @@ onDebug?: (_p: { | ||
}; | ||
/** | ||
* In order to hash a SQL query, we need to know the source path | ||
* it came from. This way we ensure the path is unique even | ||
* if two sources share the same query. | ||
*/ | ||
type Result = { | ||
fileSize: number; | ||
rows: number; | ||
}; | ||
export declare abstract class StorageDriver { | ||
@@ -36,10 +19,10 @@ private manager; | ||
}); | ||
abstract get basePath(): string; | ||
writeParquet({ queryPath, params, batchSize, onDebug, }: WriteParquetParams): Promise<Result>; | ||
getUrl(args: GetUrlParams): Promise<string>; | ||
/** | ||
* It's a Promise because other adapters can be async | ||
*/ | ||
abstract resolveUrl({ filename }: ResolveUrlParams): Promise<string>; | ||
static hashName({ sqlHash, sourcePath }: GetUrlParams): string; | ||
protected abstract resolveUrl(localFilepath: string): Promise<string>; | ||
protected abstract exists(localFilepath: string): Promise<boolean>; | ||
protected abstract parquetFileTime(localFilepath: string): Promise<number>; | ||
protected abstract parquetFileSize(localFilepath: string): Promise<number>; | ||
getParquetFilepath(queryPath: string): Promise<string>; | ||
isMaterialized(queryPath: string): Promise<boolean>; | ||
materialize({ queryPath, batchSize, onDebug, }: MaterializeProps): Promise<Result>; | ||
private read; | ||
private buildParquetSchema; | ||
@@ -46,0 +29,0 @@ } |
import SourceManager from '@/manager'; | ||
type MaterializeInfo = { | ||
query: string; | ||
file: string; | ||
fileSize: string; | ||
time: string; | ||
queryRows: string; | ||
}; | ||
interface IMaterializationInfo { | ||
queryPath: string; | ||
cached: boolean; | ||
} | ||
interface CachedMaterializationInfo extends IMaterializationInfo { | ||
cached: true; | ||
} | ||
interface IMissMaterializationInfo extends IMaterializationInfo { | ||
cached: false; | ||
success: boolean; | ||
} | ||
interface SuccessMaterializationInfo extends IMissMaterializationInfo { | ||
cached: false; | ||
success: true; | ||
rows: number; | ||
fileSize: number; | ||
time: number; | ||
} | ||
interface FailedMaterializationInfo extends IMissMaterializationInfo { | ||
cached: false; | ||
success: false; | ||
error: Error; | ||
} | ||
export type MaterializationInfo = CachedMaterializationInfo | SuccessMaterializationInfo | FailedMaterializationInfo; | ||
type Result = { | ||
successful: boolean; | ||
totalTime: string; | ||
batchSize: number; | ||
queriesInfo: MaterializeInfo[]; | ||
totalTime: number; | ||
materializations: MaterializationInfo[]; | ||
}; | ||
export default function findAndMaterializeQueries({ sourceManager, onStartQuery, onDebug, selectedQueries, }: { | ||
export default function findAndMaterializeQueries({ sourceManager, onStartQuery, onMaterialized, onDebug, selectedQueries, force, }: { | ||
selectedQueries?: string[]; | ||
@@ -22,2 +37,3 @@ onStartQuery?: (_p: { | ||
}) => void; | ||
onMaterialized?: (info: MaterializationInfo) => void; | ||
onDebug?: (_p: { | ||
@@ -27,4 +43,5 @@ memoryUsageInMb: string; | ||
sourceManager: SourceManager; | ||
force?: boolean; | ||
}): Promise<Result>; | ||
export {}; | ||
//# sourceMappingURL=findAndMaterializeQueries.d.ts.map |
import SourceManager from '@/manager'; | ||
import DiskDriver from '@/materialize/drivers/disk/DiskDriver'; | ||
export { default as findAndMaterializeQueries } from './findAndMaterializeQueries'; | ||
import DummyDriver from '@/materialize/drivers/dummy/DummyDriver'; | ||
export { default as findAndMaterializeQueries, type MaterializationInfo, } from './findAndMaterializeQueries'; | ||
export declare const STORAGE_TYPES: { | ||
@@ -18,3 +19,3 @@ disk: string; | ||
}; | ||
export type StorageKlass = typeof DiskDriver; | ||
export type StorageKlass = typeof DiskDriver | typeof DummyDriver; | ||
export declare function getDriverKlass({ type, }: { | ||
@@ -21,0 +22,0 @@ type: StorageType; |
@@ -28,5 +28,5 @@ import { BaseConnector, ConnectorOptions } from '@/baseConnector'; | ||
resolve(value: unknown): ResolvedParam; | ||
batchQuery(compiledQuery: CompiledQuery, options: BatchedQueryOptions): Promise<void>; | ||
batchQuery(compiledQuery: CompiledQuery, { onBatch }: BatchedQueryOptions): Promise<void>; | ||
runQuery(compiledQuery: CompiledQuery): Promise<QueryResult>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
import { Field, QueryResultArray } from '@latitude-data/query_result'; | ||
import { Source } from './source'; | ||
export { CompileError } from '@latitude-data/sql-compiler'; | ||
export type { GetUrlParams } from './materialize/drivers/StorageDriver'; | ||
export declare enum ConnectorType { | ||
@@ -6,0 +5,0 @@ Athena = "athena", |
{ | ||
"name": "@latitude-data/source-manager", | ||
"version": "1.1.0", | ||
"version": "1.2.0-canary.0", | ||
"license": "LGPL", | ||
@@ -5,0 +5,0 @@ "description": "Manage Latitude sources", |
import fs from 'fs' | ||
import path from 'path' | ||
import { | ||
MaterializedFileNotFoundError, | ||
ResolveUrlParams, | ||
StorageDriver, | ||
} from '@/materialize/drivers/StorageDriver' | ||
import { StorageDriver } from '@/materialize/drivers/StorageDriver' | ||
import { FullDriverConfig } from '@/materialize' | ||
@@ -14,29 +10,27 @@ | ||
constructor({ path, manager }: FullDriverConfig<'disk'>) { | ||
constructor({ path: materializeDir, manager }: FullDriverConfig<'disk'>) { | ||
super({ manager }) | ||
this.materializeDir = materializeDir | ||
this.materializeDir = path | ||
if (!fs.existsSync(this.materializeDir)) { | ||
fs.mkdirSync(this.materializeDir, { recursive: true }) | ||
} | ||
} | ||
get basePath(): string { | ||
return this.materializeDir | ||
protected async resolveUrl(localFilepath: string): Promise<string> { | ||
const filepath = path.join(this.materializeDir, localFilepath) | ||
return filepath | ||
} | ||
resolveUrl({ | ||
queryName, | ||
filename, | ||
ignoreMissingFile = false, | ||
}: ResolveUrlParams): Promise<string> { | ||
const filepath = path.join(this.materializeDir, filename) | ||
protected async exists(localFilepath: string): Promise<boolean> { | ||
return fs.existsSync(await this.resolveUrl(localFilepath)) | ||
} | ||
if (ignoreMissingFile) return Promise.resolve(filepath) | ||
protected async parquetFileTime(localFilepath: string): Promise<number> { | ||
return fs.statSync(await this.resolveUrl(localFilepath)).mtimeMs | ||
} | ||
if (fs.existsSync(filepath)) return Promise.resolve(filepath) | ||
return Promise.reject( | ||
new MaterializedFileNotFoundError( | ||
`materialize query not found for: '${queryName}'`, | ||
), | ||
) | ||
protected async parquetFileSize(localFilepath: string): Promise<number> { | ||
return fs.statSync(await this.resolveUrl(localFilepath)).size | ||
} | ||
} |
import SourceManager from '@/manager' | ||
import { | ||
GetUrlParams, | ||
ResolveUrlParams, | ||
StorageDriver, | ||
} from '@/materialize/drivers/StorageDriver' | ||
import { StorageDriver } from '@/materialize/drivers/StorageDriver' | ||
@@ -13,15 +9,17 @@ export default class DummyDriver extends StorageDriver { | ||
get basePath(): string { | ||
return '/dummy-base-path' | ||
protected async resolveUrl(localFilepath: string): Promise<string> { | ||
return localFilepath | ||
} | ||
getUrl(args: GetUrlParams): Promise<string> { | ||
return this.resolveUrl({ | ||
...args, | ||
filename: `ENCRYPTED[${args.sqlHash}].parquet`, | ||
}) | ||
protected async exists(): Promise<boolean> { | ||
return false | ||
} | ||
resolveUrl({ filename }: ResolveUrlParams): Promise<string> { | ||
return Promise.resolve(filename) | ||
protected async parquetFileTime(): Promise<number> { | ||
return Date.now() | ||
} | ||
protected async parquetFileSize(): Promise<number> { | ||
return 0 | ||
} | ||
} |
@@ -22,4 +22,4 @@ import { afterEach, describe, expect, it, vi } from 'vitest' | ||
const FAKE_QUERIES_DIR = '/queries' | ||
const FAKE_MATERIALIZED_DIR = '/materialized' | ||
const FAKE_QUERIES_DIR = 'queries' | ||
const FAKE_MATERIALIZED_DIR = 'materialized' | ||
function buildFs(sql: string) { | ||
@@ -73,3 +73,3 @@ mockFs({ | ||
describe('writeParquet', () => { | ||
describe('materialize', () => { | ||
afterEach(() => { | ||
@@ -81,9 +81,9 @@ mockFs.restore() | ||
const driver = getDriver(QUERIES_DIR, MATERIALIZED_DIR) | ||
const result = await driver.writeParquet({ | ||
queryPath: 'materialize/query.sql', | ||
params: {}, | ||
const queryPath = 'materialize/query.sql' | ||
await driver.materialize({ | ||
queryPath, | ||
batchSize: 5, | ||
}) | ||
const filePath = result.filePath | ||
const filePath = await driver.getParquetFilepath(queryPath) | ||
expect(fs.existsSync(filePath)).toBe(true) | ||
@@ -110,5 +110,4 @@ | ||
await expect( | ||
driver.writeParquet({ | ||
driver.materialize({ | ||
queryPath: 'query.sql', | ||
params: {}, | ||
batchSize: 10, | ||
@@ -115,0 +114,0 @@ }), |
import SourceManager from '@/manager' | ||
import { ParquetLogicalType, QueryRequest } from '@/types' | ||
import { ParquetLogicalType, QueryConfig, QueryRequest } from '@/types' | ||
import { DataType, Field } from '@latitude-data/query_result' | ||
@@ -8,14 +8,5 @@ import { createHash } from 'crypto' | ||
import { FieldDefinition, ParquetType } from '@dsnp/parquetjs/dist/lib/declare' | ||
import { Source } from '@/source' | ||
export type GetUrlParams = { | ||
sqlHash: string | ||
queryName: string | ||
sourcePath: string | ||
ignoreMissingFile?: boolean | ||
} | ||
export type ResolveUrlParams = GetUrlParams & { | ||
filename: string | ||
ignoreMissingFile?: boolean | ||
} | ||
export class MaterializedFileNotFoundError extends Error {} | ||
const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time | ||
@@ -49,5 +40,3 @@ function mapDataTypeToParquet(dataType: DataType): ParquetType { | ||
const ROW_GROUP_SIZE = 4096 // PARQUET BATCH WRITE | ||
type Result = { filePath: string; queryRows: number } | ||
export type WriteParquetParams = QueryRequest & { | ||
export type MaterializeProps = Omit<QueryRequest, 'params'> & { | ||
batchSize?: number | ||
@@ -57,7 +46,4 @@ onDebug?: (_p: { memoryUsageInMb: string }) => void | ||
/** | ||
* In order to hash a SQL query, we need to know the source path | ||
* it came from. This way we ensure the path is unique even | ||
* if two sources share the same query. | ||
*/ | ||
type Result = { fileSize: number; rows: number } | ||
export abstract class StorageDriver { | ||
@@ -70,93 +56,105 @@ private manager: SourceManager | ||
abstract get basePath(): string | ||
protected abstract resolveUrl(localFilepath: string): Promise<string> | ||
protected abstract exists(localFilepath: string): Promise<boolean> | ||
protected abstract parquetFileTime(localFilepath: string): Promise<number> | ||
protected abstract parquetFileSize(localFilepath: string): Promise<number> | ||
async writeParquet({ | ||
async getParquetFilepath(queryPath: string): Promise<string> { | ||
const { localFilepath } = await this.read(queryPath) | ||
return this.resolveUrl(localFilepath) | ||
} | ||
async isMaterialized(queryPath: string): Promise<boolean> { | ||
const { queryConfig, localFilepath } = await this.read(queryPath) | ||
if (!(await this.exists(localFilepath))) return false | ||
const { ttl } = queryConfig | ||
const maxLifeTime = ((ttl as number) ?? 0) * 1000 // ttl is in seconds | ||
const lifeTime = Date.now() - (await this.parquetFileTime(localFilepath)) // In milliseconds | ||
return lifeTime < maxLifeTime | ||
} | ||
async materialize({ | ||
queryPath, | ||
params, | ||
batchSize, | ||
onDebug, | ||
}: WriteParquetParams) { | ||
let writer: ParquetWriter | ||
const source = await this.manager.loadFromQuery(queryPath) | ||
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath) | ||
}: MaterializeProps): Promise<Result> { | ||
const { source, queryConfig, localFilepath } = await this.read(queryPath) | ||
if (!config.materialize) { | ||
if (!queryConfig.materialize) { | ||
throw new Error('Query is not configured as materialized') | ||
} | ||
const compiled = await source.compileQuery({ queryPath, params }) | ||
const globalFilepath = await this.getParquetFilepath(queryPath) | ||
const compiled = await source.compileQuery({ queryPath, params: {} }) | ||
let writer: ParquetWriter | ||
let currentHeap = 0 | ||
return new Promise<Result>((resolve) => { | ||
let filePath: string | ||
let queryRows = 0 | ||
return new Promise<Result>((resolve, reject) => { | ||
let rows = 0 | ||
const size = batchSize ?? 1000 | ||
source.batchQuery(compiled, { | ||
batchSize: size, | ||
onBatch: async (batch) => { | ||
if (!writer) { | ||
const schema = this.buildParquetSchema(batch.fields) | ||
filePath = await this.getUrl({ | ||
sqlHash: sqlHash!, | ||
queryName: queryPath, | ||
sourcePath: source.path, | ||
ignoreMissingFile: true, | ||
}) | ||
source | ||
.batchQuery(compiled, { | ||
batchSize: size, | ||
onBatch: async (batch) => { | ||
if (!writer) { | ||
const schema = this.buildParquetSchema(batch.fields) | ||
writer = await ParquetWriter.openFile(schema, globalFilepath, { | ||
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE, | ||
}) | ||
} | ||
writer = await ParquetWriter.openFile(schema, filePath, { | ||
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE, | ||
}) | ||
} | ||
for (const row of batch.rows) { | ||
if (onDebug) { | ||
let heapUsed = process.memoryUsage().heapUsed | ||
for (const row of batch.rows) { | ||
if (onDebug) { | ||
let heapUsed = process.memoryUsage().heapUsed | ||
if (heapUsed < currentHeap) { | ||
onDebug({ | ||
memoryUsageInMb: `${(currentHeap / 1024 / 1024).toFixed( | ||
2, | ||
)} MB`, | ||
}) | ||
} | ||
if (heapUsed < currentHeap) { | ||
onDebug({ | ||
memoryUsageInMb: `${(currentHeap / 1024 / 1024).toFixed( | ||
2, | ||
)} MB`, | ||
}) | ||
currentHeap = heapUsed | ||
} | ||
currentHeap = heapUsed | ||
try { | ||
await writer.appendRow(row) | ||
} catch { | ||
// If for some reason a row writing fails we don't want | ||
// to break the process. | ||
} | ||
} | ||
rows += batch.rows.length | ||
try { | ||
await writer.appendRow(row) | ||
} catch { | ||
// If for some reason a row writing fails we don't want | ||
// to break the process. | ||
if (batch.lastBatch) { | ||
await writer.close() | ||
const fileSize = await this.parquetFileSize(localFilepath) | ||
resolve({ fileSize, rows }) | ||
} | ||
} | ||
queryRows += batch.rows.length | ||
if (batch.lastBatch) { | ||
await writer.close() | ||
resolve({ filePath, queryRows }) | ||
} | ||
}, | ||
}) | ||
}, | ||
}) | ||
.catch((error) => { | ||
reject(error) | ||
}) | ||
}) | ||
} | ||
getUrl(args: GetUrlParams): Promise<string> { | ||
const name = StorageDriver.hashName(args) | ||
const filename = `${name}.parquet` | ||
private async read(queryPath: string): Promise<{ | ||
source: Source | ||
queryConfig: QueryConfig | ||
localFilepath: string | ||
}> { | ||
const source = await this.manager.loadFromQuery(queryPath) | ||
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath) | ||
return this.resolveUrl({ ...args, filename }) | ||
} | ||
const uniqueHash = createHash('sha256') | ||
.update(sqlHash!) | ||
.update(source.path) | ||
.digest('hex') | ||
const localFilepath = `${uniqueHash}.parquet` | ||
/** | ||
* It's a Promise because other adapters can be async | ||
*/ | ||
abstract resolveUrl({ filename }: ResolveUrlParams): Promise<string> | ||
static hashName({ sqlHash, sourcePath }: GetUrlParams) { | ||
const hash = createHash('sha256') | ||
hash.update(sqlHash) | ||
hash.update(sourcePath) | ||
return hash.digest('hex') | ||
return { source, queryConfig: config, localFilepath } | ||
} | ||
@@ -163,0 +161,0 @@ |
@@ -1,4 +0,2 @@ | ||
import { vi, expect, it, describe, afterEach } from 'vitest' | ||
import fs from 'fs' | ||
import path from 'path' | ||
import { expect, it, describe, afterEach } from 'vitest' | ||
import mockFs from 'mock-fs' | ||
@@ -9,6 +7,5 @@ import SourceManager from '@/manager' | ||
import findAndMaterializeQueries from './findAndMaterializeQueries' | ||
import { WriteParquetParams } from '@/materialize/drivers/StorageDriver' | ||
const QUERIES_DIR = '/queries' | ||
const MATERIALIZED_DIR = '/materialized' | ||
const QUERIES_DIR = 'queries' | ||
const MATERIALIZED_DIR = 'materialized' | ||
const MATERIALIZABLE_SQL = ` | ||
@@ -18,2 +15,11 @@ {@config materialize = true} | ||
` | ||
const CACHEABLE_MATERIALIZABLE_SQL = ` | ||
{@config materialize = true} | ||
{@config ttl = 3600} | ||
SELECT * FROM users | ||
` | ||
const MATERIALIZABLE_FAILING_SQL = ` | ||
{@config materialize = true} | ||
FAIL mocked error message | ||
` | ||
@@ -34,23 +40,2 @@ function buildManager(queriesDir: string, materializedDir: string) { | ||
vi.mock('@/materialize/drivers/StorageDriver', async (importOriginal) => { | ||
const original = | ||
(await importOriginal()) as typeof import('@/materialize/drivers/StorageDriver') | ||
return { | ||
...original, | ||
// @ts-expect-error - Mocking abstract class | ||
StorageDriver: class extends original.StorageDriver { | ||
async writeParquet({ queryPath, params }: WriteParquetParams) { | ||
const source = await manager.loadFromQuery(queryPath) | ||
const { sqlHash } = await source.getMetadataFromQuery(queryPath) | ||
const compiled = await source.compileQuery({ queryPath, params }) | ||
// Stupid mock parquet that write the compiled query to a file | ||
const filePath = path.join(MATERIALIZED_DIR, `${sqlHash}.parquet`) | ||
fs.writeFileSync(filePath, JSON.stringify(compiled)) | ||
return Promise.resolve({ filePath, queryRows: 0 }) | ||
} | ||
}, | ||
} | ||
}) | ||
describe('findAndMaterializeQueries', () => { | ||
@@ -61,3 +46,3 @@ afterEach(() => { | ||
it('should run writeParquet', async () => { | ||
it('should run materialize', async () => { | ||
mockFs({ | ||
@@ -71,25 +56,23 @@ [QUERIES_DIR]: { | ||
}, | ||
[MATERIALIZED_DIR]: {}, | ||
}) | ||
const result = await findAndMaterializeQueries({ | ||
sourceManager: manager, | ||
selectedQueries: [], | ||
}) | ||
const result = await findAndMaterializeQueries({ sourceManager: manager }) | ||
expect(result).toEqual({ | ||
batchSize: 4096, | ||
totalTime: expect.any(String), | ||
successful: true, | ||
queriesInfo: [ | ||
totalTime: expect.any(Number), | ||
materializations: [ | ||
{ | ||
file: expect.any(String), | ||
fileSize: '69 Bytes', | ||
query: 'query.sql', | ||
queryRows: '0 rows', | ||
time: expect.any(String), | ||
queryPath: 'query.sql', | ||
cached: false, | ||
success: true, | ||
fileSize: expect.any(Number), | ||
rows: expect.any(Number), | ||
time: expect.any(Number), | ||
}, | ||
{ | ||
file: expect.any(String), | ||
fileSize: '69 Bytes', | ||
query: 'subdir/query2.sql', | ||
queryRows: '0 rows', | ||
time: expect.any(String), | ||
queryPath: 'subdir/query2.sql', | ||
cached: false, | ||
success: true, | ||
fileSize: expect.any(Number), | ||
rows: expect.any(Number), | ||
time: expect.any(Number), | ||
}, | ||
@@ -109,2 +92,3 @@ ], | ||
}, | ||
[MATERIALIZED_DIR]: {}, | ||
}) | ||
@@ -116,12 +100,11 @@ const result = await findAndMaterializeQueries({ | ||
expect(result).toEqual({ | ||
successful: true, | ||
batchSize: 4096, | ||
totalTime: expect.any(String), | ||
queriesInfo: [ | ||
totalTime: expect.any(Number), | ||
materializations: [ | ||
{ | ||
file: expect.any(String), | ||
fileSize: '69 Bytes', | ||
query: 'subdir/query2.sql', | ||
queryRows: '0 rows', | ||
time: expect.any(String), | ||
queryPath: 'subdir/query2.sql', | ||
cached: false, | ||
success: true, | ||
fileSize: expect.any(Number), | ||
rows: expect.any(Number), | ||
time: expect.any(Number), | ||
}, | ||
@@ -141,2 +124,3 @@ ], | ||
}, | ||
[MATERIALIZED_DIR]: {}, | ||
}) | ||
@@ -149,8 +133,75 @@ const result = await findAndMaterializeQueries({ | ||
expect(result).toEqual({ | ||
successful: false, | ||
batchSize: 4096, | ||
totalTime: expect.any(String), | ||
queriesInfo: [], | ||
totalTime: expect.any(Number), | ||
materializations: [ | ||
{ | ||
queryPath: 'query', | ||
cached: false, | ||
success: false, | ||
error: expect.objectContaining({ | ||
message: 'Query is not configured as materialized', | ||
}), | ||
}, | ||
{ | ||
queryPath: 'subdir/query2', | ||
cached: false, | ||
success: true, | ||
fileSize: expect.any(Number), | ||
rows: expect.any(Number), | ||
time: expect.any(Number), | ||
}, | ||
], | ||
}) | ||
}) | ||
it('returns a failed materialization when the query fails', async () => { | ||
mockFs({ | ||
[QUERIES_DIR]: { | ||
'query.sql': MATERIALIZABLE_FAILING_SQL, | ||
'source.yml': `type: ${ConnectorType.TestInternal}`, | ||
}, | ||
[MATERIALIZED_DIR]: {}, | ||
}) | ||
const result = await findAndMaterializeQueries({ | ||
sourceManager: manager, | ||
selectedQueries: ['query'], | ||
}) | ||
expect(result).toEqual({ | ||
totalTime: expect.any(Number), | ||
materializations: [ | ||
{ | ||
queryPath: 'query', | ||
cached: false, | ||
success: false, | ||
error: expect.objectContaining({ | ||
message: 'mocked error message', | ||
}), | ||
}, | ||
], | ||
}) | ||
}) | ||
it('Does not rematerialize cached queries', async () => { | ||
mockFs({ | ||
[QUERIES_DIR]: { | ||
'query.sql': CACHEABLE_MATERIALIZABLE_SQL, | ||
'source.yml': `type: ${ConnectorType.TestInternal}`, | ||
}, | ||
[MATERIALIZED_DIR]: {}, | ||
}) | ||
// Materialize the query | ||
await findAndMaterializeQueries({ sourceManager: manager }) | ||
// Try to rematerialize the query while it's cached | ||
const result = await findAndMaterializeQueries({ sourceManager: manager }) | ||
expect(result).toEqual({ | ||
totalTime: expect.any(Number), | ||
materializations: [ | ||
{ | ||
queryPath: 'query.sql', | ||
cached: true, | ||
}, | ||
], | ||
}) | ||
}) | ||
}) |
import fs from 'fs' | ||
import path from 'path' | ||
import { StorageDriver } from '@/materialize/drivers' | ||
import { DiskDriver } from '@/materialize/drivers' | ||
import SourceManager from '@/manager' | ||
import { StorageDriver } from './drivers' | ||
class NoDiskDriverError extends Error { | ||
constructor() { | ||
super('Disk driver is required for materializing queries') | ||
} | ||
} | ||
class NonMaterializableQueryError extends Error { | ||
constructor(query: string) { | ||
super(`Query ${query} is not materializable`) | ||
} | ||
} | ||
function ensureMaterializeDirExists(storage: StorageDriver) { | ||
if (!(storage instanceof DiskDriver)) { | ||
throw new NoDiskDriverError() | ||
} | ||
const basePath = storage.basePath | ||
if (!fs.existsSync(basePath)) { | ||
fs.mkdirSync(basePath, { recursive: true }) | ||
} | ||
} | ||
function parseTime({ start, end }: { start: number; end: number }) { | ||
const minuteTime = Math.floor((end - start) / 60000) | ||
const min = minuteTime.toString().padStart(2, '0') | ||
const secondsTime = ((end - start) % 60000) / 1000 | ||
const seconds = minuteTime | ||
? secondsTime.toFixed(0).padStart(2, '0') | ||
: secondsTime.toFixed(2).padStart(5, '0') | ||
return minuteTime ? `${min}:${seconds} minutes` : `${seconds} seconds` | ||
} | ||
function humanizeFileSize(bytes: number): string { | ||
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'] | ||
if (bytes === 0) return '0 Byte' | ||
const i = Math.floor(Math.log(bytes) / Math.log(1024)) | ||
return parseFloat((bytes / Math.pow(1024, i)).toFixed(2)) + ' ' + sizes[i] | ||
} | ||
function buildInfo({ | ||
startTime, | ||
endTime, | ||
query, | ||
filePath, | ||
queryRows, | ||
}: { | ||
startTime: number | ||
endTime: number | ||
query: string | ||
filePath: string | ||
queryRows: number | ||
}) { | ||
const file = path.basename(filePath) | ||
return { | ||
query, | ||
queryRows: `${queryRows.toLocaleString('de-DE', { | ||
style: 'decimal', | ||
useGrouping: true, | ||
})} rows`, | ||
time: parseTime({ start: startTime, end: endTime }), | ||
fileSize: humanizeFileSize(fs.statSync(filePath).size), | ||
file, | ||
} | ||
} | ||
function recursiveFindQueriesInDir( | ||
@@ -90,80 +23,98 @@ rootDir: string, | ||
async function isMaterializableQuery({ | ||
sourceManager, | ||
query, | ||
}: { | ||
sourceManager: SourceManager | ||
query: string | ||
}) { | ||
const source = await sourceManager.loadFromQuery(query) | ||
const { config } = await source.getMetadataFromQuery(query) | ||
return config.materialize === true | ||
} | ||
async function findQueries({ | ||
sourceManager, | ||
selected, | ||
}: { | ||
sourceManager: SourceManager | ||
selected: string[] | ||
}) { | ||
const storage = sourceManager.materializeStorage | ||
ensureMaterializeDirExists(storage) | ||
async function findMaterializableQueries(sourceManager: SourceManager) { | ||
const queriesDir = sourceManager.queriesDir | ||
const allQueries = recursiveFindQueriesInDir(queriesDir) | ||
// We don' filter queries if user pass a specific list | ||
// If one of then is not materializable we fail | ||
if (selected.length > 0) { | ||
return Promise.all( | ||
allQueries | ||
.filter((query) => | ||
selected.some( | ||
(selectedQuery) => | ||
query === selectedQuery || query === `${selectedQuery}.sql`, | ||
), | ||
) | ||
.map(async (query) => { | ||
const canMaterialize = await isMaterializableQuery({ | ||
sourceManager, | ||
query, | ||
}) | ||
const materializableQueries = await Promise.all( | ||
allQueries.map(async (queryPath: string) => { | ||
const source = await sourceManager.loadFromQuery(queryPath) | ||
const { config } = await source.getMetadataFromQuery(queryPath) | ||
return config.materialize === true ? queryPath : null | ||
}), | ||
) | ||
if (!canMaterialize) { | ||
throw new NonMaterializableQueryError(query) | ||
} | ||
return materializableQueries.filter(Boolean) as string[] | ||
} | ||
return query | ||
}), | ||
) | ||
} | ||
interface IMaterializationInfo { | ||
queryPath: string | ||
cached: boolean | ||
} | ||
const queries = await Promise.all( | ||
allQueries.map(async (query) => { | ||
const canMaterialize = await isMaterializableQuery({ | ||
sourceManager, | ||
query, | ||
}) | ||
return { query, canMaterialize } | ||
}), | ||
) | ||
interface CachedMaterializationInfo extends IMaterializationInfo { | ||
cached: true | ||
} | ||
return queries.filter((q) => q.canMaterialize).map((q) => q.query) | ||
interface IMissMaterializationInfo extends IMaterializationInfo { | ||
cached: false | ||
success: boolean | ||
} | ||
type MaterializeInfo = { | ||
query: string | ||
file: string | ||
fileSize: string | ||
time: string | ||
queryRows: string | ||
interface SuccessMaterializationInfo extends IMissMaterializationInfo { | ||
cached: false | ||
success: true | ||
rows: number | ||
fileSize: number | ||
time: number | ||
} | ||
interface FailedMaterializationInfo extends IMissMaterializationInfo { | ||
cached: false | ||
success: false | ||
error: Error | ||
} | ||
export type MaterializationInfo = | ||
| CachedMaterializationInfo | ||
| SuccessMaterializationInfo | ||
| FailedMaterializationInfo | ||
async function materializeQuery({ | ||
storage, | ||
queryPath, | ||
onDebug, | ||
force, | ||
}: { | ||
storage: StorageDriver | ||
queryPath: string | ||
onDebug?: (_p: { memoryUsageInMb: string }) => void | ||
force?: boolean | ||
}): Promise<MaterializationInfo> { | ||
if (!force && (await storage.isMaterialized(queryPath))) { | ||
return { | ||
queryPath, | ||
cached: true, | ||
} | ||
} | ||
try { | ||
const startTime = performance.now() | ||
const materialized = await storage.materialize({ | ||
onDebug, | ||
queryPath, | ||
batchSize: BATCH_SIZE, | ||
}) | ||
const endTime = performance.now() | ||
return { | ||
queryPath, | ||
cached: false, | ||
success: true, | ||
rows: materialized.rows, | ||
fileSize: materialized.fileSize, | ||
time: endTime - startTime, | ||
} | ||
} catch (error) { | ||
return { | ||
queryPath, | ||
cached: false, | ||
success: false, | ||
error: error as Error, | ||
} | ||
} | ||
} | ||
const BATCH_SIZE = 4096 | ||
type Result = { | ||
successful: boolean | ||
totalTime: string | ||
batchSize: number | ||
queriesInfo: MaterializeInfo[] | ||
totalTime: number | ||
materializations: MaterializationInfo[] | ||
} | ||
@@ -173,61 +124,44 @@ export default async function findAndMaterializeQueries({ | ||
onStartQuery, | ||
onMaterialized, | ||
onDebug, | ||
selectedQueries = [], | ||
force = false, | ||
}: { | ||
selectedQueries?: string[] | ||
onStartQuery?: (_p: { count: number; index: number; query: string }) => void | ||
onMaterialized?: (info: MaterializationInfo) => void | ||
onDebug?: (_p: { memoryUsageInMb: string }) => void | ||
sourceManager: SourceManager | ||
force?: boolean | ||
}): Promise<Result> { | ||
const startTotalTime = performance.now() | ||
const info: MaterializeInfo[] = [] | ||
const result: Result = { | ||
batchSize: BATCH_SIZE, | ||
successful: false, | ||
totalTime: '', | ||
queriesInfo: [], | ||
} | ||
const storage = sourceManager.materializeStorage | ||
const queries = selectedQueries.length | ||
? selectedQueries | ||
: await findMaterializableQueries(sourceManager) | ||
try { | ||
const queries = await findQueries({ | ||
sourceManager, | ||
selected: selectedQueries, | ||
const materializations: MaterializationInfo[] = [] | ||
for (let index = 0; index < queries.length; index++) { | ||
const queryPath = queries[index]! | ||
onStartQuery?.({ | ||
index, | ||
count: queries.length, | ||
query: queryPath, | ||
}) | ||
const materializationInfo = await materializeQuery({ | ||
storage, | ||
queryPath, | ||
onDebug, | ||
force, | ||
}) | ||
materializations.push(materializationInfo) | ||
onMaterialized?.(materializationInfo) | ||
} | ||
for (const [index, query] of queries.entries()) { | ||
const startTime = performance.now() | ||
onStartQuery?.({ count: queries.length, index: index + 1, query }) | ||
const materialize = await storage.writeParquet({ | ||
onDebug, | ||
queryPath: query, | ||
params: {}, | ||
batchSize: BATCH_SIZE, | ||
}) | ||
const endTime = performance.now() | ||
info.push( | ||
buildInfo({ | ||
startTime, | ||
endTime, | ||
query, | ||
filePath: materialize.filePath, | ||
queryRows: materialize.queryRows, | ||
}), | ||
) | ||
} | ||
result.successful = true | ||
} catch (error) { | ||
if (process.env.NODE_ENV !== 'test') { | ||
console.error(error) | ||
} | ||
result.successful = false | ||
} finally { | ||
const endTotalTime = performance.now() | ||
const totalTime = parseTime({ start: startTotalTime, end: endTotalTime }) | ||
result.totalTime = totalTime | ||
result.queriesInfo = info | ||
const endtotalTime = performance.now() | ||
return { | ||
totalTime: endtotalTime - startTotalTime, | ||
materializations, | ||
} | ||
return result | ||
} |
import SourceManager from '@/manager' | ||
import DiskDriver from '@/materialize/drivers/disk/DiskDriver' | ||
import DummyDriver from '@/materialize/drivers/dummy/DummyDriver' | ||
export { default as findAndMaterializeQueries } from './findAndMaterializeQueries' | ||
export { | ||
default as findAndMaterializeQueries, | ||
type MaterializationInfo, | ||
} from './findAndMaterializeQueries' | ||
export const STORAGE_TYPES = { disk: 'disk' } | ||
@@ -17,3 +21,3 @@ export type StorageType = keyof typeof STORAGE_TYPES | ||
} | ||
export type StorageKlass = typeof DiskDriver | ||
export type StorageKlass = typeof DiskDriver | typeof DummyDriver | ||
@@ -20,0 +24,0 @@ export function getDriverKlass({ |
@@ -65,12 +65,10 @@ import { BaseConnector, ConnectorOptions } from '@/baseConnector' | ||
compiledQuery: CompiledQuery, | ||
options: BatchedQueryOptions, | ||
{ onBatch }: BatchedQueryOptions, | ||
): Promise<void> { | ||
return Promise.reject( | ||
new Error(` | ||
batchQuery not implemented for TestConnector | ||
Mock it in your tests | ||
CompiledQuery: ${JSON.stringify(compiledQuery)} | ||
batchOptions: ${JSON.stringify(options)} | ||
`), | ||
) | ||
const result = await this.runQuery(compiledQuery) | ||
onBatch({ | ||
rows: result.toArray(), | ||
fields: result.fields, | ||
lastBatch: true, | ||
}) | ||
} | ||
@@ -77,0 +75,0 @@ |
@@ -5,4 +5,2 @@ import { Field, QueryResultArray } from '@latitude-data/query_result' | ||
export type { GetUrlParams } from './materialize/drivers/StorageDriver' | ||
export enum ConnectorType { | ||
@@ -9,0 +7,0 @@ Athena = 'athena', |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
5
248470
103
4015
2