@cycle/xstream-adapter
Advanced tools
Comparing version 2.0.0 to 3.0.0
"use strict"; | ||
var xstream_1 = require('xstream'); | ||
function logToConsoleError(err) { | ||
var target = err.stack || err; | ||
if (console && console.error) { | ||
console.error(target); | ||
} | ||
else if (console && console.log) { | ||
console.log(target); | ||
} | ||
} | ||
var XStreamAdapter = { | ||
@@ -21,7 +12,3 @@ adapt: function (originStream, originStreamSubscribe) { | ||
start: function (out) { | ||
var observer = { | ||
next: function (value) { return out.shamefullySendNext(value); }, | ||
error: function (err) { return out.shamefullySendError(err); }, | ||
complete: function () { return out.shamefullySendComplete(); }, | ||
}; | ||
var observer = out; | ||
dispose = originStreamSubscribe(originStream, observer); | ||
@@ -36,12 +23,2 @@ }, | ||
}, | ||
dispose: function (sinks, sinkProxies, sources) { | ||
Object.keys(sources).forEach(function (k) { | ||
if (typeof sources[k].dispose === 'function') { | ||
sources[k].dispose(); | ||
} | ||
}); | ||
Object.keys(sinks).forEach(function (k) { | ||
sinks[k].removeListener(sinkProxies[k].stream); | ||
}); | ||
}, | ||
makeSubject: function () { | ||
@@ -51,6 +28,3 @@ var stream = xstream_1.default.create(); | ||
next: function (x) { stream.shamefullySendNext(x); }, | ||
error: function (err) { | ||
logToConsoleError(err); | ||
stream.shamefullySendError(err); | ||
}, | ||
error: function (err) { stream.shamefullySendError(err); }, | ||
complete: function () { stream.shamefullySendComplete(); } | ||
@@ -60,2 +34,5 @@ }; | ||
}, | ||
remember: function (stream) { | ||
return stream.remember(); | ||
}, | ||
isValidStream: function (stream) { | ||
@@ -62,0 +39,0 @@ return (typeof stream.addListener === 'function' && |
{ | ||
"name": "@cycle/xstream-adapter", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "Cycle.js xstream Stream Adapter", | ||
@@ -34,3 +34,3 @@ "main": "lib/index.js", | ||
"devDependencies": { | ||
"@cycle/base": "^3.0.0", | ||
"@cycle/base": "^4.0.0", | ||
"assert": "^1.3.0", | ||
@@ -37,0 +37,0 @@ "babel-preset-es2015": "^6.6.0", |
import { | ||
StreamAdapter, | ||
Observer, | ||
SinkProxies, | ||
StreamSubscribe, | ||
@@ -9,25 +8,12 @@ DisposeFunction, | ||
} from '@cycle/base'; | ||
import xs, {Stream, Producer} from 'xstream'; | ||
import xs, {Stream, MemoryStream, Listener, Producer} from 'xstream'; | ||
function logToConsoleError(err: any) { | ||
const target = err.stack || err; | ||
if (console && console.error) { | ||
console.error(target); | ||
} else if (console && console.log) { | ||
console.log(target); | ||
} | ||
} | ||
const XStreamAdapter: StreamAdapter = { | ||
adapt(originStream: any, originStreamSubscribe: StreamSubscribe): any { | ||
adapt<T>(originStream: any, originStreamSubscribe: StreamSubscribe): Stream<T> { | ||
if (XStreamAdapter.isValidStream(originStream)) { return originStream; }; | ||
let dispose: any = null; | ||
return xs.create((<Producer<any>>{ | ||
start(out: any) { | ||
const observer: Observer = { | ||
next: (value: any) => out.shamefullySendNext(value), | ||
error: (err: any) => out.shamefullySendError(err), | ||
complete: () => out.shamefullySendComplete(), | ||
}; | ||
return xs.create<T>((<Producer<T>>{ | ||
start(out: Listener<T>) { | ||
const observer: Observer<T> = out; | ||
dispose = originStreamSubscribe(originStream, observer); | ||
@@ -43,14 +29,3 @@ }, | ||
dispose(sinks: any, sinkProxies: SinkProxies, sources: any) { | ||
Object.keys(sources).forEach(k => { | ||
if (typeof sources[k].dispose === 'function') { | ||
sources[k].dispose(); | ||
} | ||
}); | ||
Object.keys(sinks).forEach(k => { | ||
sinks[k].removeListener(sinkProxies[k].stream); | ||
}); | ||
}, | ||
makeSubject(): Subject { | ||
makeSubject<T>(): Subject<T> { | ||
const stream = xs.create(); | ||
@@ -60,6 +35,3 @@ | ||
next: (x: any) => { stream.shamefullySendNext(x); }, | ||
error: (err: any) => { | ||
logToConsoleError(err); | ||
stream.shamefullySendError(err); | ||
}, | ||
error: (err: any) => { stream.shamefullySendError(err); }, | ||
complete: () => { stream.shamefullySendComplete(); } | ||
@@ -71,2 +43,6 @@ }; | ||
remember<T>(stream: Stream<T>): MemoryStream<T> { | ||
return stream.remember(); | ||
}, | ||
isValidStream(stream: any): boolean { | ||
@@ -78,3 +54,3 @@ return ( | ||
streamSubscribe(stream: Stream<any>, observer: Observer) { | ||
streamSubscribe(stream: Stream<any>, observer: Observer<any>) { | ||
stream.addListener(observer); | ||
@@ -81,0 +57,0 @@ return () => stream.removeListener(observer); |
@@ -11,3 +11,3 @@ import {describe, it} from 'mocha'; | ||
assert.strictEqual(typeof XStreamAdapter.adapt, 'function'); | ||
assert.strictEqual(typeof XStreamAdapter.dispose, 'function'); | ||
assert.strictEqual(typeof XStreamAdapter.remember, 'function'); | ||
assert.strictEqual(typeof XStreamAdapter.makeSubject, 'function'); | ||
@@ -71,23 +71,32 @@ assert.strictEqual(typeof XStreamAdapter.isValidStream, 'function'); | ||
it('should not complete a sink stream when dispose() is called', (done) => { | ||
const sinkProxy = XStreamAdapter.makeSubject(); | ||
const sink = xs.periodic(50); | ||
it('should create a remembered subject which can be fed and subscribed to', (done) => { | ||
const subject = XStreamAdapter.makeSubject(); | ||
assert.strictEqual(subject.stream instanceof Stream, true); | ||
assert.strictEqual(XStreamAdapter.isValidStream(subject.stream), true); | ||
const remembered = XStreamAdapter.remember(subject.stream); | ||
const expectedProxy = [0, 1]; | ||
sinkProxy.stream.addListener({ | ||
next: (x) => { | ||
assert.strictEqual(x, expectedProxy.shift()); | ||
if (expectedProxy.length === 0) { | ||
XStreamAdapter.dispose({A: sink}, {A: sinkProxy}, {}); | ||
setTimeout(() => { | ||
done(); | ||
}, 75); | ||
} | ||
}, | ||
error: err => done(err), | ||
complete: () => done('complete should not be called'), | ||
const observer1Expected = [1, 2, 3, 4]; | ||
const observer2Expected = [2, 3, 4]; | ||
XStreamAdapter.streamSubscribe(remembered, { | ||
next: (x) => assert.strictEqual(x, observer1Expected.shift()), | ||
error: done, | ||
complete: () => assert.strictEqual(observer1Expected.length, 0), | ||
}); | ||
sink._add(sinkProxy.stream); | ||
subject.observer.next(1); | ||
subject.observer.next(2); | ||
XStreamAdapter.streamSubscribe(remembered, { | ||
next: (x) => assert.strictEqual(x, observer2Expected.shift()), | ||
error: done, | ||
complete: () => assert.strictEqual(observer2Expected.length, 0), | ||
}); | ||
subject.observer.next(3); | ||
subject.observer.next(4); | ||
subject.observer.complete(); | ||
setTimeout(done, 20); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
338189
7595