Comparing version 0.1.18 to 0.1.20
{ | ||
"name": "streamops", | ||
"version": "0.1.18", | ||
"version": "0.1.20", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -291,3 +291,7 @@ const {END_SIGNAL, NEEDS_END_SIGNAL} = require('./CONSTANTS'); | ||
}, | ||
operators | ||
{ | ||
...operators, | ||
END_SIGNAL, | ||
NEEDS_END_SIGNAL | ||
} | ||
); | ||
@@ -294,0 +298,0 @@ } |
@@ -65,6 +65,7 @@ class Dam {} | ||
if (yieldIncomplete && this.batchBuffer.length > 0) { | ||
yield this.batchBuffer; | ||
const finalBatch = this.batchBuffer.slice(); | ||
this.batchBuffer = []; // Clear the buffer | ||
yield finalBatch; | ||
} | ||
yield END_SIGNAL; // Propagate end signal | ||
return; | ||
return; // Don't propagate END_SIGNAL, just return | ||
} | ||
@@ -75,4 +76,5 @@ | ||
if (this.batchBuffer.length >= size) { | ||
yield this.batchBuffer.slice(0, size); | ||
const batch = this.batchBuffer.slice(0, size); | ||
this.batchBuffer = this.batchBuffer.slice(size); | ||
yield batch; | ||
} | ||
@@ -79,0 +81,0 @@ } |
@@ -26,2 +26,18 @@ const createStreamOps = require('../src/createStreamOps'); | ||
test('basic batching with incomplete batch yielding', async () => { | ||
const pipeline = [ | ||
function* () { | ||
yield 1; yield 2; yield 3; yield 4; yield 5; | ||
}, | ||
streamOps.batch(2, {yieldIncomplete: true}) | ||
]; | ||
const results = []; | ||
for await (const item of streamOps(pipeline)) { | ||
results.push(item); | ||
} | ||
expect(results).toEqual([[1,2], [3,4], [5]]); | ||
}); | ||
test('basic batching without incomplete batch yielding', async () => { | ||
@@ -113,2 +129,38 @@ const pipeline = [ | ||
}); | ||
test('batch operator properly handles END_SIGNAL with array spreading', async () => { | ||
const streamOps = createStreamOps(); | ||
const pipeline = [ | ||
// Step 1: Generate initial array | ||
function* () { | ||
yield [1, 2, 3]; | ||
}, | ||
// Step 2: Spread array items individually | ||
function* (items) { | ||
for (const item of items) { | ||
yield item; | ||
} | ||
}, | ||
// Step 3: Batch items | ||
streamOps.batch(2, { yieldIncomplete: true }), | ||
// Step 4: Log batches (optional) | ||
function* (batch) { | ||
console.log('Batch:', batch); | ||
yield batch; | ||
} | ||
]; | ||
const results = []; | ||
for await (const item of streamOps(pipeline)) { | ||
console.log('Result:', item); | ||
results.push(item); | ||
} | ||
console.log('Final Results:', results); | ||
expect(results).toEqual([[1, 2], [3]]); | ||
}); | ||
}); |
@@ -41,3 +41,3 @@ const streaming = require('../src/createStreamOps.js')({ | ||
expect(results).toEqual([2, 4, 6]); | ||
});return; | ||
}); | ||
@@ -599,11 +599,12 @@ test('pipeline with aggregation / meh reducer', async () => { | ||
yield 2; | ||
console.log('First generator done'); | ||
console.log('First generator done', streaming.END_SIGNAL); | ||
}, | ||
function* (input) { | ||
streaming.withEndSignal(function* (input) { | ||
receivedInputs.push(input); | ||
yield input; | ||
if (input === undefined) { | ||
console.log('Second generator received undefined'); | ||
if (input === streaming.END_SIGNAL) { | ||
console.log('Second generator received END_SIGNAL'); | ||
} else { | ||
yield input; | ||
} | ||
} | ||
}) | ||
]; | ||
@@ -617,5 +618,5 @@ | ||
expect(results).toEqual([1, 2]); | ||
expect(receivedInputs).toContain(undefined); | ||
expect(receivedInputs).toContain(streaming.END_SIGNAL); | ||
}); | ||
}); |
121152
3793