Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rxjs-websockets

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rxjs-websockets - npm Package Compare versions

Comparing version 7.0.0-alpha.1 to 7.0.0-beta.1

14

lib/index.d.ts
import { Observable } from 'rxjs';
declare type WebSocketPayload = string | ArrayBuffer | Blob;
export interface Connection<T extends WebSocketPayload = WebSocketPayload> {
connectionStatus: Observable<number>;
messages: Observable<T>;
}
export interface IWebSocket {

@@ -15,4 +11,10 @@ close(): any;

}
export declare type WebSocketFactory = (url: string, protocols?: string | string[]) => IWebSocket;
export default function connect<T extends WebSocketPayload = WebSocketPayload>(url: string, input: Observable<WebSocketPayload>, protocols?: string | string[], websocketFactory?: WebSocketFactory): Connection<T>;
export declare type WebSocketFactory = (url: string, protocols: string | string[]) => IWebSocket;
export declare type GetWebSocketResponses<T = WebSocketPayload> = (input: Observable<WebSocketPayload>) => Observable<T>;
export interface WebSocketOptions {
protocols: string | string[];
makeWebSocket: WebSocketFactory;
}
export declare const normalClosureMessage = "Normal closure";
export default function makeWebSocketObservable<T extends WebSocketPayload = WebSocketPayload>(url: string, { protocols, makeWebSocket }: WebSocketOptions): Observable<GetWebSocketResponses<T>>;
export {};

@@ -5,33 +5,32 @@ "use strict";

