Socket
Socket
Sign inDemoInstall

@elastic/elasticsearch-canary

Package Overview
Dependencies
Maintainers
75
Versions
91
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@elastic/elasticsearch-canary - npm Package Compare versions

Comparing version 8.0.0-canary.21 to 8.0.0-canary.22

lib/api/api/async_search.d.ts

343

index.js

@@ -22,342 +22,5 @@ /*

const { EventEmitter } = require('events')
const { URL } = require('url')
const buffer = require('buffer')
const debug = require('debug')('elasticsearch')
const Transport = require('./lib/Transport')
const Connection = require('./lib/Connection')
const { ConnectionPool, CloudConnectionPool } = require('./lib/pool')
const Helpers = require('./lib/Helpers')
const Serializer = require('./lib/Serializer')
const errors = require('./lib/errors')
const { ConfigurationError } = errors
const { prepareHeaders } = Connection.internals
let clientVersion = require('./package.json').version
/* istanbul ignore next */
if (clientVersion.includes('-')) {
// clean prerelease
clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p'
}
const nodeVersion = process.versions.node
const { errors } = require('@elastic/transport')
const { default: Client } = require('./lib/Client')
const kInitialOptions = Symbol('elasticsearchjs-initial-options')
const kChild = Symbol('elasticsearchjs-child')
const kExtensions = Symbol('elasticsearchjs-extensions')
const kEventEmitter = Symbol('elasticsearchjs-event-emitter')
const ESAPI = require('./api')
class Client extends ESAPI {
constructor (opts = {}) {
super({ ConfigurationError })
if (opts.cloud && opts[kChild] === undefined) {
const { id, username, password } = opts.cloud
// the cloud id is `cluster-name:base64encodedurl`
// the url is a string divided by two '$', the first is the cloud url
// the second the elasticsearch instance, the third the kibana instance
const cloudUrls = Buffer.from(id.split(':')[1], 'base64').toString().split('$')
// TODO: remove username and password here in 8
if (username && password) {
opts.auth = Object.assign({}, opts.auth, { username, password })
}
opts.node = `https://${cloudUrls[1]}.${cloudUrls[0]}`
// Cloud has better performances with compression enabled
// see https://github.com/elastic/elasticsearch-py/pull/704.
// So unless the user specifies otherwise, we enable compression.
if (opts.compression == null) opts.compression = 'gzip'
if (opts.suggestCompression == null) opts.suggestCompression = true
if (opts.ssl == null ||
(opts.ssl && opts.ssl.secureProtocol == null)) {
opts.ssl = opts.ssl || {}
opts.ssl.secureProtocol = 'TLSv1_2_method'
}
}
if (!opts.node && !opts.nodes) {
throw new ConfigurationError('Missing node(s) option')
}
if (opts[kChild] === undefined) {
const checkAuth = getAuth(opts.node || opts.nodes)
if (checkAuth && checkAuth.username && checkAuth.password) {
opts.auth = Object.assign({}, opts.auth, { username: checkAuth.username, password: checkAuth.password })
}
}
const options = opts[kChild] !== undefined
? opts[kChild].initialOptions
: Object.assign({}, {
Connection,
Transport,
Serializer,
ConnectionPool: opts.cloud ? CloudConnectionPool : ConnectionPool,
maxRetries: 3,
requestTimeout: 30000,
pingTimeout: 3000,
sniffInterval: false,
sniffOnStart: false,
sniffEndpoint: '_nodes/_all/http',
sniffOnConnectionFault: false,
resurrectStrategy: 'ping',
suggestCompression: false,
compression: false,
ssl: null,
caFingerprint: null,
agent: null,
headers: {},
nodeFilter: null,
nodeSelector: 'round-robin',
generateRequestId: null,
name: 'elasticsearch-js',
auth: null,
opaqueIdPrefix: null,
context: null,
proxy: null,
enableMetaHeader: true,
disablePrototypePoisoningProtection: false,
maxResponseSize: null,
maxCompressedResponseSize: null
}, opts)
if (options.maxResponseSize !== null && options.maxResponseSize > buffer.constants.MAX_STRING_LENGTH) {
throw new ConfigurationError(`The maxResponseSize cannot be bigger than ${buffer.constants.MAX_STRING_LENGTH}`)
}
if (options.maxCompressedResponseSize !== null && options.maxCompressedResponseSize > buffer.constants.MAX_LENGTH) {
throw new ConfigurationError(`The maxCompressedResponseSize cannot be bigger than ${buffer.constants.MAX_LENGTH}`)
}
if (options.caFingerprint !== null && isHttpConnection(opts.node || opts.nodes)) {
throw new ConfigurationError('You can\'t configure the caFingerprint with a http connection')
}
if (process.env.ELASTIC_CLIENT_APIVERSIONING === 'true') {
options.headers = Object.assign({ accept: 'application/vnd.elasticsearch+json; compatible-with=7' }, options.headers)
}
this[kInitialOptions] = options
this[kExtensions] = []
this.name = options.name
if (options.enableMetaHeader) {
options.headers['x-elastic-client-meta'] = `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion}`
}
if (opts[kChild] !== undefined) {
this.serializer = options[kChild].serializer
this.connectionPool = options[kChild].connectionPool
this[kEventEmitter] = options[kChild].eventEmitter
} else {
this[kEventEmitter] = new EventEmitter()
this.serializer = new options.Serializer({
disablePrototypePoisoningProtection: options.disablePrototypePoisoningProtection
})
this.connectionPool = new options.ConnectionPool({
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
ssl: options.ssl,
agent: options.agent,
proxy: options.proxy,
Connection: options.Connection,
auth: options.auth,
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]),
caFingerprint: options.caFingerprint,
sniffEnabled: options.sniffInterval !== false ||
options.sniffOnStart !== false ||
options.sniffOnConnectionFault !== false
})
// Add the connections before initialize the Transport
this.connectionPool.addConnection(options.node || options.nodes)
}
this.transport = new options.Transport({
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]),
connectionPool: this.connectionPool,
serializer: this.serializer,
maxRetries: options.maxRetries,
requestTimeout: options.requestTimeout,
sniffInterval: options.sniffInterval,
sniffOnStart: options.sniffOnStart,
sniffOnConnectionFault: options.sniffOnConnectionFault,
sniffEndpoint: options.sniffEndpoint,
suggestCompression: options.suggestCompression,
compression: options.compression,
headers: options.headers,
nodeFilter: options.nodeFilter,
nodeSelector: options.nodeSelector,
generateRequestId: options.generateRequestId,
name: options.name,
opaqueIdPrefix: options.opaqueIdPrefix,
context: options.context,
maxResponseSize: options.maxResponseSize,
maxCompressedResponseSize: options.maxCompressedResponseSize
})
this.helpers = new Helpers({
client: this,
maxRetries: options.maxRetries,
metaHeader: options.enableMetaHeader
? `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion}`
: null
})
}
get emit () {
return this[kEventEmitter].emit.bind(this[kEventEmitter])
}
get on () {
return this[kEventEmitter].on.bind(this[kEventEmitter])
}
get once () {
return this[kEventEmitter].once.bind(this[kEventEmitter])
}
get off () {
return this[kEventEmitter].off.bind(this[kEventEmitter])
}
extend (name, opts, fn) {
if (typeof opts === 'function') {
fn = opts
opts = {}
}
let [namespace, method] = name.split('.')
if (method == null) {
method = namespace
namespace = null
}
if (namespace != null) {
if (this[namespace] != null && this[namespace][method] != null && opts.force !== true) {
throw new Error(`The method "${method}" already exists on namespace "${namespace}"`)
}
if (this[namespace] == null) this[namespace] = {}
this[namespace][method] = fn({
makeRequest: this.transport.request.bind(this.transport),
result: { body: null, statusCode: null, headers: null, warnings: null },
ConfigurationError
})
} else {
if (this[method] != null && opts.force !== true) {
throw new Error(`The method "${method}" already exists`)
}
this[method] = fn({
makeRequest: this.transport.request.bind(this.transport),
result: { body: null, statusCode: null, headers: null, warnings: null },
ConfigurationError
})
}
this[kExtensions].push({ name, opts, fn })
}
child (opts) {
// Merge the new options with the initial ones
const options = Object.assign({}, this[kInitialOptions], opts)
// Pass to the child client the parent instances that cannot be overriden
options[kChild] = {
connectionPool: this.connectionPool,
serializer: this.serializer,
eventEmitter: this[kEventEmitter],
initialOptions: options
}
/* istanbul ignore else */
if (options.auth !== undefined) {
options.headers = prepareHeaders(options.headers, options.auth)
}
const client = new Client(options)
// sync product check
const tSymbol = Object.getOwnPropertySymbols(this.transport)
.filter(symbol => symbol.description === 'product check')[0]
client.transport[tSymbol] = this.transport[tSymbol]
// Add parent extensions
if (this[kExtensions].length > 0) {
this[kExtensions].forEach(({ name, opts, fn }) => {
client.extend(name, opts, fn)
})
}
return client
}
close (callback) {
if (callback == null) {
return new Promise((resolve, reject) => {
this.close(resolve)
})
}
debug('Closing the client')
this.connectionPool.empty(callback)
}
}
function getAuth (node) {
if (Array.isArray(node)) {
for (const url of node) {
const auth = getUsernameAndPassword(url)
if (auth.username !== '' && auth.password !== '') {
return auth
}
}
return null
}
const auth = getUsernameAndPassword(node)
if (auth.username !== '' && auth.password !== '') {
return auth
}
return null
function getUsernameAndPassword (node) {
/* istanbul ignore else */
if (typeof node === 'string') {
const { username, password } = new URL(node)
return {
username: decodeURIComponent(username),
password: decodeURIComponent(password)
}
} else if (node.url instanceof URL) {
return {
username: decodeURIComponent(node.url.username),
password: decodeURIComponent(node.url.password)
}
}
}
}
function isHttpConnection (node) {
if (Array.isArray(node)) {
return node.some((n) => (typeof n === 'string' ? new URL(n).protocol : n.url.protocol) === 'http:')
} else {
return (typeof node === 'string' ? new URL(node).protocol : node.url.protocol) === 'http:'
}
}
const events = {
RESPONSE: 'response',
REQUEST: 'request',
SNIFF: 'sniff',
RESURRECT: 'resurrect',
SERIALIZATION: 'serialization',
DESERIALIZATION: 'deserialization'
}
module.exports = {
Client,
Transport,
ConnectionPool,
Connection,
Serializer,
events,
errors
}
module.exports = { Client, errors }

