athena-client
Advanced tools
Comparing version 2.0.2 to 2.1.0
@@ -0,4 +1,5 @@ | ||
/// <reference types="node" /> | ||
import { Athena } from 'aws-sdk'; | ||
import { Transform } from 'stream'; | ||
import { AthenaRequest, AthenaRequestConfig } from './request'; | ||
import { AthenaStream, AthenaStreamConfig } from './stream'; | ||
export interface AthenaExecutionResult<T> { | ||
@@ -10,5 +11,5 @@ records: T[]; | ||
toPromise: () => Promise<AthenaExecutionResult<T>>; | ||
toStream: () => AthenaStream<T>; | ||
toStream: () => Transform; | ||
} | ||
export interface AthenaClientConfig extends AthenaRequestConfig, AthenaStreamConfig { | ||
export interface AthenaClientConfig extends AthenaRequestConfig { | ||
pollingInterval?: number; | ||
@@ -26,3 +27,3 @@ queryTimeout?: number; | ||
execute<T>(query: string, callback: (err?: Error, result?: AthenaExecutionResult<T>) => void): void; | ||
private _execute<T>(query, athenaStream, config); | ||
private _execute(query, csvTransform, config); | ||
private canStartQuery(); | ||
@@ -29,0 +30,0 @@ private startQuery(); |
@@ -11,5 +11,4 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const byline_1 = require("byline"); | ||
const csv = require("csv-parser"); | ||
const timers_1 = require("timers"); | ||
const stream_1 = require("./stream"); | ||
const util = require("./util"); | ||
@@ -28,4 +27,4 @@ const defaultPollingInterval = 1000; | ||
const currentConfig = Object.assign({}, this.config); | ||
const athenaStream = new stream_1.AthenaStream(currentConfig); | ||
this._execute(query, athenaStream, currentConfig); | ||
const csvTransform = new csv(); | ||
this._execute(query, csvTransform, currentConfig); | ||
if (callback !== undefined) { | ||
@@ -35,9 +34,9 @@ let isEnd = false; | ||
let queryExecution; | ||
athenaStream.on('data', (record) => { | ||
csvTransform.on('data', (record) => { | ||
records.push(record); | ||
}); | ||
athenaStream.on('query_end', (q) => { | ||
csvTransform.on('query_end', (q) => { | ||
queryExecution = q; | ||
}); | ||
athenaStream.on('end', (record) => { | ||
csvTransform.on('end', (record) => { | ||
if (isEnd) { | ||
@@ -52,3 +51,3 @@ return; | ||
}); | ||
athenaStream.on('error', (err) => { | ||
csvTransform.on('error', (err) => { | ||
isEnd = true; | ||
@@ -65,9 +64,9 @@ callback(err); | ||
let queryExecution; | ||
athenaStream.on('data', (record) => { | ||
csvTransform.on('data', (record) => { | ||
records.push(record); | ||
}); | ||
athenaStream.on('query_end', (q) => { | ||
csvTransform.on('query_end', (q) => { | ||
queryExecution = q; | ||
}); | ||
athenaStream.on('end', (record) => { | ||
csvTransform.on('end', (record) => { | ||
const result = { | ||
@@ -79,3 +78,3 @@ records, | ||
}); | ||
athenaStream.on('error', (err) => { | ||
csvTransform.on('error', (err) => { | ||
return reject(err); | ||
@@ -86,3 +85,3 @@ }); | ||
toStream: () => { | ||
return athenaStream; | ||
return csvTransform; | ||
}, | ||
@@ -92,3 +91,3 @@ }; | ||
} | ||
_execute(query, athenaStream, config) { | ||
_execute(query, csvTransform, config) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
@@ -116,3 +115,3 @@ while (!this.canStartQuery()) { | ||
queryExecution = yield this.request.getQueryExecution(queryId, config); | ||
athenaStream.emit('query_end', queryExecution); | ||
csvTransform.emit('query_end', queryExecution); | ||
this.endQuery(); | ||
@@ -122,4 +121,3 @@ } | ||
this.endQuery(); | ||
athenaStream.emit('error', err); | ||
athenaStream.end(new Buffer('')); | ||
csvTransform.emit('error', err); | ||
return; | ||
@@ -133,7 +131,6 @@ } | ||
const resultsStream = this.request.getResultsStream(queryExecution.ResultConfiguration.OutputLocation); | ||
resultsStream.pipe(new byline_1.LineStream()).pipe(athenaStream); | ||
resultsStream.pipe(csvTransform); | ||
} | ||
catch (err) { | ||
athenaStream.emit('error', err); | ||
athenaStream.end(new Buffer('')); | ||
csvTransform.emit('error', err); | ||
return; | ||
@@ -140,0 +137,0 @@ } |
export declare function sleep(ms: number): Promise<void>; | ||
export declare function getBytes(text: string): number; |
@@ -12,21 +12,2 @@ "use strict"; | ||
exports.sleep = sleep; | ||
function getBytes(text) { | ||
text = text.toUpperCase(); | ||
const arr = text.match(/\d+/); | ||
if (!arr || !arr[0]) { | ||
throw new Error(`invalid input ${text}`); | ||
} | ||
let num = parseInt(arr[0], 10); | ||
if (text.indexOf('G') !== -1) { | ||
num = num * 1024 * 1024 * 1024; | ||
} | ||
else if (text.indexOf('M') !== -1) { | ||
num = num * 1024 * 1024; | ||
} | ||
else if (text.indexOf('K') !== -1) { | ||
num = num * 1024; | ||
} | ||
return num; | ||
} | ||
exports.getBytes = getBytes; | ||
//# sourceMappingURL=util.js.map |
{ | ||
"name": "athena-client", | ||
"version": "2.0.2", | ||
"version": "2.1.0", | ||
"description": "a nodejs simple aws athena client", | ||
@@ -63,4 +63,5 @@ "homepage": "https://github.com/KoteiIto/node-athena", | ||
"byline": "^5.0.0", | ||
"csv-parser": "^1.12.0", | ||
"typescript-collections": "^1.2.5" | ||
} | ||
} |
@@ -79,3 +79,2 @@ [![Build Status](https://travis-ci.org/KoteiIto/node-athena.svg?branch=master)](https://travis-ci.org/KoteiIto/node-athena) | ||
| concurrentExecMax | 5 | The number of cuncurrent execution of query max. It should be set `smaller than AWS Service limit`(default is 5) | | ||
| maxBufferSize | '128M' | Maximum buffer when retrieving query results | | ||
@@ -82,0 +81,0 @@ #### `awsConfig` object properties |
import { Athena } from 'aws-sdk' | ||
import { LineStream } from 'byline' | ||
import { Readable } from 'stream' | ||
import * as csv from 'csv-parser' | ||
import { Transform } from 'stream' | ||
import { setTimeout } from 'timers' | ||
import { AthenaRequest, AthenaRequestConfig } from './request' | ||
import { AthenaStream, AthenaStreamConfig } from './stream' | ||
import * as util from './util' | ||
@@ -16,8 +16,6 @@ | ||
toPromise: () => Promise<AthenaExecutionResult<T>> | ||
toStream: () => AthenaStream<T> | ||
toStream: () => Transform | ||
} | ||
export interface AthenaClientConfig | ||
extends AthenaRequestConfig, | ||
AthenaStreamConfig { | ||
export interface AthenaClientConfig extends AthenaRequestConfig { | ||
pollingInterval?: number | ||
@@ -56,4 +54,5 @@ queryTimeout?: number | ||
const currentConfig = { ...this.config } | ||
const athenaStream = new AthenaStream<T>(currentConfig) | ||
this._execute(query, athenaStream, currentConfig) | ||
const csvTransform = new csv() | ||
// const athenaStream = new AthenaStream<T>(currentConfig) | ||
this._execute(query, csvTransform, currentConfig) | ||
@@ -67,9 +66,9 @@ // Add event listener | ||
// Callback | ||
athenaStream.on('data', (record: T) => { | ||
csvTransform.on('data', (record: T) => { | ||
records.push(record) | ||
}) | ||
athenaStream.on('query_end', (q: Athena.QueryExecution) => { | ||
csvTransform.on('query_end', (q: Athena.QueryExecution) => { | ||
queryExecution = q | ||
}) | ||
athenaStream.on('end', (record: T) => { | ||
csvTransform.on('end', (record: T) => { | ||
if (isEnd) { | ||
@@ -84,3 +83,3 @@ return | ||
}) | ||
athenaStream.on('error', (err: Error) => { | ||
csvTransform.on('error', (err: Error) => { | ||
isEnd = true | ||
@@ -99,9 +98,9 @@ callback(err) | ||
// Add event listener for promise | ||
athenaStream.on('data', (record: T) => { | ||
csvTransform.on('data', (record: T) => { | ||
records.push(record) | ||
}) | ||
athenaStream.on('query_end', (q: Athena.QueryExecution) => { | ||
csvTransform.on('query_end', (q: Athena.QueryExecution) => { | ||
queryExecution = q | ||
}) | ||
athenaStream.on('end', (record: T) => { | ||
csvTransform.on('end', (record: T) => { | ||
const result: AthenaExecutionResult<T> = { | ||
@@ -113,3 +112,3 @@ records, | ||
}) | ||
athenaStream.on('error', (err: Error) => { | ||
csvTransform.on('error', (err: Error) => { | ||
return reject(err) | ||
@@ -120,4 +119,4 @@ }) | ||
// Stream | ||
toStream: (): AthenaStream<T> => { | ||
return athenaStream | ||
toStream: (): Transform => { | ||
return csvTransform | ||
}, | ||
@@ -128,5 +127,5 @@ } | ||
private async _execute<T>( | ||
private async _execute( | ||
query: string, | ||
athenaStream: AthenaStream<T>, | ||
csvTransform: Transform, | ||
config: AthenaClientConfig, | ||
@@ -170,3 +169,3 @@ ) { | ||
queryExecution = await this.request.getQueryExecution(queryId, config) | ||
athenaStream.emit('query_end', queryExecution) | ||
csvTransform.emit('query_end', queryExecution) | ||
@@ -176,4 +175,3 @@ this.endQuery() | ||
this.endQuery() | ||
athenaStream.emit('error', err) | ||
athenaStream.end(new Buffer('')) | ||
csvTransform.emit('error', err) | ||
return | ||
@@ -191,9 +189,8 @@ } | ||
} | ||
const resultsStream: Readable = this.request.getResultsStream( | ||
const resultsStream = this.request.getResultsStream( | ||
queryExecution.ResultConfiguration.OutputLocation, | ||
) | ||
resultsStream.pipe(new LineStream()).pipe(athenaStream) | ||
resultsStream.pipe(csvTransform) | ||
} catch (err) { | ||
athenaStream.emit('error', err) | ||
athenaStream.end(new Buffer('')) | ||
csvTransform.emit('error', err) | ||
return | ||
@@ -200,0 +197,0 @@ } |
@@ -10,18 +10,1 @@ import { setTimeout } from 'timers' | ||
} | ||
export function getBytes(text: string): number { | ||
text = text.toUpperCase() | ||
const arr = text.match(/\d+/) | ||
if (!arr || !arr[0]) { | ||
throw new Error(`invalid input ${text}`) | ||
} | ||
let num = parseInt(arr[0], 10) | ||
if (text.indexOf('G') !== -1) { | ||
num = num * 1024 * 1024 * 1024 | ||
} else if (text.indexOf('M') !== -1) { | ||
num = num * 1024 * 1024 | ||
} else if (text.indexOf('K') !== -1) { | ||
num = num * 1024 | ||
} | ||
return num | ||
} |
import * as assert from 'assert' | ||
import * as fs from 'fs' | ||
import { AthenaRequest } from '../lib/request' | ||
import { AthenaStream } from '../lib/stream' | ||
@@ -6,0 +5,0 @@ const config = { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
114306
4
46
2003
138
+ Addedcsv-parser@^1.12.0
+ Addedbuffer-alloc@1.2.0(transitive)
+ Addedbuffer-alloc-unsafe@1.1.0(transitive)
+ Addedbuffer-fill@1.0.0(transitive)
+ Addedbuffer-from@1.1.2(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addedcsv-parser@1.12.1(transitive)
+ Addedgenerate-function@1.1.0(transitive)
+ Addedgenerate-object-property@1.2.0(transitive)
+ Addedis-property@1.0.2(transitive)
+ Addedjson-stringify-safe@5.0.1(transitive)
+ Addedminimist@1.2.8(transitive)
+ Addedndjson@1.5.0(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsplit2@2.2.0(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedthrough2@2.0.5(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedxtend@4.0.2(transitive)