google-cloud-bigquery
Advanced tools
Comparing version 0.2.7 to 0.2.9
@@ -5,2 +5,12 @@ # Change Log | ||
<a name="0.2.9"></a> | ||
## [0.2.9](https://github.com/nicolasdao/google-cloud-bigquery/compare/v0.2.8...v0.2.9) (2018-12-09) | ||
<a name="0.2.8"></a> | ||
## [0.2.8](https://github.com/nicolasdao/google-cloud-bigquery/compare/v0.2.7...v0.2.8) (2018-12-09) | ||
<a name="0.2.7"></a> | ||
@@ -7,0 +17,0 @@ ## [0.2.7](https://github.com/nicolasdao/google-cloud-bigquery/compare/v0.2.6...v0.2.7) (2018-12-04) |
10
index.js
@@ -12,3 +12,3 @@ /** | ||
const { fitToSchema, fieldsToSchema } = require('./src/format') | ||
const { obj } = require('./utils') | ||
const { obj, promise: { retry } } = require('./utils') | ||
@@ -47,2 +47,8 @@ const _getToken = auth => new Promise((onSuccess, onFailure) => auth.getToken((err, token) => err ? onFailure(err) : onSuccess(token))) | ||
const _retryInsert = (...args) => retry( | ||
() => bigQuery.table.insert(...args), | ||
() => true, | ||
{ ignoreFailure: true, retryInterval: [200, 800], retryAttempts: 10 } | ||
) | ||
return { | ||
@@ -75,3 +81,3 @@ db: { | ||
const dd = forcedSchema ? d.map(x => fitToSchema(x,forcedSchema)) : d | ||
const _insert = insert || bigQuery.table.insert | ||
const _insert = insert || _retryInsert | ||
return _insert(projectId, db, table, dd, token, { templateSuffix, skipInvalidRows }).then(res => { | ||
@@ -78,0 +84,0 @@ res = res || {} |
{ | ||
"name": "google-cloud-bigquery", | ||
"version": "0.2.7", | ||
"version": "0.2.9", | ||
"description": "Node.js package to create BigQuery table from Google Cloud Storage or load data into Google Cloud BigQuery tables including automatically updating the tables' schema.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
# Google Cloud BigQuery · [![NPM](https://img.shields.io/npm/v/google-cloud-bigquery.svg?style=flat)](https://www.npmjs.com/package/google-cloud-bigquery) [![Tests](https://travis-ci.org/nicolasdao/google-cloud-bigquery.svg?branch=master)](https://travis-ci.org/nicolasdao/google-cloud-bigquery) [![License](https://img.shields.io/badge/License-BSD%203--Clause-blue.svg)](https://opensource.org/licenses/BSD-3-Clause) [![Neap](https://neap.co/img/made_by_neap.svg)](#this-is-what-we-re-up-to) | ||
__*Google Cloud BigQuery*__ is node.js package to create BigQuery table from Google Cloud Storage or load data into Google Cloud BigQuery tables including automatically updating the tables' schema. | ||
__*Google Cloud BigQuery*__ is a node.js package to maintain BigQuery table, either explicitely or using a Google Cloud Storage (including automatically updating the tables' schema). | ||
@@ -8,2 +8,3 @@ # Table of Contents | ||
> * [How To Use It](#how-to-use-it) | ||
> * [Useful Code Snippets](#snippets-to-put-it-all-together) | ||
> * [About Neap](#this-is-what-we-re-up-to) | ||
@@ -26,7 +27,7 @@ > * [License](#license) | ||
2. Have a both a BigQuery DB and a Bucket in the same region. | ||
2. Have a both a BigQuery DB and a Bucket in the same region (the bucket is only in case you wish to maintain BigQuery schema using data stored a Google Cloud Storage). | ||
3. Have a Service Account set up with the following 2 roles: | ||
- `roles/storage.objectAdmin` | ||
- `roles/bigquery.admin` | ||
- `roles/storage.objectAdmin` (only in case you wish to maintain BigQuery schema using data stored a Google Cloud Storage) | ||
@@ -213,9 +214,55 @@ 4. Get the JSON keys file for that Service Account above | ||
This object is guaranteed to comply to the schema so as much data is being inserted. | ||
This object is guaranteed to comply to the schema. This will guarantee that all the data is inserted. | ||
> Notice the usage of the `bigQuery.job.get` to check the status of the job. The signature of that api is as follow: | ||
> `bigQuery.job.get({ projectId: 'your-project-id', location: 'asia-northeast1', jobId: 'a-job-id' })` | ||
## Snippets To Put It All Together | ||
### Indempotent Script To Keep Your DB Tables In Sync | ||
The code snippet below shows how you can create a new tables if they don't exist yet and update their schema if their schema has changed when compared with the local version. | ||
```js | ||
const { join } = require('path') | ||
const { client } = require('google-cloud-bigquery') | ||
// The line below assumes you have a file 'schema.js' located under 'path-to-your-schema-file' | ||
// organised in a way where the 'schema' object below is structured as follow: | ||
// schema.table_01 This is the schema of 'table_01' | ||
// schema.table_02 This is the schema of 'table_02' | ||
const schema = require('path-to-your-schema-file/schema.js') | ||
const bigQuery = client.new({ jsonKeyFile: join(__dirname, './service-account.json') }) | ||
const db = bigQuery.db.get('your-dataset-id') | ||
const tbl_01 = db.table('table_01') | ||
const tbl_02 = db.table('table_02') | ||
const maintainTablesScript = () => { | ||
console.log('\nChecking for BigQuery tables updates...') | ||
return [tbl_01, tbl_02].map(table => ({ table, schema: schema[table.name] })).reduce((job, { table, schema }) => job | ||
.then(() => | ||
table.exists() | ||
.then(tableExists => tableExists | ||
? console.log(` - Table '${table.name}': Table already exists in DB '${db.name}'.`) | ||
: Promise.resolve(console.log(` - Table '${table.name}': Table not found. Creating it now...`)) | ||
.then(() => table.create.new({ schema })) | ||
.then(() => console.log(` - Table '${table.name}': Table successfully created.`)) | ||
) | ||
.then(() => table.schema.isDiff(schema)) | ||
.then(schemaHasChanged => schemaHasChanged | ||
? Promise.resolve(console.log(` - Table '${table.name}': Schema changes detected in table. Updating now...`)) | ||
.then(() => table.schema.update(schema)) | ||
.then(() => console.log(` - Table '${table.name}': Schema successfully updated.`)) | ||
: console.log(` - Table '${table.name}': No schema updates found.`) | ||
) | ||
) | ||
.catch(err => { | ||
console.log(` - Table '${table.name}': Oops... An error occured: ${err.message}`) | ||
}), | ||
Promise.resolve(null)) | ||
} | ||
maintainTablesScript() | ||
``` | ||
# This Is What We re Up To | ||
@@ -222,0 +269,0 @@ We are Neap, an Australian Technology consultancy powering the startup ecosystem in Sydney. We simply love building Tech and also meeting new people, so don't hesitate to connect with us at [https://neap.co](https://neap.co). |
@@ -44,9 +44,13 @@ /** | ||
* @param {Function} fn [description] | ||
* @param {Function} successFn Returns a boolean that determines whether a response is valid or not. | ||
* Can be a normal function or a promise | ||
* @param {Function} failureFn (Optional) Returns a boolean that determines whether an exception can be ignored or not. | ||
* Can be a normal function or a promise | ||
* @param {Function} successFn (res, options) => Returns a promise or a value. The value is a boolean or an object that determines | ||
* whether a response is valid or not. If the value is an object, that object might contain | ||
* a 'retryInterval' which overrides the optional value. | ||
* @param {Function} failureFn (Optional) (error, options) => Returns a promise or a value. The value is a boolean or an object that determines | ||
* whether a response is valid or not. If the value is an object, that object might contain | ||
* a 'retryInterval' which overrides the optional value. | ||
* @param {Number} options.retryAttempts default: 5. Number of retry | ||
* @param {Number} options.retryInterval default: 5000. Time interval in milliseconds between each retry | ||
* @param {Number} options.attemptsCount Current retry count. When that counter reaches the 'retryAttempts', the function stops. | ||
* @param {Number} options.timeOut If specified, 'retryAttempts' and 'attemptsCount' are ignored | ||
* @param {Number} options.retryInterval default: 5000. Time interval in milliseconds between each retry. It can also be a 2 items array. | ||
* In that case, the retryInterval is a random number between the 2 ranges (e.g., [10, 100] => 54) | ||
* @param {Boolean} options.ignoreError In case of constant failure to pass the 'successFn' test, this function will either throw an error | ||
@@ -61,33 +65,59 @@ * or return the current result without throwing an error if this flag is set to true. | ||
'function fn, function successFn, function failureFn, object options={}', | ||
({ fn, successFn, failureFn, options={} }) => Promise.resolve(null) | ||
.then(() => fn()).then(data => ({ error: null, data })) | ||
.catch(error => { | ||
if (options.ignoreFailure && !failureFn) | ||
failureFn = () => true | ||
return { error, data: null } | ||
}) | ||
.then(({ error, data }) => Promise.resolve(null).then(() => { | ||
if (error && failureFn) | ||
return failureFn(error) | ||
else if (error) | ||
throw error | ||
else | ||
return successFn(data) | ||
}) | ||
.then(passed => { | ||
if (!error && passed) | ||
return data | ||
else if ((!error && !passed) || (error && passed)) { | ||
const { retryAttempts=5, retryInterval=5000, attemptsCount=0 } = options | ||
if (attemptsCount < retryAttempts) | ||
return delay(retryInterval).then(() => retry(fn, successFn, merge(options, { attemptsCount:attemptsCount+1 }))) | ||
else if (options.ignoreError) | ||
({ fn, successFn, failureFn, options={} }) => { | ||
const start = Date.now() | ||
return Promise.resolve(null) | ||
.then(() => fn()).then(data => ({ error: null, data })) | ||
.catch(error => { | ||
if (options.ignoreFailure && !failureFn) | ||
failureFn = () => true | ||
return { error, data: null } | ||
}) | ||
.then(({ error, data }) => Promise.resolve(null) | ||
.then(() => { | ||
if (error && failureFn) | ||
return failureFn(error, options) | ||
else if (error) | ||
throw error | ||
else | ||
return successFn(data, options) | ||
}) | ||
.then(passed => { | ||
if (!error && passed) | ||
return data | ||
else | ||
throw new Error(options.errorMsg ? options.errorMsg : `${retryAttempts} attempts to retry the procedure failed to pass the test`) | ||
} else | ||
throw error | ||
}))) | ||
else if ((!error && !passed) || (error && passed)) { | ||
let { retryAttempts=5, retryInterval=5000, attemptsCount=0, timeOut=null, startTime=null } = options | ||
if (timeOut > 0) { | ||
startTime = startTime || start | ||
if (Date.now() - startTime < timeOut) { | ||
const explicitRetryInterval = passed && passed.retryInterval > 0 ? passed.retryInterval : null | ||
const i = (!explicitRetryInterval && Array.isArray(retryInterval) && retryInterval.length > 1) | ||
? (() => { | ||
if (typeof(retryInterval[0]) != 'number' || typeof(retryInterval[1]) != 'number') | ||
throw new Error(`Wrong argument exception. When 'options.retryInterval' is an array, all elements must be numbers. Current: [${retryInterval.join(', ')}].`) | ||
if (retryInterval[0] > retryInterval[1]) | ||
throw new Error(`Wrong argument exception. When 'options.retryInterval' is an array, the first element must be strictly greater than the second. Current: [${retryInterval.join(', ')}].`) | ||
return math.randomNumber(retryInterval[0], retryInterval[1]) | ||
})() | ||
: (explicitRetryInterval || retryInterval) | ||
return delay(i).then(() => failureFn | ||
? retry(fn, successFn, failureFn, merge(options, { startTime })) | ||
: retry(fn, successFn, merge(options, { startTime }))) | ||
} else | ||
throw new Error('timeout') | ||
} else if (attemptsCount < retryAttempts) | ||
return delay(retryInterval).then(() => failureFn | ||
? retry(fn, successFn, failureFn, merge(options, { attemptsCount:attemptsCount+1 })) | ||
: retry(fn, successFn, merge(options, { attemptsCount:attemptsCount+1 }))) | ||
else if (options.ignoreError) | ||
return data | ||
else | ||
throw new Error(options.errorMsg ? options.errorMsg : `${retryAttempts} attempts to retry the procedure failed to pass the test`) | ||
} else | ||
throw error | ||
})) | ||
}) | ||
module.exports = { | ||
@@ -94,0 +124,0 @@ delay, |
174034
4549
303