Security News
Weekly Downloads Now Available in npm Package Search Results
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
observable-webworker
Advanced tools
Simple API for using [web workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers) with [RxJS](https://rxjs-dev.firebaseapp.com/guide/overview) observables
Simple API for using web workers with RxJS observables
fromWorker
function from main thread sidematerialize
and dematerialize
is used as a robust transport of streaming errorsmergeMap
, switchMap
or exhaustMap
in your worker if
the input stream outputs multiple items that generate their own stream of resultsTransferable
parts
of message payloads so large binaries can transferred efficiently without copying - See Transferable
section for usageswitchMap
operator, or parallelisation of computation with mergeMap
https://cloudnc.github.io/observable-webworker
https://dev.to/zakhenry/observable-webworkers-with-angular-8-4k6
Install the npm package: observable-webworker
# with npm
npm install observable-webworker
# or with yarn
yarn add observable-webworker
// src/readme/hello.ts
import { fromWorker } from 'observable-webworker';
import { of } from 'rxjs';
const input$ = of('Hello from main thread');
fromWorker<string, string>(() => new Worker('./hello.worker', { type: 'module' }), input$).subscribe(message => {
console.log(message); // Outputs 'Hello from webworker'
});
// src/readme/hello.worker.ts
import { DoWork, ObservableWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@ObservableWorker()
export class HelloWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
map(message => {
console.log(message); // outputs 'Hello from main thread'
return `Hello from webworker`;
}),
);
}
}
You must export your worker class (export class ...
) from the file if you're using a minifier. If you don't, your
class will be removed from the bundle, causing your worker to do nothing!
You'll probably need to export the class anyway as you are unit testing it right?!
If decorators is not something you use regularly and prefer direct functions, simply
use the runWorker
function instead.
// src/readme/hello-no-decorator.worker.ts#L5-L16
class HelloWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
map(message => {
console.log(message); // outputs 'Hello from main thread'
return `Hello from webworker`;
}),
);
}
}
runWorker(HelloWorker);
If either your input or output (or both!) streams are passing large messages to or from the worker, it is highly
recommended to use message types that implement the Transferable
interface (ArrayBuffer
, MessagePort
, ImageBitmap
).
Bear in mind that when transferring a message to a webworker that the main thread relinquishes ownership of the data.
Recommended reading:
To use Transferable
s with observable-worker, a slightly more complex interface is provided for both sides of the
main/worker thread.
If the main thread is transferring Transferable
s to the worker, simply add a callback to the fromWorker
function
call to select which elements of the input stream are transferable.
// src/readme/transferable.main.ts#L7-L11
return fromWorker<ArrayBuffer, string>(
() => new Worker('./transferable.worker', { type: 'module' }),
input$,
input => [input],
);
If the worker is transferring Transferable
s to the main thread simply implement DoTransferableWork
, which will
require you to add an additional method selectTransferables?(output: O): Transferable[];
which you implement to select
which elements of the output object are Transferable
.
Both strategies are compatible with each other, so if for example you're computing the hash of a large ArrayBuffer
in
a worker, you would only need to use add the transferable selector callback in the main thread in order to mark the
ArrayBuffer
as being transferable in the input. The library will handle the rest, and you can just use DoWork
in the
worker thread, as the return type string
is not Transferable
.
If you have a large amount of work that needs to be done, you can use the fromWorkerPool
function to automatically
manage a pool of workers to allow true concurrency of work, distributed evenly across all available cores.
The worker pool strategy has the following features
Observable
, Array
, or Iterable
navigation.hardwareConcurrency - 1
to keep the main core free.
Observable
, work is considered remaining while the observable is not completedArray
, work remains while there are items in the arrayIterable
, work remains while the iterator is not result.done
default operator is mergeAll()
, which means the output from the webworker(s) is output as soon as available
In this simple example, we have a function that receives an array of files and returns an observable of the MD5 sum hashes of those files. For simplicity, we're passing the primitives back and forth, however in reality you are likely to want to construct your own interface to define the messages being passed to and from the worker.
// src/readme/worker-pool.main.ts
import { Observable } from 'rxjs';
import { fromWorkerPool } from 'observable-webworker';
export function computeHashes(files: File[]): Observable<string> {
return fromWorkerPool<File, string>(() => new Worker('./worker-pool-hash.worker', { type: 'module' }), files);
}
// src/readme/worker-pool-hash.worker.ts
import * as md5 from 'js-md5';
import { DoWorkUnit, ObservableWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@ObservableWorker()
export class WorkerPoolHashWorker implements DoWorkUnit<File, string> {
public workUnit(input: File): Observable<string> {
return this.readFileAsArrayBuffer(input).pipe(map(arrayBuffer => md5(arrayBuffer)));
}
private readFileAsArrayBuffer(blob: Blob): Observable<ArrayBuffer> {
return new Observable(observer => {
if (!(blob instanceof Blob)) {
observer.error(new Error('`blob` must be an instance of File or Blob.'));
return;
}
const reader = new FileReader();
reader.onerror = err => observer.error(err);
reader.onload = () => observer.next(reader.result as ArrayBuffer);
reader.onloadend = () => observer.complete();
reader.readAsArrayBuffer(blob);
return () => reader.abort();
});
}
}
Note here that the worker class implements DoWorkUnit<File, string>
. This is different to before where we implemented
DoWork
which had the slightly more complex signature of inputting an observable and outputting one.
If using the fromWorkerPool
strategy, you must only implement DoWorkUnit
as it relies on the completion of the
returned observable to indicate that the unit of work is finished processing, and the next unit of work can be
transferred to the worker.
Commonly, a worker that implements DoWorkUnit
only needs to return a single value, so you may instead return a Promise
from the workUnit
method.
// src/app/doc/async-work.worker.ts#L7-L14
@ObservableWorker()
export class FactorizationWorker implements DoWorkUnit<number, number[]> {
public async workUnit(input: number): Promise<number[]> {
return factorize(input);
}
}
FAQs
Simple API for using [web workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers) with [RxJS](https://rxjs-dev.firebaseapp.com/guide/overview) observables
The npm package observable-webworker receives a total of 2,548 weekly downloads. As such, observable-webworker popularity was classified as popular.
We found that observable-webworker demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
Security News
A Stanford study reveals 9.5% of engineers contribute almost nothing, costing tech $90B annually, with remote work fueling the rise of "ghost engineers."
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.