@ovotech/bigquery-pg-sink
Advanced tools
Comparing version 1.0.1 to 2.0.0
/// <reference types="node" /> | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
import { Writable } from 'stream'; | ||
@@ -6,10 +7,9 @@ import { PGSinkStreamOptions } from './types'; | ||
private pg; | ||
private table; | ||
private insert; | ||
constructor({ pg, table, insert, highWaterMark }: PGSinkStreamOptions); | ||
constructor({ pg, insert, highWaterMark }: PGSinkStreamOptions); | ||
_writev?(chunks: Array<{ | ||
chunk: any; | ||
chunk: RowMetadata; | ||
encoding: string; | ||
}>, callback: (error?: Error | undefined) => void): Promise<void>; | ||
_write(chunk: any, _: string, callback: (err?: Error | undefined) => void): Promise<void>; | ||
_write(chunk: RowMetadata, _: string, callback: (err?: Error | undefined) => void): Promise<void>; | ||
} |
@@ -5,6 +5,5 @@ "use strict"; | ||
class BigQueryPGSinkStream extends stream_1.Writable { | ||
constructor({ pg, table, insert, highWaterMark = 200 }) { | ||
constructor({ pg, insert, highWaterMark = 200 }) { | ||
super({ objectMode: true, highWaterMark }); | ||
this.pg = pg; | ||
this.table = table; | ||
this.insert = insert; | ||
@@ -15,3 +14,6 @@ } | ||
const rows = chunks.map(chunk => chunk.chunk); | ||
await this.pg.query(...this.insert(this.table, rows)); | ||
const inserts = this.insert(rows); | ||
for (const insert of inserts) { | ||
await this.pg.query(insert.query, insert.values); | ||
} | ||
callback(); | ||
@@ -25,3 +27,6 @@ } | ||
try { | ||
await this.pg.query(...this.insert(this.table, [chunk])); | ||
const inserts = this.insert([chunk]); | ||
for (const insert of inserts) { | ||
await this.pg.query(insert.query, insert.values); | ||
} | ||
callback(); | ||
@@ -28,0 +33,0 @@ } |
import { Client } from 'pg'; | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
export interface InsertBatch { | ||
query: string; | ||
values: any[]; | ||
} | ||
export interface PGSinkStreamOptions { | ||
pg: Client; | ||
table: string; | ||
insert: (table: string, rows: any[]) => [string, any[]]; | ||
insert: (rows: RowMetadata) => InsertBatch[]; | ||
highWaterMark?: number; | ||
} |
{ | ||
"name": "@ovotech/bigquery-pg-sink", | ||
"description": "Stream BigQuery query results into a postgres database", | ||
"version": "1.0.1", | ||
"version": "2.0.0", | ||
"main": "dist/index.js", | ||
@@ -32,3 +32,6 @@ "source": "src/index.ts", | ||
}, | ||
"gitHead": "08920fa2c6cc9a25f01f9de4c1b2283291d93b1b" | ||
"dependencies": { | ||
"@google-cloud/bigquery": "^4.7.0" | ||
}, | ||
"gitHead": "4968304ed26cf3d30b50b5b6feb90734074ae1e8" | ||
} |
@@ -10,8 +10,43 @@ # BigQuery PG Sink | ||
``` | ||
#### Creating a Sink | ||
``` | ||
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres'); | ||
const pgSink = new BigQueryPGSinkStream({ | ||
pg: db, | ||
insert: insertQuery, | ||
}); | ||
bigquery | ||
.createQueryStream('___BIGQUERY_QUERY_STRING___') | ||
.pipe(pgSink) | ||
``` | ||
#### Creating insert functions | ||
You can directly map each record returned to it a single insert query | ||
```typescript | ||
import { BigQueryPGSinkStream } from '@ovotech/bigquery-pg-sink'; | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink'; | ||
import { Client } from 'pg'; | ||
export const insertQuery = (table: string, rows: any[]): [string, any[]] => { | ||
export const insertQuery = (rows: RowMetadata): InsertBatch[] => { | ||
return rows.map(bigQueryResult => ({ | ||
query: `INSERT INTO table | ||
( | ||
id, | ||
balance | ||
) VALUES $1, $2 | ||
`, | ||
values: [bigQueryResult.id, bigQueryResult.balance], | ||
})); | ||
``` | ||
It is possible to speed up the insertion by using a bulk insert, however this would mean you need to programatically build up the query based on the size of the rows passed to your insertQuery function | ||
```typescript | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink'; | ||
import { Client } from 'pg'; | ||
export const insertQuery = (rows: RowMetadata): InsertBatch[] => { | ||
// transform each result into a flat array of values | ||
@@ -25,3 +60,3 @@ // i.e. [1, 200, 2, 300] | ||
}).flat(); | ||
// generate the values insert string | ||
@@ -38,4 +73,5 @@ // i.e. ($1,$2,$3,.....) | ||
.join(','); | ||
return [ | ||
`INSERT INTO ${table} | ||
return [{ | ||
query: `INSERT INTO table | ||
( | ||
@@ -46,17 +82,5 @@ id, | ||
`, | ||
flatRows, | ||
]; | ||
values: flatRows, | ||
}]; | ||
}; | ||
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres'); | ||
const pgSink = new BigQueryPGSinkStream({ | ||
pg: db, | ||
table: 'NAME_OF_TABLE_IN_DATABASE', | ||
insert: insertQuery, | ||
}); | ||
bigquery | ||
.createQueryStream('___BIGQUERY_QUERY_STRING___') | ||
.pipe(pgSink) | ||
``` | ||
@@ -63,0 +87,0 @@ |
@@ -0,21 +1,26 @@ | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
import { Client } from 'pg'; | ||
import { Writable } from 'stream'; | ||
import { PGSinkStreamOptions } from './types'; | ||
import { InsertBatch, PGSinkStreamOptions } from './types'; | ||
export class BigQueryPGSinkStream extends Writable { | ||
private pg: Client; | ||
private table: string; | ||
private insert: (table: string, rows: any[]) => [string, any[]]; | ||
private insert: (rows: RowMetadata) => InsertBatch[]; | ||
constructor({ pg, table, insert, highWaterMark = 200 }: PGSinkStreamOptions) { | ||
constructor({ pg, insert, highWaterMark = 200 }: PGSinkStreamOptions) { | ||
super({ objectMode: true, highWaterMark }); | ||
this.pg = pg; | ||
this.table = table; | ||
this.insert = insert; | ||
} | ||
async _writev?(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | undefined) => void) { | ||
async _writev?( | ||
chunks: Array<{ chunk: RowMetadata; encoding: string }>, | ||
callback: (error?: Error | undefined) => void, | ||
): Promise<void> { | ||
try { | ||
const rows = chunks.map(chunk => chunk.chunk); | ||
await this.pg.query(...this.insert(this.table, rows)); | ||
const inserts = this.insert(rows); | ||
for (const insert of inserts) { | ||
await this.pg.query(insert.query, insert.values); | ||
} | ||
callback(); | ||
@@ -27,5 +32,8 @@ } catch (error) { | ||
async _write(chunk: any, _: string, callback: (err?: Error | undefined) => void): Promise<void> { | ||
async _write(chunk: RowMetadata, _: string, callback: (err?: Error | undefined) => void): Promise<void> { | ||
try { | ||
await this.pg.query(...this.insert(this.table, [chunk])); | ||
const inserts = this.insert([chunk]); | ||
for (const insert of inserts) { | ||
await this.pg.query(insert.query, insert.values); | ||
} | ||
callback(); | ||
@@ -32,0 +40,0 @@ } catch (error) { |
import { Client } from 'pg'; | ||
import { RowMetadata } from '@google-cloud/bigquery'; | ||
export interface InsertBatch { | ||
query: string; | ||
values: any[]; | ||
} | ||
export interface PGSinkStreamOptions { | ||
pg: Client; | ||
table: string; | ||
insert: (table: string, rows: any[]) => [string, any[]]; | ||
insert: (rows: RowMetadata) => InsertBatch[]; | ||
highWaterMark?: number; | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10050
121
105
2
+ Added@google-cloud/bigquery@4.7.0(transitive)
+ Added@google-cloud/common@2.4.0(transitive)
+ Added@google-cloud/paginator@2.0.3(transitive)
+ Added@google-cloud/projectify@1.0.4(transitive)
+ Added@google-cloud/promisify@1.0.4(transitive)
+ Added@tootallnate/once@1.1.2(transitive)
+ Addedabort-controller@3.0.0(transitive)
+ Addedagent-base@6.0.2(transitive)
+ Addedarrify@2.0.1(transitive)
+ Addedbase64-js@1.5.1(transitive)
+ Addedbig.js@5.2.2(transitive)
+ Addedbignumber.js@9.1.2(transitive)
+ Addedbuffer-equal-constant-time@1.0.1(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addeddebug@4.3.7(transitive)
+ Addedduplexify@3.7.14.1.3(transitive)
+ Addedecdsa-sig-formatter@1.0.11(transitive)
+ Addedend-of-stream@1.4.4(transitive)
+ Addedent@2.2.1(transitive)
+ Addedevent-target-shim@5.0.1(transitive)
+ Addedextend@3.0.2(transitive)
+ Addedfast-text-encoding@1.0.6(transitive)
+ Addedgaxios@2.3.4(transitive)
+ Addedgcp-metadata@3.5.0(transitive)
+ Addedgoogle-auth-library@5.10.1(transitive)
+ Addedgoogle-p12-pem@2.0.5(transitive)
+ Addedgtoken@4.1.4(transitive)
+ Addedhttp-proxy-agent@4.0.1(transitive)
+ Addedhttps-proxy-agent@5.0.1(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedis@3.3.0(transitive)
+ Addedis-stream@2.0.1(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedjson-bigint@0.3.1(transitive)
+ Addedjwa@2.0.0(transitive)
+ Addedjws@4.0.0(transitive)
+ Addedlru-cache@5.1.1(transitive)
+ Addedmime@2.6.0(transitive)
+ Addedms@2.1.3(transitive)
+ Addednode-fetch@2.7.0(transitive)
+ Addednode-forge@0.10.0(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedpunycode@1.4.1(transitive)
+ Addedreadable-stream@2.3.83.6.2(transitive)
+ Addedretry-request@4.2.2(transitive)
+ Addedsafe-buffer@5.1.25.2.1(transitive)
+ Addedstream-events@1.0.5(transitive)
+ Addedstream-shift@1.0.3(transitive)
+ Addedstring-format-obj@1.1.1(transitive)
+ Addedstring_decoder@1.1.11.3.0(transitive)
+ Addedstubs@3.0.0(transitive)
+ Addedteeny-request@6.0.3(transitive)
+ Addedtr46@0.0.3(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addeduuid@3.4.07.0.3(transitive)
+ Addedwebidl-conversions@3.0.1(transitive)
+ Addedwhatwg-url@5.0.0(transitive)
+ Addedwrappy@1.0.2(transitive)
+ Addedyallist@3.1.1(transitive)