Comparing version 1.0.0 to 1.0.1
@@ -23,3 +23,3 @@ const { | ||
next(data) { | ||
this.sinks.forEach(s => s.next(data)) | ||
this.sinks.concat().forEach(s => s.next(data)) | ||
} | ||
@@ -31,6 +31,6 @@ complete(err) { | ||
} | ||
exports.share = source => { | ||
exports.share = () => source => { | ||
const share = new Share(null, source) | ||
return sink => { | ||
sink.defer([share.remove, share, this]) | ||
sink.defer([share.remove, share, sink]) | ||
share.add(sink); | ||
@@ -42,3 +42,3 @@ } | ||
const buffer = [] | ||
share.next = function(data) { | ||
share.next = function (data) { | ||
buffer.push(data) | ||
@@ -52,4 +52,4 @@ if (buffer.length > bufferSize) { | ||
sink.defer([share.remove, share, this]) | ||
buffer.forEach(cache => sink.next(cache)) | ||
share.add(sink); | ||
buffer.forEach(cache => sink.next(cache)) | ||
} | ||
@@ -136,4 +136,4 @@ } | ||
const array = new Array(nTotal) | ||
// const defers = new Array(nTotal) | ||
// for (let i = 0; i < nTotal; ++i) defers[i] = sources[i](new CombineLatest(sink, i, array, context)) | ||
// const defers = new Array(nTotal) | ||
// for (let i = 0; i < nTotal; ++i) defers[i] = sources[i](new CombineLatest(sink, i, array, context)) | ||
sources.forEach((source, i) => source(new CombineLatest(sink, i, array, context))) | ||
@@ -140,0 +140,0 @@ } |
@@ -28,3 +28,5 @@ declare namespace Rx { | ||
switchMapTo(source: Observable): Observable | ||
BufferTime(miniseconds: number): Observable | ||
bufferTime(miniseconds: number): Observable | ||
catchError(selector: (e: Error | any) => Observable): Observable | ||
toPromise(): Promise<any> | ||
@@ -34,6 +36,8 @@ subscribe(n: (d: any) => void, e: (d: Error) => void, c: () => void): Sink | ||
interface Creator { | ||
(f: (sink: Sink) => void): Observable | ||
of(...args: Array<any>): Observable | ||
from(source: Array<any> | Promise | any): Observable | ||
fromArray(array: Array<any>): Observable | ||
create(f: (sink: Sink) => void): Observable | ||
bindCallback(f: Function, thisArg: any, ...args: Array<any>): Observable | ||
bindCallback(f: (...args: Array<any>, callback: (res: any) => void) => void, thisArg: any, ...args: Array<any>): Observable | ||
bindNodeCallback(f: (...args: Array<any>, callback: (err: Error | any, res: any) => void) => void, thisArg: any, ...args: Array<any>): Observable | ||
iif(condition: () => Boolean, trueSource: Observable, falseSource: Observable): Observable | ||
@@ -45,2 +49,5 @@ race(...sources: Array<Observable>): Observable | ||
combineLatest(...sources: Array<Observable>): Observable | ||
never(): Observable | ||
empty(): Observable | ||
throwError(e: Error | any): Observable | ||
} | ||
@@ -67,5 +74,6 @@ } | ||
export function deliver(Class: Function): Deliver; | ||
export function create(f: (sink: Sink) => void): Observable | ||
export function from(source: Array<any> | Promise | any): Observable | ||
export function fromArray(array: Array<any>): Observable | ||
export function bindCallback(f: Function, thisArg: any, ...args: Array<any>): Observable | ||
export function bindCallback(f: (...args: Array<any>, callback: (res: any) => void) => void, thisArg: any, ...args: Array<any>): Observable | ||
export function bindNodeCallback(f: (...args: Array<any>, callback: (err: Error | any, res: any) => void) => void, thisArg: any, ...args: Array<any>): Observable | ||
export function iif(condition: () => Boolean, trueSource: Observable, falseSource: Observable): Observable | ||
@@ -77,3 +85,7 @@ export function race(...sources: Array<Observable>): Observable | ||
export function combineLatest(...sources: Array<Observable>): Observable | ||
export function never(): Observable | ||
export function empty(): Observable | ||
export function throwError(e: Error | any): Observable | ||
export function take(count: number): Observable | ||
@@ -104,3 +116,4 @@ export function takeUntil(source: Observable): Observable | ||
export function switchMapTo(source: Observable): Observable | ||
export function BufferTime(miniseconds: number): Observable | ||
export function bufferTime(miniseconds: number): Observable | ||
export function catchError(selector: (e: Error | any) => Observable): Observable | ||
export const toPromise: Observer<Sink> | ||
@@ -107,0 +120,0 @@ export function subscribe(n: (d: any) => void, e: (d: Error) => void, c: () => void): Observer<Sink> |
18
index.js
@@ -85,3 +85,15 @@ const { | ||
exports.delay = deliver(Delay) | ||
class CatchError extends Sink { | ||
init(selector) { | ||
this.selector = selector | ||
} | ||
complete(err) { | ||
if (err) { | ||
this.selector(err)(this.sink) | ||
} else { | ||
super.complete() | ||
} | ||
} | ||
} | ||
exports.catchError = deliver(CatchError) | ||
Object.assign(exports, require('./combination'), require('./filtering'), require('./mathematical'), require('./producer'), require('./transformation')) | ||
@@ -92,8 +104,8 @@ | ||
const rxProxy = { | ||
get: (target, prop) => (...args) => new Proxy(exports[prop](...args)(target), rxProxy) | ||
get: (target, prop) => target[prop] || ((...args) => new Proxy(exports[prop](...args)(target), rxProxy)) | ||
} | ||
exports.rx = new Proxy({}, { | ||
exports.rx = new Proxy(f => new Proxy(f, rxProxy), { | ||
get: (target, prop) => (...args) => new Proxy(exports[prop](...args), rxProxy), | ||
set: (target, prop, value) => exports[prop] = value | ||
}) |
{ | ||
"name": "fastrx", | ||
"version": "1.0.0", | ||
"version": "1.0.1", | ||
"description": "fast rxjs implemention", | ||
@@ -24,2 +24,2 @@ "main": "index.js", | ||
"homepage": "https://github.com/langhuihui/rx4rx#readme" | ||
} | ||
} |
@@ -8,6 +8,5 @@ const { | ||
} = require('./combination') | ||
exports.create = f => sink => f(sink) | ||
exports.subject = source => { | ||
let subSink = null | ||
const observable = share(sink => { | ||
const observable = share()(sink => { | ||
subSink = sink | ||
@@ -105,4 +104,4 @@ source && source(subSink) | ||
} | ||
exports.never = noop | ||
exports.never = () => noop | ||
exports.throwError = e => sink => sink.complete(e) | ||
exports.empty = exports.throwError() | ||
exports.empty = () => exports.throwError() |
34281
1042