@google-cloud/bigquery-storage
Advanced tools
Comparing version 4.5.0 to 4.6.0
@@ -54,4 +54,4 @@ "use strict"; | ||
INTERVAL: null, | ||
[TableFieldSchema.Type.RANGE]: null, | ||
RANGE: null, | ||
[TableFieldSchema.Type.RANGE]: FieldDescriptorProto.Type.TYPE_MESSAGE, | ||
RANGE: FieldDescriptorProto.Type.TYPE_MESSAGE, | ||
}; | ||
@@ -58,0 +58,0 @@ exports.bqModeToFieldLabelMapProto2 = { |
@@ -61,3 +61,3 @@ "use strict"; | ||
function convertStorageSchemaToFileDescriptorInternal(schema, scope, useProto3) { | ||
var _a; | ||
var _a, _b, _c; | ||
let fNumber = 0; | ||
@@ -69,6 +69,27 @@ const fields = []; | ||
const currentScope = `${scope}_${field.name}`; | ||
if (field.type === TableFieldSchema.Type.STRUCT) { | ||
const subSchema = { | ||
fields: field.fields, | ||
}; | ||
if (field.type === TableFieldSchema.Type.STRUCT || | ||
field.type === TableFieldSchema.Type.RANGE) { | ||
let subSchema = {}; | ||
switch (field.type) { | ||
case TableFieldSchema.Type.STRUCT: | ||
subSchema = { | ||
fields: field.fields, | ||
}; | ||
break; | ||
case TableFieldSchema.Type.RANGE: | ||
subSchema = { | ||
fields: [ | ||
{ | ||
name: 'start', | ||
type: (_b = field.rangeElementType) === null || _b === void 0 ? void 0 : _b.type, | ||
mode: 'NULLABLE', | ||
}, | ||
{ | ||
name: 'end', | ||
type: (_c = field.rangeElementType) === null || _c === void 0 ? void 0 : _c.type, | ||
mode: 'NULLABLE', | ||
}, | ||
], | ||
}; | ||
} | ||
const fd = convertStorageSchemaToFileDescriptorInternal(subSchema, currentScope, useProto3); | ||
@@ -178,3 +199,4 @@ for (const f of fd.file) { | ||
let fdp; | ||
if (type === TableFieldSchema.Type.STRUCT) { | ||
if (type === TableFieldSchema.Type.STRUCT || | ||
type === TableFieldSchema.Type.RANGE) { | ||
fdp = new FieldDescriptorProto({ | ||
@@ -181,0 +203,0 @@ name: name, |
@@ -36,2 +36,3 @@ "use strict"; | ||
INTERVAL: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INTERVAL, | ||
RANGE: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.RANGE, | ||
RECORD: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT, | ||
@@ -38,0 +39,0 @@ STRUCT: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT, |
@@ -29,2 +29,11 @@ import * as protos from '../../protos/protos'; | ||
type?: string; | ||
/** | ||
* Represents the type of a field element. | ||
*/ | ||
rangeElementType?: { | ||
/** | ||
* Required. The type of a field element. For more information, see TableFieldSchema.type. | ||
*/ | ||
type?: string; | ||
}; | ||
}; | ||
@@ -31,0 +40,0 @@ type StorageTableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; |
@@ -70,4 +70,13 @@ "use strict"; | ||
} | ||
if (field.rangeElementType && field.rangeElementType.type) { | ||
const rtype = schema_mappings_1.fieldTypeMap[field.rangeElementType.type]; | ||
if (!rtype) { | ||
throw Error(`could not convert range field (${field.name}) due to unknown range element type: ${field.rangeElementType.type}`); | ||
} | ||
out.rangeElementType = { | ||
type: rtype, | ||
}; | ||
} | ||
return out; | ||
} | ||
//# sourceMappingURL=schema.js.map |
@@ -13,2 +13,3 @@ import * as protos from '../../protos/protos'; | ||
private response?; | ||
private attempts; | ||
private promise; | ||
@@ -18,2 +19,20 @@ private resolveFunc?; | ||
constructor(request: AppendRowRequest); | ||
/** | ||
* Increase number of attempts and return current value. | ||
* | ||
* @private | ||
* @internal | ||
* @returns {number} current number of attempts | ||
*/ | ||
_increaseAttempts(): number; | ||
/** | ||
* Resolve pending write with error or AppendRowResponse. | ||
* This resolves the promise accessed via GetResult() | ||
* | ||
* @see GetResult | ||
* | ||
* @private | ||
* @internal | ||
* @returns {number} current number of attempts | ||
*/ | ||
_markDone(err: Error | null, response?: AppendRowsResponse): void; | ||
@@ -20,0 +39,0 @@ /** |
@@ -26,2 +26,3 @@ "use strict"; | ||
this.request = request; | ||
this.attempts = 0; | ||
this.promise = new Promise((resolve, reject) => { | ||
@@ -32,2 +33,22 @@ this.resolveFunc = resolve; | ||
} | ||
/** | ||
* Increase number of attempts and return current value. | ||
* | ||
* @private | ||
* @internal | ||
* @returns {number} current number of attempts | ||
*/ | ||
_increaseAttempts() { | ||
return this.attempts++; | ||
} | ||
/** | ||
* Resolve pending write with error or AppendRowResponse. | ||
* This resolves the promise accessed via GetResult() | ||
* | ||
* @see GetResult | ||
* | ||
* @private | ||
* @internal | ||
* @returns {number} current number of attempts | ||
*/ | ||
_markDone(err, response) { | ||
@@ -34,0 +55,0 @@ if (err) { |
@@ -31,2 +31,3 @@ /// <reference types="node" /> | ||
private _connection?; | ||
private _lastConnectionError?; | ||
private _callOptions?; | ||
@@ -38,5 +39,5 @@ private _pendingWrites; | ||
private handleError; | ||
private shouldReconnect; | ||
private isPermanentError; | ||
private isRequestError; | ||
private handleRetry; | ||
private isRetryableError; | ||
private isConnectionClosed; | ||
private resolveCallOptions; | ||
@@ -62,3 +63,6 @@ private handleData; | ||
getStreamId: () => string; | ||
private hasPendingWrites; | ||
private getNextPendingWrite; | ||
private resendAllPendingWrites; | ||
private ackAllPendingWrites; | ||
private ackNextPendingWrite; | ||
@@ -83,3 +87,3 @@ /** | ||
/** | ||
* Reconnect and re send inflight requests. | ||
* Re open appendRows BiDi gRPC connection. | ||
*/ | ||
@@ -86,0 +90,0 @@ reconnect(): void; |
@@ -21,3 +21,2 @@ "use strict"; | ||
const logger_1 = require("./logger"); | ||
const error_1 = require("./error"); | ||
/** | ||
@@ -40,27 +39,16 @@ * StreamConnection is responsible for writing requests to a bidirecional | ||
this.trace('on error', err, JSON.stringify(err)); | ||
if (this.shouldReconnect(err)) { | ||
this.reconnect(); | ||
return; | ||
this._lastConnectionError = err; | ||
const nextPendingWrite = this.getNextPendingWrite(); | ||
if (nextPendingWrite) { | ||
this.trace('found request error with pending write', err, nextPendingWrite); | ||
this.handleRetry(err); | ||
} | ||
let nextPendingWrite = this.getNextPendingWrite(); | ||
if (this.isPermanentError(err)) { | ||
this.trace('found permanent error', err); | ||
while (nextPendingWrite) { | ||
this.ackNextPendingWrite(err); | ||
nextPendingWrite = this.getNextPendingWrite(); | ||
} | ||
this.emit('error', err); | ||
if (this.listenerCount('error') === 0 && this.isRetryableError(err)) { | ||
return; | ||
} | ||
if (this.isRequestError(err) && nextPendingWrite) { | ||
this.trace('found request error with pending write', err, nextPendingWrite); | ||
this.ackNextPendingWrite(err); | ||
return; | ||
} | ||
this.emit('error', err); | ||
}; | ||
this.handleData = (response) => { | ||
this.trace('data arrived', response); | ||
const pw = this.getNextPendingWrite(); | ||
if (!pw) { | ||
this.trace('data arrived', response, this._pendingWrites.length); | ||
if (!this.hasPendingWrites()) { | ||
this.trace('data arrived with no pending write available', response); | ||
@@ -72,2 +60,11 @@ return; | ||
} | ||
const responseErr = response.error; | ||
if (responseErr) { | ||
const gerr = new gax.GoogleError(responseErr.message); | ||
gerr.code = responseErr.code; | ||
if (this.isRetryableError(gerr)) { | ||
this.handleRetry(gerr); | ||
return; | ||
} | ||
} | ||
this.ackNextPendingWrite(null, response); | ||
@@ -94,2 +91,3 @@ }; | ||
} | ||
this._lastConnectionError = null; | ||
const callOptions = this.resolveCallOptions(this._streamId, this._callOptions); | ||
@@ -102,3 +100,16 @@ const client = this._writeClient.getClient(); | ||
this._connection.on('close', () => { | ||
this.trace('connection closed'); | ||
this.trace('connection closed', this._lastConnectionError); | ||
if (this.hasPendingWrites()) { | ||
const retrySettings = this._writeClient._retrySettings; | ||
if (retrySettings.enableWriteRetries && | ||
this.isRetryableError(this._lastConnectionError)) { | ||
this.reconnect(); | ||
this.resendAllPendingWrites(); | ||
} | ||
else { | ||
const err = new gax.GoogleError('Connection failure, please retry the request'); | ||
err.code = gax.Status.UNAVAILABLE; | ||
this.ackAllPendingWrites(err); | ||
} | ||
} | ||
}); | ||
@@ -119,28 +130,33 @@ this._connection.on('pause', () => { | ||
} | ||
shouldReconnect(err) { | ||
const reconnectionErrorCodes = [ | ||
handleRetry(err) { | ||
const retrySettings = this._writeClient._retrySettings; | ||
if (retrySettings.enableWriteRetries && this.isRetryableError(err)) { | ||
if (!this.isConnectionClosed()) { | ||
const pw = this._pendingWrites.pop(); | ||
this.send(pw); | ||
} | ||
} | ||
else { | ||
this.ackNextPendingWrite(err); | ||
} | ||
} | ||
isRetryableError(err) { | ||
if (!err) { | ||
return false; | ||
} | ||
const errorCodes = [ | ||
gax.Status.ABORTED, | ||
gax.Status.UNAVAILABLE, | ||
gax.Status.RESOURCE_EXHAUSTED, | ||
gax.Status.ABORTED, | ||
gax.Status.CANCELLED, | ||
gax.Status.INTERNAL, | ||
gax.Status.DEADLINE_EXCEEDED, | ||
gax.Status.INTERNAL, | ||
]; | ||
return !!err.code && reconnectionErrorCodes.includes(err.code); | ||
return !!err.code && errorCodes.includes(err.code); | ||
} | ||
isPermanentError(err) { | ||
var _a; | ||
if (err.code === gax.Status.INVALID_ARGUMENT) { | ||
const storageErrors = (0, error_1.parseStorageErrors)(err); | ||
for (const storageError of storageErrors) { | ||
if ((_a = storageError.errorMessage) === null || _a === void 0 ? void 0 : _a.includes('Schema mismatch due to extra fields in user schema')) { | ||
return true; | ||
} | ||
} | ||
isConnectionClosed() { | ||
if (this._connection) { | ||
return this._connection.destroyed || this._connection.closed; | ||
} | ||
return false; | ||
return true; | ||
} | ||
isRequestError(err) { | ||
return err.code === gax.Status.INVALID_ARGUMENT; | ||
} | ||
resolveCallOptions(streamId, options) { | ||
@@ -187,11 +203,29 @@ const callOptions = options || {}; | ||
} | ||
hasPendingWrites() { | ||
return this._pendingWrites.length > 0; | ||
} | ||
getNextPendingWrite() { | ||
if (this._pendingWrites.length > 0) { | ||
return this._pendingWrites[0]; | ||
return this._pendingWrites[this._pendingWrites.length - 1]; | ||
} | ||
return null; | ||
} | ||
resendAllPendingWrites() { | ||
const pendingWritesToRetry = [...this._pendingWrites]; // copy array; | ||
let pw = pendingWritesToRetry.pop(); | ||
while (pw) { | ||
this._pendingWrites.pop(); // remove from real queue | ||
this.send(pw); // .send immediately adds to the queue | ||
pw = pendingWritesToRetry.pop(); | ||
} | ||
} | ||
ackAllPendingWrites(err, result) { | ||
while (this.hasPendingWrites()) { | ||
this.ackNextPendingWrite(err, result); | ||
} | ||
} | ||
ackNextPendingWrite(err, result) { | ||
const pw = this._pendingWrites.pop(); | ||
if (pw) { | ||
this.trace('ack pending write:', pw, err, result); | ||
pw._markDone(err, result); | ||
@@ -220,8 +254,10 @@ } | ||
send(pw) { | ||
const request = pw.getRequest(); | ||
if (!this._connection) { | ||
pw._markDone(new Error('connection closed')); | ||
var _a; | ||
const retrySettings = this._writeClient._retrySettings; | ||
const tries = pw._increaseAttempts(); | ||
if (tries > retrySettings.maxRetryAttempts) { | ||
pw._markDone(new Error(`pending write max retries reached: ${tries} attempts`)); | ||
return; | ||
} | ||
if (this._connection.destroyed || this._connection.closed) { | ||
if (this.isConnectionClosed()) { | ||
this.reconnect(); | ||
@@ -231,3 +267,5 @@ } | ||
try { | ||
this._connection.write(request, err => { | ||
const request = pw.getRequest(); | ||
this._pendingWrites.unshift(pw); | ||
(_a = this._connection) === null || _a === void 0 ? void 0 : _a.write(request, err => { | ||
this.trace('wrote pending write', err, this._pendingWrites.length); | ||
@@ -238,3 +276,2 @@ if (err) { | ||
} | ||
this._pendingWrites.unshift(pw); | ||
}); | ||
@@ -250,9 +287,9 @@ } | ||
isOpen() { | ||
return !!this._connection; | ||
return !this.isConnectionClosed(); | ||
} | ||
/** | ||
* Reconnect and re send inflight requests. | ||
* Re open appendRows BiDi gRPC connection. | ||
*/ | ||
reconnect() { | ||
this.trace('reconnect called'); | ||
this.trace(`reconnect called with ${this._pendingWrites.length} pending writes`); | ||
this.close(); | ||
@@ -287,3 +324,2 @@ this.open(); | ||
async flushRows(request) { | ||
this.close(); | ||
if (this.isDefaultStream()) { | ||
@@ -290,0 +326,0 @@ return null; |
@@ -6,2 +6,6 @@ import type { CallOptions, ClientOptions } from 'google-gax'; | ||
import { StreamConnection } from './stream_connection'; | ||
type RetrySettings = { | ||
enableWriteRetries: boolean; | ||
maxRetryAttempts: number; | ||
}; | ||
type BatchCommitWriteStreamsRequest = protos.google.cloud.bigquery.storage.v1.IBatchCommitWriteStreamsRequest; | ||
@@ -30,2 +34,8 @@ type BatchCommitWriteStreamsResponse = protos.google.cloud.bigquery.storage.v1.IBatchCommitWriteStreamsResponse; | ||
private _open; | ||
/** | ||
* Retry settings, only internal for now. | ||
* @private | ||
* @internal | ||
*/ | ||
_retrySettings: RetrySettings; | ||
constructor(opts?: ClientOptions); | ||
@@ -51,2 +61,19 @@ /** | ||
/** | ||
* Enables StreamConnections to automatically retry failed appends. | ||
* | ||
* Enabling retries is best suited for cases where users want to achieve at-least-once | ||
* append semantics. Use of automatic retries may complicate patterns where the user | ||
* is designing for exactly-once append semantics. | ||
*/ | ||
enableWriteRetries(enable: boolean): void; | ||
/** | ||
* Change max retries attempts on child StreamConnections. | ||
* | ||
* The default valuen is to retry 4 times. | ||
* | ||
* Only valid right now when write retries are enabled. | ||
* @see enableWriteRetries. | ||
*/ | ||
setMaxRetryAttempts(retryAttempts: number): void; | ||
/** | ||
* Creates a write stream to the given table. | ||
@@ -53,0 +80,0 @@ * Additionally, every table has a special stream named DefaultStream |
@@ -69,2 +69,6 @@ "use strict"; | ||
this._open = false; | ||
this._retrySettings = { | ||
enableWriteRetries: false, | ||
maxRetryAttempts: 4, | ||
}; | ||
} | ||
@@ -78,2 +82,23 @@ /** | ||
/** | ||
* Enables StreamConnections to automatically retry failed appends. | ||
* | ||
* Enabling retries is best suited for cases where users want to achieve at-least-once | ||
* append semantics. Use of automatic retries may complicate patterns where the user | ||
* is designing for exactly-once append semantics. | ||
*/ | ||
enableWriteRetries(enable) { | ||
this._retrySettings.enableWriteRetries = enable; | ||
} | ||
/** | ||
* Change max retries attempts on child StreamConnections. | ||
* | ||
* The default valuen is to retry 4 times. | ||
* | ||
* Only valid right now when write retries are enabled. | ||
* @see enableWriteRetries. | ||
*/ | ||
setMaxRetryAttempts(retryAttempts) { | ||
this._retrySettings.maxRetryAttempts = retryAttempts; | ||
} | ||
/** | ||
* Creates a write stream to the given table. | ||
@@ -80,0 +105,0 @@ * Additionally, every table has a special stream named DefaultStream |
@@ -81,3 +81,3 @@ "use strict"; | ||
let offset; | ||
if (offsetValue) { | ||
if (offsetValue !== undefined && offsetValue !== null) { | ||
offset = { | ||
@@ -84,0 +84,0 @@ value: offsetValue, |
# Changelog | ||
## [4.6.0](https://github.com/googleapis/nodejs-bigquery-storage/compare/v4.5.0...v4.6.0) (2024-05-03) | ||
### Features | ||
* Add support for RANGE type with Write API and adapt pkg ([#437](https://github.com/googleapis/nodejs-bigquery-storage/issues/437)) ([51924ab](https://github.com/googleapis/nodejs-bigquery-storage/commit/51924ab3639253ff79654d90fbada368f5fd5e18)) | ||
* Enable write retry and nack pending writes on reconnect ([#443](https://github.com/googleapis/nodejs-bigquery-storage/issues/443)) ([ce4f88c](https://github.com/googleapis/nodejs-bigquery-storage/commit/ce4f88c668afb8ebf1d5b7ad57f1c4e245a1a8f8)) | ||
## [4.5.0](https://github.com/googleapis/nodejs-bigquery-storage/compare/v4.4.0...v4.5.0) (2024-04-15) | ||
@@ -4,0 +12,0 @@ |
{ | ||
"name": "@google-cloud/bigquery-storage", | ||
"version": "4.5.0", | ||
"version": "4.6.0", | ||
"description": "Client for the BigQuery Storage API", | ||
@@ -5,0 +5,0 @@ "repository": "googleapis/nodejs-bigquery-storage", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
3626071
56952