New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cycle/xstream-adapter

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cycle/xstream-adapter - npm Package Compare versions

Comparing version 2.0.0 to 3.0.0

33

lib/index.js
"use strict";
var xstream_1 = require('xstream');
function logToConsoleError(err) {
var target = err.stack || err;
if (console && console.error) {
console.error(target);
}
else if (console && console.log) {
console.log(target);
}
}
var XStreamAdapter = {

@@ -21,7 +12,3 @@ adapt: function (originStream, originStreamSubscribe) {

start: function (out) {
var observer = {
next: function (value) { return out.shamefullySendNext(value); },
error: function (err) { return out.shamefullySendError(err); },
complete: function () { return out.shamefullySendComplete(); },
};
var observer = out;
dispose = originStreamSubscribe(originStream, observer);

@@ -36,12 +23,2 @@ },

},
dispose: function (sinks, sinkProxies, sources) {
Object.keys(sources).forEach(function (k) {
if (typeof sources[k].dispose === 'function') {
sources[k].dispose();
}
});
Object.keys(sinks).forEach(function (k) {
sinks[k].removeListener(sinkProxies[k].stream);
});
},
makeSubject: function () {

@@ -51,6 +28,3 @@ var stream = xstream_1.default.create();

next: function (x) { stream.shamefullySendNext(x); },
error: function (err) {
logToConsoleError(err);
stream.shamefullySendError(err);
},
error: function (err) { stream.shamefullySendError(err); },
complete: function () { stream.shamefullySendComplete(); }

@@ -60,2 +34,5 @@ };

},
remember: function (stream) {
return stream.remember();
},
isValidStream: function (stream) {

@@ -62,0 +39,0 @@ return (typeof stream.addListener === 'function' &&

4

package.json
{
"name": "@cycle/xstream-adapter",
"version": "2.0.0",
"version": "3.0.0",
"description": "Cycle.js xstream Stream Adapter",

@@ -34,3 +34,3 @@ "main": "lib/index.js",

"devDependencies": {
"@cycle/base": "^3.0.0",
"@cycle/base": "^4.0.0",
"assert": "^1.3.0",

@@ -37,0 +37,0 @@ "babel-preset-es2015": "^6.6.0",

import {
StreamAdapter,
Observer,
SinkProxies,
StreamSubscribe,

@@ -9,25 +8,12 @@ DisposeFunction,

} from '@cycle/base';
import xs, {Stream, Producer} from 'xstream';
import xs, {Stream, MemoryStream, Listener, Producer} from 'xstream';
function logToConsoleError(err: any) {
const target = err.stack || err;
if (console && console.error) {
console.error(target);
} else if (console && console.log) {
console.log(target);
}
}
const XStreamAdapter: StreamAdapter = {
adapt(originStream: any, originStreamSubscribe: StreamSubscribe): any {
adapt<T>(originStream: any, originStreamSubscribe: StreamSubscribe): Stream<T> {
if (XStreamAdapter.isValidStream(originStream)) { return originStream; };
let dispose: any = null;
return xs.create((<Producer<any>>{
start(out: any) {
const observer: Observer = {
next: (value: any) => out.shamefullySendNext(value),
error: (err: any) => out.shamefullySendError(err),
complete: () => out.shamefullySendComplete(),
};
return xs.create<T>((<Producer<T>>{
start(out: Listener<T>) {
const observer: Observer<T> = out;
dispose = originStreamSubscribe(originStream, observer);

@@ -43,14 +29,3 @@ },

dispose(sinks: any, sinkProxies: SinkProxies, sources: any) {
Object.keys(sources).forEach(k => {
if (typeof sources[k].dispose === 'function') {
sources[k].dispose();
}
});
Object.keys(sinks).forEach(k => {
sinks[k].removeListener(sinkProxies[k].stream);
});
},
makeSubject(): Subject {
makeSubject<T>(): Subject<T> {
const stream = xs.create();

@@ -60,6 +35,3 @@

next: (x: any) => { stream.shamefullySendNext(x); },
error: (err: any) => {
logToConsoleError(err);
stream.shamefullySendError(err);
},
error: (err: any) => { stream.shamefullySendError(err); },
complete: () => { stream.shamefullySendComplete(); }

@@ -71,2 +43,6 @@ };

remember<T>(stream: Stream<T>): MemoryStream<T> {
return stream.remember();
},
isValidStream(stream: any): boolean {

@@ -78,3 +54,3 @@ return (

streamSubscribe(stream: Stream<any>, observer: Observer) {
streamSubscribe(stream: Stream<any>, observer: Observer<any>) {
stream.addListener(observer);

@@ -81,0 +57,0 @@ return () => stream.removeListener(observer);

@@ -11,3 +11,3 @@ import {describe, it} from 'mocha';

assert.strictEqual(typeof XStreamAdapter.adapt, 'function');
assert.strictEqual(typeof XStreamAdapter.dispose, 'function');
assert.strictEqual(typeof XStreamAdapter.remember, 'function');
assert.strictEqual(typeof XStreamAdapter.makeSubject, 'function');

@@ -71,23 +71,32 @@ assert.strictEqual(typeof XStreamAdapter.isValidStream, 'function');

it('should not complete a sink stream when dispose() is called', (done) => {
const sinkProxy = XStreamAdapter.makeSubject();
const sink = xs.periodic(50);
it('should create a remembered subject which can be fed and subscribed to', (done) => {
const subject = XStreamAdapter.makeSubject();
assert.strictEqual(subject.stream instanceof Stream, true);
assert.strictEqual(XStreamAdapter.isValidStream(subject.stream), true);
const remembered = XStreamAdapter.remember(subject.stream);
const expectedProxy = [0, 1];
sinkProxy.stream.addListener({
next: (x) => {
assert.strictEqual(x, expectedProxy.shift());
if (expectedProxy.length === 0) {
XStreamAdapter.dispose({A: sink}, {A: sinkProxy}, {});
setTimeout(() => {
done();
}, 75);
}
},
error: err => done(err),
complete: () => done('complete should not be called'),
const observer1Expected = [1, 2, 3, 4];
const observer2Expected = [2, 3, 4];
XStreamAdapter.streamSubscribe(remembered, {
next: (x) => assert.strictEqual(x, observer1Expected.shift()),
error: done,
complete: () => assert.strictEqual(observer1Expected.length, 0),
});
sink._add(sinkProxy.stream);
subject.observer.next(1);
subject.observer.next(2);
XStreamAdapter.streamSubscribe(remembered, {
next: (x) => assert.strictEqual(x, observer2Expected.shift()),
error: done,
complete: () => assert.strictEqual(observer2Expected.length, 0),
});
subject.observer.next(3);
subject.observer.next(4);
subject.observer.complete();
setTimeout(done, 20);
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc