Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@kjots/observable-stream

Package Overview
Dependencies
Maintainers
1
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@kjots/observable-stream - npm Package Compare versions

Comparing version 0.0.6 to 0.0.7

13

dist/index.d.ts
/// <reference types="node" />
import { Readable } from 'stream';
import { Observable } from 'rxjs';
import ReadableStream = NodeJS.ReadableStream;
import ReadWriteStream = NodeJS.ReadWriteStream;
export declare class ObservableStream<T> extends Readable {
private readonly observable;
private values;
private error;
private complete;
private stopped;
constructor(observable: Observable<T>);
_read(): void;
private _subscribe;
private _flush;
private _close;
}
export declare function observableStream<T>(observable: Observable<T>): ReadableStream;
export declare function through<T, R = T>(...transforms: Array<ReadWriteStream>): (observable: Observable<T>) => Observable<R>;

77

dist/index.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const stream_observable_1 = require("@kjots/stream-observable");
const readable_stream_1 = require("readable-stream");
const stream_1 = require("stream");
const rxjs_1 = require("rxjs");
class ObservableStream extends stream_1.Readable {
constructor(observable) {
super({ objectMode: true });
this.observable = observable;
}
_read() {
this.stopped = false;
if (this.values === undefined) {
this._subscribe();
}
this._flush();
}
_subscribe() {
this.values = [];
this.observable.subscribe({
next: (value) => {
this.values.push(value);
this._flush();
},
error: (error) => {
this.error = error;
this._flush();
},
complete: () => {
this.complete = true;
this._flush();
}
});
}
_flush() {
if (this.stopped) {
return;
}
while (this.values.length !== 0) {
this.stopped = !this.push(this.values.shift());
if (this.stopped) {
return;
}
}
if (this.error) {
this.emit('error', this.error);
this._close();
}
else if (this.complete) {
this.push(null);
this._close();
}
}
_close() {
delete this.values;
delete this.error;
delete this.complete;
this.stopped = true;
}
}
exports.ObservableStream = ObservableStream;
function observableStream(observable) {
const stream = readable_stream_1.Readable({ objectMode: true });
stream._read = onceify(() => observable.subscribe(data => stream.push(data), error => stream.emit('error', error), () => stream.push(null)));
return stream;
return new ObservableStream(observable);
}

@@ -15,15 +69,10 @@ exports.observableStream = observableStream;

const errorSubject = new rxjs_1.Subject();
return rxjs_1.merge(errorSubject, stream_observable_1.streamObservable(transforms.reduce((stream, transform) => stream.on('error', error => errorSubject.error(error)).pipe(transform), observableStream(observable))));
return rxjs_1.merge(errorSubject, stream_observable_1.streamObservable(transforms
.reduce((stream, transform) => stream
.on('error', error => errorSubject.error(error))
.pipe(transform), observableStream(observable))
.on('end', () => errorSubject.complete())));
};
}
exports.through = through;
function onceify(fn) {
let invoked = false;
return function (...args) {
if (!invoked) {
invoked = true;
return fn.call(this, ...args);
}
};
}
//# sourceMappingURL=index.js.map

7

package.json
{
"name": "@kjots/observable-stream",
"version": "0.0.6",
"version": "0.0.7",
"description": "Observable Stream",

@@ -38,3 +38,2 @@ "main": "dist/index.js",

"@types/node": "^10.14.4",
"@types/through2": "^2.0.34",
"chai": "^4.2.0",

@@ -44,3 +43,2 @@ "mocha": "^6.0.2",

"rimraf": "^2.6.3",
"through2": "^3.0.1",
"ts-node": "^8.0.3",

@@ -51,6 +49,5 @@ "tslint": "^5.15.0",

"dependencies": {
"@kjots/stream-observable": "^0.0.5",
"readable-stream": "^3.3.0",
"@kjots/stream-observable": "^0.0.7",
"rxjs": "^6.4.0"
}
}
import { streamObservable } from '@kjots/stream-observable';
import { Readable } from 'readable-stream';
import { Readable } from 'stream';
import { merge, Observable, Subject } from 'rxjs';

@@ -9,16 +10,88 @@

export function observableStream<T>(observable: Observable<T>): ReadableStream {
const stream = Readable({ objectMode: true });
export class ObservableStream<T> extends Readable {
private readonly observable: Observable<T>;
stream._read = onceify(() =>
observable.subscribe(
data => stream.push(data),
error => stream.emit('error', error),
() => stream.push(null)
)
);
private values!: Array<T>;
private error!: any;
private complete!: boolean;
return stream;
private stopped!: boolean;
constructor(observable: Observable<T>) {
super({ objectMode: true });
this.observable = observable;
}
public _read() {
this.stopped = false;
if (this.values === undefined) {
this._subscribe();
}
this._flush();
}
private _subscribe() {
this.values = [];
this.observable.subscribe({
next: (value: T) => {
this.values.push(value);
this._flush();
},
error: (error: any) => {
this.error = error;
this._flush();
},
complete: () => {
this.complete = true;
this._flush();
}
});
}
private _flush() {
if (this.stopped) {
return;
}
while (this.values.length !== 0) {
this.stopped = !this.push(this.values.shift());
if (this.stopped) {
return;
}
}
if (this.error) {
this.emit('error', this.error);
this._close();
} else if (this.complete) {
this.push(null);
this._close();
}
}
private _close() {
delete this.values;
delete this.error;
delete this.complete;
this.stopped = true;
}
}
export function observableStream<T>(observable: Observable<T>): ReadableStream {
return new ObservableStream(observable);
}
export function through<T, R = T>(...transforms: Array<ReadWriteStream>): (observable: Observable<T>) => Observable<R> {

@@ -29,17 +102,11 @@ return (observable: Observable<T>) => {

return merge(errorSubject, streamObservable(
transforms.reduce((stream, transform) => stream.on('error', error => errorSubject.error(error)).pipe(transform), observableStream(observable))
transforms
.reduce((stream, transform) =>
stream
.on('error', error => errorSubject.error(error))
.pipe(transform), observableStream(observable)
)
.on('end', () => errorSubject.complete())
));
};
}
function onceify(fn: (this: any, ...args: Array<any>) => any): (this: any, ...args: Array<any>) => any {
let invoked = false;
return function (this: any, ...args: Array<any>): any {
if (!invoked) {
invoked = true;
return fn.call(this, ...args);
}
};
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc