Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lento

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lento

Streaming client for Presto HTTP protocol v1

  • 0.0.1
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
327
decreased by-70.8%
Maintainers
1
Weekly downloads
 
Created
Source

lento 1

Streaming Node.js client for Presto, the "Distributed SQL Query Engine for Big Data".

npm status node Travis build status AppVeyor build status Dependency status

features

example

Convert a Presto table to CSV:

const lento = require('lento')
const csvWriter = require('csv-write-stream')
const stdout = require('stdout-stream')
const pump = require('pump')

const client = lento({
  hostname: 'example',
  port: 8080,
  catalog: 'hive',
  schema: 'test'
})

const source = client.createRowStream('SELECT * FROM events')
const transform = csvWriter()

pump(source, transform, stdout, (err) => {
  if (err) throw err
})

If the destination streams close or error, the source stream is destroyed (courtesy of pump) and the Presto query canceled.

API

lento([options])

Options:

  • hostname: string, default is localhost
  • port: number, default 8080
  • protocol: string, one of http: (default) or https:
  • user: string, default none. Sent as X-Presto-User header.
  • timezone: string, for example UTC, default none. Sent as X-Presto-Time-Zone header.

You can specify a catalog and schema to avoid writing fully-qualified table names in queries:

  • catalog: string, for example hive, default none. Sent as X-Presto-Catalog header.
  • schema: string, for example logs, default none. Sent as X-Presto-Schema header.

Control delays and retries:

  • pollInterval: number (milliseconds) or string with unit (e.g. 1s, 500ms). How long to wait for server-side state changes, before sending another HTTP request. Default is 1 second.
  • maxRetries: number of retries if Presto responds with HTTP 503. Default is 10 (with delays between 50 and 100 ms).

createPageStream(sql[, options])

Execute a query. Returns a readable stream that yields pages of rows.

const through2 = require('through2')

client
  .createPageStream('SELECT * FROM events')
  .pipe(through2.obj((page, enc, next) => {
    for (let row of page) {
      // ..
    }

    // Process next page
    next()
  }))

Options:

  • pageSize: number, default 500
  • highWaterMark: number, default 0
  • rowFormat: string, one of object (default) or array.

The pageSize specifies the maximum number of rows per page. Presto may return less per page. If Presto returns more rows than pageSize, the surplus is buffered and the stream will not make another HTTP request to Presto until fully drained. Note that Presto retains finished queries for 15 minutes. If pageSize is <= 0 the stream emits pages as returned by Presto, without slicing them up.

The highWaterMark specifies how many pages to fetch preemptively. The maximum numbers of rows held in memory is approximately (highWaterMark || 1) * pageSize, plus any surplus from the last HTTP request. Because Presto can return thousands of rows per request, the default highWaterMark is 0 so that we don't preemptively fetch and only hold the number of rows contained in the last HTTP request.

If you care more about throughput, you can opt to increase highWaterMark. This can also help if you notice Presto going into BLOCKING state because you're not reading fast enough. Additionally you can increase pageSize if processing time is minimal or if you don't mind blocking the rest of your app while processing a page.

For tuning the Presto side of things, use set().

cancelation

Destroying the stream will cancel the query with a DELETE request to Presto, unless no requests were made yet. If the initial request is in flight the stream will wait for a response, which contains the query id that can then be cancelled. If cancelation fails the stream will emit an error (open an issue if you think it shouldn't).

events

Besides the usual Node.js stream events, the stream emits:

  • id: emitted once, with query id once known
  • info: emitted once, with fully qualified info URL
  • columns: emitted once, with raw data as returned by Presto
  • stats: emitted for each HTTP response, with raw data.

createRowStream(sql[, options])

Execute a query. Returns a readable stream that yields rows. Options:

  • highWaterMark: number, default 16
  • rowFormat: string, one of object (default) or array.

query(sql[, options], callback)

Same as above but non-streaming, meant for simple queries. The callback function will receive an error if any and an array of rows.

client.query('DESCRIBE events', (err, rows) => {
  // ..
})

I'll take a PR for Promise support.

setTimeout(duration[, options], callback)

Set query_max_run_time for subsequent queries. If those take longer than duration, Presto will return an error with code EXCEEDED_TIME_LIMIT (see errors). The duration can be a number (in milliseconds) or a string parsed by Presto with the format <value><unit> - for example 5d or 100ms. Options are passed to query() via set(), as this method is a shortcut for:

client.set('query_max_run_time', duration[, options], callback)

resetTimeout([options, ]callback)

Reset query_max_run_time to Presto's default. Options are passed to query() via reset().

set(key, value[, options], callback)

Set a session property. Executes SET SESSION .. to prevalidate input, then sets X-Presto-Session header on subsequent queries. Value can be a boolean, number or string. Options are passed to query().

client.set('redistribute_writes', false, (err) => {
  if (err) return console.error('failed to set', err)

  // Subsequent queries now use redistribute_writes=false
})

reset(key[, options], callback)

Reset a session property to its default value. Options are passed to query().

client.reset('redistribute_writes', (err) => {
  if (err) return console.error('failed to reset', err)

  // Subsequent queries now use the default value of redistribute_writes
})

session([options, ]callback)

Converts the result of SHOW SESSION into a tree, coerces boolean and integer values to JSON types. Options are passed to query(). Callback signature is (err, session). Partial example of a session:

{
  "execution_policy": {
    "key": "execution_policy",
    "value": "all-at-once",
    "default": "all-at-once",
    "type": "varchar",
    "description": "Policy used for scheduling query tasks"
  },
  "hash_partition_count": {
    "key": "hash_partition_count",
    "value": 100,
    "default": 100,
    "type": "integer",
    "description": "Number of partitions for distributed joins and aggregations"
  },
  "hive": {
    "bucket_execution_enabled": {
      "key": "hive.bucket_execution_enabled",
      "value": true,
      "default": true,
      "type": "boolean",
      "description": "Enable bucket-aware execution: only use a single worker per bucket"
    }
  }
}

See Presto Properties for a detailed description of (some of) the properties.

errors

Errors are enriched with a code and type (string) and optionally info. For example:

client.setTimeLimit('1ms', (err) => {
  if (err) return console.error(err.code, err)

  client.query('SELECT * FROM big_table', (err) => {
    console.error(err.message) // 'EXCEEDED_TIME_LIMIT: Query exceeded maximum time limit of 1.00ms'
    console.error(err.code) // 'EXCEEDED_TIME_LIMIT'
    console.error(err.type) // 'INSUFFICIENT_RESOURCES'
  })
})

install

With npm do:

npm install lento

license

MIT © Vincent Weevers


1 Because streams are about slowing down!

Keywords

FAQs

Package last updated on 05 Feb 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc