prom-utils
Advanced tools
Comparing version 0.3.0 to 0.4.0
@@ -0,1 +1,6 @@ | ||
# 0.4.0 | ||
- Added `batchBytes` option to `batchQueue` which triggers a queue flush if the total size of | ||
the queue is greater than or equal to `batchBytes`. | ||
# 0.3.0 | ||
@@ -2,0 +7,0 @@ |
@@ -7,2 +7,3 @@ "use strict"; | ||
exports.pausable = exports.defer = exports.batchQueue = exports.rateLimit = void 0; | ||
const object_sizeof_1 = __importDefault(require("object-sizeof")); | ||
const debug_1 = __importDefault(require("debug")); | ||
@@ -76,2 +77,3 @@ const debug = (0, debug_1.default)('prom-utils'); | ||
let prom; | ||
let size = 0; | ||
/** | ||
@@ -95,2 +97,4 @@ * Call fn on queue and clear the queue. | ||
queue = []; | ||
// Reset the size | ||
size = 0; | ||
debug('queue reset'); | ||
@@ -107,6 +111,6 @@ } | ||
await prom; | ||
// Start a timer if the queue is empty and timeout is set | ||
if (queue.length === 0 && timeout) { | ||
// Start a timer if timeout is set and the queue is empty | ||
if (timeout && queue.length === 0) { | ||
timeoutId = setTimeout(() => { | ||
debug('timeout cb'); | ||
debug('setTimeout cb'); | ||
prom = flush(); | ||
@@ -120,6 +124,16 @@ }, timeout); | ||
if (queue.length === batchSize) { | ||
debug('batchSize reached'); | ||
debug('batchSize reached %d', queue.length); | ||
// Wait for queue to be flushed | ||
await flush(); | ||
} | ||
else if (options.batchBytes) { | ||
// Determine size of object and add to sum | ||
size += (0, object_sizeof_1.default)(item); | ||
// Batch bytes reached | ||
if (size >= options.batchBytes) { | ||
debug('batchBytes reached %d', size); | ||
// Wait for queue to be flushed | ||
await flush(); | ||
} | ||
} | ||
}; | ||
@@ -126,0 +140,0 @@ const obj = { flush, enqueue }; |
@@ -8,2 +8,3 @@ export declare type QueueResult = { | ||
batchSize?: number; | ||
batchBytes?: number; | ||
timeout?: number; | ||
@@ -10,0 +11,0 @@ } |
{ | ||
"name": "prom-utils", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"description": "Promise utilities for looping: rate limiting, queueing/batching, etc.", | ||
@@ -21,2 +21,3 @@ "author": "GovSpend", | ||
"batch", | ||
"bytes", | ||
"queue", | ||
@@ -28,3 +29,4 @@ "concurrency", | ||
"deferred", | ||
"pause" | ||
"pause", | ||
"pausable" | ||
], | ||
@@ -49,4 +51,5 @@ "license": "ISC", | ||
"dependencies": { | ||
"debug": "^4.3.4" | ||
"debug": "^4.3.4", | ||
"object-sizeof": "^1.6.3" | ||
} | ||
} |
@@ -46,2 +46,3 @@ # prom-utils | ||
batchSize?: number | ||
batchBytes?: number | ||
timeout?: number | ||
@@ -48,0 +49,0 @@ } |
@@ -0,1 +1,2 @@ | ||
import sizeof from 'object-sizeof' | ||
import _debug from 'debug' | ||
@@ -72,2 +73,3 @@ import { Deferred, Queue, QueueResult } from './types' | ||
let prom: Promise<any> | ||
let size = 0 | ||
@@ -92,2 +94,4 @@ /** | ||
queue = [] | ||
// Reset the size | ||
size = 0 | ||
debug('queue reset') | ||
@@ -105,6 +109,6 @@ } | ||
await prom | ||
// Start a timer if the queue is empty and timeout is set | ||
if (queue.length === 0 && timeout) { | ||
// Start a timer if timeout is set and the queue is empty | ||
if (timeout && queue.length === 0) { | ||
timeoutId = setTimeout(() => { | ||
debug('timeout cb') | ||
debug('setTimeout cb') | ||
prom = flush() | ||
@@ -118,5 +122,14 @@ }, timeout) | ||
if (queue.length === batchSize) { | ||
debug('batchSize reached') | ||
debug('batchSize reached %d', queue.length) | ||
// Wait for queue to be flushed | ||
await flush() | ||
} else if (options.batchBytes) { | ||
// Determine size of object and add to sum | ||
size += sizeof(item) | ||
// Batch bytes reached | ||
if (size >= options.batchBytes) { | ||
debug('batchBytes reached %d', size) | ||
// Wait for queue to be flushed | ||
await flush() | ||
} | ||
} | ||
@@ -123,0 +136,0 @@ } |
@@ -118,2 +118,36 @@ import { describe, expect, test } from '@jest/globals' | ||
}) | ||
test('should flush queue if batchBytes is reached', async () => { | ||
const calls: any[] = [] | ||
const fn = async (records: any[]) => { | ||
calls.push(records) | ||
} | ||
const batchSize = 5 | ||
const batchBytes = 8 | ||
const queue = batchQueue(fn, { batchSize, batchBytes }) | ||
const records = ['Joe', 'Frank', 'Bob'] | ||
for (const record of records) { | ||
await queue.enqueue(record) | ||
} | ||
await queue.flush() | ||
expect(calls).toEqual([['Joe', 'Frank'], ['Bob']]) | ||
}) | ||
test('should flush queue if batchSize is reached before batchBytes', async () => { | ||
const calls: any[] = [] | ||
const fn = async (records: any[]) => { | ||
calls.push(records) | ||
} | ||
const batchSize = 2 | ||
const batchBytes = 100 | ||
const queue = batchQueue(fn, { batchSize, batchBytes }) | ||
const records = ['Joe', 'Frank', 'Bob'] | ||
for (const record of records) { | ||
await queue.enqueue(record) | ||
} | ||
await queue.flush() | ||
expect(calls).toEqual([['Joe', 'Frank'], ['Bob']]) | ||
}) | ||
}) | ||
@@ -120,0 +154,0 @@ |
@@ -9,2 +9,3 @@ export type QueueResult = { | ||
batchSize?: number | ||
batchBytes?: number | ||
timeout?: number | ||
@@ -11,0 +12,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25804
716
99
2
+ Addedobject-sizeof@^1.6.3
+ Addedbase64-js@1.5.1(transitive)
+ Addedbuffer@5.7.1(transitive)
+ Addedieee754@1.2.1(transitive)
+ Addedobject-sizeof@1.6.3(transitive)