var defaultProtocols = [];
var defaultWebsocketFactory = function (url, protocols) {
if (protocols === void 0) { protocols = defaultProtocols; }
return new WebSocket(url, protocols);
};
function connect(url, input, protocols, websocketFactory) {
if (protocols === void 0) { protocols = defaultProtocols; }
if (websocketFactory === void 0) { websocketFactory = defaultWebsocketFactory; }
var connectionStatus = new rxjs_1.BehaviorSubject(0);
var messages = new rxjs_1.Observable(function (observer) {
var socket = websocketFactory(url, protocols);
var defaultWebsocketFactory = function (url, protocols) { return new WebSocket(url, protocols); };
exports.normalClosureMessage = 'Normal closure';
function makeWebSocketObservable(url, _a) {
var _b = _a.protocols, protocols = _b === void 0 ? defaultProtocols : _b, _c = _a.makeWebSocket, makeWebSocket = _c === void 0 ? defaultWebsocketFactory : _c;
return new rxjs_1.Observable(function (observer) {
var inputSubscription;
var open = false;
var messages = new rxjs_1.Subject();
var getWebSocketResponses = function (input) {
if (inputSubscription) {
setClosedStatus();
observer.error(new Error('Web socket message factory function called more than once'));
}
else {
inputSubscription = input.subscribe(function (data) { socket.send(data); });
return messages;
}
};
var socket = makeWebSocket(url, protocols);
var isSocketOpen = false;
var forcedClose = false;
var closed = function () {
if (!open)
return;
connectionStatus.next(connectionStatus.getValue() - 1);
open = false;
};
var setClosedStatus = function () { isSocketOpen = false; };
socket.onopen = function () {
open = true;
connectionStatus.next(connectionStatus.getValue() + 1);
inputSubscription = input.subscribe(function (data) {
socket.send(data);
});
isSocketOpen = true;
observer.next(getWebSocketResponses);
};
socket.onmessage = function (message) {
observer.next(message.data);
messages.next(message.data);
};
socket.onerror = function (error) {
closed();
setClosedStatus();
observer.error(new Error(error.message));

@@ -41,9 +40,12 @@ };

// prevent observer.complete() being called after observer.error(...)
if (!open)
if (!isSocketOpen)
return;
closed();
if (forcedClose)
setClosedStatus();
if (forcedClose) {
observer.complete();
else
observer.error(new Error(event.reason));
messages.complete();
}
else {
observer.error(new Error(event.code === 1000 ? exports.normalClosureMessage : event.reason));
}
};

@@ -54,4 +56,4 @@ return function () {

inputSubscription.unsubscribe();
if (open) {
closed();
if (isSocketOpen) {
setClosedStatus();
socket.close();

@@ -61,5 +63,4 @@ }

});
return { messages: messages, connectionStatus: connectionStatus };
}
exports.default = connect;
exports.default = makeWebSocketObservable;
//# sourceMappingURL=index.js.map
import 'mocha';
import 'rxjs';
"use strict";
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
Object.defineProperty(exports, "__esModule", { value: true });
require("mocha");
require("rxjs");
var testing_1 = require("rxjs/testing");

@@ -27,6 +13,6 @@ var rxjs_1 = require("rxjs");

chai.use(sinonChai);
chai.should();
var expect = chai.expect;
describe('rxjs-websockets', function () {
var scheduler;
var expect;
var expect$;
var flush;

@@ -37,3 +23,3 @@ var cold;

scheduler = new testing_1.TestScheduler(chai.assert.deepEqual);
expect = scheduler.expectObservable.bind(scheduler);
expect$ = scheduler.expectObservable.bind(scheduler);
flush = scheduler.flush.bind(scheduler);

@@ -45,3 +31,3 @@ cold = scheduler.createColdObservable.bind(scheduler);

function MockSocket() {
this.close = function () { };
this.close = sinon.stub();
}

@@ -52,36 +38,49 @@ // forwards input as output

}());
var connectHelper = function (input, mockSocket, protocols) { return _1.default('url', input, protocols, function () { return mockSocket; }); };
var connectHelper = function (mockSocket, protocols) {
return _1.default('url', { protocols: protocols, makeWebSocket: function () { return mockSocket; } });
};
it('connects to websocket lazily and retrieves data', function () {
var mockSocket = new MockSocket();
var _a = connectHelper(hot('abcde|'), mockSocket), connectionStatus = _a.connectionStatus, messages = _a.messages;
scheduler.schedule(function () { return mockSocket.onopen(); }, 15);
expect(rxjs_1.of(null).pipe(operators_1.delay(14, scheduler)).pipe(operators_1.switchMapTo(messages))).toBe('--cde');
var socket = connectHelper(mockSocket);
var input = hot('abcde|');
expect$(rxjs_1.of(null).pipe(
// delay subscription to websocket by 10ms, TODO: find some way to
// verify the connection is attempted lazily
operators_1.delay(10, scheduler), operators_1.switchMap(function () {
return socket.pipe(operators_1.switchMap(function (factory) {
// ensure factory is called when socket is open
expect(scheduler.now()).to.equal(20);
return factory(input);
}));
}))).toBe('--cde');
// websocket opens at 20ms
scheduler.schedule(function () {
// if one of the expectations raises an error this won't be defined
if (mockSocket.onopen)
mockSocket.onopen();
}, 20);
flush();
});
it('closes websocket on unsubscribe', function () {
var mockSocket = new /** @class */ (function (_super) {
__extends(class_1, _super);
function class_1() {
var _this = _super !== null && _super.apply(this, arguments) || this;
_this.close = sinon.stub();
return _this;
}
return class_1;
}(MockSocket));
var messages = connectHelper(cold('a|'), mockSocket).messages;
var mockSocket = new MockSocket();
var socket = connectHelper(mockSocket);
scheduler.schedule(function () { return mockSocket.onopen(); }, 10);
expect(messages, '--!').toBe('-a');
expect$(socket.pipe(operators_1.switchMap(function (factory) { return factory(cold('a|')); })), '--!').toBe('-a');
flush();
mockSocket.close.should.have.been.calledOnce;
expect(mockSocket.close).to.have.been.calledOnce;
});
it('errors on unclean websocket close', function () {
var mockSocket = new MockSocket();
var messages = connectHelper(cold('a'), mockSocket).messages;
var socket = connectHelper(mockSocket);
scheduler.schedule(function () { return mockSocket.onopen(); }, 10);
scheduler.schedule(function () { return mockSocket.onclose({ reason: 'Normal closure' }); }, 30);
expect(messages.pipe(operators_1.catchError(function (error) { return throwError_1.throwError(error.message); })))
.toBe('-a-#', undefined, 'Normal closure');
expect$(socket.pipe(operators_1.switchMap(function (factory) { return factory(cold('a')); }),
// rethrow error as string... can't get expectation to match the error
operators_1.catchError(function (error) { return throwError_1.throwError(error.message); }))).toBe('-a-#', undefined, 'Normal closure');
flush();
});
xit('raises Error with normalClosureMessage when socket was closed normally', function () {
// TODO:
});
});
//# sourceMappingURL=index.spec.js.map
{
"name": "rxjs-websockets",
"version": "7.0.0-alpha.1",
"description": "rxjs 5 websockets library, ideal for use with angular 2",
"version": "7.0.0-beta.1",
"description": "rxjs 6 websockets library",
"main": "lib/index.js",

@@ -12,9 +12,14 @@ "typings": "lib/index",

"keywords": [
"angular",
"angular2",
"redux",
"redux-observable",
"rxjs",
"websocket"
"vuex",
"websocket",
"websockets"
],
"scripts": {
"build": "tsc -p src",
"build:watch": "tsc -w -p src",
"build:watch": "tsc -w --preserveWatchOutput -p src",
"test": "npm run build && npm run mocha",

@@ -32,14 +37,14 @@ "watch": "concurrently 'npm run build:watch' 'npm run mocha:watch'",

"devDependencies": {
"@types/chai": "^4.0.1",
"@types/mocha": "^2.2.41",
"@types/sinon": "^2.3.3",
"@types/sinon-chai": "^2.7.28",
"chai": "^4.1.0",
"concurrently": "^3.5.0",
"mocha": "^3.4.2",
"@types/chai": "^4.1.7",
"@types/mocha": "^5.2.5",
"@types/sinon": "^7.0.3",
"@types/sinon-chai": "^3.2.2",
"chai": "^4.2.0",
"concurrently": "^4.1.0",
"mocha": "^5.2.0",
"rxjs": "^6.1.0",
"sinon": "^3.2.1",
"sinon-chai": "^2.13.0",
"sinon": "^7.2.2",
"sinon-chai": "^3.3.0",
"typescript": "^3.2.0"
}
}
import 'mocha'
import 'rxjs'
import { TestScheduler } from 'rxjs/testing'
import { Observable, of } from 'rxjs'
import { delay, switchMapTo, catchError } from 'rxjs/operators';
import { throwError } from 'rxjs/internal/observable/throwError';
import { of } from 'rxjs'
import { delay, catchError, switchMap } from 'rxjs/operators'
import { throwError } from 'rxjs/internal/observable/throwError'
import * as chai from 'chai'

@@ -14,7 +13,7 @@ import * as sinon from 'sinon'

chai.use(sinonChai)
chai.should()
const { expect } = chai
describe('rxjs-websockets', () => {
let scheduler: TestScheduler
let expect: typeof scheduler.expectObservable
let expect$: typeof scheduler.expectObservable
let flush: typeof scheduler.flush

@@ -25,3 +24,3 @@ let cold: typeof scheduler.createColdObservable

scheduler = new TestScheduler(chai.assert.deepEqual)
expect = scheduler.expectObservable.bind(scheduler)
expect$ = scheduler.expectObservable.bind(scheduler)
flush = scheduler.flush.bind(scheduler)

@@ -36,3 +35,3 @@ cold = scheduler.createColdObservable.bind(scheduler)

onclose: Function
close = () => {}
close = sinon.stub()
// forwards input as output

@@ -42,9 +41,33 @@ send(data: string) { this.onmessage({ data }) }

const connectHelper = (input, mockSocket, protocols?) => connect('url', input, protocols, () => mockSocket)
const connectHelper = (mockSocket, protocols?: string | string[]) =>
connect('url', { protocols, makeWebSocket: () => mockSocket })
it('connects to websocket lazily and retrieves data', () => {
const mockSocket = new MockSocket()
const { connectionStatus, messages } = connectHelper(hot('abcde|'), mockSocket)
scheduler.schedule(() => mockSocket.onopen(), 15)
expect(of(null).pipe(delay(14, scheduler)).pipe(switchMapTo(messages))).toBe('--cde')
const socket = connectHelper(mockSocket)
const input = hot('abcde|')
expect$(
of(null).pipe(
// delay subscription to websocket by 10ms, TODO: find some way to
// verify the connection is attempted lazily
delay(10, scheduler),
switchMap(() => {
return socket.pipe(
switchMap(factory => {
// ensure factory is called when socket is open
expect(scheduler.now()).to.equal(20)
return factory(input)
})
)
})
)
).toBe('--cde')
// websocket opens at 20ms
scheduler.schedule(() => {
// if one of the expectations raises an error this won't be defined
if (mockSocket.onopen)
mockSocket.onopen()
}, 20)
flush()

@@ -54,11 +77,13 @@ })

it('closes websocket on unsubscribe', () => {
const mockSocket = new class extends MockSocket {
close = sinon.stub()
}
const { messages } = connectHelper(cold('a|'), mockSocket)
const mockSocket = new MockSocket()
const socket = connectHelper(mockSocket)
scheduler.schedule(() => mockSocket.onopen(), 10)
expect(messages, '--!').toBe('-a')
expect$(
socket.pipe(switchMap(factory => factory(cold('a|')))),
'--!',
).toBe('-a')
flush()
mockSocket.close.should.have.been.calledOnce
expect(mockSocket.close).to.have.been.calledOnce
})

@@ -68,9 +93,18 @@

const mockSocket = new MockSocket()
const { messages } = connectHelper(cold('a'), mockSocket)
const socket = connectHelper(mockSocket)
scheduler.schedule(() => mockSocket.onopen(), 10)
scheduler.schedule(() => mockSocket.onclose({ reason: 'Normal closure' }), 30)
expect(messages.pipe(catchError(error => throwError(error.message))))
.toBe('-a-#', undefined, 'Normal closure')
expect$(
socket.pipe(
switchMap(factory => factory(cold('a'))),
// rethrow error as string... can't get expectation to match the error
catchError(error => throwError(error.message))
),
).toBe('-a-#', undefined, 'Normal closure')
flush()
})
xit('raises Error with normalClosureMessage when socket was closed normally', () => {
// TODO:
});
})

@@ -1,4 +0,5 @@

import { Observable, Subscription, BehaviorSubject } from 'rxjs'
import { Observable, Subscription, Subject } from 'rxjs'
interface EventWithReason {
interface EventWithCodeAndReason {
code: number
reason: string

@@ -13,7 +14,2 @@ }

export interface Connection<T extends WebSocketPayload = WebSocketPayload> {
connectionStatus: Observable<number>,
messages: Observable<T>,
}
export interface IWebSocket {

@@ -32,58 +28,78 @@ close(): any

export type WebSocketFactory = (url: string, protocols?: string | string[]) => IWebSocket
export type WebSocketFactory = (url: string, protocols: string | string[]) => IWebSocket
const defaultProtocols = [];
export type GetWebSocketResponses<T = WebSocketPayload> = (
input: Observable<WebSocketPayload>
) => Observable<T>
const defaultWebsocketFactory: WebSocketFactory = (url: string, protocols: string | string[] = defaultProtocols): IWebSocket => new WebSocket(url, protocols)
const defaultProtocols = []
export default function connect<T extends WebSocketPayload = WebSocketPayload>(
const defaultWebsocketFactory: WebSocketFactory = (
url: string,
input: Observable<WebSocketPayload>,
protocols: string | string[] = defaultProtocols,
websocketFactory: WebSocketFactory = defaultWebsocketFactory,
): Connection<T> {
const connectionStatus = new BehaviorSubject<number>(0)
protocols: string | string[],
): IWebSocket => new WebSocket(url, protocols)
const messages = new Observable<T>(observer => {
const socket = websocketFactory(url, protocols)
export interface WebSocketOptions {
protocols: string | string[]
makeWebSocket: WebSocketFactory
}
export const normalClosureMessage = 'Normal closure'
export default function makeWebSocketObservable<T extends WebSocketPayload = WebSocketPayload>(
url: string,
{
protocols = defaultProtocols,
makeWebSocket = defaultWebsocketFactory
}: WebSocketOptions,
): Observable<GetWebSocketResponses<T>> {
return new Observable<GetWebSocketResponses<T>>(observer => {
let inputSubscription: Subscription
const messages = new Subject<T>()
let open = false
const getWebSocketResponses: GetWebSocketResponses<T> = (input: Observable<WebSocketPayload>) => {
if (inputSubscription) {
setClosedStatus()
observer.error(new Error('Web socket message factory function called more than once'))
} else {
inputSubscription = input.subscribe(data => { socket.send(data) })
return messages
}
}
const socket = makeWebSocket(url, protocols)
let isSocketOpen = false
let forcedClose = false
const closed = () => {
if (! open)
return
const setClosedStatus = () => { isSocketOpen = false }
connectionStatus.next(connectionStatus.getValue() - 1)
open = false
}
socket.onopen = () => {
open = true
connectionStatus.next(connectionStatus.getValue() + 1)
inputSubscription = input.subscribe(data => {
socket.send(data)
})
isSocketOpen = true
observer.next(getWebSocketResponses)
}
socket.onmessage = (message: { data: T }) => {
observer.next(message.data)
messages.next(message.data)
}
socket.onerror = (error: EventWithMessage) => {
closed()
setClosedStatus()
observer.error(new Error(error.message))
}
socket.onclose = (event: EventWithReason) => {
socket.onclose = (event: EventWithCodeAndReason) => {
// prevent observer.complete() being called after observer.error(...)
if (! open)
if (! isSocketOpen)
return
closed()
if (forcedClose)
setClosedStatus()
if (forcedClose) {
observer.complete()
else
observer.error(new Error(event.reason))
messages.complete()
}
else {
observer.error(new Error(event.code === 1000 ? normalClosureMessage : event.reason))
}
}

@@ -96,4 +112,4 @@

if (open) {
closed()
if (isSocketOpen) {
setClosedStatus()
socket.close()

@@ -103,4 +119,2 @@ }

})
return { messages, connectionStatus }
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc