google-cloud-bigquery
Advanced tools
Comparing version 0.2.4 to 0.2.5
@@ -5,2 +5,12 @@ # Change Log | ||
<a name="0.2.5"></a> | ||
## [0.2.5](https://github.com/nicolasdao/google-cloud-bigquery/compare/v0.2.4...v0.2.5) (2018-12-03) | ||
### Features | ||
* Add support for testing if the schema has changed + update schema ([8b750e7](https://github.com/nicolasdao/google-cloud-bigquery/commit/8b750e7)) | ||
<a name="0.2.4"></a> | ||
@@ -7,0 +17,0 @@ ## [0.2.4](https://github.com/nicolasdao/google-cloud-bigquery/compare/v0.2.3...v0.2.4) (2018-11-22) |
18
index.js
@@ -11,3 +11,4 @@ /** | ||
const bigQuery = require('./src') | ||
const { fitToSchema } = require('./src/format') | ||
const { fitToSchema, fieldsToSchema } = require('./src/format') | ||
const { obj } = require('./utils') | ||
@@ -82,2 +83,17 @@ const _getToken = auth => new Promise((onSuccess, onFailure) => auth.getToken((err, token) => err ? onFailure(err) : onSuccess(token))) | ||
fromStorage: ({ sources=[] }) => __getToken().then(token => bigQuery.table.createFromStorage(projectId, db, table, sources, token)) | ||
}, | ||
schema: { | ||
isDiff: (schema) => __getToken().then(token => bigQuery.table.get(projectId, db, table, token)).then(({ data }) => { | ||
if (!schema) | ||
throw new Error('Missing required \'schema\' argument.') | ||
if (Object.keys(schema).length == 0) | ||
throw new Error('Wrong argument \'schema\'. This object must at least contain one property.') | ||
if (!data.schema || !(data.schema.fields || []).some(x => x)) | ||
return true | ||
const currentSchema = fieldsToSchema(data.schema.fields) | ||
return !obj.same(schema, currentSchema) | ||
}), | ||
update: (schema) => __getToken().then(token => bigQuery.table.update(projectId, db, table, schema, token)) | ||
} | ||
@@ -84,0 +100,0 @@ }), |
{ | ||
"name": "google-cloud-bigquery", | ||
"version": "0.2.4", | ||
"version": "0.2.5", | ||
"description": "Node.js package to create BigQuery table from Google Cloud Storage or load data into Google Cloud BigQuery tables including automatically updating the tables' schema.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
150
README.md
@@ -36,3 +36,3 @@ # Google Cloud BigQuery · [![NPM](https://img.shields.io/npm/v/google-cloud-bigquery.svg?style=flat)](https://www.npmjs.com/package/google-cloud-bigquery) [![Tests](https://travis-ci.org/nicolasdao/google-cloud-bigquery.svg?branch=master)](https://travis-ci.org/nicolasdao/google-cloud-bigquery) [![License](https://img.shields.io/badge/License-BSD%203--Clause-blue.svg)](https://opensource.org/licenses/BSD-3-Clause) [![Neap](https://neap.co/img/made_by_neap.svg)](#this-is-what-we-re-up-to) | ||
## Show Me The Code | ||
### Basics | ||
### Creating A New Table | ||
@@ -49,62 +49,68 @@ ```js | ||
// Example 1 - Creating a new table, adding data, reading data | ||
const YOUR_TABLE = 'user' | ||
db.table(YOUR_TABLE).exists() | ||
.then(yes => { | ||
if (yes) | ||
console.log(`Table '${YOUR_TABLE}' already exists in DB '${YOUR_DB}'`) | ||
else | ||
return db.table(YOUR_TABLE).create.new({ | ||
schema: { | ||
.then(yes => yes | ||
? console.log(`Table '${YOUR_TABLE}' already exists in DB '${YOUR_DB}'`) | ||
: db.table(YOUR_TABLE).create.new({ | ||
schema: { | ||
id: 'integer', | ||
username: 'string', | ||
friends: [{ | ||
id: 'integer', | ||
username: 'string', | ||
friends: [{ | ||
id: 'integer', | ||
username: 'string', | ||
score: 'float' | ||
}], | ||
country: { | ||
code: 'string', | ||
name: 'string' | ||
}, | ||
married: 'boolean', | ||
tags:['string'], | ||
inserted_date: 'timestamp' | ||
} | ||
}) | ||
}) | ||
.then(() => db.table(YOUR_TABLE).insert.values({ data:[{ | ||
score: 'float' | ||
}], | ||
country: { | ||
code: 'string', | ||
name: 'string' | ||
}, | ||
married: 'boolean', | ||
tags:['string'], | ||
inserted_date: 'timestamp' | ||
} | ||
}).then(() => console.log(`Table '${YOUR_TABLE}' successfully added to DB '${YOUR_DB}'`))) | ||
``` | ||
### Inserting Data | ||
```js | ||
db.table(YOUR_TABLE).insert.values({ data:[{ | ||
id: 1, | ||
username: 'Nicolas', | ||
inserted_date: new Date() | ||
}, { | ||
id: 2, | ||
username: 'Brendan', | ||
country: { | ||
code: 'AU', | ||
name: 'Australia' | ||
}, | ||
friends:[{ | ||
id: 1, | ||
username: 'Nicolas', | ||
inserted_date: new Date() | ||
score: 0.87 | ||
}, { | ||
id: 2, | ||
username: 'Brendan', | ||
country: { | ||
code: 'AU', | ||
name: 'Australia' | ||
}, | ||
friends:[{ | ||
id: 1, | ||
username: 'Nicolas', | ||
score: 0.87 | ||
}, { | ||
id: 3, | ||
username: 'Boris', | ||
score: 0.9 | ||
}], | ||
inserted_date: new Date() | ||
}, { | ||
id: '3', | ||
id: 3, | ||
username: 'Boris', | ||
tags:['admin',1], | ||
inserted_date: Date.now()/1000 | ||
}] | ||
})) | ||
.then(() => db.query.execute({ | ||
sql:`select * from ${YOUR_DB}.${YOUR_TABLE} where id = @id`, | ||
params: { id: 2 } | ||
})) | ||
.then(({ data }) => console.log(JSON.stringify(data, null, ' '))) | ||
score: 0.9 | ||
}], | ||
inserted_date: new Date() | ||
}, { | ||
id: '3', | ||
username: 'Boris', | ||
tags:['admin',1], | ||
inserted_date: Date.now()/1000 | ||
}] | ||
}) | ||
``` | ||
### Getting Data | ||
```js | ||
db.query.execute({ | ||
sql:`select * from ${YOUR_DB}.${YOUR_TABLE} where id = @id`, | ||
params: { id: 2 } | ||
}) | ||
.then(({ data }) => console.log(JSON.stringify(data, null, ' '))) | ||
// Query Output | ||
@@ -141,4 +147,42 @@ // ============ | ||
### Extra Precautions While Inserting Data | ||
### Updating The Table's Schema | ||
With BigQuery, only 2 types of updates are possible: | ||
1. Adding new fields | ||
2. Relaxing the constraint on a field from `REQUIRED` to `NULLABLE` | ||
The second type of update is not usefull here as this project always creates nullable fields. The following example shows how to perform a schema update if the local schema is different from the current BigQuery schema: | ||
```js | ||
// Let's add a new 'deleted_date' field to our local schema | ||
const newSchema = { | ||
id: 'integer', | ||
username: 'string', | ||
friends: [{ | ||
id: 'integer', | ||
username: 'string', | ||
score: 'float' | ||
}], | ||
country: { | ||
code: 'string', | ||
name: 'string' | ||
}, | ||
married: 'boolean', | ||
tags:['string'], | ||
inserted_date: 'timestamp', | ||
deleted_date: 'timestamp' | ||
} | ||
db.table(YOUR_TABLE).schema.isDiff(newSchema) | ||
.then(yes => yes | ||
? Promise.resolve(console.log(`Schema changes detected. Updating now...`)) | ||
.then(() => db.table(YOUR_TABLE).schema.update(newSchema)) | ||
.then(() => console.log(`Schema successfully updated.`)) | ||
: console.log(`No schema updates found`) | ||
) | ||
``` | ||
## Extra Precautions While Inserting Data | ||
BigQuery casting capabilities are quite limited. When a type does not fit into the table, that row will either crashes the entire insert, or will be completely be ignored (we're using that last setting). To make sure that as much data is being inserted as possible, we've added an option called `forcedSchema` in the `db.table('some-table').insert.values` api: | ||
@@ -145,0 +189,0 @@ |
@@ -327,3 +327,12 @@ /** | ||
const _primitiveTypes = { 'number': 'INTEGER', 'integer': 'INTEGER', 'float': 'FLOAT', 'timestamp': 'TIMESTAMP', 'boolean': 'BOOLEAN', 'string': 'STRING' } | ||
const transpileSchema = (schema={}) => { | ||
/** | ||
* [description] | ||
* @param {Object} schema [description] | ||
* @return {[Field]} results.fields | ||
* @return {String} results.fields.name Field's name | ||
* @return {String} results.fields.type One of the following values: 'INTEGER', 'FLOAT', 'TIMESTAMP', 'BOOLEAN', 'STRING', 'RECORD' | ||
* @return {String} results.fields.mode One of the following values: 'NULLABLE', 'REPEATED', 'REQUIRED' | ||
* @return {[Field]} results.fields.fields If 'type' is 'RECORD', then these are the fields of the record | ||
*/ | ||
const schemaToFields = (schema={}) => { | ||
const keys = Object.keys(schema) | ||
@@ -344,3 +353,3 @@ if (!keys.some(x => x)) | ||
type = 'RECORD' | ||
fields = transpileSchema(v).fields | ||
fields = schemaToFields(v).fields | ||
} else { | ||
@@ -353,3 +362,3 @@ const arg = v[0] | ||
type = 'RECORD' | ||
fields = transpileSchema(arg).fields | ||
fields = schemaToFields(arg).fields | ||
} | ||
@@ -366,2 +375,22 @@ } | ||
const fieldsToSchema = fields => { | ||
fields = fields || [] | ||
return fields.reduce((acc, { name, type, mode, fields }) => { | ||
const t = type.toLowerCase().trim() | ||
const m = mode.toLowerCase().trim() | ||
if (t != 'record' && m != 'repeated') | ||
acc[name] = t | ||
else if (t == 'record') { | ||
const v = fieldsToSchema(fields) | ||
if (m == 'repeated') | ||
acc[name] = [v] | ||
else | ||
acc[name] = v | ||
} else if (m == 'repeated') | ||
acc[name] = [t] | ||
return acc | ||
}, {}) | ||
} | ||
const bigQueryResultToJson = (data={}) => { | ||
@@ -405,4 +434,5 @@ if (data && data.schema && data.schema.fields && data.rows) { | ||
fitToSchema, | ||
transpileSchema, | ||
bigQueryResultToJson | ||
schemaToFields, | ||
bigQueryResultToJson, | ||
fieldsToSchema | ||
} |
@@ -10,3 +10,3 @@ /** | ||
const { fetch } = require('../utils') | ||
const { transpileSchema, bigQueryResultToJson } = require('./format') | ||
const { schemaToFields, bigQueryResultToJson } = require('./format') | ||
@@ -209,3 +209,3 @@ // BigQuery Jobs APIs doc: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert | ||
}, | ||
schema: transpileSchema(schema) | ||
schema: schemaToFields(schema) | ||
} | ||
@@ -218,2 +218,18 @@ return fetch.post(BIGQUERY_TABLE_URL(projectId,db), { | ||
const updateTable = (projectId, db, table, schema={}, token) => Promise.resolve(null).then(() => { | ||
_validateRequiredParams({ projectId, db, table, token }) | ||
const payload = { | ||
tableReference: { | ||
datasetId: db, | ||
projectId, | ||
tableId: table | ||
}, | ||
schema: schemaToFields(schema) | ||
} | ||
return fetch.patch(BIGQUERY_TABLE_URL(projectId,db,table), { | ||
'Content-Type': 'application/json', | ||
Authorization: `Bearer ${token}` | ||
}, JSON.stringify(payload)) | ||
}) | ||
const getJob = (projectId, locationId, jobId, token) => fetch.get( | ||
@@ -234,3 +250,4 @@ BIGQUERY_JOB_URL(projectId, locationId, jobId), { | ||
loadData: loadData, | ||
insert: insertData | ||
insert: insertData, | ||
update: updateTable | ||
}, | ||
@@ -237,0 +254,0 @@ job: { |
@@ -10,3 +10,6 @@ /** | ||
const { assert } = require('chai') | ||
const { cleanData, fitToSchema, transpileSchema, bigQueryResultToJson } = require('../src/format') | ||
const { cleanData, fitToSchema, schemaToFields, bigQueryResultToJson, fieldsToSchema } = require('../src/format') | ||
const schemaSample = require('./mocks/schema') | ||
const fieldSchemaSample = require('./mocks/bigquerySchema') | ||
const { obj } = require('../utils') | ||
@@ -330,3 +333,3 @@ const NULL_FLOAT = 5e-16 | ||
describe('#transpileSchema', () => { | ||
describe('#schemaToFields', () => { | ||
it('Should transform a schema to a Google BigQuery table schema', () => { | ||
@@ -358,3 +361,3 @@ const schema = { | ||
const { fields=[] } = transpileSchema(schema) | ||
const { fields=[] } = schemaToFields(schema) | ||
assert.equal(fields.length, 8, '01') | ||
@@ -671,2 +674,9 @@ | ||
}) | ||
describe('#fieldsToSchema', () => { | ||
it('Should convert a BigQuery fields schema into a schema', () => { | ||
const schema = fieldsToSchema(fieldSchemaSample.fields) | ||
assert.isOk(obj.same(schema, schemaSample), '01') | ||
}) | ||
}) | ||
}) |
@@ -107,3 +107,3 @@ /** | ||
const isObj = obj => { | ||
if (!obj || typeof(obj) != 'object') | ||
if (!obj || typeof(obj) != 'object' || Array.isArray(obj) || (obj instanceof Date)) | ||
return false | ||
@@ -150,13 +150,77 @@ | ||
const objAreSame = (obj1, obj2) => { | ||
const o = getDiff(obj1, obj2) | ||
return Object.keys(o || {}).length == 0 | ||
} | ||
/** | ||
* [description] | ||
* @param {Object} o_1 That can be anything, incl. primitive type | ||
* @param {Object} o_2 That can be anything, incl. primitive type | ||
* @param {Object} options.throwError Default false. If set to true, a failed test throws an exception with the details. | ||
* @return {Boolean} Whether or not the test passes | ||
*/ | ||
const objAreSame = (o_1, o_2, options={}) => { | ||
const failed = msg => { | ||
if (options.throwError) | ||
throw new Error(msg) | ||
else | ||
return false | ||
} | ||
if (o_1 === o_2) | ||
return true | ||
if (o_1 === null || o_1 === undefined) | ||
return failed(`The first object is non-truthy while the second is truthy`) | ||
const arrayObjAreDiff = (objArr_01, objArr_02) => { | ||
objArr_01 = objArr_01 || [] | ||
objArr_02 = objArr_02 || [] | ||
if (objArr_01.length != objArr_02.length) | ||
return false | ||
return objArr_01.some(h1 => !objArr_02.some(h2 => objAreSame(h1, h2))) | ||
if (o_2 === null || o_2 === undefined) | ||
return failed(`The second object is non-truthy while the first is truthy`) | ||
const o_1_type = o_1 instanceof Date ? 'date' : Array.isArray(o_1) ? 'array' : typeof(o_1) | ||
const o_2_type = o_2 instanceof Date ? 'date' : Array.isArray(o_2) ? 'array' : typeof(o_2) | ||
if (o_1_type != o_2_type) | ||
return failed(`Object types do not match (${o_1_type} != ${o_2_type})`) | ||
if (o_1_type == 'date') | ||
return o_1.toString() == o_2.toString() ? true : failed(`Dates don't match (${o_1} != ${o_2})`) | ||
if (o_1_type == 'object') { | ||
const o_1_keys = Object.keys(o_1) | ||
const o_2_keys = Object.keys(o_2) | ||
if (o_1_keys.length > o_2_keys.length) { | ||
const additionalKey = o_1_keys.find(key => !o_2_keys.some(k => k == key)) | ||
return failed(`Property '${additionalKey}' in the first object does not exit in the second`) | ||
} | ||
if (o_1_keys.length < o_2_keys.length) { | ||
const additionalKey = o_2_keys.find(key => !o_1_keys.some(k => k == key)) | ||
return failed(`Property '${additionalKey}' in the second object does not exit in the first`) | ||
} | ||
const additionalKey = o_2_keys.find(key => !o_1_keys.some(k => k == key)) | ||
if (additionalKey) | ||
return failed(`Property '${additionalKey}' in the second object does not exit in the first`) | ||
return o_1_keys.reduce((isSame, key) => { | ||
if (!isSame) | ||
return isSame | ||
const o_1_val = o_1[key] | ||
const o_2_val = o_2[key] | ||
try { | ||
return objAreSame(o_1_val, o_2_val, { throwError: true }) | ||
} catch(err) { | ||
return failed(`Differences in property '${key}': ${err.message}`) | ||
} | ||
}, true) | ||
} | ||
if (o_1_type == 'array') { | ||
if (o_1.length != o_2.length) { | ||
return failed(`Arrays don't have the same amount of items`) | ||
} | ||
return o_1.reduce((isSame, obj_1) => { | ||
if (!isSame) | ||
return isSame | ||
return o_2.some(obj_2 => objAreSame(obj_1, obj_2)) ? true : failed(`No objects in the second array can match object ${JSON.stringify(obj_1, null, ' ')}`) | ||
}, true) | ||
} | ||
return failed(`Those 2 objects are not equal: ${o_1}, ${o_2}`) | ||
} | ||
@@ -225,5 +289,4 @@ | ||
diff: getDiff, | ||
same: objAreSame, | ||
arrayAreDiff: arrayObjAreDiff | ||
same: objAreSame | ||
} | ||
} |
@@ -15,2 +15,7 @@ /** | ||
const patchData = (url, headers={}, body) => Promise.resolve(null).then(() => { | ||
return fetch(url, { method: 'PATCH', headers, body }) | ||
.then(res => res.json().then(data => ({ status: res.status, data }))) | ||
}) | ||
const getData = (url, headers={}) => Promise.resolve(null).then(() => { | ||
@@ -22,4 +27,5 @@ return fetch(url, { method: 'GET', headers }) | ||
module.exports = { | ||
'get': getData, | ||
post: postData, | ||
'get': getData | ||
patch: patchData | ||
} |
171056
21
4532
257
12