HEX
Server: nginx/1.24.0
System: Linux nowruzgan 6.8.0-57-generic #59-Ubuntu SMP PREEMPT_DYNAMIC Sat Mar 15 17:40:59 UTC 2025 x86_64
User: babak (1000)
PHP: 8.3.6
Disabled: NONE
Upload Files
File: //usr/share/opensearch-dashboards/node_modules/@opensearch-project/opensearch-next/lib/Helpers.js
/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
 */

/*
 * Licensed to Elasticsearch B.V. under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch B.V. licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

'use strict';

/* eslint camelcase: 0 */

const { Readable } = require('stream');
const { promisify } = require('util');
const { ResponseError, ConfigurationError } = require('./errors');

const pImmediate = promisify(setImmediate);
const sleep = promisify(setTimeout);
const kClient = Symbol('opensearch-client');
const kMetaHeader = Symbol('meta header');
/* istanbul ignore next */
const noop = () => {};

class Helpers {
  constructor(opts) {
    this[kClient] = opts.client;
    this[kMetaHeader] = opts.metaHeader;
    this.maxRetries = opts.maxRetries;
  }

  /**
   * Runs a search operation. The only difference between client.search and this utility,
   * is that we are only returning the hits to the user and not the full OpenSearch response.
   * This helper automatically adds `filter_path=hits.hits._source` to the querystring,
   * as it will only need the documents source.
   * @param {object} params - The OpenSearch's search parameters.
   * @param {object} options - The client optional configuration for this request.
   * @return {array} The documents that matched the request.
   */
  async search(params, options) {
    appendFilterPath('hits.hits._source', params, true);
    const { body } = await this[kClient].search(params, options);
    if (body.hits && body.hits.hits) {
      return body.hits.hits.map((d) => d._source);
    }
    return [];
  }

  /**
   * Runs a scroll search operation. This function returns an async iterator, allowing
   * the user to use a for await loop to get all the results of a given search.
   * ```js
   * for await (const result of client.helpers.scrollSearch({ params })) {
   *   console.log(result)
   * }
   * ```
   * Each result represents the entire body of a single scroll search request,
   * if you just need to scroll the results, use scrollDocuments.
   * This function handles automatically retries on 429 status code.
   * @param {object} params - The OpenSearch's search parameters.
   * @param {object} options - The client optional configuration for this request.
   * @return {iterator} the async iterator
   */
  async *scrollSearch(params, options = {}) {
    if (this[kMetaHeader] !== null) {
      options.headers = options.headers || {};
    }
    // TODO: study scroll search slices
    const wait = options.wait || 5000;
    const maxRetries = options.maxRetries || this.maxRetries;
    if (Array.isArray(options.ignore)) {
      options.ignore.push(429);
    } else {
      options.ignore = [429];
    }
    params.scroll = params.scroll || '1m';
    appendFilterPath('_scroll_id', params, false);

    let response = null;
    for (let i = 0; i <= maxRetries; i++) {
      response = await this[kClient].search(params, options);
      if (response.statusCode !== 429) break;
      await sleep(wait);
    }
    if (response.statusCode === 429) {
      throw new ResponseError(response);
    }

    let scroll_id = response.body._scroll_id;
    let stop = false;
    const clear = async () => {
      stop = true;
      await this[kClient].clearScroll({ body: { scroll_id } }, { ignore: [400], ...options });
    };

    while (response.body.hits && response.body.hits.hits.length > 0) {
      // scroll id is always present in the response, but it might
      // change over time based on the number of shards
      scroll_id = response.body._scroll_id;
      response.clear = clear;
      addDocumentsGetter(response);

      yield response;

      if (stop === true) {
        break;
      }

      for (let i = 0; i <= maxRetries; i++) {
        response = await this[kClient].scroll(
          {
            scroll: params.scroll,
            rest_total_hits_as_int: params.rest_total_hits_as_int || params.restTotalHitsAsInt,
            body: { scroll_id },
          },
          options
        );
        if (response.statusCode !== 429) break;
        await sleep(wait);
      }
      if (response.statusCode === 429) {
        throw new ResponseError(response);
      }
    }

    if (stop === false) {
      await clear();
    }
  }

  /**
   * Runs a scroll search operation. This function returns an async iterator, allowing
   * the user to use a for await loop to get all the documents of a given search.
   * ```js
   * for await (const document of client.helpers.scrollSearch({ params })) {
   *   console.log(document)
   * }
   * ```
   * Each document is what you will find by running a scrollSearch and iterating on the hits array.
   * This helper automatically adds `filter_path=hits.hits._source` to the querystring,
   * as it will only need the documents source.
   * @param {object} params - The OpenSearch's search parameters.
   * @param {object} options - The client optional configuration for this request.
   * @return {iterator} the async iterator
   */
  async *scrollDocuments(params, options) {
    appendFilterPath('hits.hits._source', params, true);
    for await (const { documents } of this.scrollSearch(params, options)) {
      for (const document of documents) {
        yield document;
      }
    }
  }

  /**
   * Creates a msearch helper instance. Once you configure it, you can use the provided
   * `search` method to add new searches in the queue.
   * @param {object} options - The configuration of the msearch operations.
   * @param {object} reqOptions - The client optional configuration for this request.
   * @return {object} The possible operations to run.
   */
  msearch(options = {}, reqOptions = {}) {
    const client = this[kClient];
    const {
      operations = 5,
      concurrency = 5,
      flushInterval = 500,
      retries = this.maxRetries,
      wait = 5000,
      ...msearchOptions
    } = options;

    let stopReading = false;
    let stopError = null;
    let timeoutRef = null;
    const operationsStream = new Readable({
      objectMode: true,
      read() {},
    });

    const p = iterate();
    const helper = {
      then(onFulfilled, onRejected) {
        return p.then(onFulfilled, onRejected);
      },
      catch(onRejected) {
        return p.catch(onRejected);
      },
      stop(error = null) {
        if (stopReading === true) return;
        stopReading = true;
        stopError = error;
        operationsStream.push(null);
      },
      // TODO: support abort a single search?
      // NOTE: the validation checks are synchronous and the callback/promise will
      //       be resolved in the same tick. We might want to fix this in the future.
      search(header, body, callback) {
        if (stopReading === true) {
          const error =
            stopError === null
              ? new ConfigurationError('The msearch processor has been stopped')
              : stopError;
          return callback ? callback(error, {}) : Promise.reject(error);
        }

        if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
          const error = new ConfigurationError('The header should be an object');
          return callback ? callback(error, {}) : Promise.reject(error);
        }

        if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
          const error = new ConfigurationError('The body should be an object');
          return callback ? callback(error, {}) : Promise.reject(error);
        }

        let promise = null;
        if (callback === undefined) {
          let onFulfilled = null;
          let onRejected = null;
          promise = new Promise((resolve, reject) => {
            onFulfilled = resolve;
            onRejected = reject;
          });
          callback = function callback(err, result) {
            err ? onRejected(err) : onFulfilled(result);
          };
        }

        operationsStream.push([header, body, callback]);

        if (promise !== null) {
          return promise;
        }
      },
    };

    return helper;

    async function iterate() {
      const { semaphore, finish } = buildSemaphore();
      const msearchBody = [];
      const callbacks = [];
      let loadedOperations = 0;
      timeoutRef = setTimeout(onFlushTimeout, flushInterval);

      for await (const operation of operationsStream) {
        timeoutRef.refresh();
        loadedOperations += 1;
        msearchBody.push(operation[0], operation[1]);
        callbacks.push(operation[2]);
        if (loadedOperations >= operations) {
          const send = await semaphore();
          send(msearchBody.slice(), callbacks.slice());
          msearchBody.length = 0;
          callbacks.length = 0;
          loadedOperations = 0;
        }
      }

      clearTimeout(timeoutRef);
      // In some cases the previos http call does not have finished,
      // or we didn't reach the flush bytes threshold, so we force one last operation.
      if (loadedOperations > 0) {
        const send = await semaphore();
        send(msearchBody, callbacks);
      }

      await finish();

      if (stopError !== null) {
        throw stopError;
      }

      async function onFlushTimeout() {
        if (loadedOperations === 0) return;
        const msearchBodyCopy = msearchBody.slice();
        const callbacksCopy = callbacks.slice();
        msearchBody.length = 0;
        callbacks.length = 0;
        loadedOperations = 0;
        try {
          const send = await semaphore();
          send(msearchBodyCopy, callbacksCopy);
        } catch (err) {
          /* istanbul ignore next */
          helper.stop(err);
        }
      }
    }

    // This function builds a semaphore using the concurrency
    // options of the msearch helper. It is used inside the iterator
    // to guarantee that no more than the number of operations
    // allowed to run at the same time are executed.
    // It returns a semaphore function which resolves in the next tick
    // if we didn't reach the maximim concurrency yet, otherwise it returns
    // a promise that resolves as soon as one of the running request has finshed.
    // The semaphore function resolves a send function, which will be used
    // to send the actual msearch request.
    // It also returns a finish function, which returns a promise that is resolved
    // when there are no longer request running.
    function buildSemaphore() {
      let resolveSemaphore = null;
      let resolveFinish = null;
      let running = 0;

      return { semaphore, finish };

      function finish() {
        return new Promise((resolve) => {
          if (running === 0) {
            resolve();
          } else {
            resolveFinish = resolve;
          }
        });
      }

      function semaphore() {
        if (running < concurrency) {
          running += 1;
          return pImmediate(send);
        } else {
          return new Promise((resolve) => {
            resolveSemaphore = resolve;
          });
        }
      }

      function send(msearchBody, callbacks) {
        /* istanbul ignore if */
        if (running > concurrency) {
          throw new Error('Max concurrency reached');
        }
        msearchOperation(msearchBody, callbacks, () => {
          running -= 1;
          if (resolveSemaphore) {
            running += 1;
            resolveSemaphore(send);
            resolveSemaphore = null;
          } else if (resolveFinish && running === 0) {
            resolveFinish();
          }
        });
      }
    }

    function msearchOperation(msearchBody, callbacks, done) {
      let retryCount = retries;

      // Instead of going full on async-await, which would make the code easier to read,
      // we have decided to use callback style instead.
      // This because every time we use async await, V8 will create multiple promises
      // behind the scenes, making the code slightly slower.
      tryMsearch(msearchBody, callbacks, retrySearch);
      function retrySearch(msearchBody, callbacks) {
        if (msearchBody.length > 0 && retryCount > 0) {
          retryCount -= 1;
          setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch);
          return;
        }

        done();
      }

      // This function never returns an error, if the msearch operation fails,
      // the error is dispatched to all search executors.
      function tryMsearch(msearchBody, callbacks, done) {
        client.msearch(
          Object.assign({}, msearchOptions, { body: msearchBody }),
          reqOptions,
          (err, results) => {
            const retryBody = [];
            const retryCallbacks = [];
            if (err) {
              addDocumentsGetter(results);
              for (const callback of callbacks) {
                callback(err, results);
              }
              return done(retryBody, retryCallbacks);
            }
            const { responses } = results.body;
            for (let i = 0, len = responses.length; i < len; i++) {
              const response = responses[i];
              if (response.status === 429 && retryCount > 0) {
                retryBody.push(msearchBody[i * 2]);
                retryBody.push(msearchBody[i * 2 + 1]);
                retryCallbacks.push(callbacks[i]);
                continue;
              }
              const result = { ...results, body: response };
              addDocumentsGetter(result);
              if (response.status >= 400) {
                callbacks[i](new ResponseError(result), result);
              } else {
                callbacks[i](null, result);
              }
            }
            done(retryBody, retryCallbacks);
          }
        );
      }
    }
  }

  /**
   * Creates a bulk helper instance. Once you configure it, you can pick which operation
   * to execute with the given dataset, index, create, update, and delete.
   * @param {object} options - The configuration of the bulk operation.
   * @param {object} reqOptions - The client optional configuration for this request.
   * @return {object} The possible operations to run with the datasource.
   */
  bulk(options, reqOptions = {}) {
    const client = this[kClient];
    const { serializer } = client;
    if (this[kMetaHeader] !== null) {
      reqOptions.headers = reqOptions.headers || {};
    }
    const {
      datasource,
      onDocument,
      flushBytes = 5000000,
      flushInterval = 30000,
      concurrency = 5,
      retries = this.maxRetries,
      wait = 5000,
      onDrop = noop,
      refreshOnCompletion = false,
      ...bulkOptions
    } = options;

    if (datasource === undefined) {
      return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'));
    }
    if (
      !(
        Array.isArray(datasource) ||
        Buffer.isBuffer(datasource) ||
        typeof datasource.pipe === 'function' ||
        datasource[Symbol.asyncIterator]
      )
    ) {
      return Promise.reject(
        new ConfigurationError(
          'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'
        )
      );
    }
    if (onDocument === undefined) {
      return Promise.reject(
        new ConfigurationError('bulk helper: the onDocument callback is required')
      );
    }

    let shouldAbort = false;
    let timeoutRef = null;
    const stats = {
      total: 0,
      failed: 0,
      retry: 0,
      successful: 0,
      noop: 0,
      time: 0,
      bytes: 0,
      aborted: false,
    };

    const p = iterate();
    const helper = {
      get stats() {
        return stats;
      },
      then(onFulfilled, onRejected) {
        return p.then(onFulfilled, onRejected);
      },
      catch(onRejected) {
        return p.catch(onRejected);
      },
      abort() {
        clearTimeout(timeoutRef);
        shouldAbort = true;
        stats.aborted = true;
        return this;
      },
    };

    return helper;

    /**
     * Function that iterates over the given datasource and start a bulk operation as soon
     * as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
     * model at this maximum capacity, as it will collect the next body to send while there is
     * a running http call. In this way, the CPU time will be used carefully.
     * The objects will be serialized right away, to approximate the byte length of the body.
     * It creates an array of strings instead of a ndjson string because the bulkOperation
     * will navigate the body for matching failed operations with the original document.
     */
    async function iterate() {
      const { semaphore, finish } = buildSemaphore();
      const startTime = Date.now();
      const bulkBody = [];
      let actionBody = '';
      let payloadBody = '';
      let chunkBytes = 0;
      timeoutRef = setTimeout(onFlushTimeout, flushInterval);

      for await (const chunk of datasource) {
        if (shouldAbort === true) break;
        timeoutRef.refresh();
        const result = onDocument(chunk);
        const [action, payload] = Array.isArray(result) ? result : [result, chunk];
        const operation = Object.keys(action)[0];
        if (operation === 'index' || operation === 'create') {
          actionBody = serializer.serialize(action);
          payloadBody = typeof payload === 'string' ? payload : serializer.serialize(payload);
          chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody);
          bulkBody.push(actionBody, payloadBody);
        } else if (operation === 'update') {
          actionBody = serializer.serialize(action);
          payloadBody =
            typeof chunk === 'string'
              ? `{"doc":${chunk}}`
              : serializer.serialize({ doc: chunk, ...payload });
          chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody);
          bulkBody.push(actionBody, payloadBody);
        } else if (operation === 'delete') {
          actionBody = serializer.serialize(action);
          chunkBytes += Buffer.byteLength(actionBody);
          bulkBody.push(actionBody);
        } else {
          clearTimeout(timeoutRef);
          throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`);
        }

        if (chunkBytes >= flushBytes) {
          stats.bytes += chunkBytes;
          const send = await semaphore();
          send(bulkBody.slice());
          bulkBody.length = 0;
          chunkBytes = 0;
        }
      }

      clearTimeout(timeoutRef);
      // In some cases the previos http call does not have finished,
      // or we didn't reach the flush bytes threshold, so we force one last operation.
      if (shouldAbort === false && chunkBytes > 0) {
        const send = await semaphore();
        stats.bytes += chunkBytes;
        send(bulkBody);
      }

      await finish();

      if (refreshOnCompletion) {
        await client.indices.refresh(
          {
            index: typeof refreshOnCompletion === 'string' ? refreshOnCompletion : '_all',
          },
          reqOptions
        );
      }

      stats.time = Date.now() - startTime;
      stats.total = stats.successful + stats.failed;

      return stats;

      async function onFlushTimeout() {
        if (chunkBytes === 0) return;
        stats.bytes += chunkBytes;
        const bulkBodyCopy = bulkBody.slice();
        bulkBody.length = 0;
        chunkBytes = 0;
        try {
          const send = await semaphore();
          send(bulkBodyCopy);
        } catch (err) {
          /* istanbul ignore next */
          helper.abort();
        }
      }
    }

    // This function builds a semaphore using the concurrency
    // options of the bulk helper. It is used inside the iterator
    // to guarantee that no more than the number of operations
    // allowed to run at the same time are executed.
    // It returns a semaphore function which resolves in the next tick
    // if we didn't reach the maximim concurrency yet, otherwise it returns
    // a promise that resolves as soon as one of the running request has finshed.
    // The semaphore function resolves a send function, which will be used
    // to send the actual bulk request.
    // It also returns a finish function, which returns a promise that is resolved
    // when there are no longer request running. It rejects an error if one
    // of the request has failed for some reason.
    function buildSemaphore() {
      let resolveSemaphore = null;
      let resolveFinish = null;
      let rejectFinish = null;
      let error = null;
      let running = 0;

      return { semaphore, finish };

      function finish() {
        return new Promise((resolve, reject) => {
          if (running === 0) {
            if (error) {
              reject(error);
            } else {
              resolve();
            }
          } else {
            resolveFinish = resolve;
            rejectFinish = reject;
          }
        });
      }

      function semaphore() {
        if (running < concurrency) {
          running += 1;
          return pImmediate(send);
        } else {
          return new Promise((resolve) => {
            resolveSemaphore = resolve;
          });
        }
      }

      function send(bulkBody) {
        /* istanbul ignore if */
        if (running > concurrency) {
          throw new Error('Max concurrency reached');
        }
        bulkOperation(bulkBody, (err) => {
          running -= 1;
          if (err) {
            shouldAbort = true;
            error = err;
          }
          if (resolveSemaphore) {
            running += 1;
            resolveSemaphore(send);
            resolveSemaphore = null;
          } else if (resolveFinish && running === 0) {
            if (error) {
              rejectFinish(error);
            } else {
              resolveFinish();
            }
          }
        });
      }
    }

    function bulkOperation(bulkBody, callback) {
      let retryCount = retries;
      let isRetrying = false;

      // Instead of going full on async-await, which would make the code easier to read,
      // we have decided to use callback style instead.
      // This because every time we use async await, V8 will create multiple promises
      // behind the scenes, making the code slightly slower.
      tryBulk(bulkBody, retryDocuments);
      function retryDocuments(err, bulkBody) {
        if (err) return callback(err);
        if (shouldAbort === true) return callback();

        if (bulkBody.length > 0) {
          if (retryCount > 0) {
            isRetrying = true;
            retryCount -= 1;
            stats.retry += bulkBody.length;
            setTimeout(tryBulk, wait, bulkBody, retryDocuments);
            return;
          }
          for (let i = 0, len = bulkBody.length; i < len; i = i + 2) {
            const operation = Object.keys(serializer.deserialize(bulkBody[i]))[0];
            onDrop({
              status: 429,
              error: null,
              operation: serializer.deserialize(bulkBody[i]),
              document:
                operation !== 'delete'
                  ? serializer.deserialize(bulkBody[i + 1])
                  : /* istanbul ignore next */
                    null,
              retried: isRetrying,
            });
            stats.failed += 1;
          }
        }
        callback();
      }

      function tryBulk(bulkBody, callback) {
        if (shouldAbort === true) return callback(null, []);
        client.bulk(
          Object.assign({}, bulkOptions, { body: bulkBody }),
          reqOptions,
          (err, { body }) => {
            if (err) return callback(err, null);
            if (body.errors === false) {
              stats.successful += body.items.length;
              for (const item of body.items) {
                if (item.update && item.update.result === 'noop') {
                  stats.noop++;
                }
              }
              return callback(null, []);
            }
            const retry = [];
            const { items } = body;
            for (let i = 0, len = items.length; i < len; i++) {
              const action = items[i];
              const operation = Object.keys(action)[0];
              const { status } = action[operation];
              const indexSlice = operation !== 'delete' ? i * 2 : i;

              if (status >= 400) {
                // 429 is the only staus code where we might want to retry
                // a document, because it was not an error in the document itself,
                // but the OpenSearch node were handling too many operations.
                if (status === 429) {
                  retry.push(bulkBody[indexSlice]);
                  /* istanbul ignore next */
                  if (operation !== 'delete') {
                    retry.push(bulkBody[indexSlice + 1]);
                  }
                } else {
                  onDrop({
                    status: status,
                    error: action[operation].error,
                    operation: serializer.deserialize(bulkBody[indexSlice]),
                    document:
                      operation !== 'delete'
                        ? serializer.deserialize(bulkBody[indexSlice + 1])
                        : null,
                    retried: isRetrying,
                  });
                  stats.failed += 1;
                }
              } else {
                stats.successful += 1;
              }
            }
            callback(null, retry);
          }
        );
      }
    }
  }
}

// Using a getter will improve the overall performances of the code,
// as we will reed the documents only if needed.
function addDocumentsGetter(result) {
  Object.defineProperty(result, 'documents', {
    get() {
      if (this.body.hits && this.body.hits.hits) {
        return this.body.hits.hits.map((d) => d._source);
      }
      return [];
    },
  });
}

function appendFilterPath(filter, params, force) {
  if (params.filter_path !== undefined) {
    params.filter_path += ',' + filter;
  } else if (params.filterPath !== undefined) {
    params.filterPath += ',' + filter;
  } else if (force === true) {
    params.filter_path = filter;
  }
}

module.exports = Helpers;