rxjs-websockets
An rxjs websocket library with a simple implementation built with flexibility in mind. Great for use with angular 2 or any other rxjs project. Supports the browser and node.js.
Comparisons to other rxjs websocket libraries:
- observable-socket provides the input stream for the user, in rxjs-websockets the input stream is taken as a parameter allowing the user to choose the appropriate subject or observable for their needs. queueing-subject can be used to achieve the same semantics as observable-socket. rxjs-websockets exposes the websocket connection status as an observable, with observable-socket the WebSocket object must be used directly to listen for connection status changes.
- rxjs built-in websocket subject: Implemented as a Subject so lacks the flexibility that rxjs-websockets and observable-socket provide. It does not provide any ability to monitor the web socket connection state.
How to install (with webpack/angular-cli)
Install the dependency:
npm install -S rxjs-websockets
npm install -S queueing-subject
How to use
import { QueueingSubject } from 'queueing-subject'
import websocketConnect from 'rxjs-websockets'
const input = new QueueingSubject()
const { messages, connectionStatus } = websocketConnect('ws://localhost/websocket-path', input)
input.next({ whateverField: 'some data' })
const connectionStatusSubscription = connectionStatus.subscribe(numberConnected => {
console.log('number of connected websockets:', numberConnected)
})
const messagesSubscription = messages.subscribe(message => {
console.log('received message:', JSON.stringify(message))
})
messagesSubscription.unsubscribe()
connectionStatusSubscription.unsubscribe()
messages
is a cold observable, means the websocket connection is attempted lazily when a subscription is made to the messages
observable. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown below.
How to use with angular 2
You can write your own service to provide a websocket using this library as follows:
import { Injectable } from '@angular/core'
import { QueueingSubject } from 'queueing-subject'
import { Observable } from 'rxjs/Observable'
import websocketConnect from 'rxjs-websockets'
@Injectable()
export class ServerSocket {
private inputStream: QueueingSubject<any>
public messages: Observable<any>
public connect() {
if (this.messages)
return
this.messages = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject<any>()
).messages.share()
}
public send(message: any):void {
this.inputStream.next(message)
}
}
This service could be used like this:
import { Component } from '@angular/core'
import { Subscription } from 'rxjs/Subscription'
import { ServerSocket } from './server-socket.service'
@Component({
selector: 'socket-user',
templateUrl: './socket-user.component.html',
styleUrls: ['./socket-user.component.scss']
})
export class SocketUserComponent {
private socketSubscription: Subscription
constructor(private socket: ServerSocket) {}
ngOnInit() {
this.socket.connect()
this.socketSubscription = this.socket.messages.subscribe(message:any => {
console.log('received message from server: ', message)
})
this.socket.send({ type: 'helloServer' })
}
ngOnDestroy() {
this.socketSubscription.unsubscribe()
}
}
Reconnecting on failure
This can be done with built-in rxjs operators:
const input = new QueueingSubject<any>()
const { messages, connectionStatus } = websocketConnect(`ws://server`, input)
messages.retryWhen(errors => errors.delay(1000)).subscribe(message => {
console.log(message)
})
How to use with alternate WebSocket implementations
You can supply a websocket factory function (that takes a URL and returns an object that is compatible with WebSocket) as such:
const { messages } = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject<any>(),
url => new WebSocket(url)
)