Socket
Socket
Sign inDemoInstall

rsocket-flowable

Package Overview
Dependencies
Maintainers
6
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rsocket-flowable - npm Package Compare versions

Comparing version 0.0.28 to 0.0.29-alpha.0

build/Flowable.js.flow

197

build/Flowable.js

@@ -15,29 +15,39 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {
IPublisher,
ISubscriber,
IPartialSubscriber,
ISubscription,
} from 'rsocket-types';
var _FlowableMapOperator = _interopRequireDefault(
require('./FlowableMapOperator')
);
var _FlowableTakeOperator = _interopRequireDefault(
require('./FlowableTakeOperator')
);
var _Invariant = _interopRequireDefault(require('./Invariant'));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}
function _defineProperty(obj, key, value) {
if (key in obj) {
Object.defineProperty(obj, key, {
value: value,
enumerable: true,
configurable: true,
writable: true,
});
} else {
obj[key] = value;
}
return obj;
}
import FlowableMapOperator from './FlowableMapOperator';
import FlowableTakeOperator from './FlowableTakeOperator';
import invariant from './Invariant';
export type Source<T> = (subscriber: ISubscriber<T>) => void;
/**
* Implements the ReactiveStream `Publisher` interface with Rx-style operators.
*/
export default class Flowable<T> implements IPublisher<T> {
_max: number;
_source: Source<T>;
static just<U>(...values: Array<U>): Flowable<U> {
return new Flowable(subscriber => {
class Flowable {
static just(...values) {
return new Flowable((subscriber) => {
let cancelled = false;

@@ -49,3 +59,3 @@ let i = 0;

},
request: n => {
request: (n) => {
while (!cancelled && n > 0 && i < values.length) {

@@ -63,4 +73,4 @@ subscriber.onNext(values[i++]);

static error<U = empty>(error: Error): Flowable<U> {
return new Flowable(subscriber => {
static error(error) {
return new Flowable((subscriber) => {
subscriber.onSubscribe({

@@ -75,4 +85,4 @@ cancel: () => {},

static never<U = empty>(): Flowable<U> {
return new Flowable(subscriber => {
static never() {
return new Flowable((subscriber) => {
subscriber.onSubscribe({

@@ -85,3 +95,3 @@ cancel: () => {},

constructor(source: Source<T>, max?: number = Number.MAX_SAFE_INTEGER) {
constructor(source, max = Number.MAX_SAFE_INTEGER) {
this._max = max;

@@ -91,6 +101,4 @@ this._source = source;

subscribe(
subscriberOrCallback?: ?(IPartialSubscriber<T> | ((T) => void)),
): void {
let partialSubscriber: ?IPartialSubscriber<T>;
subscribe(subscriberOrCallback) {
let partialSubscriber;
if (typeof subscriberOrCallback === 'function') {

@@ -105,21 +113,21 @@ partialSubscriber = this._wrapCallback(subscriberOrCallback);

lift<R>(
onSubscribeLift: (subscriber: ISubscriber<R>) => ISubscriber<T>,
): Flowable<R> {
return new Flowable(subscriber =>
this._source(onSubscribeLift(subscriber)),
lift(onSubscribeLift) {
return new Flowable((subscriber) =>
this._source(onSubscribeLift(subscriber))
);
}
map<R>(fn: (data: T) => R): Flowable<R> {
return this.lift(subscriber => new FlowableMapOperator(subscriber, fn));
map(fn) {
return this.lift(
(subscriber) => new _FlowableMapOperator.default(subscriber, fn)
);
}
take(toTake: number): Flowable<T> {
take(toTake) {
return this.lift(
subscriber => new FlowableTakeOperator(subscriber, toTake),
(subscriber) => new _FlowableTakeOperator.default(subscriber, toTake)
);
}
_wrapCallback(callback: (T) => void): IPartialSubscriber<T> {
_wrapCallback(callback) {
const max = this._max;

@@ -137,13 +145,49 @@ return {

* @private
*/
class FlowableSubscriber<T> implements ISubscriber<T> {
_active: boolean;
_emitting: boolean;
_max: number;
_pending: number;
_started: boolean;
_subscriber: IPartialSubscriber<T>;
_subscription: ?ISubscription;
*/ exports.default = Flowable;
class FlowableSubscriber {
constructor(subscriber, max) {
_defineProperty(
this,
'_cancel',
constructor(subscriber?: ?IPartialSubscriber<T>, max: number) {
() => {
if (!this._active) {
return;
}
this._active = false;
if (this._subscription) {
this._subscription.cancel();
}
}
);
_defineProperty(
this,
'_request',
(n) => {
(0, _Invariant.default)(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
n
);
if (!this._active) {
return;
}
if (n === this._max) {
this._pending = this._max;
} else {
this._pending += n;
if (this._pending >= this._max) {
this._pending = this._max;
}
}
if (this._subscription) {
this._subscription.request(n);
}
}
);
this._active = false;

@@ -156,4 +200,3 @@ this._max = max;

}
onComplete(): void {
onComplete() {
if (!this._active) {

@@ -164,3 +207,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called',
: 'onSubscribe has not been called'
);

@@ -181,4 +224,3 @@ return;

}
onError(error: Error): void {
onError(error) {
if (this._started && !this._active) {

@@ -189,3 +231,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called',
: 'onSubscribe has not been called'
);

@@ -198,4 +240,3 @@ return;

}
onNext(data: T): void {
onNext(data) {
if (!this._active) {

@@ -206,3 +247,3 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called',
: 'onSubscribe has not been called'
);

@@ -214,3 +255,3 @@ return;

'Flowable: Invalid call to onNext(), all request()ed values have been ' +
'published.',
'published.'
);

@@ -231,4 +272,3 @@ return;

}
onSubscribe(subscription?: ?ISubscription): void {
onSubscribe(subscription) {
if (this._started) {

@@ -251,37 +291,2 @@ console.warn('Flowable: Invalid call to onSubscribe(): already called.');

}
_cancel = () => {
if (!this._active) {
return;
}
this._active = false;
if (this._subscription) {
this._subscription.cancel();
}
};
_request = (n: number) => {
invariant(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
n,
);
if (!this._active) {
return;
}
if (n === this._max) {
this._pending = this._max;
} else {
this._pending += n;
if (this._pending >= this._max) {
this._pending = this._max;
}
}
if (this._subscription) {
this._subscription.request(n);
}
};
}

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -28,8 +28,4 @@ * An operator that acts like Array.map, applying a given function to

*/
export default class FlowableMapOperator<T, R> implements ISubscriber<T> {
_fn: (t: T) => R;
_subscriber: ISubscriber<R>;
_subscription: ?ISubscription;
constructor(subscriber: ISubscriber<R>, fn: (t: T) => R) {
class FlowableMapOperator {
constructor(subscriber, fn) {
this._fn = fn;

@@ -40,11 +36,11 @@ this._subscriber = subscriber;

onComplete(): void {
onComplete() {
this._subscriber.onComplete();
}
onError(error: Error): void {
onError(error) {
this._subscriber.onError(error);
}
onNext(t: T): void {
onNext(t) {
try {

@@ -61,3 +57,3 @@ this._subscriber.onNext(this._fn(t));

onSubscribe(subscription: ISubscription): void {
onSubscribe(subscription) {
this._subscription = subscription;

@@ -67,1 +63,2 @@ this._subscriber.onSubscribe(subscription);

}
exports.default = FlowableMapOperator;

@@ -1,13 +0,7 @@

import type {IPublisher, ISubscription, ISubscriber} from 'rsocket-types';
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
export default class FlowableProcessor<T, R>
implements IPublisher<R>, ISubscriber<T>, ISubscription {
_transformer: (T) => R;
_source: IPublisher<T>;
_sink: ISubscriber<R>;
_subscription: ISubscription;
_done: boolean;
_error: Error;
constructor(source: IPublisher<T>, fn?: (T) => R) {
class FlowableProcessor {
constructor(source, fn) {
this._source = source;

@@ -19,7 +13,7 @@ this._transformer = fn;

onSubscribe(subscription: ISubscription) {
onSubscribe(subscription) {
this._subscription = subscription;
}
onNext(t: T) {
onNext(t) {
if (!this._sink) {

@@ -36,8 +30,9 @@ console.warn('premature onNext for processor, dropping value');

(interimVal, mapper) => mapper(interimVal),
val,
val
);
this._sink.onNext(finalVal);
}
onError(error: Error) {
onError(error) {
this._error = error;

@@ -60,3 +55,3 @@ if (!this._sink) {

subscribe(subscriber: ISubscriber<R>) {
subscribe(subscriber) {
if (this._source.subscribe) {

@@ -75,3 +70,3 @@ this._source.subscribe(this);

map<S>(fn: (R) => S) {
map(fn) {
this._mappers.push(fn);

@@ -81,3 +76,3 @@ return this;

request(n: number) {
request(n) {
this._subscription && this._subscription.request(n);

@@ -90,1 +85,2 @@ }

}
exports.default = FlowableProcessor;

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -27,7 +27,4 @@ * An operator that `request()`s the given number of items immediately upon

*/
export default class FlowableRequestOperator<T> implements ISubscriber<T> {
_subscriber: ISubscriber<T>;
_toRequest: number;
constructor(subscriber: ISubscriber<T>, toRequest: number) {
class FlowableRequestOperator {
constructor(subscriber, toRequest) {
this._subscriber = subscriber;

@@ -37,15 +34,15 @@ this._toRequest = toRequest;

onComplete(): void {
onComplete() {
this._subscriber.onComplete();
}
onError(error: Error): void {
onError(error) {
this._subscriber.onError(error);
}
onNext(t: T): void {
onNext(t) {
this._subscriber.onNext(t);
}
onSubscribe(subscription: ISubscription): void {
onSubscribe(subscription) {
this._subscriber.onSubscribe(subscription);

@@ -55,1 +52,2 @@ subscription.request(this._toRequest);

}
exports.default = FlowableRequestOperator;

@@ -15,9 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
import type {ISubscriber, ISubscription} from 'rsocket-types';
/**

@@ -28,8 +28,4 @@ * An operator that requests a fixed number of values from its source

*/
export default class FlowableTakeOperator<T> implements ISubscriber<T> {
_subscriber: ISubscriber<T>;
_subscription: ?ISubscription;
_toTake: number;
constructor(subscriber: ISubscriber<T>, toTake: number) {
class FlowableTakeOperator {
constructor(subscriber, toTake) {
this._subscriber = subscriber;

@@ -40,11 +36,11 @@ this._subscription = null;

onComplete(): void {
onComplete() {
this._subscriber.onComplete();
}
onError(error: Error): void {
onError(error) {
this._subscriber.onError(error);
}
onNext(t: T): void {
onNext(t) {
try {

@@ -64,3 +60,3 @@ this._subscriber.onNext(t);

onSubscribe(subscription: ISubscription): void {
onSubscribe(subscription) {
this._subscription = subscription;

@@ -73,3 +69,3 @@ this._subscriber.onSubscribe(subscription);

_cancelAndComplete(): void {
_cancelAndComplete() {
if (!this._subscription) {

@@ -82,1 +78,2 @@ throw new Error('subscription is null');

}
exports.default = FlowableTakeOperator;

@@ -15,8 +15,13 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.every = every;
import Flowable from './Flowable';
var _Flowable = _interopRequireDefault(require('./Flowable'));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}

@@ -34,4 +39,4 @@ /**

*/
export function every(ms: number): Flowable<number> {
return new Flowable(subscriber => {
function every(ms) {
return new _Flowable.default((subscriber) => {
let intervalId = null;

@@ -46,3 +51,3 @@ let pending = 0;

},
request: n => {
request: (n) => {
if (n < Number.MAX_SAFE_INTEGER) {

@@ -49,0 +54,0 @@ pending += n;

@@ -15,15 +15,38 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
Object.defineProperty(exports, 'Flowable', {
enumerable: true,
get: function () {
return _Flowable.default;
},
});
Object.defineProperty(exports, 'Single', {
enumerable: true,
get: function () {
return _Single.default;
},
});
Object.defineProperty(exports, 'FlowableProcessor', {
enumerable: true,
get: function () {
return _FlowableProcessor.default;
},
});
Object.defineProperty(exports, 'every', {
enumerable: true,
get: function () {
return _FlowableTimer.every;
},
});
import Flowable from './Flowable';
import Single from './Single';
import FlowableProcessor from './FlowableProcessor';
import {every} from './FlowableTimer';
/**
* The public API of the `flowable` package.
*/
export {Flowable, FlowableProcessor, Single, every};
var _Flowable = _interopRequireDefault(require('./Flowable'));
var _Single = _interopRequireDefault(require('./Single'));
var _FlowableProcessor = _interopRequireDefault(require('./FlowableProcessor'));
var _FlowableTimer = require('./FlowableTimer');
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}

@@ -7,3 +7,3 @@ /**

*
* @flow
*
*/

@@ -21,7 +21,3 @@ 'use strict';

*/
function invariant(
condition: mixed,
format: string,
...args: Array<mixed>
): void {
function invariant(condition, format, ...args) {
if (!condition) {

@@ -33,3 +29,3 @@ let error;

'Minified exception occurred; use the non-minified ' +
'dev environment for the full error message and additional helpful warnings.',
'dev environment for the full error message and additional helpful warnings.'
);

@@ -42,3 +38,3 @@ } else {

(error: any).framesToPop = 1; // Skip invariant's own stack frame.
error.framesToPop = 1; // Skip invariant's own stack frame.

@@ -45,0 +41,0 @@ throw error;

@@ -15,26 +15,9 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
* @flow
*
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.default = void 0;
export type Source<T> = (subject: IFutureSubject<T>) => void;
export type CancelCallback = () => void;
export interface IPartialFutureSubscriber<T> {
+onComplete?: (value: T) => void,
+onError?: (error: Error) => void,
+onSubscribe?: (cancel: CancelCallback) => void,
}
export interface IFutureSubscriber<T> {
+onComplete: (value: T) => void,
+onError: (error: Error) => void,
+onSubscribe: (cancel: CancelCallback) => void,
}
export interface IFutureSubject<T> {
+onComplete: (value: T) => void,
+onError: (error: Error) => void,
+onSubscribe: (cancel?: ?CancelCallback) => void,
}
/**

@@ -73,7 +56,5 @@ * Represents a lazy computation that will either produce a value of type T

*/
export default class Single<T> {
_source: Source<T>;
static of<U>(value: U): Single<U> {
return new Single(subscriber => {
class Single {
static of(value) {
return new Single((subscriber) => {
subscriber.onSubscribe();

@@ -84,4 +65,4 @@ subscriber.onComplete(value);

static error<U = empty>(error: Error): Single<U> {
return new Single(subscriber => {
static error(error) {
return new Single((subscriber) => {
subscriber.onSubscribe();

@@ -92,4 +73,4 @@ subscriber.onError(error);

static never<U = empty>(): Single<U> {
return new Single(subscriber => {
static never() {
return new Single((subscriber) => {
subscriber.onSubscribe();

@@ -99,7 +80,7 @@ });

constructor(source: Source<T>) {
constructor(source) {
this._source = source;
}
subscribe(partialSubscriber?: ?IPartialFutureSubscriber<T>): void {
subscribe(partialSubscriber) {
const subscriber = new FutureSubscriber(partialSubscriber);

@@ -113,4 +94,4 @@ try {

flatMap<R>(fn: (data: T) => Single<R>): Single<R> {
return new Single(subscriber => {
flatMap(fn) {
return new Single((subscriber) => {
let currentCancel;

@@ -122,9 +103,9 @@ const cancel = () => {

this._source({
onComplete: value => {
onComplete: (value) => {
fn(value).subscribe({
onComplete: mapValue => {
onComplete: (mapValue) => {
subscriber.onComplete(mapValue);
},
onError: error => subscriber.onError(error),
onSubscribe: _cancel => {
onError: (error) => subscriber.onError(error),
onSubscribe: (_cancel) => {
currentCancel = _cancel;

@@ -134,4 +115,4 @@ },

},
onError: error => subscriber.onError(error),
onSubscribe: _cancel => {
onError: (error) => subscriber.onError(error),
onSubscribe: (_cancel) => {
currentCancel = _cancel;

@@ -148,8 +129,8 @@ subscriber.onSubscribe(cancel);

*/
map<R>(fn: (data: T) => R): Single<R> {
return new Single(subscriber => {
map(fn) {
return new Single((subscriber) => {
return this._source({
onComplete: value => subscriber.onComplete(fn(value)),
onError: error => subscriber.onError(error),
onSubscribe: cancel => subscriber.onSubscribe(cancel),
onComplete: (value) => subscriber.onComplete(fn(value)),
onError: (error) => subscriber.onError(error),
onSubscribe: (cancel) => subscriber.onSubscribe(cancel),
});

@@ -159,3 +140,3 @@ });

then(successFn?: (data: T) => void, errorFn?: (error: Error) => void): void {
then(successFn, errorFn) {
this.subscribe({

@@ -170,9 +151,5 @@ onComplete: successFn || (() => {}),

* @private
*/
class FutureSubscriber<T> implements IFutureSubscriber<T> {
_active: boolean;
_started: boolean;
_subscriber: IPartialFutureSubscriber<T>;
constructor(subscriber?: ?IPartialFutureSubscriber<T>) {
*/ exports.default = Single;
class FutureSubscriber {
constructor(subscriber) {
this._active = false;

@@ -183,3 +160,3 @@ this._started = false;

onComplete(value: T): void {
onComplete(value) {
if (!this._active) {

@@ -190,4 +167,5 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called',
: 'onSubscribe has not been called'
);
return;

@@ -208,3 +186,3 @@ }

onError(error: Error): void {
onError(error) {
if (this._started && !this._active) {

@@ -215,4 +193,5 @@ console.warn(

? 'onComplete/onError was already called'
: 'onSubscribe has not been called',
: 'onSubscribe has not been called'
);
return;

@@ -225,3 +204,3 @@ }

onSubscribe(cancel?: ?CancelCallback): void {
onSubscribe(cancel) {
if (this._started) {

@@ -228,0 +207,0 @@ console.warn('Single: Invalid call to onSubscribe(): already called.');

{
"name": "rsocket-flowable",
"description": "ReactiveStreams for JavaScript",
"version": "0.0.28",
"version": "0.0.29-alpha.0",
"repository": {

@@ -11,3 +11,3 @@ "type": "git",

"main": "build/index.js",
"gitHead": "1dd3eb28183991d663392d87877225bf862946e2"
"gitHead": "1b149fbd3dd66fbfdbaa91a8c95264214114c546"
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc