Socket
Socket
Sign inDemoInstall

rsocket-flowable

Package Overview
Dependencies
Maintainers
6
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rsocket-flowable - npm Package Compare versions

Comparing version 0.0.27 to 0.0.28

197

build/Flowable.js

@@ -15,39 +15,29 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
var _FlowableMapOperator = _interopRequireDefault(
require('./FlowableMapOperator')
);
var _FlowableTakeOperator = _interopRequireDefault(
require('./FlowableTakeOperator')
);
var _Invariant = _interopRequireDefault(require('./Invariant'));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}
function _defineProperty(obj, key, value) {
if (key in obj) {
Object.defineProperty(obj, key, {
value: value,
enumerable: true,
configurable: true,
writable: true,
});
} else {
obj[key] = value;
}
return obj;
}
import type {
IPublisher,
ISubscriber,
IPartialSubscriber,
ISubscription,
} from 'rsocket-types';
import FlowableMapOperator from './FlowableMapOperator';
import FlowableTakeOperator from './FlowableTakeOperator';
import invariant from './Invariant';
export type Source<T> = (subscriber: ISubscriber<T>) => void;
/**
* Implements the ReactiveStream `Publisher` interface with Rx-style operators.
*/
class Flowable {
static just(...values) {
return new Flowable((subscriber) => {
export default class Flowable<T> implements IPublisher<T> {
_max: number;
_source: Source<T>;
static just<U>(...values: Array<U>): Flowable<U> {
return new Flowable(subscriber => {
let cancelled = false;

@@ -59,3 +49,3 @@ let i = 0;

},
request: (n) => {
request: n => {
while (!cancelled && n > 0 && i < values.length) {

@@ -73,4 +63,4 @@ subscriber.onNext(values[i++]);

static error(error) {
return new Flowable((subscriber) => {
static error<U = empty>(error: Error): Flowable<U> {
return new Flowable(subscriber => {
subscriber.onSubscribe({

@@ -85,4 +75,4 @@ cancel: () => {},

static never() {
return new Flowable((subscriber) => {
static never<U = empty>(): Flowable<U> {
return new Flowable(subscriber => {
subscriber.onSubscribe({

@@ -95,3 +85,3 @@ cancel: () => {},

constructor(source, max = Number.MAX_SAFE_INTEGER) {
constructor(source: Source<T>, max?: number = Number.MAX_SAFE_INTEGER) {
this._max = max;

@@ -101,4 +91,6 @@ this._source = source;

subscribe(subscriberOrCallback) {
let partialSubscriber;
subscribe(
subscriberOrCallback?: ?(IPartialSubscriber<T> | ((T) => void)),
): void {
let partialSubscriber: ?IPartialSubscriber<T>;
if (typeof subscriberOrCallback === 'function') {

@@ -113,21 +105,21 @@ partialSubscriber = this._wrapCallback(subscriberOrCallback);

lift(onSubscribeLift) {
return new Flowable((subscriber) =>
this._source(onSubscribeLift(subscriber))
lift<R>(
onSubscribeLift: (subscriber: ISubscriber<R>) => ISubscriber<T>,
): Flowable<R> {
return new Flowable(subscriber =>
this._source(onSubscribeLift(subscriber)),
);
}
map(fn) {
return this.lift(
(subscriber) => new _FlowableMapOperator.default(subscriber, fn)
);
map<R>(fn: (data: T) => R): Flowable<R> {
return this.lift(subscriber => new FlowableMapOperator(subscriber, fn));
}
take(toTake) {
take(toTake: number): Flowable<T> {
return this.lift(
(subscriber) => new _FlowableTakeOperator.default(subscriber, toTake)
subscriber => new FlowableTakeOperator(subscriber, toTake),
);
}
_wrapCallback(callback) {
_wrapCallback(callback: (T) => void): IPartialSubscriber<T> {
const max = this._max;

@@ -145,49 +137,13 @@ return {

* @private
*/ exports.default = Flowable;
class FlowableSubscriber {
constructor(subscriber, max) {
_defineProperty(
this,
'_cancel',
*/
class FlowableSubscriber<T> implements ISubscriber<T> {
_active: boolean;
_emitting: boolean;
_max: number;
_pending: number;
_started: boolean;
_subscriber: IPartialSubscriber<T>;
_subscription: ?ISubscription;
() => {
if (!this._active) {
return;
}
this._active = false;
if (this._subscription) {
this._subscription.cancel();
}
}
);
_defineProperty(
this,
'_request',
(n) => {
(0, _Invariant.default)(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
n
);
if (!this._active) {
return;
}
if (n === this._max) {
this._pending = this._max;
} else {
this._pending += n;
if (this._pending >= this._max) {
this._pending = this._max;
}
}
if (this._subscription) {
this._subscription.request(n);
}
}
);
constructor(subscriber?: ?IPartialSubscriber<T>, max: number) {
this._active = false;

@@ -200,3 +156,4 @@ this._max = max;

}
onComplete() {
onComplete(): void {
if (!this._active) {

@@ -207,3 +164,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called'
: 'onSubscribe has not been called',
);

@@ -224,3 +181,4 @@ return;

}
onError(error) {
onError(error: Error): void {
if (this._started && !this._active) {

@@ -231,3 +189,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called'
: 'onSubscribe has not been called',
);

@@ -240,3 +198,4 @@ return;

}
onNext(data) {
onNext(data: T): void {
if (!this._active) {

@@ -247,3 +206,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called'
: 'onSubscribe has not been called',
);

@@ -255,3 +214,3 @@ return;

'Flowable: Invalid call to onNext(), all request()ed values have been ' +
'published.'
'published.',
);

@@ -272,3 +231,4 @@ return;

}
onSubscribe(subscription) {
onSubscribe(subscription?: ?ISubscription): void {
if (this._started) {

@@ -291,2 +251,37 @@ console.warn('Flowable: Invalid call to onSubscribe(): already called.');

}
_cancel = () => {
if (!this._active) {
return;
}
this._active = false;
if (this._subscription) {
this._subscription.cancel();
}
};
_request = (n: number) => {
invariant(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
n,
);
if (!this._active) {
return;
}
if (n === this._max) {
this._pending = this._max;
} else {
this._pending += n;
if (this._pending >= this._max) {
this._pending = this._max;
}
}
if (this._subscription) {
this._subscription.request(n);
}
};
}

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -28,4 +28,8 @@ * An operator that acts like Array.map, applying a given function to

*/
class FlowableMapOperator {
constructor(subscriber, fn) {
export default class FlowableMapOperator<T, R> implements ISubscriber<T> {
_fn: (t: T) => R;
_subscriber: ISubscriber<R>;
_subscription: ?ISubscription;
constructor(subscriber: ISubscriber<R>, fn: (t: T) => R) {
this._fn = fn;

@@ -36,11 +40,11 @@ this._subscriber = subscriber;

onComplete() {
onComplete(): void {
this._subscriber.onComplete();
}
onError(error) {
onError(error: Error): void {
this._subscriber.onError(error);
}
onNext(t) {
onNext(t: T): void {
try {

@@ -57,3 +61,3 @@ this._subscriber.onNext(this._fn(t));

onSubscribe(subscription) {
onSubscribe(subscription: ISubscription): void {
this._subscription = subscription;

@@ -63,2 +67,1 @@ this._subscriber.onSubscribe(subscription);

}
exports.default = FlowableMapOperator;

@@ -1,7 +0,13 @@

'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {IPublisher, ISubscription, ISubscriber} from 'rsocket-types';
class FlowableProcessor {
constructor(source, fn) {
export default class FlowableProcessor<T, R>
implements IPublisher<R>, ISubscriber<T>, ISubscription {
_transformer: (T) => R;
_source: IPublisher<T>;
_sink: ISubscriber<R>;
_subscription: ISubscription;
_done: boolean;
_error: Error;
constructor(source: IPublisher<T>, fn?: (T) => R) {
this._source = source;

@@ -13,7 +19,7 @@ this._transformer = fn;

onSubscribe(subscription) {
onSubscribe(subscription: ISubscription) {
this._subscription = subscription;
}
onNext(t) {
onNext(t: T) {
if (!this._sink) {

@@ -30,9 +36,8 @@ console.warn('premature onNext for processor, dropping value');

(interimVal, mapper) => mapper(interimVal),
val
val,
);
this._sink.onNext(finalVal);
}
onError(error) {
onError(error: Error) {
this._error = error;

@@ -55,3 +60,3 @@ if (!this._sink) {

subscribe(subscriber) {
subscribe(subscriber: ISubscriber<R>) {
if (this._source.subscribe) {

@@ -70,3 +75,3 @@ this._source.subscribe(this);

map(fn) {
map<S>(fn: (R) => S) {
this._mappers.push(fn);

@@ -76,3 +81,3 @@ return this;

request(n) {
request(n: number) {
this._subscription && this._subscription.request(n);

@@ -85,2 +90,1 @@ }

}
exports.default = FlowableProcessor;

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -27,4 +27,7 @@ * An operator that `request()`s the given number of items immediately upon

*/
class FlowableRequestOperator {
constructor(subscriber, toRequest) {
export default class FlowableRequestOperator<T> implements ISubscriber<T> {
_subscriber: ISubscriber<T>;
_toRequest: number;
constructor(subscriber: ISubscriber<T>, toRequest: number) {
this._subscriber = subscriber;

@@ -34,15 +37,15 @@ this._toRequest = toRequest;

onComplete() {
onComplete(): void {
this._subscriber.onComplete();
}
onError(error) {
onError(error: Error): void {
this._subscriber.onError(error);
}
onNext(t) {
onNext(t: T): void {
this._subscriber.onNext(t);
}
onSubscribe(subscription) {
onSubscribe(subscription: ISubscription): void {
this._subscriber.onSubscribe(subscription);

@@ -52,2 +55,1 @@ subscription.request(this._toRequest);

}
exports.default = FlowableRequestOperator;

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -28,4 +28,8 @@ * An operator that requests a fixed number of values from its source

*/
class FlowableTakeOperator {
constructor(subscriber, toTake) {
export default class FlowableTakeOperator<T> implements ISubscriber<T> {
_subscriber: ISubscriber<T>;
_subscription: ?ISubscription;
_toTake: number;
constructor(subscriber: ISubscriber<T>, toTake: number) {
this._subscriber = subscriber;

@@ -36,11 +40,11 @@ this._subscription = null;

onComplete() {
onComplete(): void {
this._subscriber.onComplete();
}
onError(error) {
onError(error: Error): void {
this._subscriber.onError(error);
}
onNext(t) {
onNext(t: T): void {
try {

@@ -60,3 +64,3 @@ this._subscriber.onNext(t);

onSubscribe(subscription) {
onSubscribe(subscription: ISubscription): void {
this._subscription = subscription;

@@ -69,3 +73,3 @@ this._subscriber.onSubscribe(subscription);

_cancelAndComplete() {
_cancelAndComplete(): void {
if (!this._subscription) {

@@ -78,2 +82,1 @@ throw new Error('subscription is null');

}
exports.default = FlowableTakeOperator;

@@ -15,13 +15,8 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.every = every;
var _Flowable = _interopRequireDefault(require('./Flowable'));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}
import Flowable from './Flowable';

@@ -39,4 +34,4 @@ /**

*/
function every(ms) {
return new _Flowable.default((subscriber) => {
export function every(ms: number): Flowable<number> {
return new Flowable(subscriber => {
let intervalId = null;

@@ -51,3 +46,3 @@ let pending = 0;

},
request: (n) => {
request: n => {
if (n < Number.MAX_SAFE_INTEGER) {

@@ -54,0 +49,0 @@ pending += n;

@@ -15,38 +15,15 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
Object.defineProperty(exports, 'Flowable', {
enumerable: true,
get: function () {
return _Flowable.default;
},
});
Object.defineProperty(exports, 'Single', {
enumerable: true,
get: function () {
return _Single.default;
},
});
Object.defineProperty(exports, 'FlowableProcessor', {
enumerable: true,
get: function () {
return _FlowableProcessor.default;
},
});
Object.defineProperty(exports, 'every', {
enumerable: true,
get: function () {
return _FlowableTimer.every;
},
});
var _Flowable = _interopRequireDefault(require('./Flowable'));
var _Single = _interopRequireDefault(require('./Single'));
var _FlowableProcessor = _interopRequireDefault(require('./FlowableProcessor'));
var _FlowableTimer = require('./FlowableTimer');
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}
import Flowable from './Flowable';
import Single from './Single';
import FlowableProcessor from './FlowableProcessor';
import {every} from './FlowableTimer';
/**
* The public API of the `flowable` package.
*/
export {Flowable, FlowableProcessor, Single, every};

@@ -7,3 +7,3 @@ /**

*
*
* @flow
*/

@@ -21,3 +21,7 @@ 'use strict';

*/
function invariant(condition, format, ...args) {
function invariant(
condition: mixed,
format: string,
...args: Array<mixed>
): void {
if (!condition) {

@@ -29,3 +33,3 @@ let error;

'Minified exception occurred; use the non-minified ' +
'dev environment for the full error message and additional helpful warnings.'
'dev environment for the full error message and additional helpful warnings.',
);

@@ -38,3 +42,3 @@ } else {

error.framesToPop = 1; // Skip invariant's own stack frame.
(error: any).framesToPop = 1; // Skip invariant's own stack frame.

@@ -41,0 +45,0 @@ throw error;

@@ -15,9 +15,26 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
export type Source<T> = (subject: IFutureSubject<T>) => void;
export type CancelCallback = () => void;
export interface IPartialFutureSubscriber<T> {
+onComplete?: (value: T) => void,
+onError?: (error: Error) => void,
+onSubscribe?: (cancel: CancelCallback) => void,
}
export interface IFutureSubscriber<T> {
+onComplete: (value: T) => void,
+onError: (error: Error) => void,
+onSubscribe: (cancel: CancelCallback) => void,
}
export interface IFutureSubject<T> {
+onComplete: (value: T) => void,
+onError: (error: Error) => void,
+onSubscribe: (cancel?: ?CancelCallback) => void,
}
/**

@@ -56,5 +73,7 @@ * Represents a lazy computation that will either produce a value of type T

*/
class Single {
static of(value) {
return new Single((subscriber) => {
export default class Single<T> {
_source: Source<T>;
static of<U>(value: U): Single<U> {
return new Single(subscriber => {
subscriber.onSubscribe();

@@ -65,4 +84,4 @@ subscriber.onComplete(value);

static error(error) {
return new Single((subscriber) => {
static error<U = empty>(error: Error): Single<U> {
return new Single(subscriber => {
subscriber.onSubscribe();

@@ -73,4 +92,4 @@ subscriber.onError(error);

static never() {
return new Single((subscriber) => {
static never<U = empty>(): Single<U> {
return new Single(subscriber => {
subscriber.onSubscribe();

@@ -80,7 +99,7 @@ });

constructor(source) {
constructor(source: Source<T>) {
this._source = source;
}
subscribe(partialSubscriber) {
subscribe(partialSubscriber?: ?IPartialFutureSubscriber<T>): void {
const subscriber = new FutureSubscriber(partialSubscriber);

@@ -94,4 +113,4 @@ try {

flatMap(fn) {
return new Single((subscriber) => {
flatMap<R>(fn: (data: T) => Single<R>): Single<R> {
return new Single(subscriber => {
let currentCancel;

@@ -103,9 +122,9 @@ const cancel = () => {

this._source({
onComplete: (value) => {
onComplete: value => {
fn(value).subscribe({
onComplete: (mapValue) => {
onComplete: mapValue => {
subscriber.onComplete(mapValue);
},
onError: (error) => subscriber.onError(error),
onSubscribe: (_cancel) => {
onError: error => subscriber.onError(error),
onSubscribe: _cancel => {
currentCancel = _cancel;

@@ -115,4 +134,4 @@ },

},
onError: (error) => subscriber.onError(error),
onSubscribe: (_cancel) => {
onError: error => subscriber.onError(error),
onSubscribe: _cancel => {
currentCancel = _cancel;

@@ -129,8 +148,8 @@ subscriber.onSubscribe(cancel);

*/
map(fn) {
return new Single((subscriber) => {
map<R>(fn: (data: T) => R): Single<R> {
return new Single(subscriber => {
return this._source({
onComplete: (value) => subscriber.onComplete(fn(value)),
onError: (error) => subscriber.onError(error),
onSubscribe: (cancel) => subscriber.onSubscribe(cancel),
onComplete: value => subscriber.onComplete(fn(value)),
onError: error => subscriber.onError(error),
onSubscribe: cancel => subscriber.onSubscribe(cancel),
});

@@ -140,3 +159,3 @@ });

then(successFn, errorFn) {
then(successFn?: (data: T) => void, errorFn?: (error: Error) => void): void {
this.subscribe({

@@ -151,5 +170,9 @@ onComplete: successFn || (() => {}),

* @private
*/ exports.default = Single;
class FutureSubscriber {
constructor(subscriber) {
*/
class FutureSubscriber<T> implements IFutureSubscriber<T> {
_active: boolean;
_started: boolean;
_subscriber: IPartialFutureSubscriber<T>;
constructor(subscriber?: ?IPartialFutureSubscriber<T>) {
this._active = false;

@@ -160,3 +183,3 @@ this._started = false;

onComplete(value) {
onComplete(value: T): void {
if (!this._active) {

@@ -167,5 +190,4 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called'
: 'onSubscribe has not been called',
);
return;

@@ -186,3 +208,3 @@ }

onError(error) {
onError(error: Error): void {
if (this._started && !this._active) {

@@ -193,5 +215,4 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called'
: 'onSubscribe has not been called',
);
return;

@@ -204,3 +225,3 @@ }

onSubscribe(cancel) {
onSubscribe(cancel?: ?CancelCallback): void {
if (this._started) {

@@ -207,0 +228,0 @@ console.warn('Single: Invalid call to onSubscribe(): already called.');

{
"name": "rsocket-flowable",
"description": "ReactiveStreams for JavaScript",
"version": "0.0.27",
"version": "0.0.28",
"repository": {

@@ -11,3 +11,3 @@ "type": "git",

"main": "build/index.js",
"gitHead": "23da9b0e9377ba52d3294cf4763cbbf3de7ba7b1"
"gitHead": "1dd3eb28183991d663392d87877225bf862946e2"
}

@@ -0,0 +0,0 @@ # rsocket-flowable

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc