@yuants/utils
Advanced tools
Comparing version 0.3.0 to 0.3.1
@@ -1,2 +0,2 @@ | ||
import { Observable, Subject, distinctUntilChanged, filter, interval, mergeMap, pipe, tap, } from 'rxjs'; | ||
import { Observable, Subject, concat, distinctUntilChanged, filter, interval, map, mergeMap, of, pairwise, pipe, tap, } from 'rxjs'; | ||
/** | ||
@@ -116,9 +116,43 @@ * 同 groupBy 类似,但是会接受一整个数组,如果下一组数据中没有某个 key,会自动 complete 这个 key 的 Observable | ||
* @public | ||
* @param hashKey - hash key function to group items | ||
* @param keyFunc - hash key function to group items | ||
* @param consumer - consumer function to process each item | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export const listWatch = (hashKey, consumer) => pipe(batchGroupBy(hashKey), mergeMap((group) => group.pipe( | ||
export const listWatch = (keyFunc, consumer, comparator = () => true) => pipe(batchGroupBy(keyFunc), mergeMap((group) => group.pipe( | ||
// Take first but not complete until group complete | ||
distinctUntilChanged(() => true), switchMapWithComplete(consumer)))); | ||
distinctUntilChanged(comparator), switchMapWithComplete(consumer)))); | ||
/** | ||
* list and watch a source of items, and apply consumer to each newly added item, | ||
* the consumer should return an observable that completes when the item is fully processed, | ||
* | ||
* consumer will be cancelled when the item is removed. | ||
* | ||
* @public | ||
* @param keyFunc - hash key function to group items | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export const listWatchEvent = (keyFunc = (v) => `${v}`, comparator = (a, b) => a === b) => (source$) => concat(of([]), source$).pipe( | ||
// | ||
map((v) => new Map(v.map((v) => [keyFunc(v), v]))), pairwise(), map(([oldMap, newMap]) => { | ||
const events = []; | ||
for (const [key, item] of oldMap) { | ||
const newItem = newMap.get(key); | ||
if (newItem !== undefined) { | ||
if (!comparator(item, newItem)) { | ||
events.push([item, newItem]); | ||
} | ||
} | ||
else { | ||
events.push([item, undefined]); | ||
} | ||
} | ||
for (const [key, item] of newMap) { | ||
if (!oldMap.has(key)) { | ||
events.push([undefined, item]); | ||
} | ||
} | ||
return events; | ||
})); | ||
//# sourceMappingURL=rx-utils.js.map |
@@ -0,4 +1,4 @@ | ||
import { from, interval, mergeMap, of, take, toArray, zipWith } from 'rxjs'; | ||
import { TestScheduler } from 'rxjs/testing'; | ||
import { rateLimitMap } from './rx-utils'; | ||
import { from, interval, mergeMap, of, take, zipWith } from 'rxjs'; | ||
import { listWatchEvent, rateLimitMap } from './rx-utils'; | ||
const testScheduler = new TestScheduler((actual, expected) => { | ||
@@ -38,2 +38,50 @@ expect(actual).toStrictEqual(expected); | ||
}); | ||
describe('list watch', () => { | ||
it('new items coming behavior', (done) => { | ||
const source$ = from([[0, 1, 2, 3, 4]]); | ||
source$ | ||
.pipe( | ||
// | ||
listWatchEvent(), toArray()) | ||
.subscribe((events) => { | ||
expect(events).toEqual([ | ||
[ | ||
[undefined, 0], | ||
[undefined, 1], | ||
[undefined, 2], | ||
[undefined, 3], | ||
[undefined, 4], | ||
], | ||
]); | ||
done(); | ||
}); | ||
}); | ||
it('item deleted', (done) => { | ||
const source$ = from([[0, 1, 2, 3, 4], [0]]); | ||
source$ | ||
.pipe( | ||
// | ||
listWatchEvent(), toArray()) | ||
.subscribe((events) => { | ||
// expect(true).toBe(true); | ||
expect(events).toEqual([ | ||
[ | ||
[undefined, 0], | ||
[undefined, 1], | ||
[undefined, 2], | ||
[undefined, 3], | ||
[undefined, 4], | ||
], | ||
[ | ||
[1, undefined], | ||
[2, undefined], | ||
[3, undefined], | ||
[4, undefined], | ||
], | ||
]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe('group watch', () => { }); | ||
//# sourceMappingURL=rx-utils.test.js.map |
@@ -108,9 +108,23 @@ /** | ||
* @public | ||
* @param hashKey - hash key function to group items | ||
* @param keyFunc - hash key function to group items | ||
* @param consumer - consumer function to process each item | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export declare const listWatch: <T, K>(hashKey: (item: T) => string, consumer: (item: T) => Observable<K>) => OperatorFunction<T[], K>; | ||
export declare const listWatch: <T, K>(keyFunc: (item: T) => string, consumer: (item: T) => Observable<K>, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], K>; | ||
/** | ||
* list and watch a source of items, and apply consumer to each newly added item, | ||
* the consumer should return an observable that completes when the item is fully processed, | ||
* | ||
* consumer will be cancelled when the item is removed. | ||
* | ||
* @public | ||
* @param keyFunc - hash key function to group items | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export declare const listWatchEvent: <T>(keyFunc?: (item: T) => string, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], [old: T | undefined, new: T | undefined][]>; | ||
/** | ||
* NativeSubject is native version of rx's Subject, which can be used in async generator. | ||
@@ -117,0 +131,0 @@ * |
@@ -29,7 +29,20 @@ import { Observable, OperatorFunction, SchedulerLike } from 'rxjs'; | ||
* @public | ||
* @param hashKey - hash key function to group items | ||
* @param keyFunc - hash key function to group items | ||
* @param consumer - consumer function to process each item | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export declare const listWatch: <T, K>(hashKey: (item: T) => string, consumer: (item: T) => Observable<K>) => OperatorFunction<T[], K>; | ||
export declare const listWatch: <T, K>(keyFunc: (item: T) => string, consumer: (item: T) => Observable<K>, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], K>; | ||
/** | ||
* list and watch a source of items, and apply consumer to each newly added item, | ||
* the consumer should return an observable that completes when the item is fully processed, | ||
* | ||
* consumer will be cancelled when the item is removed. | ||
* | ||
* @public | ||
* @param keyFunc - hash key function to group items | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
export declare const listWatchEvent: <T>(keyFunc?: (item: T) => string, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], [old: T | undefined, new: T | undefined][]>; | ||
//# sourceMappingURL=rx-utils.d.ts.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.listWatch = exports.rateLimitMap = exports.switchMapWithComplete = exports.batchGroupBy = void 0; | ||
exports.listWatchEvent = exports.listWatch = exports.rateLimitMap = exports.switchMapWithComplete = exports.batchGroupBy = void 0; | ||
const rxjs_1 = require("rxjs"); | ||
@@ -122,10 +122,45 @@ /** | ||
* @public | ||
* @param hashKey - hash key function to group items | ||
* @param keyFunc - hash key function to group items | ||
* @param consumer - consumer function to process each item | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
const listWatch = (hashKey, consumer) => (0, rxjs_1.pipe)((0, exports.batchGroupBy)(hashKey), (0, rxjs_1.mergeMap)((group) => group.pipe( | ||
const listWatch = (keyFunc, consumer, comparator = () => true) => (0, rxjs_1.pipe)((0, exports.batchGroupBy)(keyFunc), (0, rxjs_1.mergeMap)((group) => group.pipe( | ||
// Take first but not complete until group complete | ||
(0, rxjs_1.distinctUntilChanged)(() => true), (0, exports.switchMapWithComplete)(consumer)))); | ||
(0, rxjs_1.distinctUntilChanged)(comparator), (0, exports.switchMapWithComplete)(consumer)))); | ||
exports.listWatch = listWatch; | ||
/** | ||
* list and watch a source of items, and apply consumer to each newly added item, | ||
* the consumer should return an observable that completes when the item is fully processed, | ||
* | ||
* consumer will be cancelled when the item is removed. | ||
* | ||
* @public | ||
* @param keyFunc - hash key function to group items | ||
* @param comparator - comparator function to compare items, return true if they are the same | ||
* @returns | ||
*/ | ||
const listWatchEvent = (keyFunc = (v) => `${v}`, comparator = (a, b) => a === b) => (source$) => (0, rxjs_1.concat)((0, rxjs_1.of)([]), source$).pipe( | ||
// | ||
(0, rxjs_1.map)((v) => new Map(v.map((v) => [keyFunc(v), v]))), (0, rxjs_1.pairwise)(), (0, rxjs_1.map)(([oldMap, newMap]) => { | ||
const events = []; | ||
for (const [key, item] of oldMap) { | ||
const newItem = newMap.get(key); | ||
if (newItem !== undefined) { | ||
if (!comparator(item, newItem)) { | ||
events.push([item, newItem]); | ||
} | ||
} | ||
else { | ||
events.push([item, undefined]); | ||
} | ||
} | ||
for (const [key, item] of newMap) { | ||
if (!oldMap.has(key)) { | ||
events.push([undefined, item]); | ||
} | ||
} | ||
return events; | ||
})); | ||
exports.listWatchEvent = listWatchEvent; | ||
//# sourceMappingURL=rx-utils.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const rxjs_1 = require("rxjs"); | ||
const testing_1 = require("rxjs/testing"); | ||
const rx_utils_1 = require("./rx-utils"); | ||
const rxjs_1 = require("rxjs"); | ||
const testScheduler = new testing_1.TestScheduler((actual, expected) => { | ||
@@ -40,2 +40,50 @@ expect(actual).toStrictEqual(expected); | ||
}); | ||
describe('list watch', () => { | ||
it('new items coming behavior', (done) => { | ||
const source$ = (0, rxjs_1.from)([[0, 1, 2, 3, 4]]); | ||
source$ | ||
.pipe( | ||
// | ||
(0, rx_utils_1.listWatchEvent)(), (0, rxjs_1.toArray)()) | ||
.subscribe((events) => { | ||
expect(events).toEqual([ | ||
[ | ||
[undefined, 0], | ||
[undefined, 1], | ||
[undefined, 2], | ||
[undefined, 3], | ||
[undefined, 4], | ||
], | ||
]); | ||
done(); | ||
}); | ||
}); | ||
it('item deleted', (done) => { | ||
const source$ = (0, rxjs_1.from)([[0, 1, 2, 3, 4], [0]]); | ||
source$ | ||
.pipe( | ||
// | ||
(0, rx_utils_1.listWatchEvent)(), (0, rxjs_1.toArray)()) | ||
.subscribe((events) => { | ||
// expect(true).toBe(true); | ||
expect(events).toEqual([ | ||
[ | ||
[undefined, 0], | ||
[undefined, 1], | ||
[undefined, 2], | ||
[undefined, 3], | ||
[undefined, 4], | ||
], | ||
[ | ||
[1, undefined], | ||
[2, undefined], | ||
[3, undefined], | ||
[4, undefined], | ||
], | ||
]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe('group watch', () => { }); | ||
//# sourceMappingURL=rx-utils.test.js.map |
{ | ||
"name": "@yuants/utils", | ||
"version": "0.3.0", | ||
"version": "0.3.1", | ||
"main": "lib/index.js", | ||
@@ -5,0 +5,0 @@ "module": "dist/index.js", |
{ | ||
"libraries/utils/CHANGELOG.json": "f6bf6328abddd15c0ce3c65711637d67947fa891", | ||
"libraries/utils/CHANGELOG.md": "9dca7d4b98f1a8e9715d281c46d739a209fbf7d2", | ||
"libraries/utils/CHANGELOG.json": "18eb86be403fe6e4662786fa563eadeec689f67f", | ||
"libraries/utils/CHANGELOG.md": "9b69ec032dcc6406c4a8ee8f336a5f11540e4165", | ||
"libraries/utils/api-extractor.json": "62f4fd324425b9a235f0c117975967aab09ced0c", | ||
@@ -8,4 +8,4 @@ "libraries/utils/config/jest.config.json": "4bb17bde3ee911163a3edb36a6eb71491d80b1bd", | ||
"libraries/utils/config/typescript.json": "854907e8a821f2050f6533368db160c649c25348", | ||
"libraries/utils/etc/utils.api.md": "cf8829df4ceac3044907c3d1842631b44bdb4b94", | ||
"libraries/utils/package.json": "4ba3b8b3f6dd81abb03cfb26ba029a09681f78e0", | ||
"libraries/utils/etc/utils.api.md": "237b97b46b05e3f335a0d87a38964fc0ab1277d5", | ||
"libraries/utils/package.json": "fe1d868a3d0923c2ffcc0c9f0c1c2aa1e842451f", | ||
"libraries/utils/src/async-iterator-interop.test.ts": "ec6e2470d1ace812d5fac14450a2568ff8a4dfd5", | ||
@@ -16,4 +16,4 @@ "libraries/utils/src/async-iterator-interop.ts": "8dbb6150c1d75ed4d5c11be6fec6290fd8c0a561", | ||
"libraries/utils/src/order-utils.ts": "e0340bdfd925c6a1943871f37638947657dd0e4c", | ||
"libraries/utils/src/rx-utils.test.ts": "55617b7d55f406f8554c5fba24b34cf51a50270e", | ||
"libraries/utils/src/rx-utils.ts": "bf96d329c5de3d4e30cca9fab0e8f4c464c5d376", | ||
"libraries/utils/src/rx-utils.test.ts": "a342280433c9a4a50be2f73694f06155876a6783", | ||
"libraries/utils/src/rx-utils.ts": "6c60bb2e2ed7295292c5fb839f03ffe5c9b9ac5f", | ||
"libraries/utils/tsconfig.json": "22f94ca28b507f8ddcc21b9053158eefd3f726a9", | ||
@@ -20,0 +20,0 @@ "libraries/utils/.rush/temp/shrinkwrap-deps.json": "e85cf86d130fc06964ee303124691ff73a2d3972", |
@@ -471,3 +471,3 @@ { | ||
"canonicalReference": "@yuants/utils!listWatch:var", | ||
"docComment": "/**\n * list and watch a source of items, and apply consumer to each newly added item, the consumer should return an observable that completes when the item is fully processed,\n *\n * consumer will be cancelled when the item is removed.\n *\n * @param hashKey - hash key function to group items\n *\n * @param consumer - consumer function to process each item\n *\n * @returns \n *\n * @public\n */\n", | ||
"docComment": "/**\n * list and watch a source of items, and apply consumer to each newly added item, the consumer should return an observable that completes when the item is fully processed,\n *\n * consumer will be cancelled when the item is removed.\n *\n * @param keyFunc - hash key function to group items\n *\n * @param consumer - consumer function to process each item\n *\n * @param comparator - comparator function to compare items, return true if they are the same\n *\n * @returns \n *\n * @public\n */\n", | ||
"excerptTokens": [ | ||
@@ -480,3 +480,3 @@ { | ||
"kind": "Content", | ||
"text": "<T, K>(hashKey: (item: T) => string, consumer: (item: T) => " | ||
"text": "<T, K>(keyFunc: (item: T) => string, consumer: (item: T) => " | ||
}, | ||
@@ -490,3 +490,3 @@ { | ||
"kind": "Content", | ||
"text": "<K>) => " | ||
"text": "<K>, comparator?: (a: T, b: T) => boolean) => " | ||
}, | ||
@@ -512,2 +512,33 @@ { | ||
{ | ||
"kind": "Variable", | ||
"canonicalReference": "@yuants/utils!listWatchEvent:var", | ||
"docComment": "/**\n * list and watch a source of items, and apply consumer to each newly added item, the consumer should return an observable that completes when the item is fully processed,\n *\n * consumer will be cancelled when the item is removed.\n *\n * @param keyFunc - hash key function to group items\n *\n * @param comparator - comparator function to compare items, return true if they are the same\n *\n * @returns \n *\n * @public\n */\n", | ||
"excerptTokens": [ | ||
{ | ||
"kind": "Content", | ||
"text": "listWatchEvent: " | ||
}, | ||
{ | ||
"kind": "Content", | ||
"text": "<T>(keyFunc?: (item: T) => string, comparator?: (a: T, b: T) => boolean) => " | ||
}, | ||
{ | ||
"kind": "Reference", | ||
"text": "OperatorFunction", | ||
"canonicalReference": "rxjs!OperatorFunction:interface" | ||
}, | ||
{ | ||
"kind": "Content", | ||
"text": "<T[], [old: T | undefined, new: T | undefined][]>" | ||
} | ||
], | ||
"isReadonly": true, | ||
"releaseTag": "Public", | ||
"name": "listWatchEvent", | ||
"variableTypeTokenRange": { | ||
"startIndex": 1, | ||
"endIndex": 4 | ||
} | ||
}, | ||
{ | ||
"kind": "TypeAlias", | ||
@@ -514,0 +545,0 @@ "canonicalReference": "@yuants/utils!NativeSubject:type", |
@@ -51,5 +51,8 @@ ## API Report File for "@yuants/utils" | ||
// @public | ||
export const listWatch: <T, K>(hashKey: (item: T) => string, consumer: (item: T) => Observable<K>) => OperatorFunction<T[], K>; | ||
export const listWatch: <T, K>(keyFunc: (item: T) => string, consumer: (item: T) => Observable<K>, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], K>; | ||
// @public | ||
export const listWatchEvent: <T>(keyFunc?: (item: T) => string, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T[], [old: T | undefined, new: T | undefined][]>; | ||
// @public | ||
export type NativeSubject<T> = AsyncIterable<T> & AsyncIterator<T, void, T>; | ||
@@ -56,0 +59,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
185542
2765