QuestDB Node.js Client
Requirements
The client requires Node.js v16 or newer version.
Installation
npm i -s @questdb/nodejs-client
Configuration options
Detailed description of the client's configuration options can be found in
the SenderOptions documentation.
Examples
The examples below demonstrate how to use the client.
For more details, please, check the Sender's documentation.
Basic API usage
const { Sender } = require('@questdb/nodejs-client');
async function run() {
const sender = Sender.fromConfig('http::addr=localhost:9000');
await sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0195).floatColumn('ask', 1.0221)
.at(Date.now(), 'ms');
await sender.table('prices').symbol('instrument', 'GBPUSD')
.floatColumn('bid', 1.2076).floatColumn('ask', 1.2082)
.at(Date.now(), 'ms');
await sender.flush();
await sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();
await sender.close();
}
run()
.then(console.log)
.catch(console.error);
Authentication and secure connection
const { Sender } = require('@questdb/nodejs-client');
async function run() {
const sender = Sender.fromConfig('https::addr=localhost:9000;username=user1;password=pwd');
await sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();
await sender.close();
}
run().catch(console.error);
TypeScript example
import { Sender } from '@questdb/nodejs-client';
async function run(): Promise<number> {
const sender: Sender = Sender.fromConfig('https::addr=localhost:9000;token=Xyvd3er6GF87ysaHk');
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224).at(Date.now(), 'ms');
await sender.flush();
await sender.close();
}
run().catch(console.error);
Worker threads example
const { Sender } = require('@questdb/nodejs-client');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {'ticker': ticker, 'price': Math.random()};
}
}
async function subscribe(ticker, onTick) {
const feed = venue(workerData.ticker);
let tick;
while (tick = feed.next().value) {
await onTick(tick);
await sleep(rndInt(30));
}
}
async function run() {
if (isMainThread) {
const tickers = ['t1', 't2', 't3', 't4'];
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => {
console.log(`Ingested ${msg.count} prices for ticker ${msg.ticker}`);
});
}
} else {
const sender = Sender.fromConfig('http::addr=localhost:9000');
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
await sender
.table('prices')
.symbol('ticker', tick.ticker)
.floatColumn('price', tick.price)
.at(Date.now(), 'ms');
await sender.flush();
count++;
});
parentPort.postMessage({'ticker': workerData.ticker, 'count': count});
await sender.close();
}
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}
run()
.then(console.log)
.catch(console.error);