HEX
Server: nginx/1.24.0
System: Linux nowruzgan 6.8.0-57-generic #59-Ubuntu SMP PREEMPT_DYNAMIC Sat Mar 15 17:40:59 UTC 2025 x86_64
User: babak (1000)
PHP: 8.3.6
Disabled: NONE
Upload Files
File: //usr/share/opensearch-dashboards/node_modules/stream-chopper/index.js
'use strict'

const util = require('util')
const zlib = require('zlib')
const { Writable, PassThrough } = require('readable-stream')

module.exports = StreamChopper

util.inherits(StreamChopper, Writable)

StreamChopper.split = Symbol('split')
StreamChopper.overflow = Symbol('overflow')
StreamChopper.underflow = Symbol('underflow')

const types = [
  StreamChopper.split,
  StreamChopper.overflow,
  StreamChopper.underflow
]

function StreamChopper (opts) {
  if (!(this instanceof StreamChopper)) return new StreamChopper(opts)
  if (!opts) opts = {}

  Writable.call(this, opts)

  this.size = opts.size || Infinity
  this.time = opts.time || -1
  this.type = types.indexOf(opts.type) === -1
    ? StreamChopper.split
    : opts.type
  this._transform = opts.transform

  if (this._transform && this.type === StreamChopper.split) {
    throw new Error('stream-chopper cannot split a transform stream')
  }

  this._bytes = 0
  this._stream = null

  this._locked = false
  this._draining = false

  this._onunlock = null
  this._next = noop
  this._oneos = oneos
  this._ondrain = ondrain

  const self = this

  function oneos () {
    self._removeStream()
  }

  function ondrain () {
    self._draining = false
    const next = self._next
    self._next = noop
    next()
  }
}

StreamChopper.prototype.chop = function (cb) {
  if (this.destroyed) {
    if (cb) process.nextTick(cb)
  } else if (this._onunlock === null) {
    this._endStream(cb)
  } else {
    const write = this._onunlock
    this._onunlock = () => {
      write()
      this._endStream(cb)
    }
  }
}

StreamChopper.prototype._startStream = function (cb) {
  if (this.destroyed) return
  if (this._locked) {
    this._onunlock = cb
    return
  }

  this._bytes = 0

  if (this._transform) {
    this._stream = this._transform().once('resume', () => {
      // in case `_removeStream` have just been called
      if (this._stream === null) return

      // `resume` will be emitted before the first `data` event
      this._stream.on('data', chunk => {
        this._bytes += chunk.length
        this._maybeEndTransformSteam()
      })
    })
  } else {
    this._stream = new PassThrough()
  }

  this._stream
    .on('close', this._oneos)
    .on('error', this._oneos)
    .on('finish', this._oneos)
    .on('end', this._oneos)
    .on('drain', this._ondrain)

  this._locked = true
  this.emit('stream', this._stream, err => {
    this._locked = false
    if (err) return this.destroy(err)

    const cb = this._onunlock
    if (cb) {
      this._onunlock = null
      cb()
    }
  })

  this.resetTimer()

  // To ensure that the write that caused this stream to be started
  // is perfromed in the same tick, call the callback synchronously.
  // Note that we can't do this in case the chopper is locked.
  cb()
}

StreamChopper.prototype._maybeEndTransformSteam = function () {
  if (this._stream === null) return

  // in case of backpresure on the transform stream, count how many bytes are
  // buffered
  const bufferedSize = getBufferedSize(this._stream)

  const overflow = (this._bytes + bufferedSize) - this.size

  if (overflow >= 0) this._endStream()
}

StreamChopper.prototype.resetTimer = function (time) {
  if (arguments.length > 0) this.time = time
  if (this._timer) {
    clearTimeout(this._timer)
    this._timer = null
  }
  if (this.time !== -1 && !this.destroyed && this._stream) {
    this._timer = setTimeout(() => {
      this._timer = null
      this._endStream()
    }, this.time)
    this._timer.unref()
  }
}

StreamChopper.prototype._endStream = function (cb) {
  if (this.destroyed) return
  if (this._stream === null) {
    if (cb) process.nextTick(cb)
    return
  }

  const stream = this._stream

  // ensure all timers and event listeners related to the current stream is removed
  this._removeStream()

  // if stream hasn't yet ended, make sure to end it properly
  if (!stream._writableState.ending && !stream._writableState.finished) {
    stream.end(cb)
  } else if (cb) {
    process.nextTick(cb)
  }
}

StreamChopper.prototype._removeStream = function () {
  if (this._stream === null) return

  const stream = this._stream
  this._stream = null

  if (this._timer !== null) clearTimeout(this._timer)
  if (stream._writableState.needDrain) this._ondrain()
  stream.removeListener('error', this._oneos)
  stream.removeListener('close', this._oneos)
  stream.removeListener('finish', this._oneos)
  stream.removeListener('end', this._oneos)
  stream.removeListener('drain', this._ondrain)
}

StreamChopper.prototype._write = function (chunk, enc, cb) {
  if (this._stream === null) {
    this._startStream(() => {
      this._write(chunk, enc, cb)
    })
    return
  }

  // This guard is to protect against writes that happen in the same tick after
  // a user destroys the stream. If it wasn't here, we'd accidentally write to
  // the stream and it would emit an error
  if (isDestroyed(this._stream)) {
    this._startStream(() => {
      this._write(chunk, enc, cb)
    })
    return
  }

  if (this._transform) {
    // The size of a transform stream is counted post-transform and so the size
    // guard is located elsewhere. We can therefore just write to the stream
    // without any checks.
    this._unprotectedWrite(chunk, enc, cb)
  } else {
    this._protectedWrite(chunk, enc, cb)
  }
}

StreamChopper.prototype._protectedWrite = function (chunk, enc, cb) {
  this._bytes += chunk.length

  const overflow = this._bytes - this.size

  if (overflow > 0 && this.type !== StreamChopper.overflow) {
    if (this.type === StreamChopper.split) {
      const remaining = chunk.length - overflow
      this._stream.write(chunk.slice(0, remaining))
      chunk = chunk.slice(remaining)
    }

    if (this.type === StreamChopper.underflow && this._bytes - chunk.length === 0) {
      cb(new Error(`Cannot write ${chunk.length} byte chunk - only ${this.size} available`))
      return
    }

    this._endStream(() => {
      this._write(chunk, enc, cb)
    })
    return
  }

  if (overflow < 0) {
    this._unprotectedWrite(chunk, enc, cb)
  } else {
    // if we reached the size limit, just end the stream already
    this._stream.end(chunk)
    this._endStream(cb)
  }
}

StreamChopper.prototype._unprotectedWrite = function (chunk, enc, cb) {
  if (this._stream.write(chunk) === false) this._draining = true
  if (this._draining === false) cb()
  else this._next = cb
}

StreamChopper.prototype._destroy = function (err, cb) {
  const stream = this._stream
  this._removeStream()

  if (stream !== null) {
    if (stream.destroyed === true) return cb(err)
    destroyStream(stream, function () {
      cb(err)
    })
  } else {
    cb(err)
  }
}

StreamChopper.prototype._final = function (cb) {
  if (this._stream === null) return cb()
  this._stream.end(cb)
}

function noop () {}

function getBufferedSize (stream) {
  const buffer = stream.writableBuffer || stream._writableState.getBuffer()
  return buffer.reduce((total, b) => {
    return total + b.chunk.length
  }, 0)
}

// TODO: Make this work with all Node.js 6 streams. A Node.js 6 stream doesn't
// have a destroyed flag because it doesn't have a .destroy() function. If the
// stream is a zlib stream it will however have a _handle, which will be null
// if the stream has been closed. We can check for that, but that coveres only
// zlib streams
function isDestroyed (stream) {
  return stream.destroyed === true || stream._handle === null
}

function destroyStream (stream, cb) {
  const emitClose = stream._writableState.emitClose
  if (emitClose) stream.once('close', cb)

  if (stream instanceof zlib.Gzip ||
      stream instanceof zlib.Gunzip ||
      stream instanceof zlib.Deflate ||
      stream instanceof zlib.DeflateRaw ||
      stream instanceof zlib.Inflate ||
      stream instanceof zlib.InflateRaw ||
      stream instanceof zlib.Unzip) {
    // Zlib streams doesn't have a destroy function in Node.js 6. On top of
    // that simply calling destroy on a zlib stream in Node.js 8+ will result
    // in a memory leak as the handle isn't closed (an operation normally done
    // by calling close). So until that is fixed, we need to manually close the
    // handle after destroying the stream.
    //
    // PR: https://github.com/nodejs/node/pull/23734
    if (typeof stream.destroy === 'function') {
      // Manually close the stream instead of calling `close()` as that would
      // have emitted 'close' again when calling `destroy()`
      if (stream._handle && typeof stream._handle.close === 'function') {
        stream._handle.close()
        stream._handle = null
      }

      stream.destroy()
    } else if (typeof stream.close === 'function') {
      stream.close()
    }
  } else {
    // For other streams we assume calling destroy is enough
    if (typeof stream.destroy === 'function') stream.destroy()
    // Or if there's no destroy (which Node.js 6 will not have on regular
    // streams), emit `close` as that should trigger almost the same effect
    else if (typeof stream.emit === 'function') stream.emit('close')
  }

  // In case this stream doesn't emit 'close', just call the callback manually
  if (!emitClose) cb()
}