presto-client
Advanced tools
Comparing version 0.13.0 to 1.0.0
@@ -63,2 +63,4 @@ const { URL } = require('url') ; | ||
this.timeout = typeof args.timeout !== 'undefined' ? args.timeout : 60; | ||
if (args.ssl) { | ||
@@ -128,9 +130,20 @@ this.protocol = 'https:'; | ||
var data = response_data; | ||
if (response_code < 300 && (data[0] === '{' || data[0] === '[')) { | ||
try { data = parser.parse(data); } | ||
catch (x) { | ||
/* ignore json parse error (and don't parse) for non-json content body */ | ||
var error = null; | ||
// Only a 200 response code should be considered successful | ||
if (response_code === 200) { | ||
try { | ||
if (data[0] !== '{' && data[0] !== '[') { | ||
throw new Error(); | ||
} | ||
data = parser.parse(data); | ||
} catch (_) { | ||
/* ignore actual error, and give a generic message */ | ||
error = new Error("execution error:could not parse response"); | ||
} | ||
} | ||
callback(null, response_code, data); | ||
else | ||
{ | ||
error = new Error("execution error:" + (data ? data : "invalid response code (" + response_code + ")")); | ||
} | ||
callback(error, response_code, data); | ||
}); | ||
@@ -147,2 +160,4 @@ }); | ||
req.end(); | ||
return req; | ||
}; | ||
@@ -202,2 +217,4 @@ | ||
throw {message: "callback function 'success' (or 'callback') not specified"}; | ||
if (!opts.error && !opts.callback) | ||
throw {message: "callback function 'error' (or 'callback') not specified"}; | ||
@@ -227,165 +244,210 @@ var header = Object.assign({}, opts.headers); | ||
var data_callback = opts.data; | ||
var retry_callback = opts.retry; | ||
var success_callback = opts.success || opts.callback; | ||
var error_callback = opts.error || opts.callback; | ||
var timeout_value = typeof opts.timeout !== 'undefined' ? opts.timeout : this.timeout; | ||
var enable_verbose_state_callback = this.enableVerboseStateCallback || false; | ||
var current_req = null; | ||
var last_state = null; | ||
var query_id = null; | ||
var timed_out = false; | ||
var req = { method: 'POST', path: '/v1/statement', headers: header, body: opts.query, user: opts.user }; | ||
client.request(req, function(err, code, data){ | ||
if (err || code !== 200 || (data && data.error)) { | ||
if (error_callback) { | ||
var message = "execution error" + (data && data.length > 0 ? ":" + data : ""); | ||
if (data && data.error && data.error.message) | ||
message = data.error.message; | ||
error_callback({message:message, error: (err || data.error), code: code}); | ||
} | ||
var timeout = !timeout_value ? null : setTimeout(() => { | ||
timed_out = true; | ||
if (query_id) { | ||
client.kill(query_id, function() { | ||
// don't worry if this fails | ||
}); | ||
} | ||
if (current_req) { | ||
current_req.destroy(); | ||
} | ||
error_callback({message: "execution error:query timed out"}); | ||
}, timeout_value * 1000); | ||
var clear_timeout = function(){ | ||
if (timeout) { | ||
clearTimeout(timeout); | ||
} | ||
timeout = null; | ||
} | ||
/* | ||
* 1st call: | ||
{ | ||
"stats": { | ||
"processedBytes": 0, | ||
"processedRows": 0, | ||
"wallTimeMillis": 0, | ||
"cpuTimeMillis": 0, | ||
"userTimeMillis": 0, | ||
"state": "QUEUED", | ||
"scheduled": false, | ||
"nodes": 0, | ||
"totalSplits": 0, | ||
"queuedSplits": 0, | ||
"runningSplits": 0, | ||
"completedSplits": 0, | ||
}, | ||
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/1", | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
}; | ||
* 2+ time | ||
{ | ||
"stats": { | ||
"rootStage": { | ||
"subStages": [ | ||
{ | ||
"subStages": [], | ||
"processedBytes": 83103149, | ||
"processedRows": 2532704, | ||
"wallTimeMillis": 20502, | ||
"cpuTimeMillis": 3431, | ||
"userTimeMillis": 3210, | ||
"stageId": "1", | ||
"state": "FINISHED", | ||
"done": true, | ||
"nodes": 3, | ||
"totalSplits": 420, | ||
"queuedSplits": 0, | ||
"runningSplits": 0, | ||
"completedSplits": 420 | ||
} | ||
], | ||
// same as substage | ||
}, | ||
// same as substage | ||
"state": "RUNNING", | ||
}, | ||
"data": [ [ 1266352 ] ], | ||
"columns": [ { "type": "bigint", "name": "cnt" } ], | ||
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/2", | ||
"partialCancelUri": "http://10.0.0.0:8080/v1/stage/20140120_032523_00000_32v8g.0", | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
} | ||
* final state | ||
{ | ||
"stats": { | ||
// .... | ||
"state": "FINISHED", | ||
}, | ||
"columns": [ { "type": "bigint", "name": "cnt" } ], | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
} | ||
*/ | ||
var first_request = true; | ||
var fetch = function(uri_obj){ | ||
// we have already timed out and shown an error, so abort out | ||
if (timed_out) { | ||
return; | ||
} | ||
/* | ||
var data = { | ||
"stats": { | ||
"processedBytes": 0, | ||
"processedRows": 0, | ||
"wallTimeMillis": 0, | ||
"cpuTimeMillis": 0, | ||
"userTimeMillis": 0, | ||
"state": "QUEUED", | ||
"scheduled": false, | ||
"nodes": 0, | ||
"totalSplits": 0, | ||
"queuedSplits": 0, | ||
"runningSplits": 0, | ||
"completedSplits": 0, | ||
}, | ||
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/1", | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
}; | ||
*/ | ||
if (!data.id || !data.nextUri || !data.infoUri) { | ||
var error_message = null; | ||
if (!data.id) | ||
error_message = "query id missing in response for POST /v1/statement"; | ||
else if (!data.nextUri) | ||
error_message = "nextUri missing in response for POST /v1/statement"; | ||
else if (!data.infoUri) | ||
error_message = "infoUri missing in response for POST /v1/statement"; | ||
error_callback({message: error_message, data: data}); | ||
if (!first_request && cancel_checker && cancel_checker()) { | ||
clear_timeout(); | ||
client.request({ method: 'DELETE', path: uri_obj }, function(error, code, data){ | ||
if (error || code !== 204) { | ||
error_callback({message: "query fetch canceled, but Presto query cancel may fail", error: error, code: code}); | ||
} else { | ||
error_callback({message: "query fetch canceled by operation"}); | ||
} | ||
}); | ||
return; | ||
} | ||
var last_state = null; | ||
var firstNextUri = data.nextUri; // TODO: check the cases without nextUri for /statement ? | ||
var fetch_next = function(next_uri){ | ||
/* | ||
* 1st time | ||
{ | ||
"stats": { | ||
"rootStage": { | ||
"subStages": [ | ||
{ | ||
"subStages": [], | ||
"processedBytes": 83103149, | ||
"processedRows": 2532704, | ||
"wallTimeMillis": 20502, | ||
"cpuTimeMillis": 3431, | ||
"userTimeMillis": 3210, | ||
"stageId": "1", | ||
"state": "FINISHED", | ||
"done": true, | ||
"nodes": 3, | ||
"totalSplits": 420, | ||
"queuedSplits": 0, | ||
"runningSplits": 0, | ||
"completedSplits": 420 | ||
} | ||
], | ||
// same as substage | ||
}, | ||
// same as substage | ||
"state": "RUNNING", | ||
}, | ||
"data": [ [ 1266352 ] ], | ||
"columns": [ { "type": "bigint", "name": "cnt" } ], | ||
"nextUri": "http://localhost:8080/v1/statement/20140120_032523_00000_32v8g/2", | ||
"partialCancelUri": "http://10.0.0.0:8080/v1/stage/20140120_032523_00000_32v8g.0", | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
} | ||
*/ | ||
/* | ||
* 2nd time | ||
{ | ||
"stats": { | ||
// .... | ||
"state": "FINISHED", | ||
}, | ||
"columns": [ { "type": "bigint", "name": "cnt" } ], | ||
"infoUri": "http://localhost:8080/v1/query/20140120_032523_00000_32v8g", | ||
"id": "20140120_032523_00000_32v8g" | ||
} | ||
*/ | ||
if (cancel_checker && cancel_checker()) { | ||
client.request({ method: 'DELETE', path: next_uri }, function(error, code, data){ | ||
if (error || code !== 204) { | ||
error_callback({message: "query fetch canceled, but Presto query cancel may fail", error: error, code: code}); | ||
} else { | ||
error_callback({message: "query fetch canceled by operation"}); | ||
} | ||
}); | ||
current_req = client.request(uri_obj, function(error, code, response){ | ||
// same as above, we have already timed out and shown an error, so abort out | ||
if (timed_out) { | ||
return; | ||
} | ||
client.request(next_uri, function(error, code, response){ | ||
if (error || response.error) { | ||
error_callback(error || response.error); | ||
return; | ||
if ([502, 503, 504].includes(code)) { | ||
setTimeout(function(){ | ||
fetch(uri_obj); | ||
}, Math.floor(Math.random() * 51) + 50); // random in 50-100ms | ||
if (retry_callback) { | ||
retry_callback(); | ||
} | ||
return; | ||
} | ||
if (state_callback && (last_state !== response.stats.state || enable_verbose_state_callback)) { | ||
state_callback(null, response.id, response.stats); | ||
last_state = response.stats.state; | ||
if (error || (response && response.error)) { | ||
clear_timeout(); | ||
if (first_request) { | ||
var message = "execution error" + (response && response.length > 0 ? ":" + response : ""); | ||
if (response && response.error && response.error.message) | ||
message = response.error.message; | ||
error_callback({message, error: (error || response.error), code}); | ||
} else { | ||
error_callback(error || response.error); | ||
} | ||
return; | ||
} | ||
if (columns_callback && response.columns && !columns) { | ||
columns = response.columns; | ||
columns_callback(null, columns); | ||
} | ||
if (first_request && (!response.id || !response.nextUri || !response.infoUri)) { | ||
clear_timeout(); | ||
var error_message = null; | ||
if (!response.id) | ||
error_message = "query id missing in response for POST /v1/statement"; | ||
else if (!response.nextUri) | ||
error_message = "nextUri missing in response for POST /v1/statement"; | ||
else if (!response.infoUri) | ||
error_message = "infoUri missing in response for POST /v1/statement"; | ||
error_callback({message: error_message, data: response}); | ||
return; | ||
} | ||
var fetchNextWithTimeout = function(uri, checkInterval) { | ||
setTimeout(function(){ fetch_next(uri); }, checkInterval); | ||
}; | ||
first_request = false; | ||
query_id = response.id || query_id; | ||
/* presto-main/src/main/java/com/facebook/presto/execution/QueryState.java | ||
* QUEUED, PLANNING, STARTING, RUNNING, FINISHED, CANCELED, FAILED | ||
*/ | ||
if (response.stats.state === 'QUEUED' | ||
|| response.stats.state === 'PLANNING' | ||
|| response.stats.state === 'STARTING' | ||
|| response.stats.state === 'RUNNING' && !response.data) { | ||
fetchNextWithTimeout(response.nextUri, client.checkInterval); | ||
return; | ||
} | ||
if (state_callback && (last_state !== response.stats.state || enable_verbose_state_callback)) { | ||
state_callback(null, response.id, response.stats); | ||
last_state = response.stats.state; | ||
} | ||
if (data_callback && response.data) { | ||
data_callback(null, response.data, response.columns, response.stats); | ||
} | ||
if (columns_callback && response.columns && !columns) { | ||
columns = response.columns; | ||
columns_callback(null, columns); | ||
} | ||
if (response.nextUri) { | ||
fetchNextWithTimeout(response.nextUri, client.checkInterval); | ||
return; | ||
} | ||
var fetchNextWithTimeout = function(uri, checkInterval) { | ||
setTimeout(function(){ fetch(uri); }, checkInterval); | ||
}; | ||
var finishedStats = response.stats; | ||
/* presto-main/src/main/java/com/facebook/presto/execution/QueryState.java | ||
* QUEUED, PLANNING, STARTING, RUNNING, FINISHED, CANCELED, FAILED | ||
*/ | ||
if (response.stats.state === 'QUEUED' | ||
|| response.stats.state === 'PLANNING' | ||
|| response.stats.state === 'STARTING' | ||
|| response.stats.state === 'RUNNING' && !response.data) { | ||
fetchNextWithTimeout(response.nextUri, client.checkInterval); | ||
return; | ||
} | ||
if (fetch_info && response.infoUri) { | ||
client.request(response.infoUri, function(error, code, response){ | ||
success_callback(null, finishedStats, response); | ||
}); | ||
} | ||
else { | ||
success_callback(null, finishedStats); | ||
} | ||
}); | ||
}; | ||
fetch_next(firstNextUri); | ||
}); | ||
if (data_callback && response.data) { | ||
data_callback(null, response.data, response.columns, response.stats); | ||
} | ||
if (response.nextUri) { | ||
fetchNextWithTimeout(response.nextUri, client.checkInterval); | ||
return; | ||
} | ||
clear_timeout(); | ||
var finishedStats = response.stats; | ||
if (fetch_info && response.infoUri) { | ||
client.request(response.infoUri, function(error, code, response){ | ||
success_callback(null, finishedStats, response); | ||
}); | ||
} | ||
else { | ||
success_callback(null, finishedStats); | ||
} | ||
}); | ||
}; | ||
fetch({ method: 'POST', path: '/v1/statement', headers: header, body: opts.query, user: opts.user }); | ||
}; |
{ | ||
"name": "presto-client", | ||
"version": "0.13.0", | ||
"version": "1.0.0", | ||
"description": "Distributed query engine Presto/Trino client library for node.js", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "jest" | ||
}, | ||
@@ -20,3 +20,6 @@ "repository": { | ||
"url": "https://github.com/tagomoris/presto-client-node/issues" | ||
}, | ||
"devDependencies": { | ||
"jest": "29.5.0" | ||
} | ||
} |
@@ -78,2 +78,4 @@ # presto-client-node | ||
* Available options: presto, trino (default: presto) | ||
* timeout [integer :optional] | ||
* The seconds that a query is allowed to run before it starts returning results, defaults to 60 seconds. Set to `null` or `0` to disable. | ||
@@ -105,2 +107,3 @@ return value: client instance object | ||
* additional headers to be included in the request, check the full list for [Trino](https://trino.io/docs/current/develop/client-protocol.html#client-request-headers) and [Presto](https://prestodb.io/docs/current/develop/client-protocol.html#client-request-headers) engines | ||
* timeout [integer :optional] | ||
* cancel [function() :optional] | ||
@@ -129,2 +132,4 @@ * client stops fetch of query results if this callback returns `true` | ||
* runtime statistics object of query | ||
* retry [function() :optional] | ||
* called if a request was retried due to server returning `502`, `503`, or `504` | ||
* success [function(error, stats, info) :optional] | ||
@@ -137,2 +142,3 @@ * called once when all results are fetched (default: value of `callback`) | ||
* one of `callback` or `success` must be specified | ||
* one of `callback` or `error` must be specified | ||
@@ -187,4 +193,29 @@ Callbacks order (success query) is: columns -> data (-> data xN) -> success (or callback) | ||
## Development | ||
When working on this library, you can use the included docker-compose.yml file to spin up a Presto and Trino DBs, which can be done with: | ||
``` | ||
docker compose up | ||
``` | ||
Once you see the following messages, you'll be able connect to Presto at `http://localhost:18080` and Trino at `http://localhost:18081`, without username/password: | ||
``` | ||
presto-client-node-trino-1 | 2023-06-02T08:12:37.760Z INFO main io.trino.server.Server ======== SERVER STARTED ======== | ||
presto-client-node-presto-1 | 2023-06-02T08:13:29.760Z INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ======== | ||
``` | ||
After making a change, you can run the available test suite by doing: | ||
``` | ||
npm run test | ||
``` | ||
## Versions | ||
* 1.0.0: | ||
* add test cases and CI setting, new options and others, thanks to the many contributions from Matthew Peveler (@MasterOdin) | ||
* add "timeout" option to retry requests for server errors | ||
* change "error" callback (or "callback" as fallback path) to be specified when "success" is used | ||
* 0.13.0: | ||
@@ -191,0 +222,0 @@ * add "headers" option on execute() to specify any request headers |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
29695
7
421
1
283
1
5