rsocket-flowable
Advanced tools
Comparing version
@@ -131,25 +131,15 @@ /** | ||
} | ||
if (this._emitting) { | ||
// Prevent onNext -> request -> onNext -> request -> ... cycles in a | ||
// single event loop by deferring any requests within an onNext invocation | ||
// to the end of the current event loop. Uses `request` instead of | ||
// `callbacks.request` to update `_pending` at the appropriate time and account | ||
// for the possibility of an intervening cancellation. | ||
setTimeout(() => this._request(n), 0); | ||
if (n === this._max) { | ||
this._pending = this._max; | ||
} else { | ||
if (n === this._max) { | ||
this._pending += n; | ||
if (this._pending >= this._max) { | ||
this._pending = this._max; | ||
} else { | ||
this._pending += n; | ||
if (this._pending >= this._max) { | ||
this._pending = this._max; | ||
} | ||
} | ||
if (this._subscription) { | ||
this._subscription.request(n); | ||
} | ||
} | ||
if (this._subscription) { | ||
this._subscription.request(n); | ||
} | ||
}; | ||
this._active = false; | ||
this._emitting = false; | ||
this._max = max; | ||
@@ -218,3 +208,2 @@ this._pending = 0; | ||
} | ||
this._emitting = true; | ||
if (this._pending !== this._max) { | ||
@@ -231,3 +220,2 @@ this._pending--; | ||
} | ||
this._emitting = false; | ||
} | ||
@@ -234,0 +222,0 @@ onSubscribe(subscription) { |
{ | ||
"name": "rsocket-flowable", | ||
"description": "ReactiveStreams for JavaScript", | ||
"version": "0.0.5", | ||
"version": "0.0.6", | ||
"repository": { | ||
@@ -6,0 +6,0 @@ "type": "git", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
139028
-3.16%1211
-1.94%