Socket
Socket
Sign inDemoInstall

@ovotech/bigquery-pg-sink

Package Overview
Dependencies
Maintainers
146
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@ovotech/bigquery-pg-sink - npm Package Compare versions

Comparing version 1.0.1 to 2.0.0

8

dist/PGSinkStream.d.ts
/// <reference types="node" />
import { RowMetadata } from '@google-cloud/bigquery';
import { Writable } from 'stream';

@@ -6,10 +7,9 @@ import { PGSinkStreamOptions } from './types';

private pg;
private table;
private insert;
constructor({ pg, table, insert, highWaterMark }: PGSinkStreamOptions);
constructor({ pg, insert, highWaterMark }: PGSinkStreamOptions);
_writev?(chunks: Array<{
chunk: any;
chunk: RowMetadata;
encoding: string;
}>, callback: (error?: Error | undefined) => void): Promise<void>;
_write(chunk: any, _: string, callback: (err?: Error | undefined) => void): Promise<void>;
_write(chunk: RowMetadata, _: string, callback: (err?: Error | undefined) => void): Promise<void>;
}

@@ -5,6 +5,5 @@ "use strict";

class BigQueryPGSinkStream extends stream_1.Writable {
constructor({ pg, table, insert, highWaterMark = 200 }) {
constructor({ pg, insert, highWaterMark = 200 }) {
super({ objectMode: true, highWaterMark });
this.pg = pg;
this.table = table;
this.insert = insert;

@@ -15,3 +14,6 @@ }

const rows = chunks.map(chunk => chunk.chunk);
await this.pg.query(...this.insert(this.table, rows));
const inserts = this.insert(rows);
for (const insert of inserts) {
await this.pg.query(insert.query, insert.values);
}
callback();

@@ -25,3 +27,6 @@ }

try {
await this.pg.query(...this.insert(this.table, [chunk]));
const inserts = this.insert([chunk]);
for (const insert of inserts) {
await this.pg.query(insert.query, insert.values);
}
callback();

@@ -28,0 +33,0 @@ }

import { Client } from 'pg';
import { RowMetadata } from '@google-cloud/bigquery';
export interface InsertBatch {
query: string;
values: any[];
}
export interface PGSinkStreamOptions {
pg: Client;
table: string;
insert: (table: string, rows: any[]) => [string, any[]];
insert: (rows: RowMetadata) => InsertBatch[];
highWaterMark?: number;
}
{
"name": "@ovotech/bigquery-pg-sink",
"description": "Stream BigQuery query results into a postgres database",
"version": "1.0.1",
"version": "2.0.0",
"main": "dist/index.js",

@@ -32,3 +32,6 @@ "source": "src/index.ts",

},
"gitHead": "08920fa2c6cc9a25f01f9de4c1b2283291d93b1b"
"dependencies": {
"@google-cloud/bigquery": "^4.7.0"
},
"gitHead": "4968304ed26cf3d30b50b5b6feb90734074ae1e8"
}

@@ -10,8 +10,43 @@ # BigQuery PG Sink

```
#### Creating a Sink
```
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
const pgSink = new BigQueryPGSinkStream({
pg: db,
insert: insertQuery,
});
bigquery
.createQueryStream('___BIGQUERY_QUERY_STRING___')
.pipe(pgSink)
```
#### Creating insert functions
You can directly map each record returned to it a single insert query
```typescript
import { BigQueryPGSinkStream } from '@ovotech/bigquery-pg-sink';
import { RowMetadata } from '@google-cloud/bigquery';
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink';
import { Client } from 'pg';
export const insertQuery = (table: string, rows: any[]): [string, any[]] => {
export const insertQuery = (rows: RowMetadata): InsertBatch[] => {
return rows.map(bigQueryResult => ({
query: `INSERT INTO table
(
id,
balance
) VALUES $1, $2
`,
values: [bigQueryResult.id, bigQueryResult.balance],
}));
```
It is possible to speed up the insertion by using a bulk insert, however this would mean you need to programatically build up the query based on the size of the rows passed to your insertQuery function
```typescript
import { RowMetadata } from '@google-cloud/bigquery';
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink';
import { Client } from 'pg';
export const insertQuery = (rows: RowMetadata): InsertBatch[] => {
// transform each result into a flat array of values

@@ -25,3 +60,3 @@ // i.e. [1, 200, 2, 300]

}).flat();
// generate the values insert string

@@ -38,4 +73,5 @@ // i.e. ($1,$2,$3,.....)

.join(',');
return [
`INSERT INTO ${table}
return [{
query: `INSERT INTO table
(

@@ -46,17 +82,5 @@ id,

`,
flatRows,
];
values: flatRows,
}];
};
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
const pgSink = new BigQueryPGSinkStream({
pg: db,
table: 'NAME_OF_TABLE_IN_DATABASE',
insert: insertQuery,
});
bigquery
.createQueryStream('___BIGQUERY_QUERY_STRING___')
.pipe(pgSink)
```

@@ -63,0 +87,0 @@

@@ -0,21 +1,26 @@

import { RowMetadata } from '@google-cloud/bigquery';
import { Client } from 'pg';
import { Writable } from 'stream';
import { PGSinkStreamOptions } from './types';
import { InsertBatch, PGSinkStreamOptions } from './types';
export class BigQueryPGSinkStream extends Writable {
private pg: Client;
private table: string;
private insert: (table: string, rows: any[]) => [string, any[]];
private insert: (rows: RowMetadata) => InsertBatch[];
constructor({ pg, table, insert, highWaterMark = 200 }: PGSinkStreamOptions) {
constructor({ pg, insert, highWaterMark = 200 }: PGSinkStreamOptions) {
super({ objectMode: true, highWaterMark });
this.pg = pg;
this.table = table;
this.insert = insert;
}
async _writev?(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | undefined) => void) {
async _writev?(
chunks: Array<{ chunk: RowMetadata; encoding: string }>,
callback: (error?: Error | undefined) => void,
): Promise<void> {
try {
const rows = chunks.map(chunk => chunk.chunk);
await this.pg.query(...this.insert(this.table, rows));
const inserts = this.insert(rows);
for (const insert of inserts) {
await this.pg.query(insert.query, insert.values);
}
callback();

@@ -27,5 +32,8 @@ } catch (error) {

async _write(chunk: any, _: string, callback: (err?: Error | undefined) => void): Promise<void> {
async _write(chunk: RowMetadata, _: string, callback: (err?: Error | undefined) => void): Promise<void> {
try {
await this.pg.query(...this.insert(this.table, [chunk]));
const inserts = this.insert([chunk]);
for (const insert of inserts) {
await this.pg.query(insert.query, insert.values);
}
callback();

@@ -32,0 +40,0 @@ } catch (error) {

import { Client } from 'pg';
import { RowMetadata } from '@google-cloud/bigquery';
export interface InsertBatch {
query: string;
values: any[];
}
export interface PGSinkStreamOptions {
pg: Client;
table: string;
insert: (table: string, rows: any[]) => [string, any[]];
insert: (rows: RowMetadata) => InsertBatch[];
highWaterMark?: number;
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc