
Research
Security News
Lazarus Strikes npm Again with New Wave of Malicious Packages
The Socket Research Team has discovered six new malicious npm packages linked to North Korea’s Lazarus Group, designed to steal credentials and deploy backdoors.
rxjs-websockets
Advanced tools
An rxjs websocket library with a simple and flexible implementation. Supports the browser and node.js.
npm install -S rxjs-websockets
# or
yarn add rxjs-websockets
For rxjs 6 support, rxjs-websockets 8 is needed.
npm install -S rxjs-websockets@8
# or
yarn add rxjs-websockets@8
import { QueueingSubject } from 'queueing-subject'
import { Subscription } from 'rxjs'
import { share, switchMap } from 'rxjs/operators'
import makeWebSocketObservable, {
GetWebSocketResponses,
// WebSocketPayload = string | ArrayBuffer | Blob
WebSocketPayload,
normalClosureMessage,
} from 'rxjs-websockets'
// this subject queues as necessary to ensure every message is delivered
const input$ = new QueueingSubject<string>()
// queue up a request to be sent when the websocket connects
input$.next('some data')
// create the websocket observable, does *not* open the websocket connection
const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')
const messages$: Observable<WebSocketPayload> = socket$.pipe(
// the observable produces a value once the websocket has been opened
switchMap((getResponses: GetWebSocketResponses) => {
console.log('websocket opened')
return getResponses(input$)
}),
share(),
)
const messagesSubscription: Subscription = messages.subscribe(
(message: string) => {
console.log('received message:', message)
// respond to server
input$.next('i got your message')
},
(error: Error) => {
const { message } = error
if (message === normalClosureMessage) {
console.log('server closed the websocket connection normally')
} else {
console.log('socket was disconnected due to error:', message)
}
},
() => {
// The clean termination only happens in response to the last
// subscription to the observable being unsubscribed, any
// other closure is considered an error.
console.log('the connection was closed in response to the user')
},
)
function closeWebsocket() {
// this also caused the websocket connection to be closed
messagesSubscription.unsubscribe()
}
setTimeout(closeWebsocket, 2000)
The observable returned by makeWebSocketObservable
is cold, this means the websocket connection is attempted lazily as subscriptions are made to it. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown in the example above. The share
operator ensures at most one websocket connection is attempted regardless of the number of subscriptions to the observable while ensuring the socket is closed when the last subscription is unsubscribed. When only one subscription is made the operator has no effect.
By default the websocket supports binary messages so the payload type is string | ArrayBuffer | Blob
, when you only need string
messages the generic parameter to makeWebSocketObservable
can be used:
const socket$ = makeWebSocketObservable<string>('ws://localhost/websocket-path')
const input$ = new QueueingSubject<string>()
const messages$: Observable<string> = socket$.pipe(
switchMap((getResponses: GetWebSocketResponses<string>) => getResponses(input$)),
share(),
)
This can be done with the built-in rxjs operator retryWhen
:
import { Subject } from 'rxjs'
import { switchMap, retryWhen } from 'rxjs/operators'
import makeWebSocketObservable from 'rxjs-websockets'
const input$ = new Subject<string>()
const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')
const messages$ = socket$.pipe(
switchMap((getResponses) => getResponses(input$)),
retryWhen((errors) => errors.pipe(delay(1000))),
)
A custom websocket factory function can be supplied that takes a URL and returns an object that is compatible with WebSocket:
import makeWebSocketObservable, { WebSocketOptions } from 'rxjs-websockets'
const options: WebSocketOptions = {
// this is used to create the websocket compatible object,
// the default is shown here
makeWebSocket: (url: string, protocols?: string | string[]) => new WebSocket(url, protocols),
// optional argument, passed to `makeWebSocket`
// protocols: '...',
}
const socket$ = makeWebSocketObservable('ws://127.0.0.1:4201/ws', options)
This example shows how to use the map
operator to handle JSON encoding of outgoing messages and parsing of responses:
import { Observable } from 'rxjs'
import makeWebSocketObservable, { WebSocketOptions } from 'rxjs-websockets'
function makeJsonWebSocketObservable(
url: string,
options?: WebSocketOptions,
): Observable<unknown> {
const socket$ = makeWebSocketObservable<string>(url, options)
return socket$.pipe(
map(
(getResponses: GetWebSocketReponses<string>) => (input$: Observable<object>) =>
getResponses(input$.pipe(map((request) => JSON.stringify(request)))).pipe(
map((response) => JSON.parse(response)),
),
),
)
}
The function above can be used identically to makeWebSocketObservable
only the requests/responses will be transparently encoded/decoded.
FAQs
rxjs 7 websockets library
The npm package rxjs-websockets receives a total of 1,972 weekly downloads. As such, rxjs-websockets popularity was classified as popular.
We found that rxjs-websockets demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
The Socket Research Team has discovered six new malicious npm packages linked to North Korea’s Lazarus Group, designed to steal credentials and deploy backdoors.
Security News
Socket CEO Feross Aboukhadijeh discusses the open web, open source security, and how Socket tackles software supply chain attacks on The Pair Program podcast.
Security News
Opengrep continues building momentum with the alpha release of its Playground tool, demonstrating the project's rapid evolution just two months after its initial launch.