@kjots/stream-observable
Advanced tools
Comparing version 0.0.6 to 0.0.7
/// <reference types="node" /> | ||
import { Observable } from 'rxjs'; | ||
import { Writable } from 'stream'; | ||
import { Observable, Observer } from 'rxjs'; | ||
import ReadableStream = NodeJS.ReadableStream; | ||
export declare class ObserverWritable<T> extends Writable { | ||
private readonly observer; | ||
constructor(observer: Observer<T>); | ||
onError: (error: any) => void; | ||
_write(chunk: any, encoding: string, callback: () => void): void; | ||
_final(callback: () => void): void; | ||
} | ||
export declare class StreamObservable<T> extends Observable<T> { | ||
private observerWritable; | ||
constructor(stream: ReadableStream); | ||
} | ||
export declare function streamObservable<T>(stream: ReadableStream): Observable<T>; |
@@ -5,22 +5,37 @@ "use strict"; | ||
const rxjs_1 = require("rxjs"); | ||
function streamObservable(stream) { | ||
return new rxjs_1.Observable((observer) => { | ||
const observerWritable = new stream_1.Writable({ | ||
objectMode: true, | ||
write(chunk, encoding, callback) { | ||
observer.next(chunk); | ||
callback(); | ||
}, | ||
final(callback) { | ||
observer.complete(); | ||
callback(); | ||
} | ||
class ObserverWritable extends stream_1.Writable { | ||
constructor(observer) { | ||
super({ objectMode: true }); | ||
this.onError = (error) => this.observer.error(error); | ||
this.observer = observer; | ||
} | ||
_write(chunk, encoding, callback) { | ||
this.observer.next(chunk); | ||
callback(); | ||
} | ||
_final(callback) { | ||
this.observer.complete(); | ||
callback(); | ||
} | ||
} | ||
exports.ObserverWritable = ObserverWritable; | ||
class StreamObservable extends rxjs_1.Observable { | ||
constructor(stream) { | ||
super((observer) => { | ||
this.observerWritable = new ObserverWritable(observer); | ||
stream.on('error', this.observerWritable.onError); | ||
stream.pipe(this.observerWritable); | ||
return () => { | ||
stream.unpipe(this.observerWritable); | ||
stream.off('error', this.observerWritable.onError); | ||
delete this.observerWritable; | ||
}; | ||
}); | ||
stream | ||
.on('error', error => observer.error(error)) | ||
.pipe(observerWritable); | ||
return () => stream.unpipe(observerWritable); | ||
}); | ||
} | ||
} | ||
exports.StreamObservable = StreamObservable; | ||
function streamObservable(stream) { | ||
return new StreamObservable(stream); | ||
} | ||
exports.streamObservable = streamObservable; | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "@kjots/stream-observable", | ||
"version": "0.0.6", | ||
"version": "0.0.7", | ||
"description": "Stream Observable", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -7,26 +7,48 @@ import { Writable } from 'stream'; | ||
export function streamObservable<T>(stream: ReadableStream): Observable<T> { | ||
return new Observable((observer: Observer<T>) => { | ||
const observerWritable = new Writable({ | ||
objectMode: true, | ||
export class ObserverWritable<T> extends Writable { | ||
private readonly observer: Observer<T>; | ||
write(chunk: any, encoding: string, callback: () => void) { | ||
observer.next(chunk); | ||
constructor(observer: Observer<T>) { | ||
super({ objectMode: true }); | ||
callback(); | ||
}, | ||
this.observer = observer; | ||
} | ||
final(callback: () => void) { | ||
observer.complete(); | ||
public onError = (error: any) => this.observer.error(error); | ||
callback(); | ||
} | ||
}); | ||
public _write(chunk: any, encoding: string, callback: () => void) { | ||
this.observer.next(chunk); | ||
stream | ||
.on('error', error => observer.error(error)) | ||
.pipe(observerWritable); | ||
callback(); | ||
} | ||
return () => stream.unpipe(observerWritable); | ||
}); | ||
public _final(callback: () => void) { | ||
this.observer.complete(); | ||
callback(); | ||
} | ||
} | ||
export class StreamObservable<T> extends Observable<T> { | ||
private observerWritable!: ObserverWritable<T>; | ||
constructor(stream: ReadableStream) { | ||
super((observer: Observer<T>) => { | ||
this.observerWritable = new ObserverWritable<T>(observer); | ||
stream.on('error', this.observerWritable.onError); | ||
stream.pipe(this.observerWritable); | ||
return () => { | ||
stream.unpipe(this.observerWritable); | ||
stream.off('error', this.observerWritable.onError); | ||
delete this.observerWritable; | ||
}; | ||
}); | ||
} | ||
} | ||
export function streamObservable<T>(stream: ReadableStream): Observable<T> { | ||
return new StreamObservable<T>(stream); | ||
} |
Sorry, the diff of this file is not supported yet
17098
92