Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@kjots/observable-stream

Package Overview
Dependencies
Maintainers
1
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@kjots/observable-stream - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

11

CHANGELOG.md

@@ -6,2 +6,13 @@ # Change Log

# [0.2.0](https://github.com/kjots/stream-utils/compare/v0.1.0...v0.2.0) (2020-01-21)
### Features
* **observable-stream:** update `through()` function to assemble pipeline on subscribe ([2244f1e](https://github.com/kjots/stream-utils/commit/2244f1e9d33562a0eaac62258812f327d77549eb))
# 0.1.0 (2020-01-17)

@@ -8,0 +19,0 @@

6

dist/index.js

@@ -67,3 +67,3 @@ "use strict";

function through(...transforms) {
return (observable) => {
return (observable) => new rxjs_1.Observable((observer) => {
const errorSubject = new rxjs_1.Subject();

@@ -74,6 +74,6 @@ return rxjs_1.merge(errorSubject, stream_observable_1.streamObservable(transforms

.pipe(transform), observableStream(observable))
.on('end', () => errorSubject.complete())));
};
.on('end', () => errorSubject.complete()))).subscribe(observer);
});
}
exports.through = through;
//# sourceMappingURL=index.js.map
{
"name": "@kjots/observable-stream",
"version": "0.1.0",
"version": "0.2.0",
"description": "Observable Stream",

@@ -26,6 +26,6 @@ "main": "dist/index.js",

"dependencies": {
"@kjots/stream-observable": "^0.1.0",
"@kjots/stream-observable": "^0.2.0",
"rxjs": "^6.5.4"
},
"gitHead": "7d873c1fced46df9dc8288ea7c67a5f18984abdb"
"gitHead": "0e390df221a6d3d2f9864c57bc26114641e978f5"
}

@@ -5,3 +5,3 @@ import { streamObservable } from '@kjots/stream-observable';

import { merge, Observable, Subject } from 'rxjs';
import { merge, Observable, Observer, Subject } from 'rxjs';

@@ -98,3 +98,3 @@ import ReadableStream = NodeJS.ReadableStream;

export function through<T, R = T>(...transforms: Array<ReadWriteStream>): (observable: Observable<T>) => Observable<R> {
return (observable: Observable<T>) => {
return (observable: Observable<T>) => new Observable<R>((observer: Observer<R>) => {
const errorSubject = new Subject<any>();

@@ -104,10 +104,11 @@

transforms
.reduce((stream, transform) =>
stream
.reduce(
(stream, transform) => stream
.on('error', error => errorSubject.error(error))
.pipe(transform), observableStream(observable)
.pipe(transform),
observableStream(observable)
)
.on('end', () => errorSubject.complete())
));
};
)).subscribe(observer);
});
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc