@google-cloud/storage
Advanced tools
Comparing version 6.10.0 to 6.10.1
@@ -107,2 +107,3 @@ /// <reference types="node" /> | ||
chunkSize?: number; | ||
highWaterMark?: number; | ||
metadata?: Metadata; | ||
@@ -109,0 +110,0 @@ origin?: string; |
@@ -5,3 +5,3 @@ /// <reference types="node" /> | ||
import { GoogleAuthOptions } from 'google-auth-library'; | ||
import { Writable } from 'stream'; | ||
import { Writable, WritableOptions } from 'stream'; | ||
import { RetryOptions, PreconditionOptions } from './storage'; | ||
@@ -25,3 +25,3 @@ export declare const PROTOCOL_REGEX: RegExp; | ||
} | ||
export interface UploadConfig { | ||
export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> { | ||
/** | ||
@@ -155,2 +155,3 @@ * The API endpoint used for the request. | ||
export declare class Upload extends Writable { | ||
#private; | ||
bucket: string; | ||
@@ -189,11 +190,14 @@ file: string; | ||
private currentInvocationId; | ||
private upstreamChunkBuffer; | ||
private chunkBufferEncoding?; | ||
/** | ||
* A cache of buffers written to this instance, ready for consuming | ||
*/ | ||
private writeBuffers; | ||
private numChunksReadInRequest; | ||
/** | ||
* A chunk used for caching the most recent upload chunk. | ||
* An array of buffers used for caching the most recent upload chunk. | ||
* We should not assume that the server received all bytes sent in the request. | ||
* - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload | ||
*/ | ||
private lastChunkSent; | ||
private localWriteCache; | ||
private localWriteCacheByteLength; | ||
private upstreamEnded; | ||
@@ -216,7 +220,7 @@ constructor(cfg: UploadConfig); | ||
/** | ||
* Prepends data back to the upstream chunk buffer. | ||
* Prepends the local buffer to write buffer and resets it. | ||
* | ||
* @param chunk The data to prepend | ||
* @param keepLastBytes number of bytes to keep from the end of the local buffer. | ||
*/ | ||
private unshiftChunkBuffer; | ||
private prependLocalBufferToUpstream; | ||
/** | ||
@@ -240,3 +244,2 @@ * Retrieves data from upstream's buffer. | ||
* @param limit The most amount of data this iterator should return. `Infinity` by default. | ||
* @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator | ||
*/ | ||
@@ -243,0 +246,0 @@ private upstreamIterator; |
@@ -15,2 +15,8 @@ "use strict"; | ||
// limitations under the License. | ||
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); | ||
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); | ||
}; | ||
var _Upload_instances, _Upload_resetLocalBuffersCache, _Upload_addLocalBufferCache; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -33,3 +39,4 @@ exports.createURI = exports.upload = exports.Upload = exports.PROTOCOL_REGEX = void 0; | ||
constructor(cfg) { | ||
super(); | ||
super(cfg); | ||
_Upload_instances.add(this); | ||
this.numBytesWritten = 0; | ||
@@ -42,11 +49,14 @@ this.numRetries = 0; | ||
}; | ||
this.upstreamChunkBuffer = Buffer.alloc(0); | ||
this.chunkBufferEncoding = undefined; | ||
/** | ||
* A cache of buffers written to this instance, ready for consuming | ||
*/ | ||
this.writeBuffers = []; | ||
this.numChunksReadInRequest = 0; | ||
/** | ||
* A chunk used for caching the most recent upload chunk. | ||
* An array of buffers used for caching the most recent upload chunk. | ||
* We should not assume that the server received all bytes sent in the request. | ||
* - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload | ||
*/ | ||
this.lastChunkSent = Buffer.alloc(0); | ||
this.localWriteCache = []; | ||
this.localWriteCacheByteLength = 0; | ||
this.upstreamEnded = false; | ||
@@ -151,7 +161,3 @@ cfg = cfg || {}; | ||
this.emit('writing'); | ||
this.upstreamChunkBuffer = Buffer.concat([ | ||
this.upstreamChunkBuffer, | ||
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk, | ||
]); | ||
this.chunkBufferEncoding = encoding; | ||
this.writeBuffers.push(typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk); | ||
this.once('readFromChunkBuffer', readCallback); | ||
@@ -161,8 +167,43 @@ process.nextTick(() => this.emit('wroteToChunkBuffer')); | ||
/** | ||
* Prepends data back to the upstream chunk buffer. | ||
* Prepends the local buffer to write buffer and resets it. | ||
* | ||
* @param chunk The data to prepend | ||
* @param keepLastBytes number of bytes to keep from the end of the local buffer. | ||
*/ | ||
unshiftChunkBuffer(chunk) { | ||
this.upstreamChunkBuffer = Buffer.concat([chunk, this.upstreamChunkBuffer]); | ||
prependLocalBufferToUpstream(keepLastBytes) { | ||
// Typically, the upstream write buffers should be smaller than the local | ||
// cache, so we can save time by setting the local cache as the new | ||
// upstream write buffer array and appending the old array to it | ||
let initialBuffers = []; | ||
if (keepLastBytes) { | ||
// we only want the last X bytes | ||
let bytesKept = 0; | ||
while (keepLastBytes > bytesKept) { | ||
// load backwards because we want the last X bytes | ||
// note: `localWriteCacheByteLength` is reset below | ||
let buf = this.localWriteCache.pop(); | ||
if (!buf) | ||
break; | ||
bytesKept += buf.byteLength; | ||
if (bytesKept > keepLastBytes) { | ||
// we have gone over the amount desired, let's keep the last X bytes | ||
// of this buffer | ||
const diff = bytesKept - keepLastBytes; | ||
buf = buf.subarray(diff); | ||
bytesKept -= diff; | ||
} | ||
initialBuffers.unshift(buf); | ||
} | ||
} | ||
else { | ||
// we're keeping all of the local cache, simply use it as the initial buffer | ||
initialBuffers = this.localWriteCache; | ||
} | ||
// Append the old upstream to the new | ||
const append = this.writeBuffers; | ||
this.writeBuffers = initialBuffers; | ||
for (const buf of append) { | ||
this.writeBuffers.push(buf); | ||
} | ||
// reset last buffers sent | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this); | ||
} | ||
@@ -175,9 +216,22 @@ /** | ||
*/ | ||
pullFromChunkBuffer(limit) { | ||
const chunk = this.upstreamChunkBuffer.slice(0, limit); | ||
this.upstreamChunkBuffer = this.upstreamChunkBuffer.slice(limit); | ||
// notify upstream we've read from the buffer so it can potentially | ||
// send more data down. | ||
process.nextTick(() => this.emit('readFromChunkBuffer')); | ||
return chunk; | ||
*pullFromChunkBuffer(limit) { | ||
while (limit) { | ||
const buf = this.writeBuffers.shift(); | ||
if (!buf) | ||
break; | ||
let bufToYield = buf; | ||
if (buf.byteLength > limit) { | ||
bufToYield = buf.subarray(0, limit); | ||
this.writeBuffers.unshift(buf.subarray(limit)); | ||
limit = 0; | ||
} | ||
else { | ||
limit -= buf.byteLength; | ||
} | ||
yield bufToYield; | ||
// Notify upstream we've read from the buffer and we're able to consume | ||
// more. It can also potentially send more data down as we're currently | ||
// iterating. | ||
this.emit('readFromChunkBuffer'); | ||
} | ||
} | ||
@@ -192,3 +246,3 @@ /** | ||
// There's data available - it should be digested | ||
if (this.upstreamChunkBuffer.byteLength) { | ||
if (this.writeBuffers.length) { | ||
return resolve(true); | ||
@@ -209,3 +263,3 @@ } | ||
// this should be the last chunk, if there's anything there | ||
if (this.upstreamChunkBuffer.length) | ||
if (this.writeBuffers.length) | ||
return resolve(true); | ||
@@ -231,29 +285,12 @@ return resolve(false); | ||
* @param limit The most amount of data this iterator should return. `Infinity` by default. | ||
* @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator | ||
*/ | ||
async *upstreamIterator(limit = Infinity, oneChunkMode) { | ||
let completeChunk = Buffer.alloc(0); | ||
async *upstreamIterator(limit = Infinity) { | ||
// read from upstream chunk buffer | ||
while (limit && (await this.waitForNextChunk())) { | ||
// read until end or limit has been reached | ||
const chunk = this.pullFromChunkBuffer(limit); | ||
limit -= chunk.byteLength; | ||
if (oneChunkMode) { | ||
// return 1 chunk at the end of iteration | ||
completeChunk = Buffer.concat([completeChunk, chunk]); | ||
for (const chunk of this.pullFromChunkBuffer(limit)) { | ||
limit -= chunk.byteLength; | ||
yield chunk; | ||
} | ||
else { | ||
// return many chunks throughout iteration | ||
yield { | ||
chunk, | ||
encoding: this.chunkBufferEncoding, | ||
}; | ||
} | ||
} | ||
if (oneChunkMode) { | ||
yield { | ||
chunk: completeChunk, | ||
encoding: this.chunkBufferEncoding, | ||
}; | ||
} | ||
} | ||
@@ -396,4 +433,3 @@ createURI(callback) { | ||
// A queue for the upstream data | ||
const upstreamQueue = this.upstreamIterator(expectedUploadSize, multiChunkMode // multi-chunk mode should return 1 chunk per request | ||
); | ||
const upstreamQueue = this.upstreamIterator(expectedUploadSize); | ||
// The primary read stream for this request. This stream retrieves no more | ||
@@ -409,4 +445,11 @@ // than the exact requested amount from upstream. | ||
this.numChunksReadInRequest++; | ||
this.lastChunkSent = result.value.chunk; | ||
this.numBytesWritten += result.value.chunk.byteLength; | ||
if (multiChunkMode) { | ||
// save ever buffer used in the request in multi-chunk mode | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, result.value); | ||
} | ||
else { | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this); | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, result.value); | ||
} | ||
this.numBytesWritten += result.value.byteLength; | ||
this.emit('progress', { | ||
@@ -416,3 +459,3 @@ bytesWritten: this.numBytesWritten, | ||
}); | ||
requestStream.push(result.value.chunk, result.value.encoding); | ||
requestStream.push(result.value); | ||
} | ||
@@ -430,10 +473,14 @@ if (result.done) { | ||
// We need to know how much data is available upstream to set the `Content-Range` header. | ||
const oneChunkIterator = this.upstreamIterator(expectedUploadSize, true); | ||
const { value } = await oneChunkIterator.next(); | ||
const bytesToUpload = value.chunk.byteLength; | ||
// https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload | ||
for await (const chunk of this.upstreamIterator(expectedUploadSize)) { | ||
// This will conveniently track and keep the size of the buffers | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, chunk); | ||
} | ||
// We hit either the expected upload size or the remainder | ||
const bytesToUpload = this.localWriteCacheByteLength; | ||
// Important: we want to know if the upstream has ended and the queue is empty before | ||
// unshifting data back into the queue. This way we will know if this is the last request or not. | ||
const isLastChunkOfUpload = !(await this.waitForNextChunk()); | ||
// Important: put the data back in the queue for the actual upload iterator | ||
this.unshiftChunkBuffer(value.chunk); | ||
// Important: put the data back in the queue for the actual upload | ||
this.prependLocalBufferToUpstream(); | ||
let totalObjectSize = this.contentLength; | ||
@@ -502,10 +549,12 @@ if (typeof this.contentLength !== 'number' && isLastChunkOfUpload) { | ||
if (missingBytes) { | ||
const dataToPrependForResending = this.lastChunkSent.slice(-missingBytes); | ||
// As multi-chunk uploads send one chunk per request and pulls one | ||
// chunk into the pipeline, prepending the missing bytes back should | ||
// be fine for the next request. | ||
this.unshiftChunkBuffer(dataToPrependForResending); | ||
this.prependLocalBufferToUpstream(missingBytes); | ||
this.numBytesWritten -= missingBytes; | ||
this.lastChunkSent = Buffer.alloc(0); | ||
} | ||
else { | ||
// No bytes missing - no need to keep the local cache | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this); | ||
} | ||
// continue uploading next chunk | ||
@@ -524,4 +573,4 @@ this.continueUploading(); | ||
else { | ||
// remove the last chunk sent to free memory | ||
this.lastChunkSent = Buffer.alloc(0); | ||
// no need to keep the cache | ||
__classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this); | ||
if (resp && resp.data) { | ||
@@ -642,7 +691,5 @@ resp.data.size = Number(resp.data.size); | ||
} | ||
// Unshift the most recent chunk back in case it's needed for the next | ||
// request. | ||
this.numBytesWritten -= this.lastChunkSent.byteLength; | ||
this.unshiftChunkBuffer(this.lastChunkSent); | ||
this.lastChunkSent = Buffer.alloc(0); | ||
// Unshift the local cache back in case it's needed for the next request. | ||
this.numBytesWritten -= this.localWriteCacheByteLength; | ||
this.prependLocalBufferToUpstream(); | ||
// We don't know how much data has been received by the server. | ||
@@ -696,2 +743,9 @@ // `continueUploading` will recheck the offset via `getAndSetOffset`. | ||
exports.Upload = Upload; | ||
_Upload_instances = new WeakSet(), _Upload_resetLocalBuffersCache = function _Upload_resetLocalBuffersCache() { | ||
this.localWriteCache = []; | ||
this.localWriteCacheByteLength = 0; | ||
}, _Upload_addLocalBufferCache = function _Upload_addLocalBufferCache(buf) { | ||
this.localWriteCache.push(buf); | ||
this.localWriteCacheByteLength += buf.byteLength; | ||
}; | ||
function upload(cfg) { | ||
@@ -698,0 +752,0 @@ return new Upload(cfg); |
{ | ||
"name": "@google-cloud/storage", | ||
"description": "Cloud Storage Client Library for Node.js", | ||
"version": "6.10.0", | ||
"version": "6.10.1", | ||
"license": "Apache-2.0", | ||
@@ -6,0 +6,0 @@ "author": "Google Inc.", |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
841606
16955