Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@latitude-data/source-manager

Package Overview
Dependencies
Maintainers
4
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@latitude-data/source-manager - npm Package Compare versions

Comparing version 1.1.0 to 1.2.0-canary.0

10

CHANGELOG.md
# @latitude-data/source-manager
## 1.2.0-canary.0
### Minor Changes
- d9d0326: Materialized queries now accepts a TTL config. Running the materialize command will skip any previously materialized query that still has a valid TTL. To force a rematerialization of valid cached queries, run the materialize command with the `--force` option.
### Patch Changes
- a692a3a: Fix: Loading the same source from the config file and from a query resulted in two different source instances.
## 1.1.0

@@ -4,0 +14,0 @@

318

dist/index.js

@@ -417,9 +417,9 @@ import { emptyMetadata, compile, readMetadata } from '@latitude-data/sql-compiler';

}
async batchQuery(compiledQuery, options) {
return Promise.reject(new Error(`
batchQuery not implemented for TestConnector
Mock it in your tests
CompiledQuery: ${JSON.stringify(compiledQuery)}
batchOptions: ${JSON.stringify(options)}
`));
async batchQuery(compiledQuery, { onBatch }) {
const result = await this.runQuery(compiledQuery);
onBatch({
rows: result.toArray(),
fields: result.fields,
lastBatch: true,
});
}

@@ -650,4 +650,3 @@ async runQuery(compiledQuery) {

class MaterializedFileNotFoundError extends Error {
}
const ROW_GROUP_SIZE = 4096; // How many rows are in the ParquetWriter file buffer at a time
function mapDataTypeToParquet(dataType) {

@@ -674,8 +673,2 @@ switch (dataType) {

}
const ROW_GROUP_SIZE = 4096; // PARQUET BATCH WRITE
/**
* In order to hash a SQL query, we need to know the source path
* it came from. This way we ensure the path is unique even
* if two sources share the same query.
*/
class StorageDriver {

@@ -686,16 +679,29 @@ manager;

}
async writeParquet({ queryPath, params, batchSize, onDebug, }) {
let writer;
const source = await this.manager.loadFromQuery(queryPath);
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath);
if (!config.materialize) {
async getParquetFilepath(queryPath) {
const { localFilepath } = await this.read(queryPath);
return this.resolveUrl(localFilepath);
}
async isMaterialized(queryPath) {
const { queryConfig, localFilepath } = await this.read(queryPath);
if (!(await this.exists(localFilepath)))
return false;
const { ttl } = queryConfig;
const maxLifeTime = (ttl ?? 0) * 1000; // ttl is in seconds
const lifeTime = Date.now() - (await this.parquetFileTime(localFilepath)); // In milliseconds
return lifeTime < maxLifeTime;
}
async materialize({ queryPath, batchSize, onDebug, }) {
const { source, queryConfig, localFilepath } = await this.read(queryPath);
if (!queryConfig.materialize) {
throw new Error('Query is not configured as materialized');
}
const compiled = await source.compileQuery({ queryPath, params });
const globalFilepath = await this.getParquetFilepath(queryPath);
const compiled = await source.compileQuery({ queryPath, params: {} });
let writer;
let currentHeap = 0;
return new Promise((resolve) => {
let filePath;
let queryRows = 0;
return new Promise((resolve, reject) => {
let rows = 0;
const size = batchSize ?? 1000;
source.batchQuery(compiled, {
source
.batchQuery(compiled, {
batchSize: size,

@@ -705,9 +711,3 @@ onBatch: async (batch) => {

const schema = this.buildParquetSchema(batch.fields);
filePath = await this.getUrl({
sqlHash: sqlHash,
queryName: queryPath,
sourcePath: source.path,
ignoreMissingFile: true,
});
writer = await ParquetWriter.openFile(schema, filePath, {
writer = await ParquetWriter.openFile(schema, globalFilepath, {
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,

@@ -734,22 +734,25 @@ });

}
queryRows += batch.rows.length;
rows += batch.rows.length;
if (batch.lastBatch) {
await writer.close();
resolve({ filePath, queryRows });
const fileSize = await this.parquetFileSize(localFilepath);
resolve({ fileSize, rows });
}
},
})
.catch((error) => {
reject(error);
});
});
}
getUrl(args) {
const name = StorageDriver.hashName(args);
const filename = `${name}.parquet`;
return this.resolveUrl({ ...args, filename });
async read(queryPath) {
const source = await this.manager.loadFromQuery(queryPath);
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath);
const uniqueHash = createHash('sha256')
.update(sqlHash)
.update(source.path)
.digest('hex');
const localFilepath = `${uniqueHash}.parquet`;
return { source, queryConfig: config, localFilepath };
}
static hashName({ sqlHash, sourcePath }) {
const hash = createHash('sha256');
hash.update(sqlHash);
hash.update(sourcePath);
return hash.digest('hex');
}
buildParquetSchema(fields) {

@@ -773,14 +776,14 @@ const columns = fields.reduce((schema, field) => {

}
get basePath() {
return '/dummy-base-path';
async resolveUrl(localFilepath) {
return localFilepath;
}
getUrl(args) {
return this.resolveUrl({
...args,
filename: `ENCRYPTED[${args.sqlHash}].parquet`,
});
async exists() {
return false;
}
resolveUrl({ filename }) {
return Promise.resolve(filename);
async parquetFileTime() {
return Date.now();
}
async parquetFileSize() {
return 0;
}
}

@@ -869,67 +872,24 @@

materializeDir;
constructor({ path, manager }) {
constructor({ path: materializeDir, manager }) {
super({ manager });
this.materializeDir = path;
this.materializeDir = materializeDir;
if (!fs__default.existsSync(this.materializeDir)) {
fs__default.mkdirSync(this.materializeDir, { recursive: true });
}
}
get basePath() {
return this.materializeDir;
async resolveUrl(localFilepath) {
const filepath = path.join(this.materializeDir, localFilepath);
return filepath;
}
resolveUrl({ queryName, filename, ignoreMissingFile = false, }) {
const filepath = path.join(this.materializeDir, filename);
if (ignoreMissingFile)
return Promise.resolve(filepath);
if (fs__default.existsSync(filepath))
return Promise.resolve(filepath);
return Promise.reject(new MaterializedFileNotFoundError(`materialize query not found for: '${queryName}'`));
async exists(localFilepath) {
return fs__default.existsSync(await this.resolveUrl(localFilepath));
}
}
class NoDiskDriverError extends Error {
constructor() {
super('Disk driver is required for materializing queries');
async parquetFileTime(localFilepath) {
return fs__default.statSync(await this.resolveUrl(localFilepath)).mtimeMs;
}
}
class NonMaterializableQueryError extends Error {
constructor(query) {
super(`Query ${query} is not materializable`);
async parquetFileSize(localFilepath) {
return fs__default.statSync(await this.resolveUrl(localFilepath)).size;
}
}
function ensureMaterializeDirExists(storage) {
if (!(storage instanceof DiskDriver)) {
throw new NoDiskDriverError();
}
const basePath = storage.basePath;
if (!fs__default.existsSync(basePath)) {
fs__default.mkdirSync(basePath, { recursive: true });
}
}
function parseTime({ start, end }) {
const minuteTime = Math.floor((end - start) / 60000);
const min = minuteTime.toString().padStart(2, '0');
const secondsTime = ((end - start) % 60000) / 1000;
const seconds = minuteTime
? secondsTime.toFixed(0).padStart(2, '0')
: secondsTime.toFixed(2).padStart(5, '0');
return minuteTime ? `${min}:${seconds} minutes` : `${seconds} seconds`;
}
function humanizeFileSize(bytes) {
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'];
if (bytes === 0)
return '0 Byte';
const i = Math.floor(Math.log(bytes) / Math.log(1024));
return parseFloat((bytes / Math.pow(1024, i)).toFixed(2)) + ' ' + sizes[i];
}
function buildInfo({ startTime, endTime, query, filePath, queryRows, }) {
const file = path.basename(filePath);
return {
query,
queryRows: `${queryRows.toLocaleString('de-DE', {
style: 'decimal',
useGrouping: true,
})} rows`,
time: parseTime({ start: startTime, end: endTime }),
fileSize: humanizeFileSize(fs__default.statSync(filePath).size),
file,
};
}
function recursiveFindQueriesInDir(rootDir, nextDir) {

@@ -947,86 +907,74 @@ const dir = nextDir ?? rootDir;

}
async function isMaterializableQuery({ sourceManager, query, }) {
const source = await sourceManager.loadFromQuery(query);
const { config } = await source.getMetadataFromQuery(query);
return config.materialize === true;
}
async function findQueries({ sourceManager, selected, }) {
const storage = sourceManager.materializeStorage;
ensureMaterializeDirExists(storage);
async function findMaterializableQueries(sourceManager) {
const queriesDir = sourceManager.queriesDir;
const allQueries = recursiveFindQueriesInDir(queriesDir);
// We don' filter queries if user pass a specific list
// If one of then is not materializable we fail
if (selected.length > 0) {
return Promise.all(allQueries
.filter((query) => selected.some((selectedQuery) => query === selectedQuery || query === `${selectedQuery}.sql`))
.map(async (query) => {
const canMaterialize = await isMaterializableQuery({
sourceManager,
query,
});
if (!canMaterialize) {
throw new NonMaterializableQueryError(query);
}
return query;
}));
const materializableQueries = await Promise.all(allQueries.map(async (queryPath) => {
const source = await sourceManager.loadFromQuery(queryPath);
const { config } = await source.getMetadataFromQuery(queryPath);
return config.materialize === true ? queryPath : null;
}));
return materializableQueries.filter(Boolean);
}
async function materializeQuery({ storage, queryPath, onDebug, force, }) {
if (!force && (await storage.isMaterialized(queryPath))) {
return {
queryPath,
cached: true,
};
}
const queries = await Promise.all(allQueries.map(async (query) => {
const canMaterialize = await isMaterializableQuery({
sourceManager,
query,
try {
const startTime = performance.now();
const materialized = await storage.materialize({
onDebug,
queryPath,
batchSize: BATCH_SIZE,
});
return { query, canMaterialize };
}));
return queries.filter((q) => q.canMaterialize).map((q) => q.query);
const endTime = performance.now();
return {
queryPath,
cached: false,
success: true,
rows: materialized.rows,
fileSize: materialized.fileSize,
time: endTime - startTime,
};
}
catch (error) {
return {
queryPath,
cached: false,
success: false,
error: error,
};
}
}
const BATCH_SIZE = 4096;
async function findAndMaterializeQueries({ sourceManager, onStartQuery, onDebug, selectedQueries = [], }) {
async function findAndMaterializeQueries({ sourceManager, onStartQuery, onMaterialized, onDebug, selectedQueries = [], force = false, }) {
const startTotalTime = performance.now();
const info = [];
const result = {
batchSize: BATCH_SIZE,
successful: false,
totalTime: '',
queriesInfo: [],
};
const storage = sourceManager.materializeStorage;
try {
const queries = await findQueries({
sourceManager,
selected: selectedQueries,
const queries = selectedQueries.length
? selectedQueries
: await findMaterializableQueries(sourceManager);
const materializations = [];
for (let index = 0; index < queries.length; index++) {
const queryPath = queries[index];
onStartQuery?.({
index,
count: queries.length,
query: queryPath,
});
for (const [index, query] of queries.entries()) {
const startTime = performance.now();
onStartQuery?.({ count: queries.length, index: index + 1, query });
const materialize = await storage.writeParquet({
onDebug,
queryPath: query,
params: {},
batchSize: BATCH_SIZE,
});
const endTime = performance.now();
info.push(buildInfo({
startTime,
endTime,
query,
filePath: materialize.filePath,
queryRows: materialize.queryRows,
}));
}
result.successful = true;
const materializationInfo = await materializeQuery({
storage,
queryPath,
onDebug,
force,
});
materializations.push(materializationInfo);
onMaterialized?.(materializationInfo);
}
catch (error) {
if (process.env.NODE_ENV !== 'test') {
console.error(error);
}
result.successful = false;
}
finally {
const endTotalTime = performance.now();
const totalTime = parseTime({ start: startTotalTime, end: endTotalTime });
result.totalTime = totalTime;
result.queriesInfo = info;
}
return result;
const endtotalTime = performance.now();
return {
totalTime: endtotalTime - startTotalTime,
materializations,
};
}

@@ -1033,0 +981,0 @@

@@ -1,9 +0,11 @@

import { ResolveUrlParams, StorageDriver } from '@/materialize/drivers/StorageDriver';
import { StorageDriver } from '@/materialize/drivers/StorageDriver';
import { FullDriverConfig } from '@/materialize';
export default class DiskDriver extends StorageDriver {
private materializeDir;
constructor({ path, manager }: FullDriverConfig<'disk'>);
get basePath(): string;
resolveUrl({ queryName, filename, ignoreMissingFile, }: ResolveUrlParams): Promise<string>;
constructor({ path: materializeDir, manager }: FullDriverConfig<'disk'>);
protected resolveUrl(localFilepath: string): Promise<string>;
protected exists(localFilepath: string): Promise<boolean>;
protected parquetFileTime(localFilepath: string): Promise<number>;
protected parquetFileSize(localFilepath: string): Promise<number>;
}
//# sourceMappingURL=DiskDriver.d.ts.map
import SourceManager from '@/manager';
import { GetUrlParams, ResolveUrlParams, StorageDriver } from '@/materialize/drivers/StorageDriver';
import { StorageDriver } from '@/materialize/drivers/StorageDriver';
export default class DummyDriver extends StorageDriver {

@@ -7,6 +7,7 @@ constructor({ manager }: {

});
get basePath(): string;
getUrl(args: GetUrlParams): Promise<string>;
resolveUrl({ filename }: ResolveUrlParams): Promise<string>;
protected resolveUrl(localFilepath: string): Promise<string>;
protected exists(): Promise<boolean>;
protected parquetFileTime(): Promise<number>;
protected parquetFileSize(): Promise<number>;
}
//# sourceMappingURL=DummyDriver.d.ts.map
import SourceManager from '@/manager';
import { QueryRequest } from '@/types';
export type GetUrlParams = {
sqlHash: string;
queryName: string;
sourcePath: string;
ignoreMissingFile?: boolean;
};
export type ResolveUrlParams = GetUrlParams & {
filename: string;
ignoreMissingFile?: boolean;
};
export declare class MaterializedFileNotFoundError extends Error {
}
type Result = {
filePath: string;
queryRows: number;
};
export type WriteParquetParams = QueryRequest & {
export type MaterializeProps = Omit<QueryRequest, 'params'> & {
batchSize?: number;

@@ -25,7 +9,6 @@ onDebug?: (_p: {

};
/**
* In order to hash a SQL query, we need to know the source path
* it came from. This way we ensure the path is unique even
* if two sources share the same query.
*/
type Result = {
fileSize: number;
rows: number;
};
export declare abstract class StorageDriver {

@@ -36,10 +19,10 @@ private manager;

});
abstract get basePath(): string;
writeParquet({ queryPath, params, batchSize, onDebug, }: WriteParquetParams): Promise<Result>;
getUrl(args: GetUrlParams): Promise<string>;
/**
* It's a Promise because other adapters can be async
*/
abstract resolveUrl({ filename }: ResolveUrlParams): Promise<string>;
static hashName({ sqlHash, sourcePath }: GetUrlParams): string;
protected abstract resolveUrl(localFilepath: string): Promise<string>;
protected abstract exists(localFilepath: string): Promise<boolean>;
protected abstract parquetFileTime(localFilepath: string): Promise<number>;
protected abstract parquetFileSize(localFilepath: string): Promise<number>;
getParquetFilepath(queryPath: string): Promise<string>;
isMaterialized(queryPath: string): Promise<boolean>;
materialize({ queryPath, batchSize, onDebug, }: MaterializeProps): Promise<Result>;
private read;
private buildParquetSchema;

@@ -46,0 +29,0 @@ }

import SourceManager from '@/manager';
type MaterializeInfo = {
query: string;
file: string;
fileSize: string;
time: string;
queryRows: string;
};
interface IMaterializationInfo {
queryPath: string;
cached: boolean;
}
interface CachedMaterializationInfo extends IMaterializationInfo {
cached: true;
}
interface IMissMaterializationInfo extends IMaterializationInfo {
cached: false;
success: boolean;
}
interface SuccessMaterializationInfo extends IMissMaterializationInfo {
cached: false;
success: true;
rows: number;
fileSize: number;
time: number;
}
interface FailedMaterializationInfo extends IMissMaterializationInfo {
cached: false;
success: false;
error: Error;
}
export type MaterializationInfo = CachedMaterializationInfo | SuccessMaterializationInfo | FailedMaterializationInfo;
type Result = {
successful: boolean;
totalTime: string;
batchSize: number;
queriesInfo: MaterializeInfo[];
totalTime: number;
materializations: MaterializationInfo[];
};
export default function findAndMaterializeQueries({ sourceManager, onStartQuery, onDebug, selectedQueries, }: {
export default function findAndMaterializeQueries({ sourceManager, onStartQuery, onMaterialized, onDebug, selectedQueries, force, }: {
selectedQueries?: string[];

@@ -22,2 +37,3 @@ onStartQuery?: (_p: {

}) => void;
onMaterialized?: (info: MaterializationInfo) => void;
onDebug?: (_p: {

@@ -27,4 +43,5 @@ memoryUsageInMb: string;

sourceManager: SourceManager;
force?: boolean;
}): Promise<Result>;
export {};
//# sourceMappingURL=findAndMaterializeQueries.d.ts.map
import SourceManager from '@/manager';
import DiskDriver from '@/materialize/drivers/disk/DiskDriver';
export { default as findAndMaterializeQueries } from './findAndMaterializeQueries';
import DummyDriver from '@/materialize/drivers/dummy/DummyDriver';
export { default as findAndMaterializeQueries, type MaterializationInfo, } from './findAndMaterializeQueries';
export declare const STORAGE_TYPES: {

@@ -18,3 +19,3 @@ disk: string;

};
export type StorageKlass = typeof DiskDriver;
export type StorageKlass = typeof DiskDriver | typeof DummyDriver;
export declare function getDriverKlass({ type, }: {

@@ -21,0 +22,0 @@ type: StorageType;

@@ -28,5 +28,5 @@ import { BaseConnector, ConnectorOptions } from '@/baseConnector';

resolve(value: unknown): ResolvedParam;
batchQuery(compiledQuery: CompiledQuery, options: BatchedQueryOptions): Promise<void>;
batchQuery(compiledQuery: CompiledQuery, { onBatch }: BatchedQueryOptions): Promise<void>;
runQuery(compiledQuery: CompiledQuery): Promise<QueryResult>;
}
//# sourceMappingURL=index.d.ts.map
import { Field, QueryResultArray } from '@latitude-data/query_result';
import { Source } from './source';
export { CompileError } from '@latitude-data/sql-compiler';
export type { GetUrlParams } from './materialize/drivers/StorageDriver';
export declare enum ConnectorType {

@@ -6,0 +5,0 @@ Athena = "athena",

{
"name": "@latitude-data/source-manager",
"version": "1.1.0",
"version": "1.2.0-canary.0",
"license": "LGPL",

@@ -5,0 +5,0 @@ "description": "Manage Latitude sources",

import fs from 'fs'
import path from 'path'
import {
MaterializedFileNotFoundError,
ResolveUrlParams,
StorageDriver,
} from '@/materialize/drivers/StorageDriver'
import { StorageDriver } from '@/materialize/drivers/StorageDriver'
import { FullDriverConfig } from '@/materialize'

@@ -14,29 +10,27 @@

constructor({ path, manager }: FullDriverConfig<'disk'>) {
constructor({ path: materializeDir, manager }: FullDriverConfig<'disk'>) {
super({ manager })
this.materializeDir = materializeDir
this.materializeDir = path
if (!fs.existsSync(this.materializeDir)) {
fs.mkdirSync(this.materializeDir, { recursive: true })
}
}
get basePath(): string {
return this.materializeDir
protected async resolveUrl(localFilepath: string): Promise<string> {
const filepath = path.join(this.materializeDir, localFilepath)
return filepath
}
resolveUrl({
queryName,
filename,
ignoreMissingFile = false,
}: ResolveUrlParams): Promise<string> {
const filepath = path.join(this.materializeDir, filename)
protected async exists(localFilepath: string): Promise<boolean> {
return fs.existsSync(await this.resolveUrl(localFilepath))
}
if (ignoreMissingFile) return Promise.resolve(filepath)
protected async parquetFileTime(localFilepath: string): Promise<number> {
return fs.statSync(await this.resolveUrl(localFilepath)).mtimeMs
}
if (fs.existsSync(filepath)) return Promise.resolve(filepath)
return Promise.reject(
new MaterializedFileNotFoundError(
`materialize query not found for: '${queryName}'`,
),
)
protected async parquetFileSize(localFilepath: string): Promise<number> {
return fs.statSync(await this.resolveUrl(localFilepath)).size
}
}
import SourceManager from '@/manager'
import {
GetUrlParams,
ResolveUrlParams,
StorageDriver,
} from '@/materialize/drivers/StorageDriver'
import { StorageDriver } from '@/materialize/drivers/StorageDriver'

@@ -13,15 +9,17 @@ export default class DummyDriver extends StorageDriver {

get basePath(): string {
return '/dummy-base-path'
protected async resolveUrl(localFilepath: string): Promise<string> {
return localFilepath
}
getUrl(args: GetUrlParams): Promise<string> {
return this.resolveUrl({
...args,
filename: `ENCRYPTED[${args.sqlHash}].parquet`,
})
protected async exists(): Promise<boolean> {
return false
}
resolveUrl({ filename }: ResolveUrlParams): Promise<string> {
return Promise.resolve(filename)
protected async parquetFileTime(): Promise<number> {
return Date.now()
}
protected async parquetFileSize(): Promise<number> {
return 0
}
}

@@ -22,4 +22,4 @@ import { afterEach, describe, expect, it, vi } from 'vitest'

const FAKE_QUERIES_DIR = '/queries'
const FAKE_MATERIALIZED_DIR = '/materialized'
const FAKE_QUERIES_DIR = 'queries'
const FAKE_MATERIALIZED_DIR = 'materialized'
function buildFs(sql: string) {

@@ -73,3 +73,3 @@ mockFs({

describe('writeParquet', () => {
describe('materialize', () => {
afterEach(() => {

@@ -81,9 +81,9 @@ mockFs.restore()

const driver = getDriver(QUERIES_DIR, MATERIALIZED_DIR)
const result = await driver.writeParquet({
queryPath: 'materialize/query.sql',
params: {},
const queryPath = 'materialize/query.sql'
await driver.materialize({
queryPath,
batchSize: 5,
})
const filePath = result.filePath
const filePath = await driver.getParquetFilepath(queryPath)
expect(fs.existsSync(filePath)).toBe(true)

@@ -110,5 +110,4 @@

await expect(
driver.writeParquet({
driver.materialize({
queryPath: 'query.sql',
params: {},
batchSize: 10,

@@ -115,0 +114,0 @@ }),

import SourceManager from '@/manager'
import { ParquetLogicalType, QueryRequest } from '@/types'
import { ParquetLogicalType, QueryConfig, QueryRequest } from '@/types'
import { DataType, Field } from '@latitude-data/query_result'

@@ -8,14 +8,5 @@ import { createHash } from 'crypto'

import { FieldDefinition, ParquetType } from '@dsnp/parquetjs/dist/lib/declare'
import { Source } from '@/source'
export type GetUrlParams = {
sqlHash: string
queryName: string
sourcePath: string
ignoreMissingFile?: boolean
}
export type ResolveUrlParams = GetUrlParams & {
filename: string
ignoreMissingFile?: boolean
}
export class MaterializedFileNotFoundError extends Error {}
const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time

@@ -49,5 +40,3 @@ function mapDataTypeToParquet(dataType: DataType): ParquetType {

const ROW_GROUP_SIZE = 4096 // PARQUET BATCH WRITE
type Result = { filePath: string; queryRows: number }
export type WriteParquetParams = QueryRequest & {
export type MaterializeProps = Omit<QueryRequest, 'params'> & {
batchSize?: number

@@ -57,7 +46,4 @@ onDebug?: (_p: { memoryUsageInMb: string }) => void

/**
* In order to hash a SQL query, we need to know the source path
* it came from. This way we ensure the path is unique even
* if two sources share the same query.
*/
type Result = { fileSize: number; rows: number }
export abstract class StorageDriver {

@@ -70,93 +56,105 @@ private manager: SourceManager

abstract get basePath(): string
protected abstract resolveUrl(localFilepath: string): Promise<string>
protected abstract exists(localFilepath: string): Promise<boolean>
protected abstract parquetFileTime(localFilepath: string): Promise<number>
protected abstract parquetFileSize(localFilepath: string): Promise<number>
async writeParquet({
async getParquetFilepath(queryPath: string): Promise<string> {
const { localFilepath } = await this.read(queryPath)
return this.resolveUrl(localFilepath)
}
async isMaterialized(queryPath: string): Promise<boolean> {
const { queryConfig, localFilepath } = await this.read(queryPath)
if (!(await this.exists(localFilepath))) return false
const { ttl } = queryConfig
const maxLifeTime = ((ttl as number) ?? 0) * 1000 // ttl is in seconds
const lifeTime = Date.now() - (await this.parquetFileTime(localFilepath)) // In milliseconds
return lifeTime < maxLifeTime
}
async materialize({
queryPath,
params,
batchSize,
onDebug,
}: WriteParquetParams) {
let writer: ParquetWriter
const source = await this.manager.loadFromQuery(queryPath)
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath)
}: MaterializeProps): Promise<Result> {
const { source, queryConfig, localFilepath } = await this.read(queryPath)
if (!config.materialize) {
if (!queryConfig.materialize) {
throw new Error('Query is not configured as materialized')
}
const compiled = await source.compileQuery({ queryPath, params })
const globalFilepath = await this.getParquetFilepath(queryPath)
const compiled = await source.compileQuery({ queryPath, params: {} })
let writer: ParquetWriter
let currentHeap = 0
return new Promise<Result>((resolve) => {
let filePath: string
let queryRows = 0
return new Promise<Result>((resolve, reject) => {
let rows = 0
const size = batchSize ?? 1000
source.batchQuery(compiled, {
batchSize: size,
onBatch: async (batch) => {
if (!writer) {
const schema = this.buildParquetSchema(batch.fields)
filePath = await this.getUrl({
sqlHash: sqlHash!,
queryName: queryPath,
sourcePath: source.path,
ignoreMissingFile: true,
})
source
.batchQuery(compiled, {
batchSize: size,
onBatch: async (batch) => {
if (!writer) {
const schema = this.buildParquetSchema(batch.fields)
writer = await ParquetWriter.openFile(schema, globalFilepath, {
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,
})
}
writer = await ParquetWriter.openFile(schema, filePath, {
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,
})
}
for (const row of batch.rows) {
if (onDebug) {
let heapUsed = process.memoryUsage().heapUsed
for (const row of batch.rows) {
if (onDebug) {
let heapUsed = process.memoryUsage().heapUsed
if (heapUsed < currentHeap) {
onDebug({
memoryUsageInMb: `${(currentHeap / 1024 / 1024).toFixed(
2,
)} MB`,
})
}
if (heapUsed < currentHeap) {
onDebug({
memoryUsageInMb: `${(currentHeap / 1024 / 1024).toFixed(
2,
)} MB`,
})
currentHeap = heapUsed
}
currentHeap = heapUsed
try {
await writer.appendRow(row)
} catch {
// If for some reason a row writing fails we don't want
// to break the process.
}
}
rows += batch.rows.length
try {
await writer.appendRow(row)
} catch {
// If for some reason a row writing fails we don't want
// to break the process.
if (batch.lastBatch) {
await writer.close()
const fileSize = await this.parquetFileSize(localFilepath)
resolve({ fileSize, rows })
}
}
queryRows += batch.rows.length
if (batch.lastBatch) {
await writer.close()
resolve({ filePath, queryRows })
}
},
})
},
})
.catch((error) => {
reject(error)
})
})
}
getUrl(args: GetUrlParams): Promise<string> {
const name = StorageDriver.hashName(args)
const filename = `${name}.parquet`
private async read(queryPath: string): Promise<{
source: Source
queryConfig: QueryConfig
localFilepath: string
}> {
const source = await this.manager.loadFromQuery(queryPath)
const { config, sqlHash } = await source.getMetadataFromQuery(queryPath)
return this.resolveUrl({ ...args, filename })
}
const uniqueHash = createHash('sha256')
.update(sqlHash!)
.update(source.path)
.digest('hex')
const localFilepath = `${uniqueHash}.parquet`
/**
* It's a Promise because other adapters can be async
*/
abstract resolveUrl({ filename }: ResolveUrlParams): Promise<string>
static hashName({ sqlHash, sourcePath }: GetUrlParams) {
const hash = createHash('sha256')
hash.update(sqlHash)
hash.update(sourcePath)
return hash.digest('hex')
return { source, queryConfig: config, localFilepath }
}

@@ -163,0 +161,0 @@

@@ -1,4 +0,2 @@

import { vi, expect, it, describe, afterEach } from 'vitest'
import fs from 'fs'
import path from 'path'
import { expect, it, describe, afterEach } from 'vitest'
import mockFs from 'mock-fs'

@@ -9,6 +7,5 @@ import SourceManager from '@/manager'

import findAndMaterializeQueries from './findAndMaterializeQueries'
import { WriteParquetParams } from '@/materialize/drivers/StorageDriver'
const QUERIES_DIR = '/queries'
const MATERIALIZED_DIR = '/materialized'
const QUERIES_DIR = 'queries'
const MATERIALIZED_DIR = 'materialized'
const MATERIALIZABLE_SQL = `

@@ -18,2 +15,11 @@ {@config materialize = true}

`
const CACHEABLE_MATERIALIZABLE_SQL = `
{@config materialize = true}
{@config ttl = 3600}
SELECT * FROM users
`
const MATERIALIZABLE_FAILING_SQL = `
{@config materialize = true}
FAIL mocked error message
`

@@ -34,23 +40,2 @@ function buildManager(queriesDir: string, materializedDir: string) {

vi.mock('@/materialize/drivers/StorageDriver', async (importOriginal) => {
const original =
(await importOriginal()) as typeof import('@/materialize/drivers/StorageDriver')
return {
...original,
// @ts-expect-error - Mocking abstract class
StorageDriver: class extends original.StorageDriver {
async writeParquet({ queryPath, params }: WriteParquetParams) {
const source = await manager.loadFromQuery(queryPath)
const { sqlHash } = await source.getMetadataFromQuery(queryPath)
const compiled = await source.compileQuery({ queryPath, params })
// Stupid mock parquet that write the compiled query to a file
const filePath = path.join(MATERIALIZED_DIR, `${sqlHash}.parquet`)
fs.writeFileSync(filePath, JSON.stringify(compiled))
return Promise.resolve({ filePath, queryRows: 0 })
}
},
}
})
describe('findAndMaterializeQueries', () => {

@@ -61,3 +46,3 @@ afterEach(() => {

it('should run writeParquet', async () => {
it('should run materialize', async () => {
mockFs({

@@ -71,25 +56,23 @@ [QUERIES_DIR]: {

},
[MATERIALIZED_DIR]: {},
})
const result = await findAndMaterializeQueries({
sourceManager: manager,
selectedQueries: [],
})
const result = await findAndMaterializeQueries({ sourceManager: manager })
expect(result).toEqual({
batchSize: 4096,
totalTime: expect.any(String),
successful: true,
queriesInfo: [
totalTime: expect.any(Number),
materializations: [
{
file: expect.any(String),
fileSize: '69 Bytes',
query: 'query.sql',
queryRows: '0 rows',
time: expect.any(String),
queryPath: 'query.sql',
cached: false,
success: true,
fileSize: expect.any(Number),
rows: expect.any(Number),
time: expect.any(Number),
},
{
file: expect.any(String),
fileSize: '69 Bytes',
query: 'subdir/query2.sql',
queryRows: '0 rows',
time: expect.any(String),
queryPath: 'subdir/query2.sql',
cached: false,
success: true,
fileSize: expect.any(Number),
rows: expect.any(Number),
time: expect.any(Number),
},

@@ -109,2 +92,3 @@ ],

},
[MATERIALIZED_DIR]: {},
})

@@ -116,12 +100,11 @@ const result = await findAndMaterializeQueries({

expect(result).toEqual({
successful: true,
batchSize: 4096,
totalTime: expect.any(String),
queriesInfo: [
totalTime: expect.any(Number),
materializations: [
{
file: expect.any(String),
fileSize: '69 Bytes',
query: 'subdir/query2.sql',
queryRows: '0 rows',
time: expect.any(String),
queryPath: 'subdir/query2.sql',
cached: false,
success: true,
fileSize: expect.any(Number),
rows: expect.any(Number),
time: expect.any(Number),
},

@@ -141,2 +124,3 @@ ],

},
[MATERIALIZED_DIR]: {},
})

@@ -149,8 +133,75 @@ const result = await findAndMaterializeQueries({

expect(result).toEqual({
successful: false,
batchSize: 4096,
totalTime: expect.any(String),
queriesInfo: [],
totalTime: expect.any(Number),
materializations: [
{
queryPath: 'query',
cached: false,
success: false,
error: expect.objectContaining({
message: 'Query is not configured as materialized',
}),
},
{
queryPath: 'subdir/query2',
cached: false,
success: true,
fileSize: expect.any(Number),
rows: expect.any(Number),
time: expect.any(Number),
},
],
})
})
it('returns a failed materialization when the query fails', async () => {
mockFs({
[QUERIES_DIR]: {
'query.sql': MATERIALIZABLE_FAILING_SQL,
'source.yml': `type: ${ConnectorType.TestInternal}`,
},
[MATERIALIZED_DIR]: {},
})
const result = await findAndMaterializeQueries({
sourceManager: manager,
selectedQueries: ['query'],
})
expect(result).toEqual({
totalTime: expect.any(Number),
materializations: [
{
queryPath: 'query',
cached: false,
success: false,
error: expect.objectContaining({
message: 'mocked error message',
}),
},
],
})
})
it('Does not rematerialize cached queries', async () => {
mockFs({
[QUERIES_DIR]: {
'query.sql': CACHEABLE_MATERIALIZABLE_SQL,
'source.yml': `type: ${ConnectorType.TestInternal}`,
},
[MATERIALIZED_DIR]: {},
})
// Materialize the query
await findAndMaterializeQueries({ sourceManager: manager })
// Try to rematerialize the query while it's cached
const result = await findAndMaterializeQueries({ sourceManager: manager })
expect(result).toEqual({
totalTime: expect.any(Number),
materializations: [
{
queryPath: 'query.sql',
cached: true,
},
],
})
})
})
import fs from 'fs'
import path from 'path'
import { StorageDriver } from '@/materialize/drivers'
import { DiskDriver } from '@/materialize/drivers'
import SourceManager from '@/manager'
import { StorageDriver } from './drivers'
class NoDiskDriverError extends Error {
constructor() {
super('Disk driver is required for materializing queries')
}
}
class NonMaterializableQueryError extends Error {
constructor(query: string) {
super(`Query ${query} is not materializable`)
}
}
function ensureMaterializeDirExists(storage: StorageDriver) {
if (!(storage instanceof DiskDriver)) {
throw new NoDiskDriverError()
}
const basePath = storage.basePath
if (!fs.existsSync(basePath)) {
fs.mkdirSync(basePath, { recursive: true })
}
}
function parseTime({ start, end }: { start: number; end: number }) {
const minuteTime = Math.floor((end - start) / 60000)
const min = minuteTime.toString().padStart(2, '0')
const secondsTime = ((end - start) % 60000) / 1000
const seconds = minuteTime
? secondsTime.toFixed(0).padStart(2, '0')
: secondsTime.toFixed(2).padStart(5, '0')
return minuteTime ? `${min}:${seconds} minutes` : `${seconds} seconds`
}
function humanizeFileSize(bytes: number): string {
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB']
if (bytes === 0) return '0 Byte'
const i = Math.floor(Math.log(bytes) / Math.log(1024))
return parseFloat((bytes / Math.pow(1024, i)).toFixed(2)) + ' ' + sizes[i]
}
function buildInfo({
startTime,
endTime,
query,
filePath,
queryRows,
}: {
startTime: number
endTime: number
query: string
filePath: string
queryRows: number
}) {
const file = path.basename(filePath)
return {
query,
queryRows: `${queryRows.toLocaleString('de-DE', {
style: 'decimal',
useGrouping: true,
})} rows`,
time: parseTime({ start: startTime, end: endTime }),
fileSize: humanizeFileSize(fs.statSync(filePath).size),
file,
}
}
function recursiveFindQueriesInDir(

@@ -90,80 +23,98 @@ rootDir: string,

async function isMaterializableQuery({
sourceManager,
query,
}: {
sourceManager: SourceManager
query: string
}) {
const source = await sourceManager.loadFromQuery(query)
const { config } = await source.getMetadataFromQuery(query)
return config.materialize === true
}
async function findQueries({
sourceManager,
selected,
}: {
sourceManager: SourceManager
selected: string[]
}) {
const storage = sourceManager.materializeStorage
ensureMaterializeDirExists(storage)
async function findMaterializableQueries(sourceManager: SourceManager) {
const queriesDir = sourceManager.queriesDir
const allQueries = recursiveFindQueriesInDir(queriesDir)
// We don' filter queries if user pass a specific list
// If one of then is not materializable we fail
if (selected.length > 0) {
return Promise.all(
allQueries
.filter((query) =>
selected.some(
(selectedQuery) =>
query === selectedQuery || query === `${selectedQuery}.sql`,
),
)
.map(async (query) => {
const canMaterialize = await isMaterializableQuery({
sourceManager,
query,
})
const materializableQueries = await Promise.all(
allQueries.map(async (queryPath: string) => {
const source = await sourceManager.loadFromQuery(queryPath)
const { config } = await source.getMetadataFromQuery(queryPath)
return config.materialize === true ? queryPath : null
}),
)
if (!canMaterialize) {
throw new NonMaterializableQueryError(query)
}
return materializableQueries.filter(Boolean) as string[]
}
return query
}),
)
}
interface IMaterializationInfo {
queryPath: string
cached: boolean
}
const queries = await Promise.all(
allQueries.map(async (query) => {
const canMaterialize = await isMaterializableQuery({
sourceManager,
query,
})
return { query, canMaterialize }
}),
)
interface CachedMaterializationInfo extends IMaterializationInfo {
cached: true
}
return queries.filter((q) => q.canMaterialize).map((q) => q.query)
interface IMissMaterializationInfo extends IMaterializationInfo {
cached: false
success: boolean
}
type MaterializeInfo = {
query: string
file: string
fileSize: string
time: string
queryRows: string
interface SuccessMaterializationInfo extends IMissMaterializationInfo {
cached: false
success: true
rows: number
fileSize: number
time: number
}
interface FailedMaterializationInfo extends IMissMaterializationInfo {
cached: false
success: false
error: Error
}
export type MaterializationInfo =
| CachedMaterializationInfo
| SuccessMaterializationInfo
| FailedMaterializationInfo
async function materializeQuery({
storage,
queryPath,
onDebug,
force,
}: {
storage: StorageDriver
queryPath: string
onDebug?: (_p: { memoryUsageInMb: string }) => void
force?: boolean
}): Promise<MaterializationInfo> {
if (!force && (await storage.isMaterialized(queryPath))) {
return {
queryPath,
cached: true,
}
}
try {
const startTime = performance.now()
const materialized = await storage.materialize({
onDebug,
queryPath,
batchSize: BATCH_SIZE,
})
const endTime = performance.now()
return {
queryPath,
cached: false,
success: true,
rows: materialized.rows,
fileSize: materialized.fileSize,
time: endTime - startTime,
}
} catch (error) {
return {
queryPath,
cached: false,
success: false,
error: error as Error,
}
}
}
const BATCH_SIZE = 4096
type Result = {
successful: boolean
totalTime: string
batchSize: number
queriesInfo: MaterializeInfo[]
totalTime: number
materializations: MaterializationInfo[]
}

@@ -173,61 +124,44 @@ export default async function findAndMaterializeQueries({

onStartQuery,
onMaterialized,
onDebug,
selectedQueries = [],
force = false,
}: {
selectedQueries?: string[]
onStartQuery?: (_p: { count: number; index: number; query: string }) => void
onMaterialized?: (info: MaterializationInfo) => void
onDebug?: (_p: { memoryUsageInMb: string }) => void
sourceManager: SourceManager
force?: boolean
}): Promise<Result> {
const startTotalTime = performance.now()
const info: MaterializeInfo[] = []
const result: Result = {
batchSize: BATCH_SIZE,
successful: false,
totalTime: '',
queriesInfo: [],
}
const storage = sourceManager.materializeStorage
const queries = selectedQueries.length
? selectedQueries
: await findMaterializableQueries(sourceManager)
try {
const queries = await findQueries({
sourceManager,
selected: selectedQueries,
const materializations: MaterializationInfo[] = []
for (let index = 0; index < queries.length; index++) {
const queryPath = queries[index]!
onStartQuery?.({
index,
count: queries.length,
query: queryPath,
})
const materializationInfo = await materializeQuery({
storage,
queryPath,
onDebug,
force,
})
materializations.push(materializationInfo)
onMaterialized?.(materializationInfo)
}
for (const [index, query] of queries.entries()) {
const startTime = performance.now()
onStartQuery?.({ count: queries.length, index: index + 1, query })
const materialize = await storage.writeParquet({
onDebug,
queryPath: query,
params: {},
batchSize: BATCH_SIZE,
})
const endTime = performance.now()
info.push(
buildInfo({
startTime,
endTime,
query,
filePath: materialize.filePath,
queryRows: materialize.queryRows,
}),
)
}
result.successful = true
} catch (error) {
if (process.env.NODE_ENV !== 'test') {
console.error(error)
}
result.successful = false
} finally {
const endTotalTime = performance.now()
const totalTime = parseTime({ start: startTotalTime, end: endTotalTime })
result.totalTime = totalTime
result.queriesInfo = info
const endtotalTime = performance.now()
return {
totalTime: endtotalTime - startTotalTime,
materializations,
}
return result
}
import SourceManager from '@/manager'
import DiskDriver from '@/materialize/drivers/disk/DiskDriver'
import DummyDriver from '@/materialize/drivers/dummy/DummyDriver'
export { default as findAndMaterializeQueries } from './findAndMaterializeQueries'
export {
default as findAndMaterializeQueries,
type MaterializationInfo,
} from './findAndMaterializeQueries'
export const STORAGE_TYPES = { disk: 'disk' }

@@ -17,3 +21,3 @@ export type StorageType = keyof typeof STORAGE_TYPES

}
export type StorageKlass = typeof DiskDriver
export type StorageKlass = typeof DiskDriver | typeof DummyDriver

@@ -20,0 +24,0 @@ export function getDriverKlass({

@@ -65,12 +65,10 @@ import { BaseConnector, ConnectorOptions } from '@/baseConnector'

compiledQuery: CompiledQuery,
options: BatchedQueryOptions,
{ onBatch }: BatchedQueryOptions,
): Promise<void> {
return Promise.reject(
new Error(`
batchQuery not implemented for TestConnector
Mock it in your tests
CompiledQuery: ${JSON.stringify(compiledQuery)}
batchOptions: ${JSON.stringify(options)}
`),
)
const result = await this.runQuery(compiledQuery)
onBatch({
rows: result.toArray(),
fields: result.fields,
lastBatch: true,
})
}

@@ -77,0 +75,0 @@

@@ -5,4 +5,2 @@ import { Field, QueryResultArray } from '@latitude-data/query_result'

export type { GetUrlParams } from './materialize/drivers/StorageDriver'
export enum ConnectorType {

@@ -9,0 +7,0 @@ Athena = 'athena',

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc