rxjs-websockets
An rxjs websocket library with a simple implementation built with flexibility in mind. Great for use with angular 2 or any other rxjs project.
Comparisons to other rxjs websocket libraries:
- observable-socket provides the input stream for the user, in rxjs-websockets the input stream is taken as a parameter allowing the user to choose the appropriate subject or observable for their needs. queueing-subject can be used to achieve the same semantics as observable-socket. rxjs exposes the websocket connection status as an observable, with observable-socket the WebSocket object must be used directly to listen for connection status changes.
- rxjs builting websocket subject: Implemented as a Subject so lacks the flexibility that rxjs-websockets and observable-socket provide. It does not provide any ability to monitor the web socket connection state.
How to install (with webpack/angular-cli)
Install the dependency:
npm install -S rxjs-websockets
npm install -S queueing-subject
How to use
import { QueueingSubject } from 'queueing-subject'
import websocketConnect from 'rxjs-websockets'
const input = new QueueingSubject()
const {messages, connectionStatus} = websocketConnect('ws://localhost/websocket-path', input)
input.next({ whateverField: 'some data' })
const connectionStatusSubscription = connectionStatus.subscribe(numberConnected => {
console.log('number of connected websockets:', numberConnected)
})
const messagesSubscription = messages.subscribe(message => {
console.log('received message:', JSON.stringify(message))
})
messagesSubscription.unsubscribe()
connectionStatusSubscription.unsubscribe()
How to use with angular 2
You can write your own service to provide a websocket using this library as follows:
import { Injectable } from '@angular/core'
import { QueueingSubject } from 'queueing-subject'
import { Observable } from 'rxjs/Observable'
import websocketConnect from 'rxjs-websockets'
@Injectable()
export class ServerSocket {
private inputStream: QueueingSubject<any>
public messages: Observable<any>
public connect() {
if (this.messages)
return
this.messages = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject<any>()
).messages.share()
}
public send(message: any):void {
this.inputStream.next(message)
}
}
This service could be used like this:
import { Component } from '@angular/core'
import { Subscription } from 'rxjs/Subscription'
import { ServerSocket } from './server-socket.service'
@Component({
selector: 'socket-user',
templateUrl: './socket-user.component.html',
styleUrls: ['./socket-user.component.scss']
})
export class SocketUserComponent {
private socketSubscription: Subscription
constructor(private socket: ServerSocket) {}
ngOnInit() {
this.socket.connect()
this.socketSubscription = this.socket.messages.subscribe(message:any => {
console.log('received message from server: ', message)
})
this.socket.send({ type: 'helloServer' })
}
ngOnDestroy() {
this.socketSubscription.unsubscribe()
}
}