@@ -1,124 +0,148 @@

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable as ReadableStream } from 'stream'
import { TransportRequestOptions, ApiError, ApiResponse, RequestBody, Context } from './Transport'
import { Search, Msearch, Bulk } from '../api/requestParams'
export default class Helpers {
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>, reqOptions?: TransportRequestOptions): BulkHelper<BulkStats>
/// <reference types="node" />
import { Readable } from 'stream';
import { TransportResult, TransportRequestOptions } from '@elastic/transport';
import Client from './Client';
import * as T from './api/types';
export interface HelpersOptions {
client: Client;
metaHeader: string | null;
maxRetries: number;
}
export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> {
clear: () => Promise<void>
documents: TDocument[]
export interface ScrollSearchOptions extends TransportRequestOptions {
wait?: number;
}
export interface BulkHelper<T> extends Promise<T> {
abort: () => BulkHelper<T>
readonly stats: BulkStats
export interface ScrollSearchResponse<TDocument> extends TransportResult<T.SearchResponse<TDocument>, unknown> {
clear: () => Promise<void>;
documents: TDocument[];
}
export interface MsearchHelperOptions extends T.MsearchRequest {
operations?: number;
flushInterval?: number;
concurrency?: number;
retries?: number;
wait?: number;
}
export interface MsearchHelper extends Promise<void> {
stop: (error?: Error | null) => void;
search: <TDocument = unknown>(header: T.MsearchHeader, body: T.MsearchBody) => Promise<MsearchHelperResponse<TDocument>>;
}
export interface MsearchHelperResponse<TDocument> {
body: T.SearchResponse<TDocument>;
documents: TDocument[];
status: number;
responses: T.MsearchResponse;
}
export interface BulkStats {
total: number
failed: number
retry: number
successful: number
noop: number
time: number
bytes: number
aborted: boolean
total: number;
failed: number;
retry: number;
successful: number;
noop: number;
time: number;
bytes: number;
aborted: boolean;
}
interface IndexAction {
index: {
_index: string
[key: string]: any
}
index: T.BulkIndexOperation;
}
interface CreateAction {
create: {
_index: string
[key: string]: any
}
create: T.BulkCreateOperation;
}
interface UpdateActionOperation {
update: {
_index: string
[key: string]: any
}
update: T.BulkUpdateOperation;
}
interface DeleteAction {
delete: {
_index: string
[key: string]: any
}
delete: T.BulkDeleteOperation;
}
type UpdateAction = [UpdateActionOperation, Record<string, any>]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>
export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body'> {
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
flushBytes?: number
flushInterval?: number
concurrency?: number
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
refreshOnCompletion?: boolean | string
}
declare type UpdateAction = [UpdateActionOperation, Record<string, any>];
declare type Action = IndexAction | CreateAction | UpdateAction | DeleteAction;
export interface OnDropDocument<TDocument = unknown> {
status: number
error: {
type: string,
reason: string,
caused_by: {
type: string,
reason: string
}
}
document: TDocument
retried: boolean
status: number;
operation: Action;
error: T.ErrorCause | null;
document: TDocument;
retried: boolean;
}
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> {
operations?: number
flushInterval?: number
concurrency?: number
retries?: number
wait?: number
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>;
onDocument: (doc: TDocument) => Action;
flushBytes?: number;
flushInterval?: number;
concurrency?: number;
retries?: number;
wait?: number;
onDrop?: (doc: OnDropDocument<TDocument>) => void;
refreshOnCompletion?: boolean | string;
}
declare type callbackFn<Response, Context> = (err: ApiError, result: ApiResponse<Response, Context>) => void;
export interface MsearchHelper extends Promise<void> {
stop(error?: Error): void
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>>
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void
export interface BulkHelper<T> extends Promise<BulkStats> {
abort: () => BulkHelper<T>;
readonly stats: BulkStats;
}
declare const kClient: unique symbol;
declare const kMetaHeader: unique symbol;
declare const kMaxRetries: unique symbol;
export default class Helpers {
[kClient]: Client;
[kMetaHeader]: string | null;
[kMaxRetries]: number;
constructor(opts: HelpersOptions);
/**
* Runs a search operation. The only difference between client.search and this utility,
* is that we are only returning the hits to the user and not the full ES response.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {array} The documents that matched the request.
*/
search<TDocument = unknown>(params: T.SearchRequest, options?: TransportRequestOptions): Promise<TDocument[]>;
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the results of a given search.
* ```js
* for await (const result of client.helpers.scrollSearch({ params })) {
* console.log(result)
* }
* ```
* Each result represents the entire body of a single scroll search request,
* if you just need to scroll the results, use scrollDocuments.
* This function handles automatically retries on 429 status code.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
scrollSearch<TDocument = unknown>(params: T.SearchRequest, options?: ScrollSearchOptions): AsyncIterable<ScrollSearchResponse<TDocument>>;
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the documents of a given search.
* ```js
* for await (const document of client.helpers.scrollSearch({ params })) {
* console.log(document)
* }
* ```
* Each document is what you will find by running a scrollSearch and iterating on the hits array.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
scrollDocuments<TDocument = unknown>(params: T.SearchRequest, options?: ScrollSearchOptions): AsyncIterable<TDocument>;
/**
* Creates a msearch helper instance. Once you configure it, you can use the provided
* `search` method to add new searches in the queue.
* @param {object} options - The configuration of the msearch operations.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run.
*/
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper;
/**
* Creates a bulk helper instance. Once you configure it, you can pick which operation
* to execute with the given dataset, index, create, update, and delete.
* @param {object} options - The configuration of the bulk operation.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run with the datasource.
*/
bulk<TDocument = unknown>(options: BulkHelperOptions, reqOptions?: TransportRequestOptions): BulkHelper<TDocument>;
}
export {};

@@ -0,1 +1,2 @@

"use strict";
/*

@@ -6,3 +7,3 @@ * Licensed to Elasticsearch B.V. under one or more contributor

* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* the Apache License, Version 2.0 (the "License") you may
* not use this file except in compliance with the License.

@@ -20,753 +21,753 @@ * You may obtain a copy of the License at

*/
'use strict'
/* eslint camelcase: 0 */
const { Readable } = require('stream')
const { promisify } = require('util')
const { ResponseError, ConfigurationError } = require('./errors')
const pImmediate = promisify(setImmediate)
const sleep = promisify(setTimeout)
const kClient = Symbol('elasticsearch-client')
const kMetaHeader = Symbol('meta header')
var _a, _b, _c;
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
/* eslint-disable @typescript-eslint/naming-convention */
/* eslint-disable @typescript-eslint/promise-function-async */
const assert_1 = (0, tslib_1.__importDefault)(require("assert"));
const util_1 = require("util");
const stream_1 = require("stream");
const transport_1 = require("@elastic/transport");
const { ResponseError, ConfigurationError } = transport_1.errors;
const sleep = (0, util_1.promisify)(setTimeout);
const pImmediate = (0, util_1.promisify)(setImmediate);
/* istanbul ignore next */
const noop = () => {}
const noop = () => { };
const kClient = Symbol('elasticsearch-client');
const kMetaHeader = Symbol('meta header');
const kMaxRetries = Symbol('max retries');
class Helpers {
constructor (opts) {
this[kClient] = opts.client
this[kMetaHeader] = opts.metaHeader
this.maxRetries = opts.maxRetries
}
/**
* Runs a search operation. The only difference between client.search and this utility,
* is that we are only returning the hits to the user and not the full ES response.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {array} The documents that matched the request.
*/
async search (params, options) {
appendFilterPath('hits.hits._source', params, true)
const { body } = await this[kClient].search(params, options)
if (body.hits && body.hits.hits) {
return body.hits.hits.map(d => d._source)
constructor(opts) {
Object.defineProperty(this, _a, {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, _b, {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, _c, {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
this[kClient] = opts.client;
this[kMetaHeader] = opts.metaHeader;
this[kMaxRetries] = opts.maxRetries;
}
return []
}
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the results of a given search.
* ```js
* for await (const result of client.helpers.scrollSearch({ params })) {
* console.log(result)
* }
* ```
* Each result represents the entire body of a single scroll search request,
* if you just need to scroll the results, use scrollDocuments.
* This function handles automatically retries on 429 status code.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
async * scrollSearch (params, options = {}) {
if (this[kMetaHeader] !== null) {
options.headers = options.headers || {}
options.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=s'
/**
* Runs a search operation. The only difference between client.search and this utility,
* is that we are only returning the hits to the user and not the full ES response.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {array} The documents that matched the request.
*/
async search(params, options = {}) {
var _d;
appendFilterPath('hits.hits._source', params, true);
options.meta = true;
const { body: result } = await this[kClient].search(params, options);
if (((_d = result.hits) === null || _d === void 0 ? void 0 : _d.hits) != null) {
return result.hits.hits.map(d => d._source);
}
return [];
}
// TODO: study scroll search slices
const wait = options.wait || 5000
const maxRetries = options.maxRetries || this.maxRetries
if (Array.isArray(options.ignore)) {
options.ignore.push(429)
} else {
options.ignore = [429]
}
params.scroll = params.scroll || '1m'
appendFilterPath('_scroll_id', params, false)
const { method, body, index, ...querystring } = params
let response = null
for (let i = 0; i <= maxRetries; i++) {
response = await this[kClient].search(params, options)
if (response.statusCode !== 429) break
await sleep(wait)
}
if (response.statusCode === 429) {
throw new ResponseError(response)
}
let scroll_id = response.body._scroll_id
let stop = false
const clear = async () => {
stop = true
await this[kClient].clearScroll(
{ body: { scroll_id } },
{ ignore: [400], ...options }
)
}
while (response.body.hits && response.body.hits.hits.length > 0) {
// scroll id is always present in the response, but it might
// change over time based on the number of shards
scroll_id = response.body._scroll_id
response.clear = clear
addDocumentsGetter(response)
yield response
if (stop === true) {
break
}
for (let i = 0; i <= maxRetries; i++) {
response = await this[kClient].scroll({
scroll: querystring.scroll,
rest_total_hits_as_int: querystring.rest_total_hits_as_int || querystring.restTotalHitsAsInt,
body: { scroll_id }
}, options)
if (response.statusCode !== 429) break
await sleep(wait)
}
if (response.statusCode === 429) {
throw new ResponseError(response)
}
}
if (stop === false) {
await clear()
}
}
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the documents of a given search.
* ```js
* for await (const document of client.helpers.scrollSearch({ params })) {
* console.log(document)
* }
* ```
* Each document is what you will find by running a scrollSearch and iterating on the hits array.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
async * scrollDocuments (params, options) {
appendFilterPath('hits.hits._source', params, true)
for await (const { documents } of this.scrollSearch(params, options)) {
for (const document of documents) {
yield document
}
}
}
/**
* Creates a msearch helper instance. Once you configure it, you can use the provided
* `search` method to add new searches in the queue.
* @param {object} options - The configuration of the msearch operations.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run.
*/
msearch (options = {}, reqOptions = {}) {
const client = this[kClient]
const {
operations = 5,
concurrency = 5,
flushInterval = 500,
retries = this.maxRetries,
wait = 5000,
...msearchOptions
} = options
let stopReading = false
let stopError = null
let timeoutRef = null
const operationsStream = new Readable({
objectMode: true,
read (size) {}
})
const p = iterate()
const helper = {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
catch (onRejected) {
return p.catch(onRejected)
},
stop (error = null) {
if (stopReading === true) return
stopReading = true
stopError = error
operationsStream.push(null)
},
// TODO: support abort a single search?
// NOTE: the validation checks are synchronous and the callback/promise will
// be resolved in the same tick. We might want to fix this in the future.
search (header, body, callback) {
if (stopReading === true) {
const error = stopError === null
? new ConfigurationError('The msearch processor has been stopped')
: stopError
return callback ? callback(error, {}) : Promise.reject(error)
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the results of a given search.
* ```js
* for await (const result of client.helpers.scrollSearch({ params })) {
* console.log(result)
* }
* ```
* Each result represents the entire body of a single scroll search request,
* if you just need to scroll the results, use scrollDocuments.
* This function handles automatically retries on 429 status code.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
async *scrollSearch(params, options = {}) {
var _d, _e, _f, _g;
options.meta = true;
if (this[kMetaHeader] !== null) {
options.headers = (_d = options.headers) !== null && _d !== void 0 ? _d : {};
options.headers['x-elastic-client-meta'] = `${this[kMetaHeader]},h=s`;
}
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
const error = new ConfigurationError('The header should be an object')
return callback ? callback(error, {}) : Promise.reject(error)
const wait = (_e = options.wait) !== null && _e !== void 0 ? _e : 5000;
const maxRetries = (_f = options.maxRetries) !== null && _f !== void 0 ? _f : this[kMaxRetries];
if (Array.isArray(options.ignore)) {
options.ignore.push(429);
}
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
const error = new ConfigurationError('The body should be an object')
return callback ? callback(error, {}) : Promise.reject(error)
else {
options.ignore = [429];
}
let promise = null
if (callback === undefined) {
let onFulfilled = null
let onRejected = null
promise = new Promise((resolve, reject) => {
onFulfilled = resolve
onRejected = reject
})
callback = function callback (err, result) {
err ? onRejected(err) : onFulfilled(result)
}
params.scroll = (_g = params.scroll) !== null && _g !== void 0 ? _g : '1m';
appendFilterPath('_scroll_id', params, false);
let response;
for (let i = 0; i <= maxRetries; i++) {
response = await this[kClient].search(params, options);
if (response.statusCode !== 429)
break;
await sleep(wait);
}
operationsStream.push([header, body, callback])
if (promise !== null) {
return promise
(0, assert_1.default)(response !== undefined, 'The response is undefined, please file a bug report');
if (response.statusCode === 429) {
// @ts-expect-error
throw new ResponseError(response);
}
}
}
return helper
async function iterate () {
const { semaphore, finish } = buildSemaphore()
const msearchBody = []
const callbacks = []
let loadedOperations = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
for await (const operation of operationsStream) {
timeoutRef.refresh()
loadedOperations += 1
msearchBody.push(operation[0], operation[1])
callbacks.push(operation[2])
if (loadedOperations >= operations) {
const send = await semaphore()
send(msearchBody.slice(), callbacks.slice())
msearchBody.length = 0
callbacks.length = 0
loadedOperations = 0
let scroll_id = response.body._scroll_id;
let stop = false;
const clear = async () => {
stop = true;
await this[kClient].clearScroll({ scroll_id }, { ignore: [400], ...options });
};
while (response.body.hits != null && response.body.hits.hits.length > 0) {
// scroll id is always present in the response, but it might
// change over time based on the number of shards
scroll_id = response.body._scroll_id;
// @ts-expect-error
response.clear = clear;
addDocumentsGetter(response);
// @ts-expect-error
yield response;
if (stop) {
break;
}
for (let i = 0; i <= maxRetries; i++) {
const r = await this[kClient].scroll({
scroll: params.scroll,
rest_total_hits_as_int: params.rest_total_hits_as_int,
scroll_id
}, options);
response = r;
(0, assert_1.default)(response !== undefined, 'The response is undefined, please file a bug report');
if (response.statusCode !== 429)
break;
await sleep(wait);
}
if (response.statusCode === 429) {
// @ts-expect-error
throw new ResponseError(response);
}
}
}
clearTimeout(timeoutRef)
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
const send = await semaphore()
send(msearchBody, callbacks)
}
await finish()
if (stopError !== null) {
throw stopError
}
async function onFlushTimeout () {
if (loadedOperations === 0) return
const msearchBodyCopy = msearchBody.slice()
const callbacksCopy = callbacks.slice()
msearchBody.length = 0
callbacks.length = 0
loadedOperations = 0
try {
const send = await semaphore()
send(msearchBodyCopy, callbacksCopy)
} catch (err) {
/* istanbul ignore next */
helper.stop(err)
if (!stop) {
await clear();
}
}
}
// This function builds a semaphore using the concurrency
// options of the msearch helper. It is used inside the iterator
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// The semaphore function resolves a send function, which will be used
// to send the actual msearch request.
// It also returns a finish function, which returns a promise that is resolved
// when there are no longer request running.
function buildSemaphore () {
let resolveSemaphore = null
let resolveFinish = null
let running = 0
return { semaphore, finish }
function finish () {
return new Promise((resolve, reject) => {
if (running === 0) {
resolve()
} else {
resolveFinish = resolve
}
})
}
function semaphore () {
if (running < concurrency) {
running += 1
return pImmediate(send)
} else {
return new Promise((resolve, reject) => {
resolveSemaphore = resolve
})
/**
* Runs a scroll search operation. This function returns an async iterator, allowing
* the user to use a for await loop to get all the documents of a given search.
* ```js
* for await (const document of client.helpers.scrollSearch({ params })) {
* console.log(document)
* }
* ```
* Each document is what you will find by running a scrollSearch and iterating on the hits array.
* This helper automatically adds `filter_path=hits.hits._source` to the querystring,
* as it will only need the documents source.
* @param {object} params - The Elasticsearch's search parameters.
* @param {object} options - The client optional configuration for this request.
* @return {iterator} the async iterator
*/
async *scrollDocuments(params, options = {}) {
appendFilterPath('hits.hits._source', params, true);
for await (const { documents } of this.scrollSearch(params, options)) {
for (const document of documents) {
yield document;
}
}
}
function send (msearchBody, callbacks) {
/* istanbul ignore if */
if (running > concurrency) {
throw new Error('Max concurrency reached')
}
msearchOperation(msearchBody, callbacks, () => {
running -= 1
if (resolveSemaphore) {
running += 1
resolveSemaphore(send)
resolveSemaphore = null
} else if (resolveFinish && running === 0) {
resolveFinish()
}
})
}
}
function msearchOperation (msearchBody, callbacks, done) {
let retryCount = retries
// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryMsearch(msearchBody, callbacks, retrySearch)
function retrySearch (msearchBody, callbacks) {
if (msearchBody.length > 0 && retryCount > 0) {
retryCount -= 1
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch)
return
/**
* Creates a msearch helper instance. Once you configure it, you can use the provided
* `search` method to add new searches in the queue.
* @param {object} options - The configuration of the msearch operations.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run.
*/
msearch(options = {}, reqOptions = {}) {
const client = this[kClient];
const { operations = 5, concurrency = 5, flushInterval = 500, retries = this[kMaxRetries], wait = 5000, ...msearchOptions } = options;
reqOptions.meta = true;
let stopReading = false;
let stopError = null;
let timeoutRef = null;
const operationsStream = new stream_1.Readable({
objectMode: true,
read(size) { }
});
const p = iterate();
const helper = {
[Symbol.toStringTag]: 'Promise',
then(onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected);
},
catch(onRejected) {
return p.catch(onRejected);
},
finally(onFinally) {
return p.finally(onFinally);
},
stop(error = null) {
if (stopReading)
return;
stopReading = true;
stopError = error;
operationsStream.push(null);
},
// TODO: support abort a single search?
// NOTE: the validation checks are synchronous and the callback/promise will
// be resolved in the same tick. We might want to fix this in the future.
search(header, body) {
if (stopReading) {
const error = stopError === null
? new ConfigurationError('The msearch processor has been stopped')
: stopError;
return Promise.reject(error);
}
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
return Promise.reject(new ConfigurationError('The header should be an object'));
}
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
return Promise.reject(new ConfigurationError('The body should be an object'));
}
let onFulfilled = null;
let onRejected = null;
const promise = new Promise((resolve, reject) => {
onFulfilled = resolve;
onRejected = reject;
});
const callback = function callback(err, result) {
err !== null ? onRejected(err) : onFulfilled(result);
};
operationsStream.push([header, body, callback]);
return promise;
}
};
return helper;
async function iterate() {
const { semaphore, finish } = buildSemaphore();
const msearchBody = [];
const callbacks = [];
let loadedOperations = 0;
timeoutRef = setTimeout(onFlushTimeout, flushInterval); // eslint-disable-line
for await (const operation of operationsStream) {
timeoutRef.refresh();
loadedOperations += 1;
msearchBody.push(operation[0], operation[1]);
callbacks.push(operation[2]);
if (loadedOperations >= operations) {
const send = await semaphore();
send(msearchBody.slice(), callbacks.slice());
msearchBody.length = 0;
callbacks.length = 0;
loadedOperations = 0;
}
}
clearTimeout(timeoutRef);
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
const send = await semaphore();
send(msearchBody, callbacks);
}
await finish();
if (stopError !== null) {
throw stopError;
}
async function onFlushTimeout() {
if (loadedOperations === 0)
return;
const msearchBodyCopy = msearchBody.slice();
const callbacksCopy = callbacks.slice();
msearchBody.length = 0;
callbacks.length = 0;
loadedOperations = 0;
try {
const send = await semaphore();
send(msearchBodyCopy, callbacksCopy);
}
catch (err) {
/* istanbul ignore next */
// @ts-expect-error
helper.stop(err);
}
}
}
done()
}
// This function never returns an error, if the msearch operation fails,
// the error is dispatched to all search executors.
function tryMsearch (msearchBody, callbacks, done) {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions, (err, results) => {
const retryBody = []
const retryCallbacks = []
if (err) {
addDocumentsGetter(results)
for (const callback of callbacks) {
callback(err, results)
// This function builds a semaphore using the concurrency
// options of the msearch helper. It is used inside the iterator
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// The semaphore function resolves a send function, which will be used
// to send the actual msearch request.
// It also returns a finish function, which returns a promise that is resolved
// when there are no longer request running.
function buildSemaphore() {
let resolveSemaphore = null;
let resolveFinish = null;
let running = 0;
return { semaphore, finish };
function finish() {
return new Promise((resolve, reject) => {
if (running === 0) {
resolve();
}
else {
resolveFinish = resolve;
}
});
}
return done(retryBody, retryCallbacks)
}
const { responses } = results.body
for (let i = 0, len = responses.length; i < len; i++) {
const response = responses[i]
if (response.status === 429 && retryCount > 0) {
retryBody.push(msearchBody[i * 2])
retryBody.push(msearchBody[(i * 2) + 1])
retryCallbacks.push(callbacks[i])
continue
function semaphore() {
if (running < concurrency) {
running += 1;
return pImmediate(send);
}
else {
return new Promise((resolve, reject) => {
resolveSemaphore = resolve;
});
}
}
const result = { ...results, body: response }
addDocumentsGetter(result)
if (response.status >= 400) {
callbacks[i](new ResponseError(result), result)
} else {
callbacks[i](null, result)
function send(msearchBody, callbacks) {
/* istanbul ignore if */
if (running > concurrency) {
throw new Error('Max concurrency reached');
}
msearchOperation(msearchBody, callbacks, () => {
running -= 1;
if (resolveSemaphore !== null) {
running += 1;
resolveSemaphore(send);
resolveSemaphore = null;
}
else if (resolveFinish != null && running === 0) {
resolveFinish();
}
});
}
}
done(retryBody, retryCallbacks)
})
}
}
function msearchOperation(msearchBody, callbacks, done) {
let retryCount = retries;
// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryMsearch(msearchBody, callbacks, retrySearch);
function retrySearch(msearchBody, callbacks) {
if (msearchBody.length > 0 && retryCount > 0) {
retryCount -= 1;
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch);
return;
}
done();
}
// This function never returns an error, if the msearch operation fails,
// the error is dispatched to all search executors.
function tryMsearch(msearchBody, callbacks, done) {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions)
.then(results => {
const retryBody = [];
const retryCallbacks = [];
const { responses } = results.body;
for (let i = 0, len = responses.length; i < len; i++) {
const response = responses[i];
if (response.status === 429 && retryCount > 0) {
retryBody.push(msearchBody[i * 2]);
retryBody.push(msearchBody[(i * 2) + 1]);
retryCallbacks.push(callbacks[i]);
continue;
}
const result = { ...results, body: response };
// @ts-expect-error
addDocumentsGetter(result);
if (response.status != null && response.status >= 400) {
// @ts-expect-error
callbacks[i](new ResponseError(result), result);
}
else {
callbacks[i](null, result);
}
}
done(retryBody, retryCallbacks);
})
.catch(err => {
for (const callback of callbacks) {
callback(err, null);
}
return done([], []);
});
}
}
}
}
/**
* Creates a bulk helper instance. Once you configure it, you can pick which operation
* to execute with the given dataset, index, create, update, and delete.
* @param {object} options - The configuration of the bulk operation.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run with the datasource.
*/
bulk (options, reqOptions = {}) {
const client = this[kClient]
const { serializer } = client
if (this[kMetaHeader] !== null) {
reqOptions.headers = reqOptions.headers || {}
reqOptions.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=bp'
}
const {
datasource,
onDocument,
flushBytes = 5000000,
flushInterval = 30000,
concurrency = 5,
retries = this.maxRetries,
wait = 5000,
onDrop = noop,
refreshOnCompletion = false,
...bulkOptions
} = options
if (datasource === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'))
}
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'))
}
if (onDocument === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'))
}
let shouldAbort = false
let timeoutRef = null
const stats = {
total: 0,
failed: 0,
retry: 0,
successful: 0,
noop: 0,
time: 0,
bytes: 0,
aborted: false
}
const p = iterate()
const helper = {
get stats () {
return stats
},
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
catch (onRejected) {
return p.catch(onRejected)
},
abort () {
clearTimeout(timeoutRef)
shouldAbort = true
stats.aborted = true
return this
}
}
return helper
/**
* Function that iterates over the given datasource and start a bulk operation as soon
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
* model at this maximum capacity, as it will collect the next body to send while there is
* a running http call. In this way, the CPU time will be used carefully.
* The objects will be serialized right away, to approximate the byte length of the body.
* It creates an array of strings instead of a ndjson string because the bulkOperation
* will navigate the body for matching failed operations with the original document.
* Creates a bulk helper instance. Once you configure it, you can pick which operation
* to execute with the given dataset, index, create, update, and delete.
* @param {object} options - The configuration of the bulk operation.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run with the datasource.
*/
async function iterate () {
const { semaphore, finish } = buildSemaphore()
const startTime = Date.now()
const bulkBody = []
let actionBody = ''
let payloadBody = ''
let chunkBytes = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
for await (const chunk of datasource) {
if (shouldAbort === true) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
actionBody = serializer.serialize(action[0])
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
: serializer.serialize({ doc: chunk, ...action[1] })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
actionBody = serializer.serialize(action)
chunkBytes += Buffer.byteLength(actionBody)
bulkBody.push(actionBody)
} else {
clearTimeout(timeoutRef)
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
bulk(options, reqOptions = {}) {
var _d;
const client = this[kClient];
const { serializer } = client;
if (this[kMetaHeader] !== null) {
reqOptions.headers = (_d = reqOptions.headers) !== null && _d !== void 0 ? _d : {};
reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader]},h=bp`;
}
if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
bulkBody.length = 0
chunkBytes = 0
reqOptions.meta = true;
const { datasource, onDocument, flushBytes = 5000000, flushInterval = 30000, concurrency = 5, retries = this[kMaxRetries], wait = 5000, onDrop = noop, refreshOnCompletion = false, ...bulkOptions } = options;
if (datasource === undefined) {
// @ts-expect-error
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'));
}
}
clearTimeout(timeoutRef)
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (shouldAbort === false && chunkBytes > 0) {
const send = await semaphore()
stats.bytes += chunkBytes
send(bulkBody)
}
await finish()
if (refreshOnCompletion) {
await client.indices.refresh({
index: typeof refreshOnCompletion === 'string'
? refreshOnCompletion
: '_all'
}, reqOptions)
}
stats.time = Date.now() - startTime
stats.total = stats.successful + stats.failed
return stats
async function onFlushTimeout () {
if (chunkBytes === 0) return
stats.bytes += chunkBytes
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
try {
const send = await semaphore()
send(bulkBodyCopy)
} catch (err) {
/* istanbul ignore next */
helper.abort()
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || isReadableStream(datasource) || isAsyncIterator(datasource))) {
// @ts-expect-error
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'));
}
}
}
// This function builds a semaphore using the concurrency
// options of the bulk helper. It is used inside the iterator
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// The semaphore function resolves a send function, which will be used
// to send the actual bulk request.
// It also returns a finish function, which returns a promise that is resolved
// when there are no longer request running. It rejects an error if one
// of the request has failed for some reason.
function buildSemaphore () {
let resolveSemaphore = null
let resolveFinish = null
let rejectFinish = null
let error = null
let running = 0
return { semaphore, finish }
function finish () {
return new Promise((resolve, reject) => {
if (running === 0) {
if (error) {
reject(error)
} else {
resolve()
if (onDocument === undefined) {
// @ts-expect-error
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'));
}
let shouldAbort = false;
let timeoutRef = null;
const stats = {
total: 0,
failed: 0,
retry: 0,
successful: 0,
noop: 0,
time: 0,
bytes: 0,
aborted: false
};
const p = iterate();
const helper = {
[Symbol.toStringTag]: 'Promise',
then(onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected);
},
catch(onRejected) {
return p.catch(onRejected);
},
finally(onFinally) {
return p.finally(onFinally);
},
get stats() {
return stats;
},
abort() {
clearTimeout(timeoutRef);
shouldAbort = true;
stats.aborted = true;
return this;
}
} else {
resolveFinish = resolve
rejectFinish = reject
}
})
}
function semaphore () {
if (running < concurrency) {
running += 1
return pImmediate(send)
} else {
return new Promise((resolve, reject) => {
resolveSemaphore = resolve
})
};
return helper;
/**
* Function that iterates over the given datasource and start a bulk operation as soon
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
* model at this maximum capacity, as it will collect the next body to send while there is
* a running http call. In this way, the CPU time will be used carefully.
* The objects will be serialized right away, to approximate the byte length of the body.
* It creates an array of strings instead of a ndjson string because the bulkOperation
* will navigate the body for matching failed operations with the original document.
*/
async function iterate() {
const { semaphore, finish } = buildSemaphore();
const startTime = Date.now();
const bulkBody = [];
let actionBody = '';
let payloadBody = '';
let chunkBytes = 0;
timeoutRef = setTimeout(onFlushTimeout, flushInterval); // eslint-disable-line
// @ts-expect-error datasoruce is an iterable
for await (const chunk of datasource) {
if (shouldAbort)
break;
timeoutRef.refresh();
const action = onDocument(chunk);
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0];
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action);
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk);
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody);
bulkBody.push(actionBody, payloadBody);
}
else if (operation === 'update') {
// @ts-expect-error in case of update action is an array
actionBody = serializer.serialize(action[0]);
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
// @ts-expect-error in case of update action is an array
: serializer.serialize({ doc: chunk, ...action[1] });
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody);
bulkBody.push(actionBody, payloadBody);
}
else if (operation === 'delete') {
actionBody = serializer.serialize(action);
chunkBytes += Buffer.byteLength(actionBody);
bulkBody.push(actionBody);
}
else {
clearTimeout(timeoutRef);
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`);
}
if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes;
const send = await semaphore();
send(bulkBody.slice());
bulkBody.length = 0;
chunkBytes = 0;
}
}
clearTimeout(timeoutRef);
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (!shouldAbort && chunkBytes > 0) {
const send = await semaphore();
stats.bytes += chunkBytes;
send(bulkBody);
}
await finish();
if (refreshOnCompletion !== false) {
await client.indices.refresh({
index: typeof refreshOnCompletion === 'string'
? refreshOnCompletion
: '_all'
}, reqOptions);
}
stats.time = Date.now() - startTime;
stats.total = stats.successful + stats.failed;
return stats;
async function onFlushTimeout() {
if (chunkBytes === 0)
return;
stats.bytes += chunkBytes;
const bulkBodyCopy = bulkBody.slice();
bulkBody.length = 0;
chunkBytes = 0;
try {
const send = await semaphore();
send(bulkBodyCopy);
}
catch (err) {
/* istanbul ignore next */
helper.abort(); // eslint-disable-line
}
}
}
}
function send (bulkBody) {
/* istanbul ignore if */
if (running > concurrency) {
throw new Error('Max concurrency reached')
}
bulkOperation(bulkBody, err => {
running -= 1
if (err) {
shouldAbort = true
error = err
}
if (resolveSemaphore) {
running += 1
resolveSemaphore(send)
resolveSemaphore = null
} else if (resolveFinish && running === 0) {
if (error) {
rejectFinish(error)
} else {
resolveFinish()
// This function builds a semaphore using the concurrency
// options of the bulk helper. It is used inside the iterator
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// The semaphore function resolves a send function, which will be used
// to send the actual bulk request.
// It also returns a finish function, which returns a promise that is resolved
// when there are no longer request running. It rejects an error if one
// of the request has failed for some reason.
function buildSemaphore() {
let resolveSemaphore = null;
let resolveFinish = null;
let rejectFinish = null;
let error = null;
let running = 0;
return { semaphore, finish };
function finish() {
return new Promise((resolve, reject) => {
if (running === 0) {
if (error !== null) {
reject(error);
}
else {
resolve();
}
}
else {
resolveFinish = resolve;
rejectFinish = reject;
}
});
}
}
})
}
}
function bulkOperation (bulkBody, callback) {
let retryCount = retries
let isRetrying = false
// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryBulk(bulkBody, retryDocuments)
function retryDocuments (err, bulkBody) {
if (err) return callback(err)
if (shouldAbort === true) return callback()
if (bulkBody.length > 0) {
if (retryCount > 0) {
isRetrying = true
retryCount -= 1
stats.retry += bulkBody.length
setTimeout(tryBulk, wait, bulkBody, retryDocuments)
return
}
for (let i = 0, len = bulkBody.length; i < len; i = i + 2) {
const operation = Object.keys(serializer.deserialize(bulkBody[i]))[0]
onDrop({
status: 429,
error: null,
operation: serializer.deserialize(bulkBody[i]),
document: operation !== 'delete'
? serializer.deserialize(bulkBody[i + 1])
/* istanbul ignore next */
: null,
retried: isRetrying
})
stats.failed += 1
}
function semaphore() {
if (running < concurrency) {
running += 1;
return pImmediate(send);
}
else {
return new Promise((resolve, reject) => {
resolveSemaphore = resolve;
});
}
}
function send(bulkBody) {
/* istanbul ignore if */
if (running > concurrency) {
throw new Error('Max concurrency reached');
}
bulkOperation(bulkBody, err => {
running -= 1;
if (err != null) {
shouldAbort = true;
error = err;
}
if (resolveSemaphore !== null) {
running += 1;
resolveSemaphore(send);
resolveSemaphore = null;
}
else if (resolveFinish != null && rejectFinish != null && running === 0) {
if (error != null) {
rejectFinish(error);
}
else {
resolveFinish();
}
}
});
}
}
callback()
}
function tryBulk (bulkBody, callback) {
if (shouldAbort === true) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions, (err, { body }) => {
if (err) return callback(err, null)
if (body.errors === false) {
stats.successful += body.items.length
for (const item of body.items) {
if (item.update && item.update.result === 'noop') {
stats.noop++
}
function bulkOperation(bulkBody, callback) {
let retryCount = retries;
let isRetrying = false;
// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryBulk(bulkBody, retryDocuments);
function retryDocuments(err, bulkBody) {
if (err != null)
return callback(err);
if (shouldAbort)
return callback();
if (bulkBody.length > 0) {
if (retryCount > 0) {
isRetrying = true;
retryCount -= 1;
stats.retry += bulkBody.length;
setTimeout(tryBulk, wait, bulkBody, retryDocuments);
return;
}
for (let i = 0, len = bulkBody.length; i < len; i = i + 2) {
const operation = Object.keys(serializer.deserialize(bulkBody[i]))[0];
onDrop({
status: 429,
error: null,
operation: serializer.deserialize(bulkBody[i]),
document: operation !== 'delete'
? serializer.deserialize(bulkBody[i + 1])
/* istanbul ignore next */
: null,
retried: isRetrying
});
stats.failed += 1;
}
}
callback();
}
return callback(null, [])
}
const retry = []
const { items } = body
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
const { status } = action[operation]
const indexSlice = operation !== 'delete' ? i * 2 : i
if (status >= 400) {
// 429 is the only staus code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
if (status === 429) {
retry.push(bulkBody[indexSlice])
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
}
} else {
onDrop({
status: status,
error: action[operation].error,
operation: serializer.deserialize(bulkBody[indexSlice]),
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
retried: isRetrying
function tryBulk(bulkBody, callback) {
if (shouldAbort)
return callback(null, []);
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions)
.then(response => {
var _d, _e;
const result = response.body;
if (!result.errors) {
stats.successful += result.items.length;
for (const item of result.items) {
if (((_d = item.update) === null || _d === void 0 ? void 0 : _d.result) === 'noop') {
stats.noop++;
}
}
return callback(null, []);
}
const retry = [];
const { items } = result;
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i];
const operation = Object.keys(action)[0];
const responseItem = action[operation];
(0, assert_1.default)(responseItem !== undefined, 'The responseItem is undefined, please file a bug report');
const indexSlice = operation !== 'delete' ? i * 2 : i;
if (responseItem.status >= 400) {
// 429 is the only staus code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice]);
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1]);
}
}
else {
onDrop({
status: responseItem.status,
error: (_e = responseItem.error) !== null && _e !== void 0 ? _e : null,
operation: serializer.deserialize(bulkBody[indexSlice]),
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
retried: isRetrying
});
stats.failed += 1;
}
}
else {
stats.successful += 1;
}
}
callback(null, retry);
})
stats.failed += 1
}
} else {
stats.successful += 1
.catch(err => {
callback(err, []);
});
}
}
callback(null, retry)
})
}
}
}
}
}
exports.default = Helpers;
_a = kClient, _b = kMetaHeader, _c = kMaxRetries;
// Using a getter will improve the overall performances of the code,
// as we will reed the documents only if needed.
function addDocumentsGetter (result) {
Object.defineProperty(result, 'documents', {
get () {
if (this.body.hits && this.body.hits.hits) {
return this.body.hits.hits.map(d => d._source)
}
return []
function addDocumentsGetter(result) {
Object.defineProperty(result, 'documents', {
get() {
var _d;
if (((_d = this.body.hits) === null || _d === void 0 ? void 0 : _d.hits) != null) {
// @ts-expect-error
return this.body.hits.hits.map(d => d._source);
}
return [];
}
});
}
function appendFilterPath(filter, params, force) {
if (params.filter_path !== undefined) {
params.filter_path += ',' + filter; // eslint-disable-line
}
})
else if (force) {
params.filter_path = filter;
}
}
function appendFilterPath (filter, params, force) {
if (params.filter_path !== undefined) {
params.filter_path += ',' + filter
} else if (params.filterPath !== undefined) {
params.filterPath += ',' + filter
} else if (force === true) {
params.filter_path = filter
}
function isReadableStream(obj) {
return obj != null && typeof obj.pipe === 'function';
}
module.exports = Helpers
function isAsyncIterator(obj) {
return (obj === null || obj === void 0 ? void 0 : obj[Symbol.asyncIterator]) != null;
}
//# sourceMappingURL=Helpers.js.map
{
"name": "@elastic/elasticsearch-canary",
"version": "8.0.0-canary.22",
"versionCanary": "8.0.0-canary.22",
"description": "The official Elasticsearch client for Node.js",
"main": "index.js",
"types": "./api/new.d.ts",
"exports": {
".": {
"require": "./index.js",
"import": "./index.mjs"
},
"./": "./"
"types": "index.d.ts",
"scripts": {
"test": "npm run build && npm run lint && tap test/{unit,acceptance}/{*,**/*}.test.ts",
"test:unit": "npm run build && tap test/unit/{*,**/*}.test.ts",
"test:acceptance": "npm run build && tap test/acceptance/*.test.ts",
"test:coverage-100": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage --100",
"test:coverage-report": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage && nyc report --reporter=text-lcov > coverage.lcov",
"test:coverage-ui": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage --coverage-report=html",
"test:integration": "tsc && node test/integration/index.js",
"lint": "ts-standard src",
"lint:fix": "ts-standard --fix src",
"license-checker": "license-checker --production --onlyAllow='MIT;Apache-2.0;Apache1.1;ISC;BSD-3-Clause;BSD-2-Clause;0BSD'",
"prebuild": "npm run clean-build && npm run lint",
"build": "tsc",
"clean-build": "rimraf ./lib && mkdir lib",
"prepublishOnly": "npm run build"
},
"homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html",
"version": "8.0.0-canary.21",
"versionCanary": "8.0.0-canary.21",
"keywords": [

@@ -26,17 +34,2 @@ "elasticsearch",

],
"scripts": {
"test": "npm run lint && tap test/{unit,acceptance}/{*,**/*}.test.js && npm run test:types",
"test:unit": "tap test/unit/{*,**/*}.test.js",
"test:acceptance": "tap test/acceptance/*.test.js",
"test:integration": "node test/integration/index.js",
"test:integration:helpers": "tap test/integration/helpers/*.test.js",
"test:types": "tsd",
"test:coverage-100": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --100 --nyc-arg=\"--exclude=api\"",
"test:coverage-report": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --nyc-arg=\"--exclude=api\" && nyc report --reporter=text-lcov > coverage.lcov",
"test:coverage-ui": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --coverage-report=html --nyc-arg=\"--exclude=api\"",
"lint": "standard",
"lint:fix": "standard --fix",
"license-checker": "license-checker --production --onlyAllow='MIT;Apache-2.0;Apache1.1;ISC;BSD-3-Clause;BSD-2-Clause'",
"build-esm": "npx gen-esm-wrapper . index.mjs && standard --fix index.mjs"
},
"author": {

@@ -46,14 +39,24 @@ "name": "Tomas Della Vedova",

},
"original-author": {
"name": "Spencer Alger",
"company": "Elasticsearch BV"
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/elastic/elasticsearch-js.git"
},
"bugs": {
"url": "https://github.com/elastic/elasticsearch-js/issues"
},
"homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html",
"engines": {
"node": ">=12"
},
"devDependencies": {
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
"@types/node": "^15.3.1",
"convert-hrtime": "^5.0.0",
"@types/debug": "^4.1.6",
"@types/ms": "^0.7.31",
"@types/node": "^16.4.1",
"@types/sinonjs__fake-timers": "^6.0.3",
"@types/split2": "^3.2.1",
"@types/stoppable": "^1.1.1",
"@types/tap": "^15.0.5",
"cross-zip": "^4.0.0",
"dedent": "^0.7.0",
"deepmerge": "^4.2.2",
"dezalgo": "^1.0.3",
"fast-deep-equal": "^3.1.3",

@@ -64,10 +67,9 @@ "into-stream": "^6.0.0",

"minimist": "^1.2.5",
"node-fetch": "^2.6.1",
"ora": "^5.4.0",
"pretty-hrtime": "^1.0.3",
"ms": "^2.1.3",
"node-abort-controller": "^2.0.0",
"node-fetch": "^2.6.2",
"ora": "^5.4.1",
"proxy": "^1.0.2",
"rimraf": "^3.0.2",
"semver": "^7.3.5",
"simple-git": "^2.39.0",
"simple-statistics": "^7.7.0",
"split2": "^3.2.2",

@@ -77,35 +79,20 @@ "standard": "^16.0.3",

"tap": "^15.0.9",
"tsd": "^0.15.1",
"ts-node": "^10.1.0",
"ts-standard": "^10.0.0",
"typescript": "^4.3.5",
"workq": "^3.0.0",
"xmlbuilder2": "^2.4.1"
"xmlbuilder2": "^3.0.2"
},
"dependencies": {
"debug": "^4.3.1",
"hpagent": "^0.1.1",
"ms": "^2.1.3",
"secure-json-parse": "^2.4.0"
"@elastic/transport": "^0.0.6",
"tslib": "^2.3.0"
},
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/elastic/elasticsearch-js.git"
},
"bugs": {
"url": "https://github.com/elastic/elasticsearch-js/issues"
},
"engines": {
"node": ">=12"
},
"tsd": {
"directory": "test/types"
},
"tap": {
"ts": false,
"ts": true,
"jsx": false,
"flow": false,
"coverage": false,
"jobs-auto": true,
"check-coverage": false
},
"commitHash": "3feda5d9"
"commitHash": "1a227459"
}

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc