
Research
Security News
Lazarus Strikes npm Again with New Wave of Malicious Packages
The Socket Research Team has discovered six new malicious npm packages linked to North Korea’s Lazarus Group, designed to steal credentials and deploy backdoors.
rxjs-websockets
Advanced tools
An rxjs websocket library with a simple implementation built with flexibility in mind. Great for use with angular 2 or any other rxjs project. Supports the browser and node.js.
Install the dependency:
npm install -S rxjs-websockets
# the following dependency is recommended for most users
npm install -S queueing-subject
import { QueueingSubject } from 'queueing-subject'
import websocketConnect from 'rxjs-websockets'
// this subject queues as necessary to ensure every message is delivered
const input = new QueueingSubject()
// this method returns an object which contains two observables
const { messages, connectionStatus } = websocketConnect('ws://localhost/websocket-path', input)
// this value will be stringified before being sent to the server
input.next({ whateverField: 'some data' })
// the connectionStatus stream will provides the current number of websocket
// connections immediately to each new observer and updates as it changes
const connectionStatusSubscription = connectionStatus.subscribe(numberConnected => {
console.log('number of connected websockets:', numberConnected)
})
// the websocket connection is created lazily when the messages observable is
// subscribed to
const messagesSubscription = messages.subscribe(message => {
// message is the message from the server parsed with JSON.parse(...)
console.log('received message:', JSON.stringify(message))
})
// this will close the websocket
messagesSubscription.unsubscribe()
// closing the websocket does not close the connection status observable, it
// can be used to monitor future connection status changes
connectionStatusSubscription.unsubscribe()
messages
is a cold observable, means the websocket connection is attempted lazily when a subscription is made to the messages
observable. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown below.
You can write your own service to provide a websocket using this library as follows:
// file: server-socket.service.ts
import { Injectable } from '@angular/core'
import { QueueingSubject } from 'queueing-subject'
import { Observable } from 'rxjs/Observable'
import websocketConnect from 'rxjs-websockets'
@Injectable()
export class ServerSocket {
private inputStream: QueueingSubject<any>
public messages: Observable<any>
public connect() {
if (this.messages)
return
// Using share() causes a single websocket to be created when the first
// observer subscribes. This socket is shared with subsequent observers
// and closed when the observer count falls to zero.
this.messages = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject<any>()
).messages.share()
}
public send(message: any):void {
// If the websocket is not connected then the QueueingSubject will ensure
// that messages are queued and delivered when the websocket reconnects.
// A regular Subject can be used to discard messages sent when the websocket
// is disconnected.
this.inputStream.next(message)
}
}
This service could be used like this:
import { Component } from '@angular/core'
import { Subscription } from 'rxjs/Subscription'
import { ServerSocket } from './server-socket.service'
@Component({
selector: 'socket-user',
templateUrl: './socket-user.component.html',
styleUrls: ['./socket-user.component.scss']
})
export class SocketUserComponent {
private socketSubscription: Subscription
constructor(private socket: ServerSocket) {}
ngOnInit() {
this.socket.connect()
this.socketSubscription = this.socket.messages.subscribe(message:any => {
console.log('received message from server: ', message)
})
// send message to server, if the socket is not connected it will be sent
// as soon as the connection becomes available thanks to QueueingSubject
this.socket.send({ type: 'helloServer' })
}
ngOnDestroy() {
this.socketSubscription.unsubscribe()
}
}
This can be done with built-in rxjs operators:
const input = new QueueingSubject<any>()
const { messages, connectionStatus } = websocketConnect(`ws://server`, input)
// try to reconnect every second
messages.retryWhen(errors => errors.delay(1000)).subscribe(message => {
console.log(message)
})
You can supply a websocket factory function (that takes a URL and returns an object that is compatible with WebSocket) as such:
const { messages } = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject<any>(),
url => new WebSocket(url)
)
FAQs
rxjs 7 websockets library
The npm package rxjs-websockets receives a total of 1,972 weekly downloads. As such, rxjs-websockets popularity was classified as popular.
We found that rxjs-websockets demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
The Socket Research Team has discovered six new malicious npm packages linked to North Korea’s Lazarus Group, designed to steal credentials and deploy backdoors.
Security News
Socket CEO Feross Aboukhadijeh discusses the open web, open source security, and how Socket tackles software supply chain attacks on The Pair Program podcast.
Security News
Opengrep continues building momentum with the alpha release of its Playground tool, demonstrating the project's rapid evolution just two months after its initial launch.