Security News
Supply Chain Attack Detected in Solana's web3.js Library
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
Streaming Node.js client for Presto, the "Distributed SQL Query Engine for Big Data".
stream.destroy()
Convert a Presto table to CSV:
const lento = require('lento')
const csvWriter = require('csv-write-stream')
const stdout = require('stdout-stream')
const pipeline = require('readable-stream').pipeline
const client = lento({
hostname: 'example',
port: 8080,
catalog: 'hive',
schema: 'test',
user: 'test'
})
const source = client.createRowStream('SELECT * FROM events')
const transform = csvWriter()
pipeline(source, transform, stdout, (err) => {
if (err) throw err
})
If the destination streams close or error, the source stream is destroyed (courtesy of pipeline
) and the Presto query canceled.
lento([options])
createPageStream(sql[, options])
createRowStream(sql[, options])
query(sql[, options], callback)
setTimeout(duration[, options], callback)
resetTimeout([options, ]callback)
set(key, value[, options], callback)
reset(key[, options], callback)
session([options, ]callback)
lento([options])
Options:
hostname
: string, default is localhost
port
: number, default 8080protocol
: string, one of http:
(default) or https:
user
: string, default none. Sent as X-Presto-User
header.timezone
: string, for example UTC
, default none. Sent as X-Presto-Time-Zone
header.headers
: object containing custom headers to set on every request such as Authorization
(case-insensitive). Headers specified here take precedence over other options that set headers.parametricDatetime
: boolean, default false. Opt-in to datetime types with variable precision, for example timestamp(6)
. When not set, datetime types are returned with a precision of 3.You can specify a catalog and schema to avoid writing fully-qualified table names in queries:
catalog
: string, for example hive
, default none. Sent as X-Presto-Catalog
header.schema
: string, for example logs
, default none. Sent as X-Presto-Schema
header.Control delays and retries:
pollInterval
: number (milliseconds) or string with unit (e.g. 1s
, 500ms
). How long to wait for server-side state changes, before sending another HTTP request. Default is 1 second.maxRetries
: number of retries if Presto responds with HTTP 503 or other failures. Default is 10.socketTimeout
: number (milliseconds) or string with unit. When to timeout after inactivity on the socket. Default is 2 minutes.createPageStream(sql[, options])
Execute a query. Takes sql
as a string or Buffer and returns a readable stream that yields pages of rows.
const through2 = require('through2')
client
.createPageStream('SELECT * FROM events')
.pipe(through2.obj((page, enc, next) => {
for (let row of page) {
// ..
}
// Process next page
next()
}))
Options:
pageSize
: number, default 500highWaterMark
: number, default 0rowFormat
: string, one of object
(default) or array
deserialize
: boolean, default trueheaders
: custom request headers. Merged with headers that were set in the constructor, if any.The pageSize
specifies the maximum number of rows per page. Presto may return less per page. If Presto returns more rows than pageSize
, the surplus is buffered and the stream will not make another HTTP request to Presto until fully drained. Note that if the (remainder of) rows fit in Presto's buffers, Presto will not block (until another HTTP request is made) but instead go into the FINISHED
state after which you have 15 minutes (by default) to fetch the remaining results. If pageSize
is <= 0 the stream emits pages as returned by Presto, without slicing them up.
The highWaterMark
specifies how many pages to fetch preemptively. The maximum numbers of rows held in memory is approximately (highWaterMark || 1) * pageSize
, plus any surplus from the last HTTP request. Because Presto can return thousands of rows per request, the default highWaterMark
is 0 so that we don't preemptively fetch and only hold the number of rows contained in the last HTTP request.
If you care more about throughput, you can opt to increase highWaterMark
. Additionally you can increase pageSize
if processing time is minimal or if you don't mind blocking the rest of your app while processing a page.
For tuning the Presto side of things, use set()
.
Destroying the stream will cancel the query with a DELETE
request to Presto, unless no requests were made yet. If the initial request is in flight the stream will wait for a response, which contains the query id that can then be cancelled. If cancelation fails the stream will emit an error
(open an issue if you think it shouldn't). Regardless of success, the streams emits close
as the last event.
Besides the usual Node.js stream events, the stream emits:
id
: emitted with query id once knowninfo
: emitted with fully qualified info URLcolumns
: emitted with an array of column metadata as returned by Prestostats
: emitted for each HTTP response, with raw datastate_change
: emitted with a string state (e.g. RUNNING
, FINISHED
) when Presto state changes. Should not be relied upon, only meant for debugging.raw_page_size
: emitted with a count of rows (e.g. 21829) for each HTTP response that has rows. For debugging.Note The id
, info
and columns
events may be emitted more than once due to retries. Subsequent stats
and state_change
events pertain to that retried query as well.
createRowStream(sql[, options])
Execute a query. Takes sql
as a string or Buffer and returns a readable stream that yields rows. Options:
highWaterMark
: number, default 16rowFormat
: string, one of object
(default) or array
deserialize
: boolean, default trueheaders
: custom request headers. Merged with headers that were set in the constructor, if any.query(sql[, options][, callback])
Same as above but non-streaming, meant for simple queries. The callback
function will receive an error if any and an array of rows. If no callback is provided, a promise is returned.
client.query('DESCRIBE events', (err, rows) => {
// ..
})
With async/await:
const rows = await client.query('DESCRIBE events')
setTimeout(duration[, options][, callback])
Set query_max_run_time
for subsequent queries. If those take longer than duration
, Presto will return an error with code EXCEEDED_TIME_LIMIT
(see errors). The duration
can be a number (in milliseconds) or a string parsed by Presto with the format <value><unit>
- for example 5d
or 100ms
. Options are passed to query()
via set()
, as this method is a shortcut for:
client.set('query_max_run_time', duration[, options], callback)
If no callback is provided, a promise is returned.
resetTimeout([options][, callback])
Reset query_max_run_time
to Presto's default. Options are passed to query()
via reset()
. If no callback is provided, a promise is returned.
set(key, value[, options][, callback])
Set a session property. Executes SET SESSION ..
to prevalidate input, then sets X-Presto-Session
header on subsequent queries. Value can be a boolean, number or string. Options are passed to query()
. If no callback is provided, a promise is returned.
client.set('redistribute_writes', false, (err) => {
if (err) return console.error('failed to set', err)
// Subsequent queries now use redistribute_writes=false
})
reset(key[, options][, callback])
Reset a session property to its default value. Options are passed to query()
. If no callback is provided, a promise is returned.
client.reset('redistribute_writes', (err) => {
if (err) return console.error('failed to reset', err)
// Subsequent queries now use the default value of redistribute_writes
})
session([options, ][callback])
Converts the result of SHOW SESSION
into a tree, coerces boolean and integer values to JSON types. Options are passed to query()
. Callback signature is (err, session)
. If no callback is provided, a promise is returned.
Partial example of a session
:
{
"execution_policy": {
"key": "execution_policy",
"value": "all-at-once",
"default": "all-at-once",
"type": "varchar",
"description": "Policy used for scheduling query tasks"
},
"hash_partition_count": {
"key": "hash_partition_count",
"value": 100,
"default": 100,
"type": "integer",
"description": "Number of partitions for distributed joins and aggregations"
},
"hive": {
"bucket_execution_enabled": {
"key": "hive.bucket_execution_enabled",
"value": true,
"default": true,
"type": "boolean",
"description": "Enable bucket-aware execution: only use a single worker per bucket"
}
}
}
See Presto Properties for a detailed description of (some of) the properties.
Errors are enriched with a code
and type
(string) and optionally info
. For example:
client.setTimeout('1ms', (err) => {
if (err) throw err
client.query('SELECT * FROM big_table', (err) => {
console.error(err.message) // 'EXCEEDED_TIME_LIMIT: Query exceeded maximum time limit of 1.00ms'
console.error(err.code) // 'EXCEEDED_TIME_LIMIT'
console.error(err.type) // 'INSUFFICIENT_RESOURCES'
})
})
If Presto responds to an HTTP request with 503 or if the TCP connection is refused, lento
retries the request with an exponential delay between 1 and 10 seconds.
In addition, a query (consisting of one or more HTTP requests) will be retried if Presto returns an error like SERVER_STARTING_UP
or HIVE_METASTORE_ERROR
, but only if no data was received yet, to avoid emitting duplicates.
I wish retries could be handled at a higher level, but as it stands, lento
is both a low-level HTTP client and a streaming client, so retries have to be handled here. This may change in the future.
Enable debug output with DEBUG=lento
. Mostly logs HTTP requests and retries, no usernames, SQL or other potentially sensitive data. Beware, it can log hundreds of lines per query.
With npm do:
npm install lento
MIT © 2018-present Vincent Weevers
1 Because streams are about slowing down! ↩
FAQs
Streaming client for Presto HTTP protocol v1
The npm package lento receives a total of 524 weekly downloads. As such, lento popularity was classified as not popular.
We found that lento demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
Research
Security News
A malicious npm package targets Solana developers, rerouting funds in 2% of transactions to a hardcoded address.
Security News
Research
Socket researchers have discovered malicious npm packages targeting crypto developers, stealing credentials and wallet data using spyware delivered through typosquats of popular cryptographic libraries.