@trivago/samsa
Advanced tools
Comparing version 0.3.0-beta.2 to 0.3.0-beta.3
@@ -11,6 +11,7 @@ /// <reference types="node" /> | ||
private projection; | ||
private maxKeyBufferSize; | ||
private keyBuffer; | ||
private subProcesses; | ||
private keyBufferProcessInterval; | ||
constructor(primaryKTable: KTable, foreignKTable: KTable, projection: JoinProjection<any, any, any>); | ||
constructor(primaryKTable: KTable, foreignKTable: KTable, projection: JoinProjection<any, any, any>, maxKeyBufferSize?: number); | ||
private startProcessInterval; | ||
@@ -22,4 +23,4 @@ finishUp(): Promise<unknown>; | ||
} | ||
export declare const innerJoin: <P extends any, F extends any, R extends any>(primaryStream: Readable, foreignStream: Readable, project?: JoinProjection<P, F, any>, kTableConfig?: KTableConfig, window?: number) => Joiner; | ||
export declare const join: <P extends any, F extends any, R extends any>(primaryStream: Readable, foreignStream: Readable, project?: JoinProjection<P, F, any>, kTableConfig?: KTableConfig, window?: number) => Joiner; | ||
export declare const innerJoin: <P extends any, F extends any, R extends any>(primaryStream: Readable, foreignStream: Readable, project?: JoinProjection<P, F, any>, kTableConfig?: KTableConfig, maxKeyBufferSize?: number, window?: number) => Joiner; | ||
export declare const join: <P extends any, F extends any, R extends any>(primaryStream: Readable, foreignStream: Readable, project?: JoinProjection<P, F, any>, kTableConfig?: KTableConfig, maxKeyBufferSize?: number, window?: number) => Joiner; | ||
export {}; |
@@ -84,3 +84,4 @@ "use strict"; | ||
__extends(Joiner, _super); | ||
function Joiner(primaryKTable, foreignKTable, projection) { | ||
function Joiner(primaryKTable, foreignKTable, projection, maxKeyBufferSize) { | ||
if (maxKeyBufferSize === void 0) { maxKeyBufferSize = 10000; } | ||
var _this = _super.call(this) || this; | ||
@@ -90,2 +91,3 @@ _this.primaryKTable = primaryKTable; | ||
_this.projection = projection; | ||
_this.maxKeyBufferSize = maxKeyBufferSize; | ||
_this.keyBuffer = new Set(); | ||
@@ -152,3 +154,3 @@ _this.subProcesses = new Set(); | ||
this.keyBuffer.add(key); | ||
if (this.keyBuffer.size >= 1000) { | ||
if (this.keyBuffer.size >= this.maxKeyBufferSize) { | ||
clearInterval(this.keyBufferProcessInterval); | ||
@@ -169,3 +171,3 @@ this.finalizeBuffer(); | ||
}(ObjectTransform_1.ObjectTransform)); | ||
exports.innerJoin = function (primaryStream, foreignStream, project, kTableConfig, | ||
exports.innerJoin = function (primaryStream, foreignStream, project, kTableConfig, maxKeyBufferSize, | ||
/** | ||
@@ -178,2 +180,3 @@ * Window currently won't do anything, until we can get a PR to RocksDB. | ||
if (kTableConfig === void 0) { kTableConfig = {}; } | ||
if (maxKeyBufferSize === void 0) { maxKeyBufferSize = 10000; } | ||
if (window === void 0) { window = 0; } | ||
@@ -183,3 +186,3 @@ var batchAge = kTableConfig.batchAge, batchSize = kTableConfig.batchSize; | ||
var foreignTable = new KTable_1.KTable(batchSize, batchAge); | ||
var joiner = new Joiner(primaryTable, foreignTable, project); | ||
var joiner = new Joiner(primaryTable, foreignTable, project, maxKeyBufferSize); | ||
var primaryKeyStream = primaryStream.pipe(primaryTable); | ||
@@ -186,0 +189,0 @@ var foreignKeyStream = foreignStream.pipe(foreignTable); |
{ | ||
"name": "@trivago/samsa", | ||
"version": "0.3.0-beta.2", | ||
"version": "0.3.0-beta.3", | ||
"types": "lib/index.d.ts", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -27,3 +27,4 @@ import { Readable, TransformCallback } from "stream"; | ||
public foreignKTable: KTable, | ||
private projection: JoinProjection<any, any, any> | ||
private projection: JoinProjection<any, any, any>, | ||
private maxKeyBufferSize: number = 10000 | ||
) { | ||
@@ -103,3 +104,3 @@ super(); | ||
if (this.keyBuffer.size >= 1000) { | ||
if (this.keyBuffer.size >= this.maxKeyBufferSize) { | ||
clearInterval(this.keyBufferProcessInterval); | ||
@@ -124,2 +125,3 @@ this.finalizeBuffer(); | ||
kTableConfig: KTableConfig = {}, | ||
maxKeyBufferSize: number = 10000, | ||
/** | ||
@@ -136,3 +138,8 @@ * Window currently won't do anything, until we can get a PR to RocksDB. | ||
const joiner = new Joiner(primaryTable, foreignTable, project); | ||
const joiner = new Joiner( | ||
primaryTable, | ||
foreignTable, | ||
project, | ||
maxKeyBufferSize | ||
); | ||
@@ -139,0 +146,0 @@ const primaryKeyStream = primaryStream.pipe(primaryTable); |
201275
4832