@kjots/observable-stream
Advanced tools
Comparing version 0.0.6 to 0.0.7
/// <reference types="node" /> | ||
import { Readable } from 'stream'; | ||
import { Observable } from 'rxjs'; | ||
import ReadableStream = NodeJS.ReadableStream; | ||
import ReadWriteStream = NodeJS.ReadWriteStream; | ||
export declare class ObservableStream<T> extends Readable { | ||
private readonly observable; | ||
private values; | ||
private error; | ||
private complete; | ||
private stopped; | ||
constructor(observable: Observable<T>); | ||
_read(): void; | ||
private _subscribe; | ||
private _flush; | ||
private _close; | ||
} | ||
export declare function observableStream<T>(observable: Observable<T>): ReadableStream; | ||
export declare function through<T, R = T>(...transforms: Array<ReadWriteStream>): (observable: Observable<T>) => Observable<R>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const stream_observable_1 = require("@kjots/stream-observable"); | ||
const readable_stream_1 = require("readable-stream"); | ||
const stream_1 = require("stream"); | ||
const rxjs_1 = require("rxjs"); | ||
class ObservableStream extends stream_1.Readable { | ||
constructor(observable) { | ||
super({ objectMode: true }); | ||
this.observable = observable; | ||
} | ||
_read() { | ||
this.stopped = false; | ||
if (this.values === undefined) { | ||
this._subscribe(); | ||
} | ||
this._flush(); | ||
} | ||
_subscribe() { | ||
this.values = []; | ||
this.observable.subscribe({ | ||
next: (value) => { | ||
this.values.push(value); | ||
this._flush(); | ||
}, | ||
error: (error) => { | ||
this.error = error; | ||
this._flush(); | ||
}, | ||
complete: () => { | ||
this.complete = true; | ||
this._flush(); | ||
} | ||
}); | ||
} | ||
_flush() { | ||
if (this.stopped) { | ||
return; | ||
} | ||
while (this.values.length !== 0) { | ||
this.stopped = !this.push(this.values.shift()); | ||
if (this.stopped) { | ||
return; | ||
} | ||
} | ||
if (this.error) { | ||
this.emit('error', this.error); | ||
this._close(); | ||
} | ||
else if (this.complete) { | ||
this.push(null); | ||
this._close(); | ||
} | ||
} | ||
_close() { | ||
delete this.values; | ||
delete this.error; | ||
delete this.complete; | ||
this.stopped = true; | ||
} | ||
} | ||
exports.ObservableStream = ObservableStream; | ||
function observableStream(observable) { | ||
const stream = readable_stream_1.Readable({ objectMode: true }); | ||
stream._read = onceify(() => observable.subscribe(data => stream.push(data), error => stream.emit('error', error), () => stream.push(null))); | ||
return stream; | ||
return new ObservableStream(observable); | ||
} | ||
@@ -15,15 +69,10 @@ exports.observableStream = observableStream; | ||
const errorSubject = new rxjs_1.Subject(); | ||
return rxjs_1.merge(errorSubject, stream_observable_1.streamObservable(transforms.reduce((stream, transform) => stream.on('error', error => errorSubject.error(error)).pipe(transform), observableStream(observable)))); | ||
return rxjs_1.merge(errorSubject, stream_observable_1.streamObservable(transforms | ||
.reduce((stream, transform) => stream | ||
.on('error', error => errorSubject.error(error)) | ||
.pipe(transform), observableStream(observable)) | ||
.on('end', () => errorSubject.complete()))); | ||
}; | ||
} | ||
exports.through = through; | ||
function onceify(fn) { | ||
let invoked = false; | ||
return function (...args) { | ||
if (!invoked) { | ||
invoked = true; | ||
return fn.call(this, ...args); | ||
} | ||
}; | ||
} | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "@kjots/observable-stream", | ||
"version": "0.0.6", | ||
"version": "0.0.7", | ||
"description": "Observable Stream", | ||
@@ -38,3 +38,2 @@ "main": "dist/index.js", | ||
"@types/node": "^10.14.4", | ||
"@types/through2": "^2.0.34", | ||
"chai": "^4.2.0", | ||
@@ -44,3 +43,2 @@ "mocha": "^6.0.2", | ||
"rimraf": "^2.6.3", | ||
"through2": "^3.0.1", | ||
"ts-node": "^8.0.3", | ||
@@ -51,6 +49,5 @@ "tslint": "^5.15.0", | ||
"dependencies": { | ||
"@kjots/stream-observable": "^0.0.5", | ||
"readable-stream": "^3.3.0", | ||
"@kjots/stream-observable": "^0.0.7", | ||
"rxjs": "^6.4.0" | ||
} | ||
} |
115
src/index.ts
import { streamObservable } from '@kjots/stream-observable'; | ||
import { Readable } from 'readable-stream'; | ||
import { Readable } from 'stream'; | ||
import { merge, Observable, Subject } from 'rxjs'; | ||
@@ -9,16 +10,88 @@ | ||
export function observableStream<T>(observable: Observable<T>): ReadableStream { | ||
const stream = Readable({ objectMode: true }); | ||
export class ObservableStream<T> extends Readable { | ||
private readonly observable: Observable<T>; | ||
stream._read = onceify(() => | ||
observable.subscribe( | ||
data => stream.push(data), | ||
error => stream.emit('error', error), | ||
() => stream.push(null) | ||
) | ||
); | ||
private values!: Array<T>; | ||
private error!: any; | ||
private complete!: boolean; | ||
return stream; | ||
private stopped!: boolean; | ||
constructor(observable: Observable<T>) { | ||
super({ objectMode: true }); | ||
this.observable = observable; | ||
} | ||
public _read() { | ||
this.stopped = false; | ||
if (this.values === undefined) { | ||
this._subscribe(); | ||
} | ||
this._flush(); | ||
} | ||
private _subscribe() { | ||
this.values = []; | ||
this.observable.subscribe({ | ||
next: (value: T) => { | ||
this.values.push(value); | ||
this._flush(); | ||
}, | ||
error: (error: any) => { | ||
this.error = error; | ||
this._flush(); | ||
}, | ||
complete: () => { | ||
this.complete = true; | ||
this._flush(); | ||
} | ||
}); | ||
} | ||
private _flush() { | ||
if (this.stopped) { | ||
return; | ||
} | ||
while (this.values.length !== 0) { | ||
this.stopped = !this.push(this.values.shift()); | ||
if (this.stopped) { | ||
return; | ||
} | ||
} | ||
if (this.error) { | ||
this.emit('error', this.error); | ||
this._close(); | ||
} else if (this.complete) { | ||
this.push(null); | ||
this._close(); | ||
} | ||
} | ||
private _close() { | ||
delete this.values; | ||
delete this.error; | ||
delete this.complete; | ||
this.stopped = true; | ||
} | ||
} | ||
export function observableStream<T>(observable: Observable<T>): ReadableStream { | ||
return new ObservableStream(observable); | ||
} | ||
export function through<T, R = T>(...transforms: Array<ReadWriteStream>): (observable: Observable<T>) => Observable<R> { | ||
@@ -29,17 +102,11 @@ return (observable: Observable<T>) => { | ||
return merge(errorSubject, streamObservable( | ||
transforms.reduce((stream, transform) => stream.on('error', error => errorSubject.error(error)).pipe(transform), observableStream(observable)) | ||
transforms | ||
.reduce((stream, transform) => | ||
stream | ||
.on('error', error => errorSubject.error(error)) | ||
.pipe(transform), observableStream(observable) | ||
) | ||
.on('end', () => errorSubject.complete()) | ||
)); | ||
}; | ||
} | ||
function onceify(fn: (this: any, ...args: Array<any>) => any): (this: any, ...args: Array<any>) => any { | ||
let invoked = false; | ||
return function (this: any, ...args: Array<any>): any { | ||
if (!invoked) { | ||
invoked = true; | ||
return fn.call(this, ...args); | ||
} | ||
}; | ||
} |
Sorry, the diff of this file is not supported yet
20037
2
11
176
+ Added@kjots/stream-observable@0.0.7(transitive)
- Removedreadable-stream@^3.3.0
- Removed@kjots/stream-observable@0.0.5(transitive)
- Removedany-observable@0.2.0(transitive)
- Removedinherits@2.0.4(transitive)
- Removedreadable-stream@3.6.2(transitive)
- Removedrxjs-compat@6.6.7(transitive)
- Removedsafe-buffer@5.2.1(transitive)
- Removedstream-to-observable@0.2.0(transitive)
- Removedstring_decoder@1.3.0(transitive)
- Removedutil-deprecate@1.0.2(transitive)