New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@kjots/stream-observable

Package Overview
Dependencies
Maintainers
1
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@kjots/stream-observable - npm Package Compare versions

Comparing version 0.0.6 to 0.0.7

14

dist/index.d.ts
/// <reference types="node" />
import { Observable } from 'rxjs';
import { Writable } from 'stream';
import { Observable, Observer } from 'rxjs';
import ReadableStream = NodeJS.ReadableStream;
export declare class ObserverWritable<T> extends Writable {
private readonly observer;
constructor(observer: Observer<T>);
onError: (error: any) => void;
_write(chunk: any, encoding: string, callback: () => void): void;
_final(callback: () => void): void;
}
export declare class StreamObservable<T> extends Observable<T> {
private observerWritable;
constructor(stream: ReadableStream);
}
export declare function streamObservable<T>(stream: ReadableStream): Observable<T>;

49

dist/index.js

@@ -5,22 +5,37 @@ "use strict";

const rxjs_1 = require("rxjs");
function streamObservable(stream) {
return new rxjs_1.Observable((observer) => {
const observerWritable = new stream_1.Writable({
objectMode: true,
write(chunk, encoding, callback) {
observer.next(chunk);
callback();
},
final(callback) {
observer.complete();
callback();
}
class ObserverWritable extends stream_1.Writable {
constructor(observer) {
super({ objectMode: true });
this.onError = (error) => this.observer.error(error);
this.observer = observer;
}
_write(chunk, encoding, callback) {
this.observer.next(chunk);
callback();
}
_final(callback) {
this.observer.complete();
callback();
}
}
exports.ObserverWritable = ObserverWritable;
class StreamObservable extends rxjs_1.Observable {
constructor(stream) {
super((observer) => {
this.observerWritable = new ObserverWritable(observer);
stream.on('error', this.observerWritable.onError);
stream.pipe(this.observerWritable);
return () => {
stream.unpipe(this.observerWritable);
stream.off('error', this.observerWritable.onError);
delete this.observerWritable;
};
});
stream
.on('error', error => observer.error(error))
.pipe(observerWritable);
return () => stream.unpipe(observerWritable);
});
}
}
exports.StreamObservable = StreamObservable;
function streamObservable(stream) {
return new StreamObservable(stream);
}
exports.streamObservable = streamObservable;
//# sourceMappingURL=index.js.map
{
"name": "@kjots/stream-observable",
"version": "0.0.6",
"version": "0.0.7",
"description": "Stream Observable",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -7,26 +7,48 @@ import { Writable } from 'stream';

export function streamObservable<T>(stream: ReadableStream): Observable<T> {
return new Observable((observer: Observer<T>) => {
const observerWritable = new Writable({
objectMode: true,
export class ObserverWritable<T> extends Writable {
private readonly observer: Observer<T>;
write(chunk: any, encoding: string, callback: () => void) {
observer.next(chunk);
constructor(observer: Observer<T>) {
super({ objectMode: true });
callback();
},
this.observer = observer;
}
final(callback: () => void) {
observer.complete();
public onError = (error: any) => this.observer.error(error);
callback();
}
});
public _write(chunk: any, encoding: string, callback: () => void) {
this.observer.next(chunk);
stream
.on('error', error => observer.error(error))
.pipe(observerWritable);
callback();
}
return () => stream.unpipe(observerWritable);
});
public _final(callback: () => void) {
this.observer.complete();
callback();
}
}
export class StreamObservable<T> extends Observable<T> {
private observerWritable!: ObserverWritable<T>;
constructor(stream: ReadableStream) {
super((observer: Observer<T>) => {
this.observerWritable = new ObserverWritable<T>(observer);
stream.on('error', this.observerWritable.onError);
stream.pipe(this.observerWritable);
return () => {
stream.unpipe(this.observerWritable);
stream.off('error', this.observerWritable.onError);
delete this.observerWritable;
};
});
}
}
export function streamObservable<T>(stream: ReadableStream): Observable<T> {
return new StreamObservable<T>(stream);
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc