Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

presto-client

Package Overview
Dependencies
Maintainers
1
Versions
29
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

presto-client - npm Package Compare versions

Comparing version 0.13.0 to 1.0.0

docker-compose.yml

356

lib/presto-client/index.js

@@ -63,2 +63,4 @@ const { URL } = require('url') ;

this.timeout = typeof args.timeout !== 'undefined' ? args.timeout : 60;
if (args.ssl) {

@@ -128,9 +130,20 @@ this.protocol = 'https:';

var data = response_data;
if (response_code < 300 && (data[0] === '{' || data[0] === '[')) {
try { data = parser.parse(data); }
catch (x) {
/* ignore json parse error (and don't parse) for non-json content body */
var error = null;
// Only a 200 response code should be considered successful
if (response_code === 200) {
try {
if (data[0] !== '{' && data[0] !== '[') {
throw new Error();
}
data = parser.parse(data);
} catch (_) {
/* ignore actual error, and give a generic message */
error = new Error("execution error:could not parse response");
}
}
callback(null, response_code, data);
else
{
error = new Error("execution error:" + (data ? data : "invalid response code (" + response_code + ")"));
}
callback(error, response_code, data);
});

@@ -147,2 +160,4 @@ });

req.end();
return req;
};

@@ -202,2 +217,4 @@

throw {message: "callback function 'success' (or 'callback') not specified"};
if (!opts.error && !opts.callback)
throw {message: "callback function 'error' (or 'callback') not specified"};

@@ -227,165 +244,210 @@ var header = Object.assign({}, opts.headers);

var data_callback = opts.data;
var retry_callback = opts.retry;
var success_callback = opts.success || opts.callback;
var error_callback = opts.error || opts.callback;
var timeout_value = typeof opts.timeout !== 'undefined' ? opts.timeout : this.timeout;
var enable_verbose_state_callback = this.enableVerboseStateCallback || false;
var current_req = null;
var last_state = null;
var query_id = null;
var timed_out = false;
var req = { method: 'POST', path: '/v1/statement', headers: header, body: opts.query, user: opts.user };
client.request(req, function(err, code, data){
if (err || code !== 200 || (data && data.error)) {
if (error_callback) {
var message = "execution error" + (data && data.length > 0 ? ":" + data : "");
if (data && data.error && data.error.message)
message = data.error.message;
error_callback({message:message, error: (err || data.error), code: code});
}
var timeout = !timeout_value ? null : setTimeout(() => {
timed_out = true;
if (query_id) {
client.kill(query_id, function() {
// don't worry if this fails
});
}
if (current_req) {
current_req.destroy();
}
error_callback({message: "execution error:query timed out"});
}, timeout_value * 1000);
var clear_timeout = function(){
if (timeout) {
clearTimeout(timeout);
}
timeout = null;
}
/*
* 1st call:
{
"stats": {
"processedBytes": 0,
"processedRows": 0,
"wallTimeMillis": 0,
"cpuTimeMillis": 0,
"userTimeMillis": 0,
"state": "QUEUED",
"scheduled": false,
"nodes": 0,
"totalSplits": 0,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 0,
},
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/1",
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
};
* 2+ time
{
"stats": {
"rootStage": {
"subStages": [
{
"subStages": [],
"processedBytes": 83103149,
"processedRows": 2532704,
"wallTimeMillis": 20502,
"cpuTimeMillis": 3431,
"userTimeMillis": 3210,
"stageId": "1",
"state": "FINISHED",
"done": true,
"nodes": 3,
"totalSplits": 420,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 420
}
],
// same as substage
},
// same as substage
"state": "RUNNING",
},
"data": [ [ 1266352 ] ],
"columns": [ { "type": "bigint", "name": "cnt" } ],
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/2",
"partialCancelUri": "http://10.0.0.0:8080/v1/stage/20140120_032523_00000_32v8g.0",
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
}
* final state
{
"stats": {
// ....
"state": "FINISHED",
},
"columns": [ { "type": "bigint", "name": "cnt" } ],
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
}
*/
var first_request = true;
var fetch = function(uri_obj){
// we have already timed out and shown an error, so abort out
if (timed_out) {
return;
}
/*
var data = {
"stats": {
"processedBytes": 0,
"processedRows": 0,
"wallTimeMillis": 0,
"cpuTimeMillis": 0,
"userTimeMillis": 0,
"state": "QUEUED",
"scheduled": false,
"nodes": 0,
"totalSplits": 0,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 0,
},
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/1",
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
};
*/
if (!data.id || !data.nextUri || !data.infoUri) {
var error_message = null;
if (!data.id)
error_message = "query id missing in response for POST /v1/statement";
else if (!data.nextUri)
error_message = "nextUri missing in response for POST /v1/statement";
else if (!data.infoUri)
error_message = "infoUri missing in response for POST /v1/statement";
error_callback({message: error_message, data: data});
if (!first_request && cancel_checker && cancel_checker()) {
clear_timeout();
client.request({ method: 'DELETE', path: uri_obj }, function(error, code, data){
if (error || code !== 204) {
error_callback({message: "query fetch canceled, but Presto query cancel may fail", error: error, code: code});
} else {
error_callback({message: "query fetch canceled by operation"});
}
});
return;
}
var last_state = null;
var firstNextUri = data.nextUri; // TODO: check the cases without nextUri for /statement ?
var fetch_next = function(next_uri){
/*
* 1st time
{
"stats": {
"rootStage": {
"subStages": [
{
"subStages": [],
"processedBytes": 83103149,
"processedRows": 2532704,
"wallTimeMillis": 20502,
"cpuTimeMillis": 3431,
"userTimeMillis": 3210,
"stageId": "1",
"state": "FINISHED",
"done": true,
"nodes": 3,
"totalSplits": 420,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 420
}
],
// same as substage
},
// same as substage
"state": "RUNNING",
},
"data": [ [ 1266352 ] ],
"columns": [ { "type": "bigint", "name": "cnt" } ],
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/2",
"partialCancelUri": "http://10.0.0.0:8080/v1/stage/20140120_032523_00000_32v8g.0",
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
}
*/
/*
* 2nd time
{
"stats": {
// ....
"state": "FINISHED",
},
"columns": [ { "type": "bigint", "name": "cnt" } ],
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g",
"id": "20140120_032523_00000_32v8g"
}
*/
if (cancel_checker && cancel_checker()) {
client.request({ method: 'DELETE', path: next_uri }, function(error, code, data){
if (error || code !== 204) {
error_callback({message: "query fetch canceled, but Presto query cancel may fail", error: error, code: code});
} else {
error_callback({message: "query fetch canceled by operation"});
}
});
current_req = client.request(uri_obj, function(error, code, response){
// same as above, we have already timed out and shown an error, so abort out
if (timed_out) {
return;
}
client.request(next_uri, function(error, code, response){
if (error || response.error) {
error_callback(error || response.error);
return;
if ([502, 503, 504].includes(code)) {
setTimeout(function(){
fetch(uri_obj);
}, Math.floor(Math.random() * 51) + 50); // random in 50-100ms
if (retry_callback) {
retry_callback();
}
return;
}
if (state_callback && (last_state !== response.stats.state || enable_verbose_state_callback)) {
state_callback(null, response.id, response.stats);
last_state = response.stats.state;
if (error || (response && response.error)) {
clear_timeout();
if (first_request) {
var message = "execution error" + (response && response.length > 0 ? ":" + response : "");
if (response && response.error && response.error.message)
message = response.error.message;
error_callback({message, error: (error || response.error), code});
} else {
error_callback(error || response.error);
}
return;
}
if (columns_callback && response.columns && !columns) {
columns = response.columns;
columns_callback(null, columns);
}
if (first_request && (!response.id || !response.nextUri || !response.infoUri)) {
clear_timeout();
var error_message = null;
if (!response.id)
error_message = "query id missing in response for POST /v1/statement";
else if (!response.nextUri)
error_message = "nextUri missing in response for POST /v1/statement";
else if (!response.infoUri)
error_message = "infoUri missing in response for POST /v1/statement";
error_callback({message: error_message, data: response});
return;
}
var fetchNextWithTimeout = function(uri, checkInterval) {
setTimeout(function(){ fetch_next(uri); }, checkInterval);
};
first_request = false;
query_id = response.id || query_id;
/* presto-main/src/main/java/com/facebook/presto/execution/QueryState.java
* QUEUED, PLANNING, STARTING, RUNNING, FINISHED, CANCELED, FAILED
*/
if (response.stats.state === 'QUEUED'
|| response.stats.state === 'PLANNING'
|| response.stats.state === 'STARTING'
|| response.stats.state === 'RUNNING' && !response.data) {
fetchNextWithTimeout(response.nextUri, client.checkInterval);
return;
}
if (state_callback && (last_state !== response.stats.state || enable_verbose_state_callback)) {
state_callback(null, response.id, response.stats);
last_state = response.stats.state;
}
if (data_callback && response.data) {
data_callback(null, response.data, response.columns, response.stats);
}
if (columns_callback && response.columns && !columns) {
columns = response.columns;
columns_callback(null, columns);
}
if (response.nextUri) {
fetchNextWithTimeout(response.nextUri, client.checkInterval);
return;
}
var fetchNextWithTimeout = function(uri, checkInterval) {
setTimeout(function(){ fetch(uri); }, checkInterval);
};
var finishedStats = response.stats;
/* presto-main/src/main/java/com/facebook/presto/execution/QueryState.java
* QUEUED, PLANNING, STARTING, RUNNING, FINISHED, CANCELED, FAILED
*/
if (response.stats.state === 'QUEUED'
|| response.stats.state === 'PLANNING'
|| response.stats.state === 'STARTING'
|| response.stats.state === 'RUNNING' && !response.data) {
fetchNextWithTimeout(response.nextUri, client.checkInterval);
return;
}
if (fetch_info && response.infoUri) {
client.request(response.infoUri, function(error, code, response){
success_callback(null, finishedStats, response);
});
}
else {
success_callback(null, finishedStats);
}
});
};
fetch_next(firstNextUri);
});
if (data_callback && response.data) {
data_callback(null, response.data, response.columns, response.stats);
}
if (response.nextUri) {
fetchNextWithTimeout(response.nextUri, client.checkInterval);
return;
}
clear_timeout();
var finishedStats = response.stats;
if (fetch_info && response.infoUri) {
client.request(response.infoUri, function(error, code, response){
success_callback(null, finishedStats, response);
});
}
else {
success_callback(null, finishedStats);
}
});
};
fetch({ method: 'POST', path: '/v1/statement', headers: header, body: opts.query, user: opts.user });
};
{
"name": "presto-client",
"version": "0.13.0",
"version": "1.0.0",
"description": "Distributed query engine Presto/Trino client library for node.js",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "jest"
},

@@ -20,3 +20,6 @@ "repository": {

"url": "https://github.com/tagomoris/presto-client-node/issues"
},
"devDependencies": {
"jest": "29.5.0"
}
}

@@ -78,2 +78,4 @@ # presto-client-node

* Available options: presto, trino (default: presto)
* timeout [integer :optional]
* The seconds that a query is allowed to run before it starts returning results, defaults to 60 seconds. Set to `null` or `0` to disable.

@@ -105,2 +107,3 @@ return value: client instance object

* additional headers to be included in the request, check the full list for [Trino](https://trino.io/docs/current/develop/client-protocol.html#client-request-headers) and [Presto](https://prestodb.io/docs/current/develop/client-protocol.html#client-request-headers) engines
* timeout [integer :optional]
* cancel [function() :optional]

@@ -129,2 +132,4 @@ * client stops fetch of query results if this callback returns `true`

* runtime statistics object of query
* retry [function() :optional]
* called if a request was retried due to server returning `502`, `503`, or `504`
* success [function(error, stats, info) :optional]

@@ -137,2 +142,3 @@ * called once when all results are fetched (default: value of `callback`)

* one of `callback` or `success` must be specified
* one of `callback` or `error` must be specified

@@ -187,4 +193,29 @@ Callbacks order (success query) is: columns -> data (-> data xN) -> success (or callback)

## Development
When working on this library, you can use the included docker-compose.yml file to spin up a Presto and Trino DBs, which can be done with:
```
docker compose up
```
Once you see the following messages, you'll be able connect to Presto at `http://localhost:18080` and Trino at `http://localhost:18081`, without username/password:
```
presto-client-node-trino-1 | 2023-06-02T08:12:37.760Z INFO main io.trino.server.Server ======== SERVER STARTED ========
presto-client-node-presto-1 | 2023-06-02T08:13:29.760Z INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========
```
After making a change, you can run the available test suite by doing:
```
npm run test
```
## Versions
* 1.0.0:
* add test cases and CI setting, new options and others, thanks to the many contributions from Matthew Peveler (@MasterOdin)
* add "timeout" option to retry requests for server errors
* change "error" callback (or "callback" as fallback path) to be specified when "success" is used
* 0.13.0:

@@ -191,0 +222,0 @@ * add "headers" option on execute() to specify any request headers

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc