Socket
Socket
Sign inDemoInstall

rxjs-async-map

Package Overview
Dependencies
2
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.0 to 0.1.1

19

out/index.js

@@ -6,14 +6,19 @@ "use strict";

const mergeMap_1 = require("rxjs/operators/mergeMap");
const utils_1 = require("./utils");
const notify_1 = require("./notify");
const mapper = (project) => {
const promises = new Array();
const onReady = () => utils_1.shiftWhile(promises, p => p.isReady()).forEach(p => p.notify());
return (value) => {
const notify = new notify_1.NotifyPromise(project(value), onReady);
promises.push(notify);
return notify.promise();
const notifiers = new Array();
const onReady = () => {
// find the first non-ready notifier in the queue,
// while invoking all ready notifiers that we encounter along the way
const notReadyIdx = notifiers.findIndex(notifier => !notifier.notifyIfReady());
if (notReadyIdx > 0) {
// remove all the notifiers we invoked
notifiers.splice(0, notReadyIdx);
}
};
return (value) => new Observable_1.Observable(sub => {
notifiers.push(notify_1.notify(project(value), sub, onReady));
});
};
exports.asyncMap = (project, concurrent) => mergeMap_1.mergeMap(mapper(project), concurrent);
//# sourceMappingURL=index.js.map

@@ -16,3 +16,4 @@ "use strict";

const project = jest.fn();
const output = yield index_1.asyncMap(project, 0)(rxjs_1.Observable.empty()).toArray().toPromise();
const input = rxjs_1.Observable.empty();
const output = yield input.pipe(index_1.asyncMap(project, 0)).toArray().toPromise();
expect(output).toEqual([]);

@@ -23,3 +24,3 @@ }));

const input = rxjs_1.Observable.of('f', 'ba', 'baz');
const output = yield index_1.asyncMap(project, 1)(input).toArray().toPromise();
const output = yield input.pipe(index_1.asyncMap(project, 1)).toArray().toPromise();
expect(output).toEqual([1, 2, 3]);

@@ -35,3 +36,3 @@ expect(project.mock.calls).toEqual([['f'], ['ba'], ['baz']]);

});
yield index_1.asyncMap(project, 2)(input).toArray().toPromise();
yield input.pipe(index_1.asyncMap(project, 2)).toArray().toPromise();
expect(invocations['bar'] - invocations['foo']).toBeLessThan(5);

@@ -51,3 +52,3 @@ expect(invocations['baz']).toBeGreaterThan(invocations['foo']);

const input = rxjs_1.Observable.of('foo', 'bar');
const output = yield index_1.asyncMap(project, 1)(input).toArray().toPromise();
const output = yield input.pipe(index_1.asyncMap(project, 1)).toArray().toPromise();
expect(output).toEqual([1, 2]);

@@ -54,0 +55,0 @@ }));

@@ -1,13 +0,5 @@

export declare class NotifyPromise<T> {
private _resolve;
private _reject;
private _isFulfilled;
private _value?;
private _isRejected;
private _reason?;
private _promise;
constructor(promise: PromiseLike<T>, onReady: (notify: NotifyPromise<T>) => void);
isReady(): boolean;
notify(): void;
promise(): Promise<T>;
import { Observer } from 'rxjs/Observer';
export interface Notifier {
notifyIfReady(): boolean;
}
export declare const notify: <T>(promise: PromiseLike<T>, observer: Observer<T>, onReady: (notifier: Notifier) => void) => Notifier;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class NotifyPromise {
constructor(promise, onReady) {
this._isFulfilled = false;
this._isRejected = false;
this._promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
promise.then(value => {
this._isFulfilled = true;
this._value = value;
onReady(this);
}, reason => {
this._isRejected = true;
this._reason = reason;
onReady(this);
});
});
}
isReady() {
return this._isFulfilled || this._isRejected;
}
notify() {
if (this._isFulfilled) {
this._resolve(this._value);
}
else if (this._isRejected) {
this._reject(this._reason);
}
else {
throw new Error('attempted to notify non-ready promise');
}
}
promise() {
return this._promise;
}
}
exports.NotifyPromise = NotifyPromise;
exports.notify = (promise, observer, onReady) => {
const notifier = {
notifyIfReady: () => false
};
promise.then(value => {
notifier.notifyIfReady = () => {
observer.next(value);
observer.complete();
return true;
};
onReady(notifier);
}, reason => {
notifier.notifyIfReady = () => {
observer.error(reason);
return true;
};
onReady(notifier);
});
return notifier;
};
//# sourceMappingURL=notify.js.map

@@ -11,30 +11,29 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const Observable_1 = require("rxjs/Observable");
const Observer_1 = require("rxjs/Observer");
const jasmine_promise_tools_1 = require("jasmine-promise-tools");
const notify_1 = require("./notify");
describe('NotifyPromise', () => {
describe('Notify', () => {
it('calls onReady when wrapped promise resolves', () => __awaiter(this, void 0, void 0, function* () {
const onReady = jest.fn(p => p.notify());
const notify = new notify_1.NotifyPromise(Promise.resolve(), onReady);
yield notify.promise();
const onReady = jest.fn(p => p.notifyIfReady());
yield new Observable_1.Observable(sub => {
notify_1.notify(Promise.resolve(), sub, onReady);
}).toPromise();
expect(onReady).toHaveBeenCalledTimes(1);
expect(notify.isReady()).toBe(true);
}));
it('calls onReady when wrapped promise rejects', () => __awaiter(this, void 0, void 0, function* () {
const onReady = jest.fn(p => p.notify());
const notify = new notify_1.NotifyPromise(Promise.reject('error'), onReady);
const err = yield jasmine_promise_tools_1.expectToReject(notify.promise());
const onReady = jest.fn(p => p.notifyIfReady());
const promise = new Observable_1.Observable(sub => {
notify_1.notify(Promise.reject('error'), sub, onReady);
}).toPromise();
const err = yield jasmine_promise_tools_1.expectToReject(promise);
expect(err).toBe('error');
expect(notify.isReady()).toBe(true);
expect(onReady).toHaveBeenCalledTimes(1);
}));
it('signals non-readiness when the wrapped promise has not been resolved yet', () => {
const onReady = jest.fn();
const notify = new notify_1.NotifyPromise(Promise.reject('error'), onReady);
expect(notify.isReady()).toBe(false);
const notifier = notify_1.notify(Promise.reject('error'), Observer_1.empty, onReady);
expect(notifier.notifyIfReady()).toBe(false);
});
it('throws when attempting to notify while the wrapped promise has not been resolved yet', () => {
const onReady = jest.fn();
const notify = new notify_1.NotifyPromise(Promise.reject('error'), onReady);
expect(() => notify.notify()).toThrow(/attempted to notify non-ready promise/);
});
});
//# sourceMappingURL=notify.spec.js.map
{
"name": "rxjs-async-map",
"version": "0.1.0",
"version": "0.1.1",
"description": "Map an observable using an async function with a configurable concurreny level, while preserving element order.",

@@ -29,2 +29,9 @@ "main": "out/index.js",

},
"greenkeeper": {
"ignore": [
"@types/jest",
"jest",
"ts-jest"
]
},
"jest": {

@@ -31,0 +38,0 @@ "collectCoverage": true,

@@ -5,2 +5,5 @@ # rxjs-async-map

[![CircleCI](https://img.shields.io/circleci/project/github/srijs/rxjs-async-map.svg)](https://circleci.com/gh/srijs/rxjs-async-map/tree/master)
[![Greenkeeper badge](https://badges.greenkeeper.io/srijs/rxjs-async-map.svg)](https://greenkeeper.io/)
## Installation

@@ -27,3 +30,3 @@

const source = Observable.of(1, 2, 3);
const source = of(1, 2, 3);

@@ -30,0 +33,0 @@ // Map over the observable using the async function, while running

@@ -8,3 +8,4 @@ import { Observable } from 'rxjs';

const project = jest.fn();
const output = await asyncMap(project, 0)(Observable.empty()).toArray().toPromise();
const input = Observable.empty();
const output = await input.pipe(asyncMap(project, 0)).toArray().toPromise();

@@ -17,3 +18,3 @@ expect(output).toEqual([]);

const input = Observable.of('f', 'ba', 'baz');
const output = await asyncMap(project, 1)(input).toArray().toPromise();
const output = await input.pipe(asyncMap(project, 1)).toArray().toPromise();

@@ -32,3 +33,3 @@ expect(output).toEqual([1, 2, 3]);

});
await asyncMap(project, 2)(input).toArray().toPromise();
await input.pipe(asyncMap(project, 2)).toArray().toPromise();

@@ -49,3 +50,3 @@ expect(invocations['bar'] - invocations['foo']).toBeLessThan(5);

const input = Observable.of('foo', 'bar');
const output = await asyncMap(project, 1)(input).toArray().toPromise();
const output = await input.pipe(asyncMap(project, 1)).toArray().toPromise();

@@ -52,0 +53,0 @@ expect(output).toEqual([1, 2]);

import { Observable } from 'rxjs/Observable';
import { mergeMap } from 'rxjs/operators/mergeMap';
import { shiftWhile } from './utils';
import { NotifyPromise } from './notify';
import { Notifier, notify } from './notify';
const mapper = <T, U>(project: (value: T) => PromiseLike<U>) => {
const promises = new Array<NotifyPromise<U>>();
const notifiers = new Array<Notifier>();
const onReady = () =>
shiftWhile(promises, p => p.isReady()).forEach(p => p.notify());
const onReady = () => {
// find the first non-ready notifier in the queue,
// while invoking all ready notifiers that we encounter along the way
const notReadyIdx = notifiers.findIndex(notifier => !notifier.notifyIfReady());
if (notReadyIdx > 0) {
// remove all the notifiers we invoked
notifiers.splice(0, notReadyIdx);
}
};
return (value: T) => {
const notify = new NotifyPromise(project(value), onReady);
promises.push(notify);
return notify.promise();
};
return (value: T) => new Observable<U>(sub => {
notifiers.push(notify(project(value), sub, onReady));
});
};

@@ -26,2 +29,3 @@

concurrent: number
) => mergeMap(mapper(project), concurrent);
): (source: Observable<T>) => Observable<U> =>
mergeMap(mapper(project), concurrent);

@@ -0,22 +1,26 @@

import { Observable } from 'rxjs/Observable';
import { empty as emptyObserver } from 'rxjs/Observer';
import { expectToReject } from 'jasmine-promise-tools';
import { NotifyPromise } from './notify';
import { notify } from './notify';
describe('NotifyPromise', () => {
describe('Notify', () => {
it('calls onReady when wrapped promise resolves', async () => {
const onReady = jest.fn(p => p.notify());
const notify = new NotifyPromise(Promise.resolve(), onReady);
await notify.promise();
const onReady = jest.fn(p => p.notifyIfReady());
await new Observable<void>(sub => {
notify(Promise.resolve(), sub, onReady);
}).toPromise();
expect(onReady).toHaveBeenCalledTimes(1);
expect(notify.isReady()).toBe(true);
});
it('calls onReady when wrapped promise rejects', async () => {
const onReady = jest.fn(p => p.notify());
const notify = new NotifyPromise(Promise.reject('error'), onReady);
const err = await expectToReject(notify.promise());
const onReady = jest.fn(p => p.notifyIfReady());
const promise = new Observable<void>(sub => {
notify(Promise.reject('error'), sub, onReady);
}).toPromise();
const err = await expectToReject(promise);
expect(err).toBe('error');
expect(notify.isReady()).toBe(true);
expect(onReady).toHaveBeenCalledTimes(1);
});

@@ -26,13 +30,6 @@

const onReady = jest.fn();
const notify = new NotifyPromise(Promise.reject('error'), onReady);
const notifier = notify(Promise.reject('error'), emptyObserver, onReady);
expect(notify.isReady()).toBe(false);
expect(notifier.notifyIfReady()).toBe(false);
});
it('throws when attempting to notify while the wrapped promise has not been resolved yet', () => {
const onReady = jest.fn();
const notify = new NotifyPromise(Promise.reject('error'), onReady);
expect(() => notify.notify()).toThrow(/attempted to notify non-ready promise/);
});
});

@@ -1,49 +0,37 @@

export class NotifyPromise<T> {
private _resolve: (item: T) => void;
private _reject: (reason: Error) => void;
import { Observer } from 'rxjs/Observer';
private _isFulfilled = false;
private _value?: T;
private _isRejected = false;
private _reason?: Error;
export interface Notifier {
notifyIfReady(): boolean;
}
private _promise: Promise<T>;
export const notify = <T>(
promise: PromiseLike<T>,
observer: Observer<T>,
onReady: (notifier: Notifier) => void
): Notifier => {
const notifier = {
notifyIfReady: () => false
};
constructor(promise: PromiseLike<T>, onReady: (notify: NotifyPromise<T>) => void) {
this._promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
promise.then(
value => {
notifier.notifyIfReady = () => {
observer.next(value);
observer.complete();
promise.then(
value => {
this._isFulfilled = true;
this._value = value;
onReady(this);
},
reason => {
this._isRejected = true;
this._reason = reason;
onReady(this);
}
);
});
}
return true;
};
onReady(notifier);
},
reason => {
notifier.notifyIfReady = () => {
observer.error(reason);
isReady() {
return this._isFulfilled || this._isRejected;
}
notify() {
if (this._isFulfilled) {
this._resolve(this._value);
} else if (this._isRejected) {
this._reject(this._reason);
} else {
throw new Error('attempted to notify non-ready promise');
return true;
};
onReady(notifier);
}
}
);
promise(): Promise<T> {
return this._promise;
}
}
return notifier;
};

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc