@sweepbright/iter-helpers
Advanced tools
Comparing version 0.4.0 to 0.5.0
import { Operator } from "./Operator"; | ||
export declare function batch<T>(batchSize: number): Operator<T, T[]>; | ||
export type BatchOptions = { | ||
size: number; | ||
} | { | ||
timeFrame: number; | ||
} | { | ||
size: number; | ||
timeFrame: number; | ||
}; | ||
export declare function batch<T>(sizeOrOptions: number | BatchOptions): Operator<T, T[]>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.batch = void 0; | ||
function batch(batchSize) { | ||
return async function* batchOperator(input) { | ||
let currentBatch = []; | ||
for await (const value of input) { | ||
currentBatch.push(value); | ||
if (currentBatch.length === batchSize) { | ||
yield currentBatch; | ||
currentBatch = []; | ||
} | ||
} | ||
if (currentBatch.length > 0) { | ||
yield currentBatch; | ||
} | ||
}; | ||
const Bufferize_1 = require("./Bufferize"); | ||
function batch(sizeOrOptions) { | ||
const options = typeof sizeOrOptions === "number" | ||
? { size: sizeOrOptions } | ||
: sizeOrOptions; | ||
let size = Infinity; | ||
let timeFrame = undefined; | ||
if ("size" in options) { | ||
size = options.size; | ||
} | ||
if ("timeFrame" in options) { | ||
timeFrame = options.timeFrame; | ||
} | ||
return (0, Bufferize_1.bufferize)({ | ||
timeFrame, | ||
getInitialValue: () => [], | ||
reducer(acc, value) { | ||
acc.push(value); | ||
return acc; | ||
}, | ||
shouldFlush: (acc) => acc.length >= size, | ||
}); | ||
} | ||
exports.batch = batch; | ||
//# sourceMappingURL=Batch.js.map |
@@ -0,4 +1,6 @@ | ||
import { BatchOptions } from "./Batch"; | ||
import { ConcurrentMapOptions } from "./ConcurrentMap"; | ||
import { Iter } from "./Iter"; | ||
import { Operator } from "./Operator"; | ||
import { BufferizeOptions } from "./Bufferize"; | ||
declare class Chain<I> implements AsyncIterable<I> { | ||
@@ -52,3 +54,3 @@ private source; | ||
*/ | ||
batch(batchSize: number): Chain<I[]>; | ||
batch(options: number | BatchOptions): Chain<I[]>; | ||
/** | ||
@@ -76,2 +78,7 @@ * Caclulates the intervals of the items. | ||
skip(size: number): Chain<I>; | ||
bufferize<O>(options: BufferizeOptions<I, O>): Chain<O>; | ||
/** | ||
* Called once, when the iteration is done | ||
*/ | ||
onEnd(cb: () => void): Chain<I>; | ||
} | ||
@@ -78,0 +85,0 @@ /** |
@@ -10,5 +10,7 @@ "use strict"; | ||
const Map_1 = require("./Map"); | ||
const Bufferize_1 = require("./Bufferize"); | ||
const Skip_1 = require("./Skip"); | ||
const Take_1 = require("./Take"); | ||
const Tap_1 = require("./Tap"); | ||
const OnEnd_1 = require("./OnEnd"); | ||
class Chain { | ||
@@ -88,4 +90,4 @@ constructor(source) { | ||
*/ | ||
batch(batchSize) { | ||
return this.pipe((0, Batch_1.batch)(batchSize)); | ||
batch(options) { | ||
return this.pipe((0, Batch_1.batch)(options)); | ||
} | ||
@@ -129,2 +131,11 @@ /** | ||
} | ||
bufferize(options) { | ||
return this.pipe((0, Bufferize_1.bufferize)(options)); | ||
} | ||
/** | ||
* Called once, when the iteration is done | ||
*/ | ||
onEnd(cb) { | ||
return this.pipe((0, OnEnd_1.onEnd)(cb)); | ||
} | ||
} | ||
@@ -131,0 +142,0 @@ /** |
@@ -16,2 +16,3 @@ "use strict"; | ||
exports.Fifo = void 0; | ||
const sleep_1 = require("./tests/sleep"); | ||
class Fifo { | ||
@@ -72,3 +73,3 @@ constructor(options) { | ||
if (__classPrivateFieldGet(this, _Fifo_instances, "m", _Fifo_isDrain).call(this)) { | ||
return Promise.resolve(); | ||
return (0, sleep_1.sleep)(0); | ||
} | ||
@@ -75,0 +76,0 @@ const onDrainPromise = __classPrivateFieldGet(this, _Fifo_onDrainPromise, "f") ?? |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.interval = void 0; | ||
const Bufferize_1 = require("./Bufferize"); | ||
function interval(size) { | ||
return async function* intervalOperator(input) { | ||
let currentInterval = null; | ||
let currentLength = 0; | ||
for await (const value of input) { | ||
if (currentInterval === null) { | ||
currentInterval = [value, value]; | ||
return (0, Bufferize_1.bufferize)({ | ||
getInitialValue: () => null, | ||
reducer(acc, value) { | ||
if (acc === null) { | ||
return [value, value]; | ||
} | ||
else { | ||
currentInterval[1] = value; | ||
} | ||
currentLength++; | ||
if (currentLength === size) { | ||
yield currentInterval; | ||
currentInterval = null; | ||
currentLength = 0; | ||
} | ||
} | ||
if (currentLength > 0 && currentInterval !== null) { | ||
yield currentInterval; | ||
} | ||
}; | ||
acc[1] = value; | ||
return acc; | ||
}, | ||
shouldFlush: (_, __, bufferizedItemsCount) => bufferizedItemsCount >= size, | ||
}); | ||
} | ||
exports.interval = interval; | ||
//# sourceMappingURL=Interval.js.map |
@@ -16,1 +16,3 @@ export * from "./Batch"; | ||
export * from "./Interval"; | ||
export * from "./Bufferize"; | ||
export * from "./OnEnd"; |
@@ -32,2 +32,4 @@ "use strict"; | ||
__exportStar(require("./Interval"), exports); | ||
__exportStar(require("./Bufferize"), exports); | ||
__exportStar(require("./OnEnd"), exports); | ||
//# sourceMappingURL=main.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const Chain_1 = require("../Chain"); | ||
const Range_1 = require("../Range"); | ||
const sleep_1 = require("./sleep"); | ||
describe("chain.batch", () => { | ||
it("groups items into batches", async () => { | ||
const result = await (0, Chain_1.chain)([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) | ||
.batch(3) | ||
const result = await (0, Chain_1.chain)((0, Range_1.range)(0, 10)).batch(3).toArray(); | ||
expect(result).toEqual([[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]); | ||
}); | ||
it("groups items into batches, based on a time frame", async () => { | ||
const result = await (0, Chain_1.chain)((0, Range_1.range)(0, 10)) | ||
.tap(() => (0, sleep_1.sleep)(10)) | ||
.batch({ | ||
timeFrame: 25, | ||
}) | ||
.toArray(); | ||
expect(result).toEqual([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]); | ||
expect(result).toEqual([[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]); | ||
}); | ||
it("groups items into batches, based on a size or a time frame (size takes precedence)", async () => { | ||
const result = await (0, Chain_1.chain)((0, Range_1.range)(0, 10)) | ||
.tap(() => (0, sleep_1.sleep)(10)) | ||
.batch({ | ||
timeFrame: 50, | ||
size: 2, | ||
}) | ||
.toArray(); | ||
expect(result).toEqual([ | ||
[0, 1], | ||
[2, 3], | ||
[4, 5], | ||
[6, 7], | ||
[8, 9], | ||
]); | ||
}); | ||
it("groups items into a single batch if the size is Infinity", async () => { | ||
const result = await (0, Chain_1.chain)((0, Range_1.range)(0, 10)).batch(Infinity).toArray(); | ||
expect(result).toEqual([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]); | ||
}); | ||
}); | ||
//# sourceMappingURL=batch.spec.js.map |
@@ -50,9 +50,5 @@ "use strict"; | ||
}) | ||
// This trick allows to call `end` only once, when all items | ||
// have been pushed | ||
.batch(Infinity) | ||
.tap(() => { | ||
.onEnd(() => { | ||
f.end(); | ||
}) | ||
.flatten(); | ||
}); | ||
const fifoReader = (0, Chain_1.chain)(f) | ||
@@ -75,3 +71,25 @@ .tap((item) => log(`Read: ${item}`)) | ||
}); | ||
it("allows for back-pressure behavior when the highWatermark is 1", async () => { | ||
const f = new Fifo_1.Fifo({ | ||
highWatermark: 1, | ||
}); | ||
const input = (0, Chain_1.chain)((0, Range_1.range)(0, 10)) | ||
.tap(async (item) => { | ||
await f.waitDrain(); | ||
f.push(item); | ||
}) | ||
.onEnd(() => { | ||
f.end(); | ||
}); | ||
async function* readFifo() { | ||
yield* f; | ||
} | ||
const output = (0, Chain_1.chain)(readFifo()); | ||
const [itemsWritten, itemsRead] = await Promise.all([ | ||
input.toArray(), | ||
output.toArray(), | ||
]); | ||
expect(itemsWritten).toEqual(itemsRead); | ||
}); | ||
}); | ||
//# sourceMappingURL=fifo.spec.js.map |
@@ -35,3 +35,18 @@ "use strict"; | ||
}); | ||
it("groups items into intervals, even if the size of the interval is 0", async () => { | ||
const result = await (0, Chain_1.chain)((0, Range_1.range)(0, 10)).interval(0).toArray(); | ||
expect(result).toEqual([ | ||
[0, 0], | ||
[1, 1], | ||
[2, 2], | ||
[3, 3], | ||
[4, 4], | ||
[5, 5], | ||
[6, 6], | ||
[7, 7], | ||
[8, 8], | ||
[9, 9], | ||
]); | ||
}); | ||
}); | ||
//# sourceMappingURL=interval.spec.js.map |
@@ -1,1 +0,1 @@ | ||
export declare function sleep(ms: number): Promise<unknown>; | ||
export declare function sleep(ms: number): Promise<void>; |
{ | ||
"name": "@sweepbright/iter-helpers", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
110111
110
1623