@elastic/elasticsearch-canary
Advanced tools
Comparing version 8.0.0-canary.21 to 8.0.0-canary.22
343
index.js
@@ -22,342 +22,5 @@ /* | ||
const { EventEmitter } = require('events') | ||
const { URL } = require('url') | ||
const buffer = require('buffer') | ||
const debug = require('debug')('elasticsearch') | ||
const Transport = require('./lib/Transport') | ||
const Connection = require('./lib/Connection') | ||
const { ConnectionPool, CloudConnectionPool } = require('./lib/pool') | ||
const Helpers = require('./lib/Helpers') | ||
const Serializer = require('./lib/Serializer') | ||
const errors = require('./lib/errors') | ||
const { ConfigurationError } = errors | ||
const { prepareHeaders } = Connection.internals | ||
let clientVersion = require('./package.json').version | ||
/* istanbul ignore next */ | ||
if (clientVersion.includes('-')) { | ||
// clean prerelease | ||
clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p' | ||
} | ||
const nodeVersion = process.versions.node | ||
const { errors } = require('@elastic/transport') | ||
const { default: Client } = require('./lib/Client') | ||
const kInitialOptions = Symbol('elasticsearchjs-initial-options') | ||
const kChild = Symbol('elasticsearchjs-child') | ||
const kExtensions = Symbol('elasticsearchjs-extensions') | ||
const kEventEmitter = Symbol('elasticsearchjs-event-emitter') | ||
const ESAPI = require('./api') | ||
class Client extends ESAPI { | ||
constructor (opts = {}) { | ||
super({ ConfigurationError }) | ||
if (opts.cloud && opts[kChild] === undefined) { | ||
const { id, username, password } = opts.cloud | ||
// the cloud id is `cluster-name:base64encodedurl` | ||
// the url is a string divided by two '$', the first is the cloud url | ||
// the second the elasticsearch instance, the third the kibana instance | ||
const cloudUrls = Buffer.from(id.split(':')[1], 'base64').toString().split('$') | ||
// TODO: remove username and password here in 8 | ||
if (username && password) { | ||
opts.auth = Object.assign({}, opts.auth, { username, password }) | ||
} | ||
opts.node = `https://${cloudUrls[1]}.${cloudUrls[0]}` | ||
// Cloud has better performances with compression enabled | ||
// see https://github.com/elastic/elasticsearch-py/pull/704. | ||
// So unless the user specifies otherwise, we enable compression. | ||
if (opts.compression == null) opts.compression = 'gzip' | ||
if (opts.suggestCompression == null) opts.suggestCompression = true | ||
if (opts.ssl == null || | ||
(opts.ssl && opts.ssl.secureProtocol == null)) { | ||
opts.ssl = opts.ssl || {} | ||
opts.ssl.secureProtocol = 'TLSv1_2_method' | ||
} | ||
} | ||
if (!opts.node && !opts.nodes) { | ||
throw new ConfigurationError('Missing node(s) option') | ||
} | ||
if (opts[kChild] === undefined) { | ||
const checkAuth = getAuth(opts.node || opts.nodes) | ||
if (checkAuth && checkAuth.username && checkAuth.password) { | ||
opts.auth = Object.assign({}, opts.auth, { username: checkAuth.username, password: checkAuth.password }) | ||
} | ||
} | ||
const options = opts[kChild] !== undefined | ||
? opts[kChild].initialOptions | ||
: Object.assign({}, { | ||
Connection, | ||
Transport, | ||
Serializer, | ||
ConnectionPool: opts.cloud ? CloudConnectionPool : ConnectionPool, | ||
maxRetries: 3, | ||
requestTimeout: 30000, | ||
pingTimeout: 3000, | ||
sniffInterval: false, | ||
sniffOnStart: false, | ||
sniffEndpoint: '_nodes/_all/http', | ||
sniffOnConnectionFault: false, | ||
resurrectStrategy: 'ping', | ||
suggestCompression: false, | ||
compression: false, | ||
ssl: null, | ||
caFingerprint: null, | ||
agent: null, | ||
headers: {}, | ||
nodeFilter: null, | ||
nodeSelector: 'round-robin', | ||
generateRequestId: null, | ||
name: 'elasticsearch-js', | ||
auth: null, | ||
opaqueIdPrefix: null, | ||
context: null, | ||
proxy: null, | ||
enableMetaHeader: true, | ||
disablePrototypePoisoningProtection: false, | ||
maxResponseSize: null, | ||
maxCompressedResponseSize: null | ||
}, opts) | ||
if (options.maxResponseSize !== null && options.maxResponseSize > buffer.constants.MAX_STRING_LENGTH) { | ||
throw new ConfigurationError(`The maxResponseSize cannot be bigger than ${buffer.constants.MAX_STRING_LENGTH}`) | ||
} | ||
if (options.maxCompressedResponseSize !== null && options.maxCompressedResponseSize > buffer.constants.MAX_LENGTH) { | ||
throw new ConfigurationError(`The maxCompressedResponseSize cannot be bigger than ${buffer.constants.MAX_LENGTH}`) | ||
} | ||
if (options.caFingerprint !== null && isHttpConnection(opts.node || opts.nodes)) { | ||
throw new ConfigurationError('You can\'t configure the caFingerprint with a http connection') | ||
} | ||
if (process.env.ELASTIC_CLIENT_APIVERSIONING === 'true') { | ||
options.headers = Object.assign({ accept: 'application/vnd.elasticsearch+json; compatible-with=7' }, options.headers) | ||
} | ||
this[kInitialOptions] = options | ||
this[kExtensions] = [] | ||
this.name = options.name | ||
if (options.enableMetaHeader) { | ||
options.headers['x-elastic-client-meta'] = `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion}` | ||
} | ||
if (opts[kChild] !== undefined) { | ||
this.serializer = options[kChild].serializer | ||
this.connectionPool = options[kChild].connectionPool | ||
this[kEventEmitter] = options[kChild].eventEmitter | ||
} else { | ||
this[kEventEmitter] = new EventEmitter() | ||
this.serializer = new options.Serializer({ | ||
disablePrototypePoisoningProtection: options.disablePrototypePoisoningProtection | ||
}) | ||
this.connectionPool = new options.ConnectionPool({ | ||
pingTimeout: options.pingTimeout, | ||
resurrectStrategy: options.resurrectStrategy, | ||
ssl: options.ssl, | ||
agent: options.agent, | ||
proxy: options.proxy, | ||
Connection: options.Connection, | ||
auth: options.auth, | ||
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]), | ||
caFingerprint: options.caFingerprint, | ||
sniffEnabled: options.sniffInterval !== false || | ||
options.sniffOnStart !== false || | ||
options.sniffOnConnectionFault !== false | ||
}) | ||
// Add the connections before initialize the Transport | ||
this.connectionPool.addConnection(options.node || options.nodes) | ||
} | ||
this.transport = new options.Transport({ | ||
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]), | ||
connectionPool: this.connectionPool, | ||
serializer: this.serializer, | ||
maxRetries: options.maxRetries, | ||
requestTimeout: options.requestTimeout, | ||
sniffInterval: options.sniffInterval, | ||
sniffOnStart: options.sniffOnStart, | ||
sniffOnConnectionFault: options.sniffOnConnectionFault, | ||
sniffEndpoint: options.sniffEndpoint, | ||
suggestCompression: options.suggestCompression, | ||
compression: options.compression, | ||
headers: options.headers, | ||
nodeFilter: options.nodeFilter, | ||
nodeSelector: options.nodeSelector, | ||
generateRequestId: options.generateRequestId, | ||
name: options.name, | ||
opaqueIdPrefix: options.opaqueIdPrefix, | ||
context: options.context, | ||
maxResponseSize: options.maxResponseSize, | ||
maxCompressedResponseSize: options.maxCompressedResponseSize | ||
}) | ||
this.helpers = new Helpers({ | ||
client: this, | ||
maxRetries: options.maxRetries, | ||
metaHeader: options.enableMetaHeader | ||
? `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion}` | ||
: null | ||
}) | ||
} | ||
get emit () { | ||
return this[kEventEmitter].emit.bind(this[kEventEmitter]) | ||
} | ||
get on () { | ||
return this[kEventEmitter].on.bind(this[kEventEmitter]) | ||
} | ||
get once () { | ||
return this[kEventEmitter].once.bind(this[kEventEmitter]) | ||
} | ||
get off () { | ||
return this[kEventEmitter].off.bind(this[kEventEmitter]) | ||
} | ||
extend (name, opts, fn) { | ||
if (typeof opts === 'function') { | ||
fn = opts | ||
opts = {} | ||
} | ||
let [namespace, method] = name.split('.') | ||
if (method == null) { | ||
method = namespace | ||
namespace = null | ||
} | ||
if (namespace != null) { | ||
if (this[namespace] != null && this[namespace][method] != null && opts.force !== true) { | ||
throw new Error(`The method "${method}" already exists on namespace "${namespace}"`) | ||
} | ||
if (this[namespace] == null) this[namespace] = {} | ||
this[namespace][method] = fn({ | ||
makeRequest: this.transport.request.bind(this.transport), | ||
result: { body: null, statusCode: null, headers: null, warnings: null }, | ||
ConfigurationError | ||
}) | ||
} else { | ||
if (this[method] != null && opts.force !== true) { | ||
throw new Error(`The method "${method}" already exists`) | ||
} | ||
this[method] = fn({ | ||
makeRequest: this.transport.request.bind(this.transport), | ||
result: { body: null, statusCode: null, headers: null, warnings: null }, | ||
ConfigurationError | ||
}) | ||
} | ||
this[kExtensions].push({ name, opts, fn }) | ||
} | ||
child (opts) { | ||
// Merge the new options with the initial ones | ||
const options = Object.assign({}, this[kInitialOptions], opts) | ||
// Pass to the child client the parent instances that cannot be overriden | ||
options[kChild] = { | ||
connectionPool: this.connectionPool, | ||
serializer: this.serializer, | ||
eventEmitter: this[kEventEmitter], | ||
initialOptions: options | ||
} | ||
/* istanbul ignore else */ | ||
if (options.auth !== undefined) { | ||
options.headers = prepareHeaders(options.headers, options.auth) | ||
} | ||
const client = new Client(options) | ||
// sync product check | ||
const tSymbol = Object.getOwnPropertySymbols(this.transport) | ||
.filter(symbol => symbol.description === 'product check')[0] | ||
client.transport[tSymbol] = this.transport[tSymbol] | ||
// Add parent extensions | ||
if (this[kExtensions].length > 0) { | ||
this[kExtensions].forEach(({ name, opts, fn }) => { | ||
client.extend(name, opts, fn) | ||
}) | ||
} | ||
return client | ||
} | ||
close (callback) { | ||
if (callback == null) { | ||
return new Promise((resolve, reject) => { | ||
this.close(resolve) | ||
}) | ||
} | ||
debug('Closing the client') | ||
this.connectionPool.empty(callback) | ||
} | ||
} | ||
function getAuth (node) { | ||
if (Array.isArray(node)) { | ||
for (const url of node) { | ||
const auth = getUsernameAndPassword(url) | ||
if (auth.username !== '' && auth.password !== '') { | ||
return auth | ||
} | ||
} | ||
return null | ||
} | ||
const auth = getUsernameAndPassword(node) | ||
if (auth.username !== '' && auth.password !== '') { | ||
return auth | ||
} | ||
return null | ||
function getUsernameAndPassword (node) { | ||
/* istanbul ignore else */ | ||
if (typeof node === 'string') { | ||
const { username, password } = new URL(node) | ||
return { | ||
username: decodeURIComponent(username), | ||
password: decodeURIComponent(password) | ||
} | ||
} else if (node.url instanceof URL) { | ||
return { | ||
username: decodeURIComponent(node.url.username), | ||
password: decodeURIComponent(node.url.password) | ||
} | ||
} | ||
} | ||
} | ||
function isHttpConnection (node) { | ||
if (Array.isArray(node)) { | ||
return node.some((n) => (typeof n === 'string' ? new URL(n).protocol : n.url.protocol) === 'http:') | ||
} else { | ||
return (typeof node === 'string' ? new URL(node).protocol : node.url.protocol) === 'http:' | ||
} | ||
} | ||
const events = { | ||
RESPONSE: 'response', | ||
REQUEST: 'request', | ||
SNIFF: 'sniff', | ||
RESURRECT: 'resurrect', | ||
SERIALIZATION: 'serialization', | ||
DESERIALIZATION: 'deserialization' | ||
} | ||
module.exports = { | ||
Client, | ||
Transport, | ||
ConnectionPool, | ||
Connection, | ||
Serializer, | ||
events, | ||
errors | ||
} | ||
module.exports = { Client, errors } |
@@ -1,124 +0,148 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
import { Readable as ReadableStream } from 'stream' | ||
import { TransportRequestOptions, ApiError, ApiResponse, RequestBody, Context } from './Transport' | ||
import { Search, Msearch, Bulk } from '../api/requestParams' | ||
export default class Helpers { | ||
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]> | ||
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>> | ||
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument> | ||
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper | ||
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>, reqOptions?: TransportRequestOptions): BulkHelper<BulkStats> | ||
/// <reference types="node" /> | ||
import { Readable } from 'stream'; | ||
import { TransportResult, TransportRequestOptions } from '@elastic/transport'; | ||
import Client from './Client'; | ||
import * as T from './api/types'; | ||
export interface HelpersOptions { | ||
client: Client; | ||
metaHeader: string | null; | ||
maxRetries: number; | ||
} | ||
export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> { | ||
clear: () => Promise<void> | ||
documents: TDocument[] | ||
export interface ScrollSearchOptions extends TransportRequestOptions { | ||
wait?: number; | ||
} | ||
export interface BulkHelper<T> extends Promise<T> { | ||
abort: () => BulkHelper<T> | ||
readonly stats: BulkStats | ||
export interface ScrollSearchResponse<TDocument> extends TransportResult<T.SearchResponse<TDocument>, unknown> { | ||
clear: () => Promise<void>; | ||
documents: TDocument[]; | ||
} | ||
export interface MsearchHelperOptions extends T.MsearchRequest { | ||
operations?: number; | ||
flushInterval?: number; | ||
concurrency?: number; | ||
retries?: number; | ||
wait?: number; | ||
} | ||
export interface MsearchHelper extends Promise<void> { | ||
stop: (error?: Error | null) => void; | ||
search: <TDocument = unknown>(header: T.MsearchHeader, body: T.MsearchBody) => Promise<MsearchHelperResponse<TDocument>>; | ||
} | ||
export interface MsearchHelperResponse<TDocument> { | ||
body: T.SearchResponse<TDocument>; | ||
documents: TDocument[]; | ||
status: number; | ||
responses: T.MsearchResponse; | ||
} | ||
export interface BulkStats { | ||
total: number | ||
failed: number | ||
retry: number | ||
successful: number | ||
noop: number | ||
time: number | ||
bytes: number | ||
aborted: boolean | ||
total: number; | ||
failed: number; | ||
retry: number; | ||
successful: number; | ||
noop: number; | ||
time: number; | ||
bytes: number; | ||
aborted: boolean; | ||
} | ||
interface IndexAction { | ||
index: { | ||
_index: string | ||
[key: string]: any | ||
} | ||
index: T.BulkIndexOperation; | ||
} | ||
interface CreateAction { | ||
create: { | ||
_index: string | ||
[key: string]: any | ||
} | ||
create: T.BulkCreateOperation; | ||
} | ||
interface UpdateActionOperation { | ||
update: { | ||
_index: string | ||
[key: string]: any | ||
} | ||
update: T.BulkUpdateOperation; | ||
} | ||
interface DeleteAction { | ||
delete: { | ||
_index: string | ||
[key: string]: any | ||
} | ||
delete: T.BulkDeleteOperation; | ||
} | ||
type UpdateAction = [UpdateActionOperation, Record<string, any>] | ||
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction | ||
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>> | ||
export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body'> { | ||
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument> | ||
onDocument: (doc: TDocument) => Action | ||
flushBytes?: number | ||
flushInterval?: number | ||
concurrency?: number | ||
retries?: number | ||
wait?: number | ||
onDrop?: (doc: OnDropDocument<TDocument>) => void | ||
refreshOnCompletion?: boolean | string | ||
} | ||
declare type UpdateAction = [UpdateActionOperation, Record<string, any>]; | ||
declare type Action = IndexAction | CreateAction | UpdateAction | DeleteAction; | ||
export interface OnDropDocument<TDocument = unknown> { | ||
status: number | ||
error: { | ||
type: string, | ||
reason: string, | ||
caused_by: { | ||
type: string, | ||
reason: string | ||
} | ||
} | ||
document: TDocument | ||
retried: boolean | ||
status: number; | ||
operation: Action; | ||
error: T.ErrorCause | null; | ||
document: TDocument; | ||
retried: boolean; | ||
} | ||
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> { | ||
operations?: number | ||
flushInterval?: number | ||
concurrency?: number | ||
retries?: number | ||
wait?: number | ||
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest { | ||
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>; | ||
onDocument: (doc: TDocument) => Action; | ||
flushBytes?: number; | ||
flushInterval?: number; | ||
concurrency?: number; | ||
retries?: number; | ||
wait?: number; | ||
onDrop?: (doc: OnDropDocument<TDocument>) => void; | ||
refreshOnCompletion?: boolean | string; | ||
} | ||
declare type callbackFn<Response, Context> = (err: ApiError, result: ApiResponse<Response, Context>) => void; | ||
export interface MsearchHelper extends Promise<void> { | ||
stop(error?: Error): void | ||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>> | ||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void | ||
export interface BulkHelper<T> extends Promise<BulkStats> { | ||
abort: () => BulkHelper<T>; | ||
readonly stats: BulkStats; | ||
} | ||
declare const kClient: unique symbol; | ||
declare const kMetaHeader: unique symbol; | ||
declare const kMaxRetries: unique symbol; | ||
export default class Helpers { | ||
[kClient]: Client; | ||
[kMetaHeader]: string | null; | ||
[kMaxRetries]: number; | ||
constructor(opts: HelpersOptions); | ||
/** | ||
* Runs a search operation. The only difference between client.search and this utility, | ||
* is that we are only returning the hits to the user and not the full ES response. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {array} The documents that matched the request. | ||
*/ | ||
search<TDocument = unknown>(params: T.SearchRequest, options?: TransportRequestOptions): Promise<TDocument[]>; | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the results of a given search. | ||
* ```js | ||
* for await (const result of client.helpers.scrollSearch({ params })) { | ||
* console.log(result) | ||
* } | ||
* ``` | ||
* Each result represents the entire body of a single scroll search request, | ||
* if you just need to scroll the results, use scrollDocuments. | ||
* This function handles automatically retries on 429 status code. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
scrollSearch<TDocument = unknown>(params: T.SearchRequest, options?: ScrollSearchOptions): AsyncIterable<ScrollSearchResponse<TDocument>>; | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the documents of a given search. | ||
* ```js | ||
* for await (const document of client.helpers.scrollSearch({ params })) { | ||
* console.log(document) | ||
* } | ||
* ``` | ||
* Each document is what you will find by running a scrollSearch and iterating on the hits array. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
scrollDocuments<TDocument = unknown>(params: T.SearchRequest, options?: ScrollSearchOptions): AsyncIterable<TDocument>; | ||
/** | ||
* Creates a msearch helper instance. Once you configure it, you can use the provided | ||
* `search` method to add new searches in the queue. | ||
* @param {object} options - The configuration of the msearch operations. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run. | ||
*/ | ||
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper; | ||
/** | ||
* Creates a bulk helper instance. Once you configure it, you can pick which operation | ||
* to execute with the given dataset, index, create, update, and delete. | ||
* @param {object} options - The configuration of the bulk operation. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run with the datasource. | ||
*/ | ||
bulk<TDocument = unknown>(options: BulkHelperOptions, reqOptions?: TransportRequestOptions): BulkHelper<TDocument>; | ||
} | ||
export {}; |
1431
lib/Helpers.js
@@ -0,1 +1,2 @@ | ||
"use strict"; | ||
/* | ||
@@ -6,3 +7,3 @@ * Licensed to Elasticsearch B.V. under one or more contributor | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* the Apache License, Version 2.0 (the "License") you may | ||
* not use this file except in compliance with the License. | ||
@@ -20,753 +21,753 @@ * You may obtain a copy of the License at | ||
*/ | ||
'use strict' | ||
/* eslint camelcase: 0 */ | ||
const { Readable } = require('stream') | ||
const { promisify } = require('util') | ||
const { ResponseError, ConfigurationError } = require('./errors') | ||
const pImmediate = promisify(setImmediate) | ||
const sleep = promisify(setTimeout) | ||
const kClient = Symbol('elasticsearch-client') | ||
const kMetaHeader = Symbol('meta header') | ||
var _a, _b, _c; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
/* eslint-disable @typescript-eslint/naming-convention */ | ||
/* eslint-disable @typescript-eslint/promise-function-async */ | ||
const assert_1 = (0, tslib_1.__importDefault)(require("assert")); | ||
const util_1 = require("util"); | ||
const stream_1 = require("stream"); | ||
const transport_1 = require("@elastic/transport"); | ||
const { ResponseError, ConfigurationError } = transport_1.errors; | ||
const sleep = (0, util_1.promisify)(setTimeout); | ||
const pImmediate = (0, util_1.promisify)(setImmediate); | ||
/* istanbul ignore next */ | ||
const noop = () => {} | ||
const noop = () => { }; | ||
const kClient = Symbol('elasticsearch-client'); | ||
const kMetaHeader = Symbol('meta header'); | ||
const kMaxRetries = Symbol('max retries'); | ||
class Helpers { | ||
constructor (opts) { | ||
this[kClient] = opts.client | ||
this[kMetaHeader] = opts.metaHeader | ||
this.maxRetries = opts.maxRetries | ||
} | ||
/** | ||
* Runs a search operation. The only difference between client.search and this utility, | ||
* is that we are only returning the hits to the user and not the full ES response. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {array} The documents that matched the request. | ||
*/ | ||
async search (params, options) { | ||
appendFilterPath('hits.hits._source', params, true) | ||
const { body } = await this[kClient].search(params, options) | ||
if (body.hits && body.hits.hits) { | ||
return body.hits.hits.map(d => d._source) | ||
constructor(opts) { | ||
Object.defineProperty(this, _a, { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: void 0 | ||
}); | ||
Object.defineProperty(this, _b, { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: void 0 | ||
}); | ||
Object.defineProperty(this, _c, { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: void 0 | ||
}); | ||
this[kClient] = opts.client; | ||
this[kMetaHeader] = opts.metaHeader; | ||
this[kMaxRetries] = opts.maxRetries; | ||
} | ||
return [] | ||
} | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the results of a given search. | ||
* ```js | ||
* for await (const result of client.helpers.scrollSearch({ params })) { | ||
* console.log(result) | ||
* } | ||
* ``` | ||
* Each result represents the entire body of a single scroll search request, | ||
* if you just need to scroll the results, use scrollDocuments. | ||
* This function handles automatically retries on 429 status code. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
async * scrollSearch (params, options = {}) { | ||
if (this[kMetaHeader] !== null) { | ||
options.headers = options.headers || {} | ||
options.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=s' | ||
/** | ||
* Runs a search operation. The only difference between client.search and this utility, | ||
* is that we are only returning the hits to the user and not the full ES response. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {array} The documents that matched the request. | ||
*/ | ||
async search(params, options = {}) { | ||
var _d; | ||
appendFilterPath('hits.hits._source', params, true); | ||
options.meta = true; | ||
const { body: result } = await this[kClient].search(params, options); | ||
if (((_d = result.hits) === null || _d === void 0 ? void 0 : _d.hits) != null) { | ||
return result.hits.hits.map(d => d._source); | ||
} | ||
return []; | ||
} | ||
// TODO: study scroll search slices | ||
const wait = options.wait || 5000 | ||
const maxRetries = options.maxRetries || this.maxRetries | ||
if (Array.isArray(options.ignore)) { | ||
options.ignore.push(429) | ||
} else { | ||
options.ignore = [429] | ||
} | ||
params.scroll = params.scroll || '1m' | ||
appendFilterPath('_scroll_id', params, false) | ||
const { method, body, index, ...querystring } = params | ||
let response = null | ||
for (let i = 0; i <= maxRetries; i++) { | ||
response = await this[kClient].search(params, options) | ||
if (response.statusCode !== 429) break | ||
await sleep(wait) | ||
} | ||
if (response.statusCode === 429) { | ||
throw new ResponseError(response) | ||
} | ||
let scroll_id = response.body._scroll_id | ||
let stop = false | ||
const clear = async () => { | ||
stop = true | ||
await this[kClient].clearScroll( | ||
{ body: { scroll_id } }, | ||
{ ignore: [400], ...options } | ||
) | ||
} | ||
while (response.body.hits && response.body.hits.hits.length > 0) { | ||
// scroll id is always present in the response, but it might | ||
// change over time based on the number of shards | ||
scroll_id = response.body._scroll_id | ||
response.clear = clear | ||
addDocumentsGetter(response) | ||
yield response | ||
if (stop === true) { | ||
break | ||
} | ||
for (let i = 0; i <= maxRetries; i++) { | ||
response = await this[kClient].scroll({ | ||
scroll: querystring.scroll, | ||
rest_total_hits_as_int: querystring.rest_total_hits_as_int || querystring.restTotalHitsAsInt, | ||
body: { scroll_id } | ||
}, options) | ||
if (response.statusCode !== 429) break | ||
await sleep(wait) | ||
} | ||
if (response.statusCode === 429) { | ||
throw new ResponseError(response) | ||
} | ||
} | ||
if (stop === false) { | ||
await clear() | ||
} | ||
} | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the documents of a given search. | ||
* ```js | ||
* for await (const document of client.helpers.scrollSearch({ params })) { | ||
* console.log(document) | ||
* } | ||
* ``` | ||
* Each document is what you will find by running a scrollSearch and iterating on the hits array. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
async * scrollDocuments (params, options) { | ||
appendFilterPath('hits.hits._source', params, true) | ||
for await (const { documents } of this.scrollSearch(params, options)) { | ||
for (const document of documents) { | ||
yield document | ||
} | ||
} | ||
} | ||
/** | ||
* Creates a msearch helper instance. Once you configure it, you can use the provided | ||
* `search` method to add new searches in the queue. | ||
* @param {object} options - The configuration of the msearch operations. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run. | ||
*/ | ||
msearch (options = {}, reqOptions = {}) { | ||
const client = this[kClient] | ||
const { | ||
operations = 5, | ||
concurrency = 5, | ||
flushInterval = 500, | ||
retries = this.maxRetries, | ||
wait = 5000, | ||
...msearchOptions | ||
} = options | ||
let stopReading = false | ||
let stopError = null | ||
let timeoutRef = null | ||
const operationsStream = new Readable({ | ||
objectMode: true, | ||
read (size) {} | ||
}) | ||
const p = iterate() | ||
const helper = { | ||
then (onFulfilled, onRejected) { | ||
return p.then(onFulfilled, onRejected) | ||
}, | ||
catch (onRejected) { | ||
return p.catch(onRejected) | ||
}, | ||
stop (error = null) { | ||
if (stopReading === true) return | ||
stopReading = true | ||
stopError = error | ||
operationsStream.push(null) | ||
}, | ||
// TODO: support abort a single search? | ||
// NOTE: the validation checks are synchronous and the callback/promise will | ||
// be resolved in the same tick. We might want to fix this in the future. | ||
search (header, body, callback) { | ||
if (stopReading === true) { | ||
const error = stopError === null | ||
? new ConfigurationError('The msearch processor has been stopped') | ||
: stopError | ||
return callback ? callback(error, {}) : Promise.reject(error) | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the results of a given search. | ||
* ```js | ||
* for await (const result of client.helpers.scrollSearch({ params })) { | ||
* console.log(result) | ||
* } | ||
* ``` | ||
* Each result represents the entire body of a single scroll search request, | ||
* if you just need to scroll the results, use scrollDocuments. | ||
* This function handles automatically retries on 429 status code. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
async *scrollSearch(params, options = {}) { | ||
var _d, _e, _f, _g; | ||
options.meta = true; | ||
if (this[kMetaHeader] !== null) { | ||
options.headers = (_d = options.headers) !== null && _d !== void 0 ? _d : {}; | ||
options.headers['x-elastic-client-meta'] = `${this[kMetaHeader]},h=s`; | ||
} | ||
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) { | ||
const error = new ConfigurationError('The header should be an object') | ||
return callback ? callback(error, {}) : Promise.reject(error) | ||
const wait = (_e = options.wait) !== null && _e !== void 0 ? _e : 5000; | ||
const maxRetries = (_f = options.maxRetries) !== null && _f !== void 0 ? _f : this[kMaxRetries]; | ||
if (Array.isArray(options.ignore)) { | ||
options.ignore.push(429); | ||
} | ||
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) { | ||
const error = new ConfigurationError('The body should be an object') | ||
return callback ? callback(error, {}) : Promise.reject(error) | ||
else { | ||
options.ignore = [429]; | ||
} | ||
let promise = null | ||
if (callback === undefined) { | ||
let onFulfilled = null | ||
let onRejected = null | ||
promise = new Promise((resolve, reject) => { | ||
onFulfilled = resolve | ||
onRejected = reject | ||
}) | ||
callback = function callback (err, result) { | ||
err ? onRejected(err) : onFulfilled(result) | ||
} | ||
params.scroll = (_g = params.scroll) !== null && _g !== void 0 ? _g : '1m'; | ||
appendFilterPath('_scroll_id', params, false); | ||
let response; | ||
for (let i = 0; i <= maxRetries; i++) { | ||
response = await this[kClient].search(params, options); | ||
if (response.statusCode !== 429) | ||
break; | ||
await sleep(wait); | ||
} | ||
operationsStream.push([header, body, callback]) | ||
if (promise !== null) { | ||
return promise | ||
(0, assert_1.default)(response !== undefined, 'The response is undefined, please file a bug report'); | ||
if (response.statusCode === 429) { | ||
// @ts-expect-error | ||
throw new ResponseError(response); | ||
} | ||
} | ||
} | ||
return helper | ||
async function iterate () { | ||
const { semaphore, finish } = buildSemaphore() | ||
const msearchBody = [] | ||
const callbacks = [] | ||
let loadedOperations = 0 | ||
timeoutRef = setTimeout(onFlushTimeout, flushInterval) | ||
for await (const operation of operationsStream) { | ||
timeoutRef.refresh() | ||
loadedOperations += 1 | ||
msearchBody.push(operation[0], operation[1]) | ||
callbacks.push(operation[2]) | ||
if (loadedOperations >= operations) { | ||
const send = await semaphore() | ||
send(msearchBody.slice(), callbacks.slice()) | ||
msearchBody.length = 0 | ||
callbacks.length = 0 | ||
loadedOperations = 0 | ||
let scroll_id = response.body._scroll_id; | ||
let stop = false; | ||
const clear = async () => { | ||
stop = true; | ||
await this[kClient].clearScroll({ scroll_id }, { ignore: [400], ...options }); | ||
}; | ||
while (response.body.hits != null && response.body.hits.hits.length > 0) { | ||
// scroll id is always present in the response, but it might | ||
// change over time based on the number of shards | ||
scroll_id = response.body._scroll_id; | ||
// @ts-expect-error | ||
response.clear = clear; | ||
addDocumentsGetter(response); | ||
// @ts-expect-error | ||
yield response; | ||
if (stop) { | ||
break; | ||
} | ||
for (let i = 0; i <= maxRetries; i++) { | ||
const r = await this[kClient].scroll({ | ||
scroll: params.scroll, | ||
rest_total_hits_as_int: params.rest_total_hits_as_int, | ||
scroll_id | ||
}, options); | ||
response = r; | ||
(0, assert_1.default)(response !== undefined, 'The response is undefined, please file a bug report'); | ||
if (response.statusCode !== 429) | ||
break; | ||
await sleep(wait); | ||
} | ||
if (response.statusCode === 429) { | ||
// @ts-expect-error | ||
throw new ResponseError(response); | ||
} | ||
} | ||
} | ||
clearTimeout(timeoutRef) | ||
// In some cases the previos http call does not have finished, | ||
// or we didn't reach the flush bytes threshold, so we force one last operation. | ||
if (loadedOperations > 0) { | ||
const send = await semaphore() | ||
send(msearchBody, callbacks) | ||
} | ||
await finish() | ||
if (stopError !== null) { | ||
throw stopError | ||
} | ||
async function onFlushTimeout () { | ||
if (loadedOperations === 0) return | ||
const msearchBodyCopy = msearchBody.slice() | ||
const callbacksCopy = callbacks.slice() | ||
msearchBody.length = 0 | ||
callbacks.length = 0 | ||
loadedOperations = 0 | ||
try { | ||
const send = await semaphore() | ||
send(msearchBodyCopy, callbacksCopy) | ||
} catch (err) { | ||
/* istanbul ignore next */ | ||
helper.stop(err) | ||
if (!stop) { | ||
await clear(); | ||
} | ||
} | ||
} | ||
// This function builds a semaphore using the concurrency | ||
// options of the msearch helper. It is used inside the iterator | ||
// to guarantee that no more than the number of operations | ||
// allowed to run at the same time are executed. | ||
// It returns a semaphore function which resolves in the next tick | ||
// if we didn't reach the maximim concurrency yet, otherwise it returns | ||
// a promise that resolves as soon as one of the running request has finshed. | ||
// The semaphore function resolves a send function, which will be used | ||
// to send the actual msearch request. | ||
// It also returns a finish function, which returns a promise that is resolved | ||
// when there are no longer request running. | ||
function buildSemaphore () { | ||
let resolveSemaphore = null | ||
let resolveFinish = null | ||
let running = 0 | ||
return { semaphore, finish } | ||
function finish () { | ||
return new Promise((resolve, reject) => { | ||
if (running === 0) { | ||
resolve() | ||
} else { | ||
resolveFinish = resolve | ||
} | ||
}) | ||
} | ||
function semaphore () { | ||
if (running < concurrency) { | ||
running += 1 | ||
return pImmediate(send) | ||
} else { | ||
return new Promise((resolve, reject) => { | ||
resolveSemaphore = resolve | ||
}) | ||
/** | ||
* Runs a scroll search operation. This function returns an async iterator, allowing | ||
* the user to use a for await loop to get all the documents of a given search. | ||
* ```js | ||
* for await (const document of client.helpers.scrollSearch({ params })) { | ||
* console.log(document) | ||
* } | ||
* ``` | ||
* Each document is what you will find by running a scrollSearch and iterating on the hits array. | ||
* This helper automatically adds `filter_path=hits.hits._source` to the querystring, | ||
* as it will only need the documents source. | ||
* @param {object} params - The Elasticsearch's search parameters. | ||
* @param {object} options - The client optional configuration for this request. | ||
* @return {iterator} the async iterator | ||
*/ | ||
async *scrollDocuments(params, options = {}) { | ||
appendFilterPath('hits.hits._source', params, true); | ||
for await (const { documents } of this.scrollSearch(params, options)) { | ||
for (const document of documents) { | ||
yield document; | ||
} | ||
} | ||
} | ||
function send (msearchBody, callbacks) { | ||
/* istanbul ignore if */ | ||
if (running > concurrency) { | ||
throw new Error('Max concurrency reached') | ||
} | ||
msearchOperation(msearchBody, callbacks, () => { | ||
running -= 1 | ||
if (resolveSemaphore) { | ||
running += 1 | ||
resolveSemaphore(send) | ||
resolveSemaphore = null | ||
} else if (resolveFinish && running === 0) { | ||
resolveFinish() | ||
} | ||
}) | ||
} | ||
} | ||
function msearchOperation (msearchBody, callbacks, done) { | ||
let retryCount = retries | ||
// Instead of going full on async-await, which would make the code easier to read, | ||
// we have decided to use callback style instead. | ||
// This because every time we use async await, V8 will create multiple promises | ||
// behind the scenes, making the code slightly slower. | ||
tryMsearch(msearchBody, callbacks, retrySearch) | ||
function retrySearch (msearchBody, callbacks) { | ||
if (msearchBody.length > 0 && retryCount > 0) { | ||
retryCount -= 1 | ||
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch) | ||
return | ||
/** | ||
* Creates a msearch helper instance. Once you configure it, you can use the provided | ||
* `search` method to add new searches in the queue. | ||
* @param {object} options - The configuration of the msearch operations. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run. | ||
*/ | ||
msearch(options = {}, reqOptions = {}) { | ||
const client = this[kClient]; | ||
const { operations = 5, concurrency = 5, flushInterval = 500, retries = this[kMaxRetries], wait = 5000, ...msearchOptions } = options; | ||
reqOptions.meta = true; | ||
let stopReading = false; | ||
let stopError = null; | ||
let timeoutRef = null; | ||
const operationsStream = new stream_1.Readable({ | ||
objectMode: true, | ||
read(size) { } | ||
}); | ||
const p = iterate(); | ||
const helper = { | ||
[Symbol.toStringTag]: 'Promise', | ||
then(onFulfilled, onRejected) { | ||
return p.then(onFulfilled, onRejected); | ||
}, | ||
catch(onRejected) { | ||
return p.catch(onRejected); | ||
}, | ||
finally(onFinally) { | ||
return p.finally(onFinally); | ||
}, | ||
stop(error = null) { | ||
if (stopReading) | ||
return; | ||
stopReading = true; | ||
stopError = error; | ||
operationsStream.push(null); | ||
}, | ||
// TODO: support abort a single search? | ||
// NOTE: the validation checks are synchronous and the callback/promise will | ||
// be resolved in the same tick. We might want to fix this in the future. | ||
search(header, body) { | ||
if (stopReading) { | ||
const error = stopError === null | ||
? new ConfigurationError('The msearch processor has been stopped') | ||
: stopError; | ||
return Promise.reject(error); | ||
} | ||
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) { | ||
return Promise.reject(new ConfigurationError('The header should be an object')); | ||
} | ||
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) { | ||
return Promise.reject(new ConfigurationError('The body should be an object')); | ||
} | ||
let onFulfilled = null; | ||
let onRejected = null; | ||
const promise = new Promise((resolve, reject) => { | ||
onFulfilled = resolve; | ||
onRejected = reject; | ||
}); | ||
const callback = function callback(err, result) { | ||
err !== null ? onRejected(err) : onFulfilled(result); | ||
}; | ||
operationsStream.push([header, body, callback]); | ||
return promise; | ||
} | ||
}; | ||
return helper; | ||
async function iterate() { | ||
const { semaphore, finish } = buildSemaphore(); | ||
const msearchBody = []; | ||
const callbacks = []; | ||
let loadedOperations = 0; | ||
timeoutRef = setTimeout(onFlushTimeout, flushInterval); // eslint-disable-line | ||
for await (const operation of operationsStream) { | ||
timeoutRef.refresh(); | ||
loadedOperations += 1; | ||
msearchBody.push(operation[0], operation[1]); | ||
callbacks.push(operation[2]); | ||
if (loadedOperations >= operations) { | ||
const send = await semaphore(); | ||
send(msearchBody.slice(), callbacks.slice()); | ||
msearchBody.length = 0; | ||
callbacks.length = 0; | ||
loadedOperations = 0; | ||
} | ||
} | ||
clearTimeout(timeoutRef); | ||
// In some cases the previos http call does not have finished, | ||
// or we didn't reach the flush bytes threshold, so we force one last operation. | ||
if (loadedOperations > 0) { | ||
const send = await semaphore(); | ||
send(msearchBody, callbacks); | ||
} | ||
await finish(); | ||
if (stopError !== null) { | ||
throw stopError; | ||
} | ||
async function onFlushTimeout() { | ||
if (loadedOperations === 0) | ||
return; | ||
const msearchBodyCopy = msearchBody.slice(); | ||
const callbacksCopy = callbacks.slice(); | ||
msearchBody.length = 0; | ||
callbacks.length = 0; | ||
loadedOperations = 0; | ||
try { | ||
const send = await semaphore(); | ||
send(msearchBodyCopy, callbacksCopy); | ||
} | ||
catch (err) { | ||
/* istanbul ignore next */ | ||
// @ts-expect-error | ||
helper.stop(err); | ||
} | ||
} | ||
} | ||
done() | ||
} | ||
// This function never returns an error, if the msearch operation fails, | ||
// the error is dispatched to all search executors. | ||
function tryMsearch (msearchBody, callbacks, done) { | ||
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions, (err, results) => { | ||
const retryBody = [] | ||
const retryCallbacks = [] | ||
if (err) { | ||
addDocumentsGetter(results) | ||
for (const callback of callbacks) { | ||
callback(err, results) | ||
// This function builds a semaphore using the concurrency | ||
// options of the msearch helper. It is used inside the iterator | ||
// to guarantee that no more than the number of operations | ||
// allowed to run at the same time are executed. | ||
// It returns a semaphore function which resolves in the next tick | ||
// if we didn't reach the maximim concurrency yet, otherwise it returns | ||
// a promise that resolves as soon as one of the running request has finshed. | ||
// The semaphore function resolves a send function, which will be used | ||
// to send the actual msearch request. | ||
// It also returns a finish function, which returns a promise that is resolved | ||
// when there are no longer request running. | ||
function buildSemaphore() { | ||
let resolveSemaphore = null; | ||
let resolveFinish = null; | ||
let running = 0; | ||
return { semaphore, finish }; | ||
function finish() { | ||
return new Promise((resolve, reject) => { | ||
if (running === 0) { | ||
resolve(); | ||
} | ||
else { | ||
resolveFinish = resolve; | ||
} | ||
}); | ||
} | ||
return done(retryBody, retryCallbacks) | ||
} | ||
const { responses } = results.body | ||
for (let i = 0, len = responses.length; i < len; i++) { | ||
const response = responses[i] | ||
if (response.status === 429 && retryCount > 0) { | ||
retryBody.push(msearchBody[i * 2]) | ||
retryBody.push(msearchBody[(i * 2) + 1]) | ||
retryCallbacks.push(callbacks[i]) | ||
continue | ||
function semaphore() { | ||
if (running < concurrency) { | ||
running += 1; | ||
return pImmediate(send); | ||
} | ||
else { | ||
return new Promise((resolve, reject) => { | ||
resolveSemaphore = resolve; | ||
}); | ||
} | ||
} | ||
const result = { ...results, body: response } | ||
addDocumentsGetter(result) | ||
if (response.status >= 400) { | ||
callbacks[i](new ResponseError(result), result) | ||
} else { | ||
callbacks[i](null, result) | ||
function send(msearchBody, callbacks) { | ||
/* istanbul ignore if */ | ||
if (running > concurrency) { | ||
throw new Error('Max concurrency reached'); | ||
} | ||
msearchOperation(msearchBody, callbacks, () => { | ||
running -= 1; | ||
if (resolveSemaphore !== null) { | ||
running += 1; | ||
resolveSemaphore(send); | ||
resolveSemaphore = null; | ||
} | ||
else if (resolveFinish != null && running === 0) { | ||
resolveFinish(); | ||
} | ||
}); | ||
} | ||
} | ||
done(retryBody, retryCallbacks) | ||
}) | ||
} | ||
} | ||
function msearchOperation(msearchBody, callbacks, done) { | ||
let retryCount = retries; | ||
// Instead of going full on async-await, which would make the code easier to read, | ||
// we have decided to use callback style instead. | ||
// This because every time we use async await, V8 will create multiple promises | ||
// behind the scenes, making the code slightly slower. | ||
tryMsearch(msearchBody, callbacks, retrySearch); | ||
function retrySearch(msearchBody, callbacks) { | ||
if (msearchBody.length > 0 && retryCount > 0) { | ||
retryCount -= 1; | ||
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch); | ||
return; | ||
} | ||
done(); | ||
} | ||
// This function never returns an error, if the msearch operation fails, | ||
// the error is dispatched to all search executors. | ||
function tryMsearch(msearchBody, callbacks, done) { | ||
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions) | ||
.then(results => { | ||
const retryBody = []; | ||
const retryCallbacks = []; | ||
const { responses } = results.body; | ||
for (let i = 0, len = responses.length; i < len; i++) { | ||
const response = responses[i]; | ||
if (response.status === 429 && retryCount > 0) { | ||
retryBody.push(msearchBody[i * 2]); | ||
retryBody.push(msearchBody[(i * 2) + 1]); | ||
retryCallbacks.push(callbacks[i]); | ||
continue; | ||
} | ||
const result = { ...results, body: response }; | ||
// @ts-expect-error | ||
addDocumentsGetter(result); | ||
if (response.status != null && response.status >= 400) { | ||
// @ts-expect-error | ||
callbacks[i](new ResponseError(result), result); | ||
} | ||
else { | ||
callbacks[i](null, result); | ||
} | ||
} | ||
done(retryBody, retryCallbacks); | ||
}) | ||
.catch(err => { | ||
for (const callback of callbacks) { | ||
callback(err, null); | ||
} | ||
return done([], []); | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
/** | ||
* Creates a bulk helper instance. Once you configure it, you can pick which operation | ||
* to execute with the given dataset, index, create, update, and delete. | ||
* @param {object} options - The configuration of the bulk operation. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run with the datasource. | ||
*/ | ||
bulk (options, reqOptions = {}) { | ||
const client = this[kClient] | ||
const { serializer } = client | ||
if (this[kMetaHeader] !== null) { | ||
reqOptions.headers = reqOptions.headers || {} | ||
reqOptions.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=bp' | ||
} | ||
const { | ||
datasource, | ||
onDocument, | ||
flushBytes = 5000000, | ||
flushInterval = 30000, | ||
concurrency = 5, | ||
retries = this.maxRetries, | ||
wait = 5000, | ||
onDrop = noop, | ||
refreshOnCompletion = false, | ||
...bulkOptions | ||
} = options | ||
if (datasource === undefined) { | ||
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required')) | ||
} | ||
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) { | ||
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')) | ||
} | ||
if (onDocument === undefined) { | ||
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required')) | ||
} | ||
let shouldAbort = false | ||
let timeoutRef = null | ||
const stats = { | ||
total: 0, | ||
failed: 0, | ||
retry: 0, | ||
successful: 0, | ||
noop: 0, | ||
time: 0, | ||
bytes: 0, | ||
aborted: false | ||
} | ||
const p = iterate() | ||
const helper = { | ||
get stats () { | ||
return stats | ||
}, | ||
then (onFulfilled, onRejected) { | ||
return p.then(onFulfilled, onRejected) | ||
}, | ||
catch (onRejected) { | ||
return p.catch(onRejected) | ||
}, | ||
abort () { | ||
clearTimeout(timeoutRef) | ||
shouldAbort = true | ||
stats.aborted = true | ||
return this | ||
} | ||
} | ||
return helper | ||
/** | ||
* Function that iterates over the given datasource and start a bulk operation as soon | ||
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous | ||
* model at this maximum capacity, as it will collect the next body to send while there is | ||
* a running http call. In this way, the CPU time will be used carefully. | ||
* The objects will be serialized right away, to approximate the byte length of the body. | ||
* It creates an array of strings instead of a ndjson string because the bulkOperation | ||
* will navigate the body for matching failed operations with the original document. | ||
* Creates a bulk helper instance. Once you configure it, you can pick which operation | ||
* to execute with the given dataset, index, create, update, and delete. | ||
* @param {object} options - The configuration of the bulk operation. | ||
* @param {object} reqOptions - The client optional configuration for this request. | ||
* @return {object} The possible operations to run with the datasource. | ||
*/ | ||
async function iterate () { | ||
const { semaphore, finish } = buildSemaphore() | ||
const startTime = Date.now() | ||
const bulkBody = [] | ||
let actionBody = '' | ||
let payloadBody = '' | ||
let chunkBytes = 0 | ||
timeoutRef = setTimeout(onFlushTimeout, flushInterval) | ||
for await (const chunk of datasource) { | ||
if (shouldAbort === true) break | ||
timeoutRef.refresh() | ||
const action = onDocument(chunk) | ||
const operation = Array.isArray(action) | ||
? Object.keys(action[0])[0] | ||
: Object.keys(action)[0] | ||
if (operation === 'index' || operation === 'create') { | ||
actionBody = serializer.serialize(action) | ||
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk) | ||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) | ||
bulkBody.push(actionBody, payloadBody) | ||
} else if (operation === 'update') { | ||
actionBody = serializer.serialize(action[0]) | ||
payloadBody = typeof chunk === 'string' | ||
? `{"doc":${chunk}}` | ||
: serializer.serialize({ doc: chunk, ...action[1] }) | ||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) | ||
bulkBody.push(actionBody, payloadBody) | ||
} else if (operation === 'delete') { | ||
actionBody = serializer.serialize(action) | ||
chunkBytes += Buffer.byteLength(actionBody) | ||
bulkBody.push(actionBody) | ||
} else { | ||
clearTimeout(timeoutRef) | ||
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`) | ||
bulk(options, reqOptions = {}) { | ||
var _d; | ||
const client = this[kClient]; | ||
const { serializer } = client; | ||
if (this[kMetaHeader] !== null) { | ||
reqOptions.headers = (_d = reqOptions.headers) !== null && _d !== void 0 ? _d : {}; | ||
reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader]},h=bp`; | ||
} | ||
if (chunkBytes >= flushBytes) { | ||
stats.bytes += chunkBytes | ||
const send = await semaphore() | ||
send(bulkBody.slice()) | ||
bulkBody.length = 0 | ||
chunkBytes = 0 | ||
reqOptions.meta = true; | ||
const { datasource, onDocument, flushBytes = 5000000, flushInterval = 30000, concurrency = 5, retries = this[kMaxRetries], wait = 5000, onDrop = noop, refreshOnCompletion = false, ...bulkOptions } = options; | ||
if (datasource === undefined) { | ||
// @ts-expect-error | ||
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required')); | ||
} | ||
} | ||
clearTimeout(timeoutRef) | ||
// In some cases the previos http call does not have finished, | ||
// or we didn't reach the flush bytes threshold, so we force one last operation. | ||
if (shouldAbort === false && chunkBytes > 0) { | ||
const send = await semaphore() | ||
stats.bytes += chunkBytes | ||
send(bulkBody) | ||
} | ||
await finish() | ||
if (refreshOnCompletion) { | ||
await client.indices.refresh({ | ||
index: typeof refreshOnCompletion === 'string' | ||
? refreshOnCompletion | ||
: '_all' | ||
}, reqOptions) | ||
} | ||
stats.time = Date.now() - startTime | ||
stats.total = stats.successful + stats.failed | ||
return stats | ||
async function onFlushTimeout () { | ||
if (chunkBytes === 0) return | ||
stats.bytes += chunkBytes | ||
const bulkBodyCopy = bulkBody.slice() | ||
bulkBody.length = 0 | ||
chunkBytes = 0 | ||
try { | ||
const send = await semaphore() | ||
send(bulkBodyCopy) | ||
} catch (err) { | ||
/* istanbul ignore next */ | ||
helper.abort() | ||
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || isReadableStream(datasource) || isAsyncIterator(datasource))) { | ||
// @ts-expect-error | ||
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')); | ||
} | ||
} | ||
} | ||
// This function builds a semaphore using the concurrency | ||
// options of the bulk helper. It is used inside the iterator | ||
// to guarantee that no more than the number of operations | ||
// allowed to run at the same time are executed. | ||
// It returns a semaphore function which resolves in the next tick | ||
// if we didn't reach the maximim concurrency yet, otherwise it returns | ||
// a promise that resolves as soon as one of the running request has finshed. | ||
// The semaphore function resolves a send function, which will be used | ||
// to send the actual bulk request. | ||
// It also returns a finish function, which returns a promise that is resolved | ||
// when there are no longer request running. It rejects an error if one | ||
// of the request has failed for some reason. | ||
function buildSemaphore () { | ||
let resolveSemaphore = null | ||
let resolveFinish = null | ||
let rejectFinish = null | ||
let error = null | ||
let running = 0 | ||
return { semaphore, finish } | ||
function finish () { | ||
return new Promise((resolve, reject) => { | ||
if (running === 0) { | ||
if (error) { | ||
reject(error) | ||
} else { | ||
resolve() | ||
if (onDocument === undefined) { | ||
// @ts-expect-error | ||
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required')); | ||
} | ||
let shouldAbort = false; | ||
let timeoutRef = null; | ||
const stats = { | ||
total: 0, | ||
failed: 0, | ||
retry: 0, | ||
successful: 0, | ||
noop: 0, | ||
time: 0, | ||
bytes: 0, | ||
aborted: false | ||
}; | ||
const p = iterate(); | ||
const helper = { | ||
[Symbol.toStringTag]: 'Promise', | ||
then(onFulfilled, onRejected) { | ||
return p.then(onFulfilled, onRejected); | ||
}, | ||
catch(onRejected) { | ||
return p.catch(onRejected); | ||
}, | ||
finally(onFinally) { | ||
return p.finally(onFinally); | ||
}, | ||
get stats() { | ||
return stats; | ||
}, | ||
abort() { | ||
clearTimeout(timeoutRef); | ||
shouldAbort = true; | ||
stats.aborted = true; | ||
return this; | ||
} | ||
} else { | ||
resolveFinish = resolve | ||
rejectFinish = reject | ||
} | ||
}) | ||
} | ||
function semaphore () { | ||
if (running < concurrency) { | ||
running += 1 | ||
return pImmediate(send) | ||
} else { | ||
return new Promise((resolve, reject) => { | ||
resolveSemaphore = resolve | ||
}) | ||
}; | ||
return helper; | ||
/** | ||
* Function that iterates over the given datasource and start a bulk operation as soon | ||
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous | ||
* model at this maximum capacity, as it will collect the next body to send while there is | ||
* a running http call. In this way, the CPU time will be used carefully. | ||
* The objects will be serialized right away, to approximate the byte length of the body. | ||
* It creates an array of strings instead of a ndjson string because the bulkOperation | ||
* will navigate the body for matching failed operations with the original document. | ||
*/ | ||
async function iterate() { | ||
const { semaphore, finish } = buildSemaphore(); | ||
const startTime = Date.now(); | ||
const bulkBody = []; | ||
let actionBody = ''; | ||
let payloadBody = ''; | ||
let chunkBytes = 0; | ||
timeoutRef = setTimeout(onFlushTimeout, flushInterval); // eslint-disable-line | ||
// @ts-expect-error datasoruce is an iterable | ||
for await (const chunk of datasource) { | ||
if (shouldAbort) | ||
break; | ||
timeoutRef.refresh(); | ||
const action = onDocument(chunk); | ||
const operation = Array.isArray(action) | ||
? Object.keys(action[0])[0] | ||
: Object.keys(action)[0]; | ||
if (operation === 'index' || operation === 'create') { | ||
actionBody = serializer.serialize(action); | ||
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk); | ||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody); | ||
bulkBody.push(actionBody, payloadBody); | ||
} | ||
else if (operation === 'update') { | ||
// @ts-expect-error in case of update action is an array | ||
actionBody = serializer.serialize(action[0]); | ||
payloadBody = typeof chunk === 'string' | ||
? `{"doc":${chunk}}` | ||
// @ts-expect-error in case of update action is an array | ||
: serializer.serialize({ doc: chunk, ...action[1] }); | ||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody); | ||
bulkBody.push(actionBody, payloadBody); | ||
} | ||
else if (operation === 'delete') { | ||
actionBody = serializer.serialize(action); | ||
chunkBytes += Buffer.byteLength(actionBody); | ||
bulkBody.push(actionBody); | ||
} | ||
else { | ||
clearTimeout(timeoutRef); | ||
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`); | ||
} | ||
if (chunkBytes >= flushBytes) { | ||
stats.bytes += chunkBytes; | ||
const send = await semaphore(); | ||
send(bulkBody.slice()); | ||
bulkBody.length = 0; | ||
chunkBytes = 0; | ||
} | ||
} | ||
clearTimeout(timeoutRef); | ||
// In some cases the previos http call does not have finished, | ||
// or we didn't reach the flush bytes threshold, so we force one last operation. | ||
if (!shouldAbort && chunkBytes > 0) { | ||
const send = await semaphore(); | ||
stats.bytes += chunkBytes; | ||
send(bulkBody); | ||
} | ||
await finish(); | ||
if (refreshOnCompletion !== false) { | ||
await client.indices.refresh({ | ||
index: typeof refreshOnCompletion === 'string' | ||
? refreshOnCompletion | ||
: '_all' | ||
}, reqOptions); | ||
} | ||
stats.time = Date.now() - startTime; | ||
stats.total = stats.successful + stats.failed; | ||
return stats; | ||
async function onFlushTimeout() { | ||
if (chunkBytes === 0) | ||
return; | ||
stats.bytes += chunkBytes; | ||
const bulkBodyCopy = bulkBody.slice(); | ||
bulkBody.length = 0; | ||
chunkBytes = 0; | ||
try { | ||
const send = await semaphore(); | ||
send(bulkBodyCopy); | ||
} | ||
catch (err) { | ||
/* istanbul ignore next */ | ||
helper.abort(); // eslint-disable-line | ||
} | ||
} | ||
} | ||
} | ||
function send (bulkBody) { | ||
/* istanbul ignore if */ | ||
if (running > concurrency) { | ||
throw new Error('Max concurrency reached') | ||
} | ||
bulkOperation(bulkBody, err => { | ||
running -= 1 | ||
if (err) { | ||
shouldAbort = true | ||
error = err | ||
} | ||
if (resolveSemaphore) { | ||
running += 1 | ||
resolveSemaphore(send) | ||
resolveSemaphore = null | ||
} else if (resolveFinish && running === 0) { | ||
if (error) { | ||
rejectFinish(error) | ||
} else { | ||
resolveFinish() | ||
// This function builds a semaphore using the concurrency | ||
// options of the bulk helper. It is used inside the iterator | ||
// to guarantee that no more than the number of operations | ||
// allowed to run at the same time are executed. | ||
// It returns a semaphore function which resolves in the next tick | ||
// if we didn't reach the maximim concurrency yet, otherwise it returns | ||
// a promise that resolves as soon as one of the running request has finshed. | ||
// The semaphore function resolves a send function, which will be used | ||
// to send the actual bulk request. | ||
// It also returns a finish function, which returns a promise that is resolved | ||
// when there are no longer request running. It rejects an error if one | ||
// of the request has failed for some reason. | ||
function buildSemaphore() { | ||
let resolveSemaphore = null; | ||
let resolveFinish = null; | ||
let rejectFinish = null; | ||
let error = null; | ||
let running = 0; | ||
return { semaphore, finish }; | ||
function finish() { | ||
return new Promise((resolve, reject) => { | ||
if (running === 0) { | ||
if (error !== null) { | ||
reject(error); | ||
} | ||
else { | ||
resolve(); | ||
} | ||
} | ||
else { | ||
resolveFinish = resolve; | ||
rejectFinish = reject; | ||
} | ||
}); | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
function bulkOperation (bulkBody, callback) { | ||
let retryCount = retries | ||
let isRetrying = false | ||
// Instead of going full on async-await, which would make the code easier to read, | ||
// we have decided to use callback style instead. | ||
// This because every time we use async await, V8 will create multiple promises | ||
// behind the scenes, making the code slightly slower. | ||
tryBulk(bulkBody, retryDocuments) | ||
function retryDocuments (err, bulkBody) { | ||
if (err) return callback(err) | ||
if (shouldAbort === true) return callback() | ||
if (bulkBody.length > 0) { | ||
if (retryCount > 0) { | ||
isRetrying = true | ||
retryCount -= 1 | ||
stats.retry += bulkBody.length | ||
setTimeout(tryBulk, wait, bulkBody, retryDocuments) | ||
return | ||
} | ||
for (let i = 0, len = bulkBody.length; i < len; i = i + 2) { | ||
const operation = Object.keys(serializer.deserialize(bulkBody[i]))[0] | ||
onDrop({ | ||
status: 429, | ||
error: null, | ||
operation: serializer.deserialize(bulkBody[i]), | ||
document: operation !== 'delete' | ||
? serializer.deserialize(bulkBody[i + 1]) | ||
/* istanbul ignore next */ | ||
: null, | ||
retried: isRetrying | ||
}) | ||
stats.failed += 1 | ||
} | ||
function semaphore() { | ||
if (running < concurrency) { | ||
running += 1; | ||
return pImmediate(send); | ||
} | ||
else { | ||
return new Promise((resolve, reject) => { | ||
resolveSemaphore = resolve; | ||
}); | ||
} | ||
} | ||
function send(bulkBody) { | ||
/* istanbul ignore if */ | ||
if (running > concurrency) { | ||
throw new Error('Max concurrency reached'); | ||
} | ||
bulkOperation(bulkBody, err => { | ||
running -= 1; | ||
if (err != null) { | ||
shouldAbort = true; | ||
error = err; | ||
} | ||
if (resolveSemaphore !== null) { | ||
running += 1; | ||
resolveSemaphore(send); | ||
resolveSemaphore = null; | ||
} | ||
else if (resolveFinish != null && rejectFinish != null && running === 0) { | ||
if (error != null) { | ||
rejectFinish(error); | ||
} | ||
else { | ||
resolveFinish(); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
callback() | ||
} | ||
function tryBulk (bulkBody, callback) { | ||
if (shouldAbort === true) return callback(null, []) | ||
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions, (err, { body }) => { | ||
if (err) return callback(err, null) | ||
if (body.errors === false) { | ||
stats.successful += body.items.length | ||
for (const item of body.items) { | ||
if (item.update && item.update.result === 'noop') { | ||
stats.noop++ | ||
} | ||
function bulkOperation(bulkBody, callback) { | ||
let retryCount = retries; | ||
let isRetrying = false; | ||
// Instead of going full on async-await, which would make the code easier to read, | ||
// we have decided to use callback style instead. | ||
// This because every time we use async await, V8 will create multiple promises | ||
// behind the scenes, making the code slightly slower. | ||
tryBulk(bulkBody, retryDocuments); | ||
function retryDocuments(err, bulkBody) { | ||
if (err != null) | ||
return callback(err); | ||
if (shouldAbort) | ||
return callback(); | ||
if (bulkBody.length > 0) { | ||
if (retryCount > 0) { | ||
isRetrying = true; | ||
retryCount -= 1; | ||
stats.retry += bulkBody.length; | ||
setTimeout(tryBulk, wait, bulkBody, retryDocuments); | ||
return; | ||
} | ||
for (let i = 0, len = bulkBody.length; i < len; i = i + 2) { | ||
const operation = Object.keys(serializer.deserialize(bulkBody[i]))[0]; | ||
onDrop({ | ||
status: 429, | ||
error: null, | ||
operation: serializer.deserialize(bulkBody[i]), | ||
document: operation !== 'delete' | ||
? serializer.deserialize(bulkBody[i + 1]) | ||
/* istanbul ignore next */ | ||
: null, | ||
retried: isRetrying | ||
}); | ||
stats.failed += 1; | ||
} | ||
} | ||
callback(); | ||
} | ||
return callback(null, []) | ||
} | ||
const retry = [] | ||
const { items } = body | ||
for (let i = 0, len = items.length; i < len; i++) { | ||
const action = items[i] | ||
const operation = Object.keys(action)[0] | ||
const { status } = action[operation] | ||
const indexSlice = operation !== 'delete' ? i * 2 : i | ||
if (status >= 400) { | ||
// 429 is the only staus code where we might want to retry | ||
// a document, because it was not an error in the document itself, | ||
// but the ES node were handling too many operations. | ||
if (status === 429) { | ||
retry.push(bulkBody[indexSlice]) | ||
/* istanbul ignore next */ | ||
if (operation !== 'delete') { | ||
retry.push(bulkBody[indexSlice + 1]) | ||
} | ||
} else { | ||
onDrop({ | ||
status: status, | ||
error: action[operation].error, | ||
operation: serializer.deserialize(bulkBody[indexSlice]), | ||
document: operation !== 'delete' | ||
? serializer.deserialize(bulkBody[indexSlice + 1]) | ||
: null, | ||
retried: isRetrying | ||
function tryBulk(bulkBody, callback) { | ||
if (shouldAbort) | ||
return callback(null, []); | ||
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions) | ||
.then(response => { | ||
var _d, _e; | ||
const result = response.body; | ||
if (!result.errors) { | ||
stats.successful += result.items.length; | ||
for (const item of result.items) { | ||
if (((_d = item.update) === null || _d === void 0 ? void 0 : _d.result) === 'noop') { | ||
stats.noop++; | ||
} | ||
} | ||
return callback(null, []); | ||
} | ||
const retry = []; | ||
const { items } = result; | ||
for (let i = 0, len = items.length; i < len; i++) { | ||
const action = items[i]; | ||
const operation = Object.keys(action)[0]; | ||
const responseItem = action[operation]; | ||
(0, assert_1.default)(responseItem !== undefined, 'The responseItem is undefined, please file a bug report'); | ||
const indexSlice = operation !== 'delete' ? i * 2 : i; | ||
if (responseItem.status >= 400) { | ||
// 429 is the only staus code where we might want to retry | ||
// a document, because it was not an error in the document itself, | ||
// but the ES node were handling too many operations. | ||
if (responseItem.status === 429) { | ||
retry.push(bulkBody[indexSlice]); | ||
/* istanbul ignore next */ | ||
if (operation !== 'delete') { | ||
retry.push(bulkBody[indexSlice + 1]); | ||
} | ||
} | ||
else { | ||
onDrop({ | ||
status: responseItem.status, | ||
error: (_e = responseItem.error) !== null && _e !== void 0 ? _e : null, | ||
operation: serializer.deserialize(bulkBody[indexSlice]), | ||
document: operation !== 'delete' | ||
? serializer.deserialize(bulkBody[indexSlice + 1]) | ||
: null, | ||
retried: isRetrying | ||
}); | ||
stats.failed += 1; | ||
} | ||
} | ||
else { | ||
stats.successful += 1; | ||
} | ||
} | ||
callback(null, retry); | ||
}) | ||
stats.failed += 1 | ||
} | ||
} else { | ||
stats.successful += 1 | ||
.catch(err => { | ||
callback(err, []); | ||
}); | ||
} | ||
} | ||
callback(null, retry) | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
exports.default = Helpers; | ||
_a = kClient, _b = kMetaHeader, _c = kMaxRetries; | ||
// Using a getter will improve the overall performances of the code, | ||
// as we will reed the documents only if needed. | ||
function addDocumentsGetter (result) { | ||
Object.defineProperty(result, 'documents', { | ||
get () { | ||
if (this.body.hits && this.body.hits.hits) { | ||
return this.body.hits.hits.map(d => d._source) | ||
} | ||
return [] | ||
function addDocumentsGetter(result) { | ||
Object.defineProperty(result, 'documents', { | ||
get() { | ||
var _d; | ||
if (((_d = this.body.hits) === null || _d === void 0 ? void 0 : _d.hits) != null) { | ||
// @ts-expect-error | ||
return this.body.hits.hits.map(d => d._source); | ||
} | ||
return []; | ||
} | ||
}); | ||
} | ||
function appendFilterPath(filter, params, force) { | ||
if (params.filter_path !== undefined) { | ||
params.filter_path += ',' + filter; // eslint-disable-line | ||
} | ||
}) | ||
else if (force) { | ||
params.filter_path = filter; | ||
} | ||
} | ||
function appendFilterPath (filter, params, force) { | ||
if (params.filter_path !== undefined) { | ||
params.filter_path += ',' + filter | ||
} else if (params.filterPath !== undefined) { | ||
params.filterPath += ',' + filter | ||
} else if (force === true) { | ||
params.filter_path = filter | ||
} | ||
function isReadableStream(obj) { | ||
return obj != null && typeof obj.pipe === 'function'; | ||
} | ||
module.exports = Helpers | ||
function isAsyncIterator(obj) { | ||
return (obj === null || obj === void 0 ? void 0 : obj[Symbol.asyncIterator]) != null; | ||
} | ||
//# sourceMappingURL=Helpers.js.map |
109
package.json
{ | ||
"name": "@elastic/elasticsearch-canary", | ||
"version": "8.0.0-canary.22", | ||
"versionCanary": "8.0.0-canary.22", | ||
"description": "The official Elasticsearch client for Node.js", | ||
"main": "index.js", | ||
"types": "./api/new.d.ts", | ||
"exports": { | ||
".": { | ||
"require": "./index.js", | ||
"import": "./index.mjs" | ||
}, | ||
"./": "./" | ||
"types": "index.d.ts", | ||
"scripts": { | ||
"test": "npm run build && npm run lint && tap test/{unit,acceptance}/{*,**/*}.test.ts", | ||
"test:unit": "npm run build && tap test/unit/{*,**/*}.test.ts", | ||
"test:acceptance": "npm run build && tap test/acceptance/*.test.ts", | ||
"test:coverage-100": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage --100", | ||
"test:coverage-report": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage && nyc report --reporter=text-lcov > coverage.lcov", | ||
"test:coverage-ui": "npm run build && tap test/{unit,acceptance}/{*,**/*}.test.ts --coverage --coverage-report=html", | ||
"test:integration": "tsc && node test/integration/index.js", | ||
"lint": "ts-standard src", | ||
"lint:fix": "ts-standard --fix src", | ||
"license-checker": "license-checker --production --onlyAllow='MIT;Apache-2.0;Apache1.1;ISC;BSD-3-Clause;BSD-2-Clause;0BSD'", | ||
"prebuild": "npm run clean-build && npm run lint", | ||
"build": "tsc", | ||
"clean-build": "rimraf ./lib && mkdir lib", | ||
"prepublishOnly": "npm run build" | ||
}, | ||
"homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html", | ||
"version": "8.0.0-canary.21", | ||
"versionCanary": "8.0.0-canary.21", | ||
"keywords": [ | ||
@@ -26,17 +34,2 @@ "elasticsearch", | ||
], | ||
"scripts": { | ||
"test": "npm run lint && tap test/{unit,acceptance}/{*,**/*}.test.js && npm run test:types", | ||
"test:unit": "tap test/unit/{*,**/*}.test.js", | ||
"test:acceptance": "tap test/acceptance/*.test.js", | ||
"test:integration": "node test/integration/index.js", | ||
"test:integration:helpers": "tap test/integration/helpers/*.test.js", | ||
"test:types": "tsd", | ||
"test:coverage-100": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --100 --nyc-arg=\"--exclude=api\"", | ||
"test:coverage-report": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --nyc-arg=\"--exclude=api\" && nyc report --reporter=text-lcov > coverage.lcov", | ||
"test:coverage-ui": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --coverage-report=html --nyc-arg=\"--exclude=api\"", | ||
"lint": "standard", | ||
"lint:fix": "standard --fix", | ||
"license-checker": "license-checker --production --onlyAllow='MIT;Apache-2.0;Apache1.1;ISC;BSD-3-Clause;BSD-2-Clause'", | ||
"build-esm": "npx gen-esm-wrapper . index.mjs && standard --fix index.mjs" | ||
}, | ||
"author": { | ||
@@ -46,14 +39,24 @@ "name": "Tomas Della Vedova", | ||
}, | ||
"original-author": { | ||
"name": "Spencer Alger", | ||
"company": "Elasticsearch BV" | ||
"license": "Apache-2.0", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/elastic/elasticsearch-js.git" | ||
}, | ||
"bugs": { | ||
"url": "https://github.com/elastic/elasticsearch-js/issues" | ||
}, | ||
"homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html", | ||
"engines": { | ||
"node": ">=12" | ||
}, | ||
"devDependencies": { | ||
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", | ||
"@types/node": "^15.3.1", | ||
"convert-hrtime": "^5.0.0", | ||
"@types/debug": "^4.1.6", | ||
"@types/ms": "^0.7.31", | ||
"@types/node": "^16.4.1", | ||
"@types/sinonjs__fake-timers": "^6.0.3", | ||
"@types/split2": "^3.2.1", | ||
"@types/stoppable": "^1.1.1", | ||
"@types/tap": "^15.0.5", | ||
"cross-zip": "^4.0.0", | ||
"dedent": "^0.7.0", | ||
"deepmerge": "^4.2.2", | ||
"dezalgo": "^1.0.3", | ||
"fast-deep-equal": "^3.1.3", | ||
@@ -64,10 +67,9 @@ "into-stream": "^6.0.0", | ||
"minimist": "^1.2.5", | ||
"node-fetch": "^2.6.1", | ||
"ora": "^5.4.0", | ||
"pretty-hrtime": "^1.0.3", | ||
"ms": "^2.1.3", | ||
"node-abort-controller": "^2.0.0", | ||
"node-fetch": "^2.6.2", | ||
"ora": "^5.4.1", | ||
"proxy": "^1.0.2", | ||
"rimraf": "^3.0.2", | ||
"semver": "^7.3.5", | ||
"simple-git": "^2.39.0", | ||
"simple-statistics": "^7.7.0", | ||
"split2": "^3.2.2", | ||
@@ -77,35 +79,20 @@ "standard": "^16.0.3", | ||
"tap": "^15.0.9", | ||
"tsd": "^0.15.1", | ||
"ts-node": "^10.1.0", | ||
"ts-standard": "^10.0.0", | ||
"typescript": "^4.3.5", | ||
"workq": "^3.0.0", | ||
"xmlbuilder2": "^2.4.1" | ||
"xmlbuilder2": "^3.0.2" | ||
}, | ||
"dependencies": { | ||
"debug": "^4.3.1", | ||
"hpagent": "^0.1.1", | ||
"ms": "^2.1.3", | ||
"secure-json-parse": "^2.4.0" | ||
"@elastic/transport": "^0.0.6", | ||
"tslib": "^2.3.0" | ||
}, | ||
"license": "Apache-2.0", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/elastic/elasticsearch-js.git" | ||
}, | ||
"bugs": { | ||
"url": "https://github.com/elastic/elasticsearch-js/issues" | ||
}, | ||
"engines": { | ||
"node": ">=12" | ||
}, | ||
"tsd": { | ||
"directory": "test/types" | ||
}, | ||
"tap": { | ||
"ts": false, | ||
"ts": true, | ||
"jsx": false, | ||
"flow": false, | ||
"coverage": false, | ||
"jobs-auto": true, | ||
"check-coverage": false | ||
}, | ||
"commitHash": "3feda5d9" | ||
"commitHash": "1a227459" | ||
} |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Unidentified License
License(Experimental) Something that seems like a license was found, but its contents could not be matched with a known license.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 2 instances in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Unidentified License
License(Experimental) Something that seems like a license was found, but its contents could not be matched with a known license.
Found 1 instance in 1 package
2298962
2
254
44159
0
1
30
+ Added@elastic/transport@^0.0.6
+ Addedtslib@^2.3.0
+ Added@elastic/transport@0.0.6(transitive)
+ Addedtslib@2.7.0(transitive)
+ Addedundici@4.16.0(transitive)
- Removeddebug@^4.3.1
- Removedhpagent@^0.1.1
- Removedms@^2.1.3
- Removedsecure-json-parse@^2.4.0