Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@quilted/threads

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@quilted/threads - npm Package Compare versions

Comparing version 0.1.15 to 1.0.0

build/cjs/abort-signal/accept.cjs

2

build/typescript/encoding.d.ts

@@ -1,2 +0,2 @@

export { createBasicEncoder, createBasicEncoderWithOverrides, } from './encoding/basic.ts';
export { createBasicEncoder } from './encoding/basic.ts';
//# sourceMappingURL=encoding.d.ts.map

@@ -1,8 +0,7 @@

import type { ThreadEncodingStrategy, ThreadEncodingStrategyApi } from '../types.ts';
import { type MemoryRetainer } from '../memory.ts';
export declare function createBasicEncoderWithOverrides({ encode: encodeOverride, decode: decodeOverride, }?: {
encode?(value: unknown, api: ThreadEncodingStrategyApi & Pick<ThreadEncodingStrategy, 'encode'>): ReturnType<ThreadEncodingStrategy['encode']> | undefined;
decode?(value: unknown, retainedBy: Iterable<MemoryRetainer> | undefined, api: ThreadEncodingStrategyApi & Pick<ThreadEncodingStrategy, 'decode'>): unknown;
}): (api: ThreadEncodingStrategyApi) => ThreadEncodingStrategy;
export declare const createBasicEncoder: (api: ThreadEncodingStrategyApi) => ThreadEncodingStrategy;
import type { ThreadEncoder } from '../types.ts';
/**
* Creates an encoder that converts most common JavaScript types into a format
* that can be transferred via message passing.
*/
export declare function createBasicEncoder(): ThreadEncoder;
//# sourceMappingURL=basic.d.ts.map

@@ -1,11 +0,8 @@

export { createThread } from './thread.ts';
export type { ThreadOptions } from './thread.ts';
export { retain, release, StackFrame, isMemoryManageable } from './memory.ts';
export type { MemoryManageable, MemoryRetainer } from './memory.ts';
export { RELEASE_METHOD, RETAIN_METHOD, RETAINED_BY, ENCODE_METHOD, } from './constants.ts';
export { targetFromIframe, targetFromInsideIframe, targetFromWebWorker, targetFromMessagePort, targetFromBrowserWebSocket, } from './targets.ts';
export { createBasicEncoder, createBasicEncoderWithOverrides, } from './encoding.ts';
export { createThreadAbortSignal, acceptThreadAbortSignal } from './abort.ts';
export type { ThreadAbortSignal } from './abort.ts';
export type { Thread, ThreadTarget, ThreadCallable, ThreadExposable, ThreadSafeArgument, ThreadSafeReturnType, ThreadEncodingStrategy, ThreadEncodingStrategyApi, ThreadEncodable, AnyFunction, } from './types.ts';
export { createThread, createThreadFromBrowserWebSocket, createThreadFromIframe, createThreadFromInsideIframe, createThreadFromMessagePort, createThreadFromWebWorker, type ThreadOptions, } from './targets.ts';
export { createBasicEncoder } from './encoding.ts';
export { createThreadAbortSignal, acceptThreadAbortSignal, type ThreadAbortSignal, } from './abort-signal.ts';
export type { Thread, ThreadTarget, ThreadCallable, ThreadCallableFunction, ThreadSafeArgument, ThreadSafeReturnType, ThreadSafeReturnValueType, ThreadEncoder, ThreadEncoderApi, ThreadEncodable, AnyFunction, } from './types.ts';
//# sourceMappingURL=index.d.ts.map

@@ -5,2 +5,7 @@ import { RETAINED_BY, RETAIN_METHOD, RELEASE_METHOD } from './constants.ts';

export type { MemoryRetainer, MemoryManageable };
/**
* A simple representation of a called function. This object allows this library to
* release references to functions immediately when the function call that transferred
* them into this thread is completed.
*/
export declare class StackFrame {

@@ -11,6 +16,43 @@ private readonly memoryManaged;

}
/**
* Indicates that a value is being manually memory-managed across threads by this library.
*/
export declare function isMemoryManageable(value: unknown): value is MemoryManageable;
/**
* Marks a value as being used so it will not be automatically released. Calling `retain` will,
* by default, deeply retain the value — that is, it will traverse into nested array elements
* and object properties, and retain every `retain`-able thing it finds.
*
* You will typically use this alongside also storing that value in a variable that lives outside
* the context of the function where that value was received.
*
* @example
* import {retain} from '@quilted/threads';
*
* const allUsers = new Set<User>();
*
* async function sayHello(user: User) {
* allUsers.add(user);
* retain(user);
* return `Hey, ${await user.fullName()}!`;
* }
*/
export declare function retain(value: any, { deep }?: {
deep?: boolean | undefined;
}): boolean;
/**
* Once you are no longer using the a `retain`-ed value, you can use this function to mark it as
* being unused. Like `retain()`, this function will apply to all nested array elements and object
* properties.
*
* @example
* import {retain} from '@quilted/threads';
*
* const allUsers = new Set<User>();
*
* function removeUser(user: User) {
* allUsers.delete(user);
* release(user);
* }
*/
export declare function release(value: any, { deep }?: {

@@ -17,0 +59,0 @@ deep?: boolean | undefined;

@@ -1,6 +0,7 @@

export { targetFromIframe } from './targets/iframe/iframe.ts';
export { targetFromInsideIframe } from './targets/iframe/nested.ts';
export { targetFromMessagePort } from './targets/message-port.ts';
export { targetFromBrowserWebSocket } from './targets/web-socket-browser.ts';
export { targetFromWebWorker } from './targets/web-worker.ts';
export { createThread, type ThreadOptions } from './targets/target.ts';
export { createThreadFromIframe } from './targets/iframe/iframe.ts';
export { createThreadFromInsideIframe } from './targets/iframe/nested.ts';
export { createThreadFromMessagePort } from './targets/message-port.ts';
export { createThreadFromBrowserWebSocket } from './targets/web-socket-browser.ts';
export { createThreadFromWebWorker } from './targets/web-worker.ts';
//# sourceMappingURL=targets.d.ts.map

@@ -1,5 +0,25 @@

import type { ThreadTarget } from '../../types.ts';
export declare function targetFromIframe(iframe: HTMLIFrameElement, { targetOrigin }?: {
import { type ThreadOptions } from '../target.ts';
/**
* Creates a thread from an iframe nested on a top-level document. To create
* a thread from the contents of this iframe, use `createThreadFromInsideIframe()`
* instead.
*
* @see https://developer.mozilla.org/en-US/docs/Web/HTML/Element/iframe
*
* @example
* import {createThreadFromIframe} from '@quilted/threads';
*
* const iframe = document.createElement('iframe');
* const thread = createThreadFromInsideIframe(iframe);
* await thread.sendMessage('Hello world!');
*/
export declare function createThreadFromIframe<Self = Record<string, never>, Target = Record<string, never>>(iframe: HTMLIFrameElement, { targetOrigin, ...options }?: ThreadOptions<Self, Target> & {
/**
* The target origin to use when sending `postMessage` events to the child frame.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage#targetorigin
* @default '*'
*/
targetOrigin?: string;
}): ThreadTarget;
}): import("../../types.ts").ThreadCallable<Target>;
//# sourceMappingURL=iframe.d.ts.map

@@ -1,5 +0,24 @@

import type { ThreadTarget } from '../../types.ts';
export declare function targetFromInsideIframe({ targetOrigin, }?: {
import { type ThreadOptions } from '../target.ts';
/**
* Creates a thread from within an iframe nested in a top-level document. To create
* a thread from this iframe in the top-level document, use `createThreadFromIframe()`
* instead.
*
* @see https://developer.mozilla.org/en-US/docs/Web/HTML/Element/iframe
*
* @example
* import {createThreadFromInsideIframe} from '@quilted/threads';
*
* const thread = createThreadFromInsideIframe();
* await thread.sendMessage('Hello world!');
*/
export declare function createThreadFromInsideIframe<Self = Record<string, never>, Target = Record<string, never>>({ targetOrigin, ...options }?: ThreadOptions<Self, Target> & {
/**
* The target origin to use when sending `postMessage` events to the parent frame.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage#targetorigin
* @default '*'
*/
targetOrigin?: string;
}): ThreadTarget;
}): import("../../types.ts").ThreadCallable<Target>;
//# sourceMappingURL=nested.d.ts.map

@@ -1,3 +0,21 @@

import type { ThreadTarget } from '../types.ts';
export declare function targetFromMessagePort(port: MessagePort): ThreadTarget;
import { type ThreadOptions } from './target.ts';
/**
* Creates a thread from a `WebSocket` instance in the browser.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
*
* @example
* import {createThreadFromMessagePort} from '@quilted/threads';
*
* const channel = new MessageChannel();
* const threadOne = createThreadFromMessagePort(channel.port1);
* const threadTwo = createThreadFromMessagePort(channel.port2, {
* expose: {
* sendMessage: (message) => console.log(message),
* },
* });
*
* await threadOne.sendMessage('Hello world!');
*/
export declare function createThreadFromMessagePort<Self = Record<string, never>, Target = Record<string, never>>(port: MessagePort, options?: ThreadOptions<Self, Target>): import("../types.ts").ThreadCallable<Target>;
//# sourceMappingURL=message-port.d.ts.map

@@ -1,3 +0,15 @@

import type { ThreadTarget } from '../types.ts';
export declare function targetFromBrowserWebSocket(websocket: WebSocket): ThreadTarget;
import { type ThreadOptions } from './target.ts';
/**
* Creates a thread from a `WebSocket` instance in the browser.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
*
* @example
* import {createThreadFromBrowserWebSocket} from '@quilted/threads';
*
* const websocket = new WebSocket('ws://localhost:8080');
* const thread = createThreadFromBrowserWebSocket(websocket);
* await thread.sendMessage('Hello world!');
*/
export declare function createThreadFromBrowserWebSocket<Self = Record<string, never>, Target = Record<string, never>>(websocket: WebSocket, options?: ThreadOptions<Self, Target>): import("../types.ts").ThreadCallable<Target>;
//# sourceMappingURL=web-socket-browser.d.ts.map

@@ -1,3 +0,22 @@

import type { ThreadTarget } from '../types.ts';
export declare function targetFromWebWorker(worker: Worker): ThreadTarget;
import { type ThreadOptions } from './target.ts';
/**
* Creates a thread from a web worker. This function can be used either from a JavaScript
* environment that *created* a web worker, or from within a web worker that has been
* created.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers
*
* @example
* import {createThreadFromWebWorker} from '@quilted/threads';
*
* // If inside a web worker:
* const thread = createThreadFromWebWorker(self);
*
* // If in an environment that creates a worker:
* const worker = new Worker('worker.js');
* const thread = createThreadFromWebWorker(worker);
*
* await thread.sendMessage('Hello world!');
*/
export declare function createThreadFromWebWorker<Self = Record<string, never>, Target = Record<string, never>>(worker: Worker, options?: ThreadOptions<Self, Target>): import("../types.ts").ThreadCallable<Target>;
//# sourceMappingURL=web-worker.d.ts.map
import type { RELEASE_METHOD, RETAIN_METHOD, ENCODE_METHOD, RETAINED_BY } from './constants.ts';
/**
* A thread represents a target JavaScript environment that exposes a set
* of callable, asynchronous methods. The thread takes care of automatically
* encoding and decoding its arguments and return values, so you can interact
* with it as if its methods were implemented in the same environment as your
* own code.
*/
export type Thread<Target> = ThreadCallable<Target>;
/**
* An object backing a `Thread` that provides the message-passing interface
* that allows communication to flow between environments. This message-passing
* interface is based on the [`postMessage` interface](https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage),
* which is easily adaptable to many JavaScript objects and environments.
*/
export interface ThreadTarget {
/**
* Sends a message to the target thread. The message will be encoded before sending,
* and the consumer may also pass an array of "transferable" objects that should be
* transferred (rather than copied) to the other environment, if supported.
*/
send(message: any, transferables?: Transferable[]): void;
listen(options: {
/**
* Listens for messages coming in to the thread. This method must call the provided
* listener for each message as it is received. The thread will then decode the message
* and handle its content. This method may be passed an `AbortSignal` to abort the
* listening process.
*/
listen(listener: (value: any) => void, options: {
signal?: AbortSignal;
}): AsyncGenerator<any, void, void>;
}): void;
}
export interface ThreadExposableFunction<Args extends any[], ReturnType> {
(...args: ThreadSafeArgument<Args>): ReturnType extends Promise<any> ? ReturnType : ReturnType extends AsyncGenerator<any, any, any> ? ReturnType : ReturnType | Promise<ReturnType>;
}
export type ThreadExposable<T> = {
[K in keyof T]: T[K] extends (...args: infer Args) => infer ReturnType ? ThreadExposableFunction<Args, ReturnType> : never;
};
/**
* A function type that can be called over a thread. It is the same as defining a
* normal function type, but with the additional restriction that the function must
* always return an asynchronous value (either a promise or an async generator). Additionally,
* all arguments to that function must also be thread-callable
*/
export interface ThreadCallableFunction<Args extends any[], ReturnType> {
(...args: ThreadSafeArgument<Args>): ThreadSafeReturnType<ReturnType>;
}
/**
* A mapped object type that takes an object with methods, and converts it into the
* an object with the same methods that can be called over a thread.
*/
export type ThreadCallable<T> = {

@@ -22,11 +50,30 @@ [K in keyof T]: T[K] extends (...args: infer Args) => infer ReturnType ? ThreadCallableFunction<Args, ReturnType> : never;

export type MaybePromise<T> = T extends Promise<any> ? T : T | Promise<T>;
export type ThreadSafeReturnType<T> = T extends AsyncGenerator<any, any, any> ? T : T extends Generator<infer T, infer R, infer N> ? AsyncGenerator<T, R, N> : T extends Promise<any> ? T : T extends infer U | Promise<infer U> ? Promise<U> : T extends (...args: infer Args) => infer TypeReturned ? (...args: Args) => ThreadSafeReturnType<TypeReturned> : T extends (infer ArrayElement)[] ? ThreadSafeReturnType<ArrayElement>[] : T extends readonly (infer ArrayElement)[] ? readonly ThreadSafeReturnType<ArrayElement>[] : T extends object ? {
[K in keyof T]: ThreadSafeReturnType<T[K]>;
/**
* Converts the return type of a function into the type it will be when
* passed over a thread.
*/
export type ThreadSafeReturnType<T> = T extends AsyncGenerator<infer T, infer R, infer N> ? AsyncGenerator<ThreadSafeReturnValueType<T>, ThreadSafeReturnValueType<R>, ThreadSafeReturnValueType<N>> : T extends Generator<infer T, infer R, infer N> ? Generator<ThreadSafeReturnValueType<T>, ThreadSafeReturnValueType<R>, ThreadSafeReturnValueType<N>> | AsyncGenerator<ThreadSafeReturnValueType<T>, ThreadSafeReturnValueType<R>, ThreadSafeReturnValueType<N>> : T extends Promise<infer U> ? Promise<ThreadSafeReturnValueType<U>> : T extends infer U | Promise<infer U> ? Promise<ThreadSafeReturnValueType<U>> : Promise<ThreadSafeReturnValueType<T>>;
/**
* Converts an object into the type it will be when passed over a thread.
*/
export type ThreadSafeReturnValueType<T> = T extends (...args: infer Args) => infer ReturnType ? ThreadCallableFunction<Args, ReturnType> : T extends (infer ArrayElement)[] ? ThreadSafeReturnValueType<ArrayElement>[] : T extends readonly (infer ArrayElement)[] ? readonly ThreadSafeReturnValueType<ArrayElement>[] : T extends Set<infer U> ? Set<ThreadSafeReturnValueType<U>> : T extends Map<infer K, infer U> ? Map<K, ThreadSafeReturnValueType<U>> : T extends object ? {
[K in keyof T]: ThreadSafeReturnValueType<T[K]>;
} : T;
/**
* Converts an object into the type it could be if accepted as an argument to a function
* called over a thread.
*/
export type ThreadSafeArgument<T> = T extends (...args: infer Args) => infer TypeReturned ? TypeReturned extends Promise<any> ? (...args: Args) => TypeReturned : TypeReturned extends AsyncGenerator<any, any, any> ? (...args: Args) => TypeReturned : TypeReturned extends Generator<infer T, infer R, infer N> ? (...args: Args) => AsyncGenerator<T, R, N> : TypeReturned extends boolean ? (...args: Args) => boolean | Promise<boolean> : (...args: Args) => TypeReturned | Promise<TypeReturned> : {
[K in keyof T]: ThreadSafeArgument<T[K]>;
};
/**
* An object that can retain a reference to a `MemoryManageable` object.
*/
export interface MemoryRetainer {
add(manageable: MemoryManageable): void;
}
/**
* An object transferred between threads that must have its memory manually managed,
* in order to release the reference to a corresponding object on the original thread.
*/
export interface MemoryManageable {

@@ -37,14 +84,40 @@ readonly [RETAINED_BY]: Set<MemoryRetainer>;

}
export interface ThreadEncodingStrategy {
encode(value: unknown): [any, Transferable[]?];
decode(value: unknown, retainedBy?: Iterable<MemoryRetainer>): unknown;
call(id: string, args: any[]): any;
release(id: string): void;
terminate?(): void;
/**
* An object that can encode and decode values communicated between two threads.
*/
export interface ThreadEncoder {
/**
* Encodes a value before sending it to another thread. Should return a tuple where
* the first item is the encoded value, and the second item is an array of elements
* that can be transferred to the other thread, instead of being copied.
*/
encode(value: unknown, api: ThreadEncoderApi): [any, Transferable[]?];
/**
* Decodes a value received from another thread.
*/
decode(value: unknown, api: ThreadEncoderApi, retainedBy?: Iterable<MemoryRetainer>): unknown;
}
export interface ThreadEncodingStrategyApi {
uuid(): string;
release(id: string): void;
call(id: string, args: any[], retainedBy?: Iterable<MemoryRetainer>): Promise<any>;
export interface ThreadEncoderApi {
/**
* Controls how the thread encoder will handle functions.
*/
functions?: {
/**
* Retrieve a function by its serialized ID. This function will be called while
* decoding responses from the other "side" of a thread. The implementer of this
* API should return a proxy function that will call the function on the other
* thread, or `undefined` to prevent the function from being being decoded.
*/
get(id: string): AnyFunction | undefined;
/**
* Stores a function during encoding. The implementer of this API should return
* a unique ID for the function, or `undefined` to prevent the function from
* being encoded.
*/
add(func: AnyFunction): string | undefined;
};
}
/**
* An object that provides a custom process to encode its value.
*/
export interface ThreadEncodable {

@@ -51,0 +124,0 @@ [ENCODE_METHOD](api: {

# @quilted/threads
## 1.0.0
### Major Changes
- [#588](https://github.com/lemonmade/quilt/pull/588) [`837c8677`](https://github.com/lemonmade/quilt/commit/837c8677566b7e6d182496e07e9c998fc6b7802d) Thanks [@lemonmade](https://github.com/lemonmade)! - Clean up threads for a first version
### Patch Changes
- [`00d90d10`](https://github.com/lemonmade/quilt/commit/00d90d10f4eb97fe55712adcc8b34aa3d3ec1aa1) Thanks [@lemonmade](https://github.com/lemonmade)! - Update signals dependency and add dedicated package for signal utilities
- [#587](https://github.com/lemonmade/quilt/pull/587) [`1180dde2`](https://github.com/lemonmade/quilt/commit/1180dde278793006b8ae153804130cad6dab36c2) Thanks [@lemonmade](https://github.com/lemonmade)! - First major version for `@quilted/events`
- [`e45f766b`](https://github.com/lemonmade/quilt/commit/e45f766bce9e8632fe17d9e9c2e3d446d0783feb) Thanks [@lemonmade](https://github.com/lemonmade)! - Simplify thread creation and add helpers for transferring signals over threads
- Updated dependencies [[`1180dde2`](https://github.com/lemonmade/quilt/commit/1180dde278793006b8ae153804130cad6dab36c2)]:
- @quilted/events@1.0.0
## 0.1.15

@@ -4,0 +21,0 @@

{
"name": "@quilted/threads",
"description": "",
"description": "Helpers for communicating between JavaScript environments using message passing.",
"type": "module",
"version": "0.1.15",
"version": "1.0.0",
"license": "MIT",

@@ -26,11 +26,34 @@ "engines": {

"require": "./build/cjs/index.cjs"
},
"./signals": {
"types": "./build/typescript/signals.d.ts",
"quilt:source": "./source/signals.ts",
"quilt:esnext": "./build/esnext/signals.esnext",
"import": "./build/esm/signals.mjs",
"require": "./build/cjs/signals.cjs"
}
},
"types": "./build/typescript/index.d.ts",
"typesVersions": {
"*": {
"signals": [
"./build/typescript/signals.d.ts"
]
}
},
"sideEffects": false,
"dependencies": {
"@quilted/events": "^0.1.0"
"@quilted/events": "^1.0.0"
},
"peerDependencies": {
"@preact/signals-core": "^1.4.0"
},
"peerDependenciesMeta": {
"@preact/signals-core": {
"optional": true
}
},
"devDependencies": {
"@quilted/testing": "0.1.5"
"@preact/signals-core": "^1.4.0",
"@quilted/testing": "0.1.6"
},

@@ -37,0 +60,0 @@ "eslintConfig": {

# `@quilted/threads`
Helpers for communicating between JavaScript environments using message passing. This makes it easy to offload expensive work to sandboxed environments, like [web workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers), [iframes](https://developer.mozilla.org/en-US/docs/Web/HTML/Element/iframe), and [WebSockets](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket).
## Installation
```bash
# npm
npm install @quilted/threads --save
# pnpm
pnpm install @quilted/threads --save
# yarn
yarn add @quilted/threads
```
## Usage
### Creating a "thread"
A "thread" in this library represents a target JavaScript environment that can be communicated with via message passing. Each thread can expose a set of methods that are callable from other threads. This library provides helpers for creating threads from a number of common web platform objects:
```ts
// Create a thread from a web worker.
import {createThreadFromWebWorker} from '@quilted/threads';
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);
// If you are creating a thread from inside a web worker, pass `self` instead:
const thread = createThreadFromWebWorker(self);
// Create a thread from a target iframe. This is usually done from a top-level
// page, after it has constructed nested iframes.
import {createThreadFromIframe} from '@quilted/threads';
const iframe = document.querySelector('iframe#my-iframe');
const thread = createThreadFromIframe(iframe);
// Create a thread from within a nested iframe.
import {createThreadFromInsideIframe} from '@quilted/threads';
const thread = createThreadFromInsideIframe();
// Create a thread from a WebSocket.
import {createThreadFromBrowserWebSocket} from '@quilted/threads';
const socket = new WebSocket('ws://localhost:8080');
const thread = createThreadFromBrowserWebSocket(socket);
```
To expose methods on a thread, pass them as an `expose` option to your thread creation function:
```ts
import {createThreadFromWebWorker} from '@quilted/threads';
// We are in a nested worker, and we’ll expose a single `add()` method to
// a paired thread.
const thread = createThreadFromWebWorker(self, {
expose: {
add(a: number, b: number) {
return a + b;
},
},
});
```
The `Thread` object returned by each of these functions returns an object that you can use to call methods on the paired thread. Because these methods are asynchronous, these "proxy methods" will always return a promise for the result of calling the exposed function.
```ts
import {createThreadFromWebWorker} from '@quilted/threads';
// We are on the top-level page, so we create our worker, wrap it in a thread,
// and call its exposed method.
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);
const result = await thread.add(1, 2);
// result === 3
```
Threads will continue listening and sending messages indefinitely. To stop a thread, you can pass an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to the `signal` option on any thread creation function:
```ts
import {createThreadFromWebWorker} from '@quilted/threads';
const abort = new AbortController();
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker, {signal: abort.signal});
const result = await thread.doWork();
abort.abort();
worker.terminate();
```
### Restrictions on thread functions
Not all types of arguments are supported for functions proxied via message passing by `@quilted/threads`. Only the following simple types can be used:
- Strings, numbers, `true`, `false`, `null`, and `undefined`
- Objects whose keys and values are all simple types
- Arrays whose values are all simple types
- Functions, but they will become asynchronous when proxied, and all functions accepted by arguments in those functions, or returned as part of return values, will have the same argument limitations (also note the [memory management implications of functions](#memory-management) detailed below)
This excludes many types, but of particular note are the following restrictions:
- No `WeakMap` or `WeakSet`
- No `ArrayBuffer` or typed arrays
- Instances of classes will transfer, but only their own properties — that is, properties on their prototype chain **will not** be transferred (additionally, no effort is made to preserve `instanceof` or similar checks on the transferred value)
### Memory management
Implementing functions using message passing always leaks memory. The implementation in this library involves storing a unique identifier for each function sent between sibling threads. When this identifier is received by the sibling, it recognizes it as a “function identifier”. It then maps this function to its existing representation for that ID (if it has been sent before), or creates a new function for it. This function, when called, will send a message to the original source of the function, listing the ID of the function to call (alongside the arguments and other metadata). However, because the two environments need to be able to reference the function and its proxy by ID, it can never release either safely.
`@quilted/threads` implements some smart defaults that make memory management a little easier. By default, a function is only retained for the lifetime of its “parent” — the function call that caused the function to be passed. Let’s look at an example of a thread that accepts a function (here, as the `user.fullName` method):
```ts
import {createThreadFromWebWorker} from '@quilted/threads';
const thread = createThreadFromWebWorker(self, {
expose: {sayHello},
});
interface User {
fullName(): string | Promise<string>;
}
async function sayHello(user: User) {
return `Hey, ${await user.fullName()}!`;
}
```
The paired thread would call this method like so:
```ts
// back on the main thread:
import {createThreadFromWebWorker} from '@quilted/threads';
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);
const user = {
fullName() {
return 'Winston';
},
};
const message = await thread.sayHello(user);
console.log(user);
```
A simple implementation would retain the `user.fullName` function forever, even after the `sayHello()` call was long gone, and even if `user` would otherwise have been garbage collected. However, with `@quilted/threads`, this function is automatically released after `sayHello` is done. It does so by marking the function as used (“retained”) when `sayHello` starts, then marking it as unused when `sayHello` is finished. When a function is marked as completely unused, it automatically cleans up after itself by removing the memory in the receiving `Endpoint`, and sending a message to its source `Thread` to release that memory, too.
```ts
async function sayHello(user: User) {
// user.fullName is retained automatically here
return `Hey, ${await user.fullName()}!`;
// just before we finish up and send the message with the result,
// we release user, which also releases user.fullName
}
```
This automatic behavior is problematic if you want to hold on to a function received via `@quilted/threads` and call it later, after the function that received it has finished. To address this need, this library provides two functions for manual memory management: `retain` and `release`.
#### `retain()`
As noted above, you will `retain()` a value when you want to prevent its automatic release. Calling `retain` will, by default, deeply retain the value — that is, it will traverse into nested array elements and object properties, and retain every `retain`-able thing it finds. You will typically use this alongside also storing that value in a variable that lives outside the context of the function.
```ts
import {retain} from '@quilted/threads';
const allUsers = new Set<User>();
async function sayHello(user: User) {
allUsers.add(user);
retain(user);
return `Hey, ${await user.fullName()}!`;
}
```
Once you have explicitly `retain`ed a value, it will never be released until the `Thread` is terminated, or a matching number of `release()` calls are performed on the object.
#### `release()`
Once you are no longer using the a `retain`-ed value, you must `release` it. Like `retain()`, this function will apply to all nested array elements and object properties.
```ts
import {retain} from '@quilted/threads';
const allUsers = new Set<User>();
function removeUser(user: User) {
allUsers.delete(user);
release(user);
}
```
Once an object is fully released, any attempt to call its proxied functions will result in an error.
### Sharing special objects across threads
#### [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
[`AbortSignal`s](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) allow you to communicate that an asynchronous operation should stop. Because all methods exposed through `@quilted/threads` are asynchronous, you may find many uses
for `AbortSignal`s. However, it can be a bit tricky to communicate an abort signal across threads yourself. To make this easier, this library provides a pair of utilities to create a "thread-safe" `AbortSignal` on one thread, and to "accept" that signal on another thread. In the thread sending a signal, use the `createThreadAbortSignal()` function from this library, passing it an `AbortSignal`:
```ts
import {
createThreadFromWebWorker,
createThreadAbortSignal,
} from '@quilted/threads';
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);
const abort = new AbortController();
await thread.calculateResult({signal: createThreadSignal(abort.signal)});
```
On the receiving thread, use the `acceptThreadAbortSignal()` to turn it back into a "live" `AbortSignal`, in the current thread’s JavaScript environment:
```ts
import {
createThreadFromWebWorker,
acceptThreadAbortSignal,
type ThreadAbortSignal,
} from '@quilted/threads';
const thread = createThreadFromWebWorker(self, {
expose: {calculateResult},
});
function calculateResult({signal: threadSignal}: {signal: ThreadAbortSignal}) {
const signal = acceptThreadAbortSignal(threadSignal);
return await figureOutResult({signal});
}
```
#### [Preact signals](https://github.com/preactjs/signals)
[Preact signals](https://github.com/preactjs/signals) are a powerful tool for managing state in JavaScript applications. Signals represent mutable state that can be subscribed to, so they can be useful for sharing state between JavaScript environments connected by `@quilted/threads`. This library provides a collection of helpers for working with signals across threads.
Like the `AbortSignal` utilities documented above, a pair of utilities is provided to create a "thread-safe" Preact signal on one thread, and "accepting" that signal on another thread. In the thread sending a signal, use the `createThreadSignal()` function from this library, passing it a Preact signal:
```ts
import {signal} from '@preact/signals-core';
import {createThreadFromWebWorker} from '@quilted/threads';
import {createThreadSignal} from '@quilted/threads/signals';
const result = signal(32);
const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);
await thread.calculateResult(createThreadSignal(result));
```
On the receiving thread, use the `acceptThreadSignal()` to turn it back into a "live" Preact signal, in the current thread’s JavaScript environment:
```ts
import {signal} from '@preact/signals-core';
import {createThreadFromWebWorker} from '@quilted/threads';
import {acceptThreadSignal, type ThreadSignal} from '@quilted/threads/signals';
const thread = createThreadFromWebWorker(self, {
expose: {calculateResult},
});
function calculateResult(resultThreadSignal: ThreadSignal<number>) {
const result = acceptThreadSignal(resultThreadSignal);
const correctedResult = await figureOutResult();
result.value = correctedAge;
}
```
Both `createThreadSignal()` and `acceptThreadSignal()` accept an optional second argument, which must be an options object. The only option accepted is `signal`, which is an `AbortSignal` that allows you to stop synchronizing the Preact signal’s value between threads.

@@ -1,4 +0,4 @@

export const RETAIN_METHOD = Symbol.for('Threads::Retain');
export const RELEASE_METHOD = Symbol.for('Threads::Release');
export const RETAINED_BY = Symbol.for('Threads::RetainedBy');
export const ENCODE_METHOD = Symbol.for('Threads::Encode');
export const RETAIN_METHOD = Symbol.for('quilt.threads.retain');
export const RELEASE_METHOD = Symbol.for('quilt.threads.release');
export const RETAINED_BY = Symbol.for('quilt.threads.retained-by');
export const ENCODE_METHOD = Symbol.for('quilt.threads.encode');

@@ -1,4 +0,1 @@

export {
createBasicEncoder,
createBasicEncoderWithOverrides,
} from './encoding/basic.ts';
export {createBasicEncoder} from './encoding/basic.ts';

@@ -1,15 +0,8 @@

import {
RETAINED_BY,
RETAIN_METHOD,
ENCODE_METHOD,
RELEASE_METHOD,
} from '../constants.ts';
import {ENCODE_METHOD} from '../constants.ts';
import type {
ThreadEncodingStrategy,
ThreadEncodingStrategyApi,
ThreadEncoder,
ThreadEncoderApi,
ThreadEncodable,
AnyFunction,
} from '../types.ts';
import {
StackFrame,
isBasicObject,

@@ -23,2 +16,3 @@ isMemoryManageable,

const SET = '_@s';
const URL_ID = '_@u';
const DATE = '_@d';

@@ -28,194 +22,120 @@ const REGEXP = '_@r';

export function createBasicEncoderWithOverrides({
encode: encodeOverride,
decode: decodeOverride,
}: {
encode?(
value: unknown,
api: ThreadEncodingStrategyApi & Pick<ThreadEncodingStrategy, 'encode'>,
): ReturnType<ThreadEncodingStrategy['encode']> | undefined;
decode?(
value: unknown,
retainedBy: Iterable<MemoryRetainer> | undefined,
api: ThreadEncodingStrategyApi & Pick<ThreadEncodingStrategy, 'decode'>,
): unknown;
} = {}) {
function createBasicEncoder(
api: ThreadEncodingStrategyApi,
): ThreadEncodingStrategy {
const functionsToId = new Map<AnyFunction, string>();
const idsToFunction = new Map<string, AnyFunction>();
const idsToProxy = new Map<string, AnyFunction>();
/**
* Creates an encoder that converts most common JavaScript types into a format
* that can be transferred via message passing.
*/
export function createBasicEncoder(): ThreadEncoder {
return {
encode,
decode,
};
const encodeOverrideApi = {...api, encode};
const decodeOverrideApi = {...api, decode};
type EncodeResult = ReturnType<ThreadEncoder['encode']>;
return {
encode,
decode,
call(id, args) {
const stackFrame = new StackFrame();
const func = idsToFunction.get(id);
function encode(
value: unknown,
api: ThreadEncoderApi,
seen: Map<any, EncodeResult> = new Map(),
): EncodeResult {
if (value == null) return [value];
if (func == null) {
throw new Error(
'You attempted to call a function that was already released.',
);
}
const seenValue = seen.get(value);
if (seenValue) return seenValue;
const retainedBy = isMemoryManageable(func)
? [stackFrame, ...func[RETAINED_BY]]
: [stackFrame];
seen.set(value, [undefined]);
const result = func(...(decode(args, retainedBy) as any[]));
if (typeof value === 'object') {
const transferables: Transferable[] = [];
const encodeValue = (value: any) => {
const [fieldValue, nestedTransferables = []] = encode(value, api, seen);
transferables.push(...nestedTransferables);
return fieldValue;
};
if (result == null || typeof result.then !== 'function') {
stackFrame.release();
}
if (typeof (value as any)[ENCODE_METHOD] === 'function') {
const result = (value as ThreadEncodable)[ENCODE_METHOD]({
encode: encodeValue,
});
return (async () => {
try {
const resolved = await result;
return resolved;
} finally {
stackFrame.release();
}
})();
},
release(id) {
const func = idsToFunction.get(id);
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
if (func) {
idsToFunction.delete(id);
functionsToId.delete(func);
}
},
terminate() {
functionsToId.clear();
idsToFunction.clear();
idsToProxy.clear();
},
};
return fullResult;
}
type EncodeResult = ReturnType<ThreadEncodingStrategy['encode']>;
if (Array.isArray(value)) {
const result = value.map((item) => encodeValue(item));
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
function encode(
value: unknown,
seen: Map<any, EncodeResult> = new Map(),
): EncodeResult {
if (value == null) return [value];
if (value instanceof RegExp) {
const result = [{[REGEXP]: [value.source, value.flags]}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
const seenValue = seen.get(value);
if (seenValue) return seenValue;
if (value instanceof URL) {
const result = [{[URL_ID]: value.href}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
seen.set(value, [undefined]);
if (value instanceof Date) {
const result = [{[DATE]: value.toISOString()}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
const override = encodeOverride?.(value, encodeOverrideApi);
if (value instanceof Map) {
const entries = [...value.entries()].map(([key, value]) => {
return [encodeValue(key), encodeValue(value)];
});
const result = [{[MAP]: entries}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
if (override !== undefined) {
seen.set(value, override);
return override;
if (value instanceof Set) {
const entries = [...value].map((entry) => encodeValue(entry));
const result = [{[SET]: entries}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
if (typeof value === 'object') {
const transferables: Transferable[] = [];
const encodeValue = (value: any) => {
const [fieldValue, nestedTransferables = []] = encode(value, seen);
transferables.push(...nestedTransferables);
return fieldValue;
};
const valueIsIterator = isIterator(value);
if (typeof (value as any)[ENCODE_METHOD] === 'function') {
const result = (value as ThreadEncodable)[ENCODE_METHOD]({
encode: encodeValue,
});
if (isBasicObject(value) || valueIsIterator) {
const result: Record<string, any> = {};
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
for (const key of Object.keys(value)) {
result[key] = encodeValue((value as any)[key]);
}
if (Array.isArray(value)) {
const result = value.map((item) => encodeValue(item));
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
if (valueIsIterator) {
result.next ??= encodeValue((value as any).next.bind(value));
result.return ??= encodeValue((value as any).return.bind(value));
result.throw ??= encodeValue((value as any).throw.bind(value));
result[ASYNC_ITERATOR] = true;
}
if (value instanceof RegExp) {
const result = [{[REGEXP]: [value.source, value.flags]}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
if (value instanceof Date) {
const result = [{[DATE]: value.toISOString()}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
if (value instanceof Map) {
const entries = [...value.entries()].map(([key, value]) => {
return [encodeValue(key), encodeValue(value)];
});
const result = [{[MAP]: entries}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
if (value instanceof Set) {
const entries = [...value].map((entry) => encodeValue(entry));
const result = [{[SET]: entries}];
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
const valueIsIterator = isIterator(value);
if (isBasicObject(value) || valueIsIterator) {
const result: Record<string, any> = {};
for (const key of Object.keys(value)) {
result[key] = encodeValue((value as any)[key]);
}
if (valueIsIterator) {
result.next ??= encodeValue((value as any).next.bind(value));
result.return ??= encodeValue((value as any).return.bind(value));
result.throw ??= encodeValue((value as any).throw.bind(value));
result[ASYNC_ITERATOR] = true;
}
const fullResult: EncodeResult = [result, transferables];
seen.set(value, fullResult);
return fullResult;
}
return fullResult;
}
}
if (typeof value === 'function') {
if (functionsToId.has(value)) {
const id = functionsToId.get(value)!;
const result: EncodeResult = [{[FUNCTION]: id}];
seen.set(value, result);
return result;
}
if (typeof value === 'function') {
const id = api.functions?.add(value);
const id = api.uuid();
if (id == null) return [id];
functionsToId.set(value, id);
idsToFunction.set(id, value);
const result: EncodeResult = [{[FUNCTION]: id}];
seen.set(value, result);
return result;
}
const result: EncodeResult = [value];
const result: EncodeResult = [{[FUNCTION]: id}];
seen.set(value, result);

@@ -226,123 +146,82 @@

function decode(
value: unknown,
retainedBy?: Iterable<MemoryRetainer>,
): any {
const override = decodeOverride?.(value, retainedBy, decodeOverrideApi);
const result: EncodeResult = [value];
seen.set(value, result);
if (override !== undefined) return override;
return result;
}
if (typeof value === 'object') {
if (value == null) {
return value as any;
}
function decode(
value: unknown,
api: ThreadEncoderApi,
retainedBy?: Iterable<MemoryRetainer>,
): any {
if (typeof value === 'object') {
if (value == null) {
return value as any;
}
if (Array.isArray(value)) {
return value.map((value) => decode(value, retainedBy));
}
if (Array.isArray(value)) {
return value.map((value) => decode(value, api, retainedBy));
}
if (REGEXP in value) {
return new RegExp(...(value as {[REGEXP]: [string, string]})[REGEXP]);
}
if (REGEXP in value) {
return new RegExp(...(value as {[REGEXP]: [string, string]})[REGEXP]);
}
if (DATE in value) {
return new Date((value as {[DATE]: string})[DATE]);
}
if (URL_ID in value) {
return new URL((value as {[URL_ID]: string})[URL_ID]);
}
if (MAP in value) {
return new Map(
(value as {[MAP]: [any, any]})[MAP].map(([key, value]) => [
decode(key, retainedBy),
decode(value, retainedBy),
]),
);
}
if (DATE in value) {
return new Date((value as {[DATE]: string})[DATE]);
}
if (SET in value) {
return new Set(
(value as {[SET]: any[]})[SET].map((entry) =>
decode(entry, retainedBy),
),
);
}
if (MAP in value) {
return new Map(
(value as {[MAP]: [any, any]})[MAP].map(([key, value]) => [
decode(key, api, retainedBy),
decode(value, api, retainedBy),
]),
);
}
if (FUNCTION in value) {
const id = (value as {[FUNCTION]: string})[FUNCTION];
if (SET in value) {
return new Set(
(value as {[SET]: any[]})[SET].map((entry) =>
decode(entry, api, retainedBy),
),
);
}
if (idsToProxy.has(id)) {
return idsToProxy.get(id)! as any;
}
if (FUNCTION in value) {
const id = (value as {[FUNCTION]: string})[FUNCTION];
let retainCount = 0;
let released = false;
const func = api.functions?.get(id);
const release = () => {
retainCount -= 1;
if (retainCount === 0) {
released = true;
idsToProxy.delete(id);
api.release(id);
}
};
const retain = () => {
retainCount += 1;
};
const retainers = new Set(retainedBy);
const proxy = (...args: any[]) => {
if (released) {
throw new Error(
'You attempted to call a function that was already released.',
);
}
if (!idsToProxy.has(id)) {
throw new Error(
'You attempted to call a function that was already revoked.',
);
}
return api.call(id, args);
};
Object.defineProperties(proxy, {
[RELEASE_METHOD]: {value: release, writable: false},
[RETAIN_METHOD]: {value: retain, writable: false},
[RETAINED_BY]: {value: retainers, writable: false},
});
for (const retainer of retainers) {
retainer.add(proxy as any);
if (retainedBy && isMemoryManageable(func)) {
for (const retainer of retainedBy) {
retainer.add(func);
}
}
idsToProxy.set(id, proxy);
return func;
}
return proxy as any;
}
const result: Record<string | symbol, any> = {};
const result: Record<string | symbol, any> = {};
for (const key of Object.keys(value)) {
if (key === ASYNC_ITERATOR) {
result[Symbol.asyncIterator] = () => result;
} else {
result[key] = decode((value as any)[key], retainedBy);
}
for (const key of Object.keys(value)) {
if (key === ASYNC_ITERATOR) {
result[Symbol.asyncIterator] = () => result;
} else {
result[key] = decode((value as any)[key], api, retainedBy);
}
return result;
}
return value;
return result;
}
return value;
}
return createBasicEncoder;
}
export const createBasicEncoder = createBasicEncoderWithOverrides();
function isIterator(value: any) {

@@ -349,0 +228,0 @@ return (

@@ -1,3 +0,1 @@

export {createThread} from './thread.ts';
export type {ThreadOptions} from './thread.ts';
export {retain, release, StackFrame, isMemoryManageable} from './memory.ts';

@@ -12,14 +10,16 @@ export type {MemoryManageable, MemoryRetainer} from './memory.ts';

export {
targetFromIframe,
targetFromInsideIframe,
targetFromWebWorker,
targetFromMessagePort,
targetFromBrowserWebSocket,
createThread,
createThreadFromBrowserWebSocket,
createThreadFromIframe,
createThreadFromInsideIframe,
createThreadFromMessagePort,
createThreadFromWebWorker,
type ThreadOptions,
} from './targets.ts';
export {createBasicEncoder} from './encoding.ts';
export {
createBasicEncoder,
createBasicEncoderWithOverrides,
} from './encoding.ts';
export {createThreadAbortSignal, acceptThreadAbortSignal} from './abort.ts';
export type {ThreadAbortSignal} from './abort.ts';
createThreadAbortSignal,
acceptThreadAbortSignal,
type ThreadAbortSignal,
} from './abort-signal.ts';
export type {

@@ -29,9 +29,10 @@ Thread,

ThreadCallable,
ThreadExposable,
ThreadCallableFunction,
ThreadSafeArgument,
ThreadSafeReturnType,
ThreadEncodingStrategy,
ThreadEncodingStrategyApi,
ThreadSafeReturnValueType,
ThreadEncoder,
ThreadEncoderApi,
ThreadEncodable,
AnyFunction,
} from './types.ts';

@@ -7,2 +7,7 @@ import {RETAINED_BY, RETAIN_METHOD, RELEASE_METHOD} from './constants.ts';

/**
* A simple representation of a called function. This object allows this library to
* release references to functions immediately when the function call that transferred
* them into this thread is completed.
*/
export class StackFrame {

@@ -27,2 +32,5 @@ private readonly memoryManaged = new Set<MemoryManageable>();

/**
* Indicates that a value is being manually memory-managed across threads by this library.
*/
export function isMemoryManageable(value: unknown): value is MemoryManageable {

@@ -34,2 +42,21 @@ return Boolean(

/**
* Marks a value as being used so it will not be automatically released. Calling `retain` will,
* by default, deeply retain the value — that is, it will traverse into nested array elements
* and object properties, and retain every `retain`-able thing it finds.
*
* You will typically use this alongside also storing that value in a variable that lives outside
* the context of the function where that value was received.
*
* @example
* import {retain} from '@quilted/threads';
*
* const allUsers = new Set<User>();
*
* async function sayHello(user: User) {
* allUsers.add(user);
* retain(user);
* return `Hey, ${await user.fullName()}!`;
* }
*/
export function retain(value: any, {deep = true} = {}): boolean {

@@ -80,2 +107,17 @@ return retainInternal(value, deep, new Map());

/**
* Once you are no longer using the a `retain`-ed value, you can use this function to mark it as
* being unused. Like `retain()`, this function will apply to all nested array elements and object
* properties.
*
* @example
* import {retain} from '@quilted/threads';
*
* const allUsers = new Set<User>();
*
* function removeUser(user: User) {
* allUsers.delete(user);
* release(user);
* }
*/
export function release(value: any, {deep = true} = {}): boolean {

@@ -82,0 +124,0 @@ return releaseInternal(value, deep, new Map());

@@ -1,5 +0,6 @@

export {targetFromIframe} from './targets/iframe/iframe.ts';
export {targetFromInsideIframe} from './targets/iframe/nested.ts';
export {targetFromMessagePort} from './targets/message-port.ts';
export {targetFromBrowserWebSocket} from './targets/web-socket-browser.ts';
export {targetFromWebWorker} from './targets/web-worker.ts';
export {createThread, type ThreadOptions} from './targets/target.ts';
export {createThreadFromIframe} from './targets/iframe/iframe.ts';
export {createThreadFromInsideIframe} from './targets/iframe/nested.ts';
export {createThreadFromMessagePort} from './targets/message-port.ts';
export {createThreadFromBrowserWebSocket} from './targets/web-socket-browser.ts';
export {createThreadFromWebWorker} from './targets/web-worker.ts';

@@ -1,9 +0,41 @@

import {on} from '@quilted/events';
import type {ThreadTarget} from '../../types.ts';
import {NestedAbortController} from '@quilted/events';
import {
createThread,
type ThreadTarget,
type ThreadOptions,
} from '../target.ts';
import {CHECK_MESSAGE, RESPONSE_MESSAGE} from './shared.ts';
export function targetFromIframe(
/**
* Creates a thread from an iframe nested on a top-level document. To create
* a thread from the contents of this iframe, use `createThreadFromInsideIframe()`
* instead.
*
* @see https://developer.mozilla.org/en-US/docs/Web/HTML/Element/iframe
*
* @example
* import {createThreadFromIframe} from '@quilted/threads';
*
* const iframe = document.createElement('iframe');
* const thread = createThreadFromInsideIframe(iframe);
* await thread.sendMessage('Hello world!');
*/
export function createThreadFromIframe<
Self = Record<string, never>,
Target = Record<string, never>,
>(
iframe: HTMLIFrameElement,
{targetOrigin = '*'}: {targetOrigin?: string} = {},
): ThreadTarget {
{
targetOrigin = '*',
...options
}: ThreadOptions<Self, Target> & {
/**
* The target origin to use when sending `postMessage` events to the child frame.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage#targetorigin
* @default '*'
*/
targetOrigin?: string;
} = {},
) {
let connected = false;

@@ -16,3 +48,5 @@

const connectedPromise = new Promise<void>((resolve) => {
const abort = new AbortController();
const abort = options.signal
? new NestedAbortController(options.signal)
: new AbortController();

@@ -33,29 +67,38 @@ window.addEventListener(

abort.signal.addEventListener(
'abort',
() => {
resolve();
},
{once: true},
);
sendMessage(CHECK_MESSAGE);
});
return {
send(message, transfer) {
if (!connected) {
return connectedPromise.then(() => sendMessage(message, transfer));
}
return createThread(
{
send(message, transfer) {
if (!connected) {
return connectedPromise.then(() => {
if (connected) return sendMessage(message, transfer);
});
}
return sendMessage(message, transfer);
return sendMessage(message, transfer);
},
listen(listen, {signal}) {
self.addEventListener(
'message',
(event) => {
if (event.source !== iframe.contentWindow) return;
if (event.data === RESPONSE_MESSAGE) return;
listen(event.data);
},
{signal},
);
},
},
async *listen({signal}) {
const messages = on<WindowEventHandlersEventMap, 'message'>(
self,
'message',
{
signal,
},
);
for await (const message of messages) {
if (message.source !== iframe.contentWindow) continue;
if (message.data === RESPONSE_MESSAGE) continue;
yield message.data;
}
},
};
options,
);
}

@@ -1,8 +0,33 @@

import {on} from '@quilted/events';
import type {ThreadTarget} from '../../types.ts';
import {NestedAbortController} from '@quilted/events';
import {createThread, type ThreadOptions} from '../target.ts';
import {CHECK_MESSAGE, RESPONSE_MESSAGE} from './shared.ts';
export function targetFromInsideIframe({
/**
* Creates a thread from within an iframe nested in a top-level document. To create
* a thread from this iframe in the top-level document, use `createThreadFromIframe()`
* instead.
*
* @see https://developer.mozilla.org/en-US/docs/Web/HTML/Element/iframe
*
* @example
* import {createThreadFromInsideIframe} from '@quilted/threads';
*
* const thread = createThreadFromInsideIframe();
* await thread.sendMessage('Hello world!');
*/
export function createThreadFromInsideIframe<
Self = Record<string, never>,
Target = Record<string, never>,
>({
targetOrigin = '*',
}: {targetOrigin?: string} = {}): ThreadTarget {
...options
}: ThreadOptions<Self, Target> & {
/**
* The target origin to use when sending `postMessage` events to the parent frame.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage#targetorigin
* @default '*'
*/
targetOrigin?: string;
} = {}) {
if (typeof self === 'undefined' || self.parent == null) {

@@ -16,3 +41,5 @@ throw new Error(

const abort = new AbortController();
const abort = options.signal
? new NestedAbortController(options.signal)
: new AbortController();

@@ -23,5 +50,9 @@ const ready = () => {

// Handles wrappers that want to connect after the page has already loaded
self.addEventListener('message', ({data}) => {
if (data === CHECK_MESSAGE) respond();
});
self.addEventListener(
'message',
({data}) => {
if (data === CHECK_MESSAGE) respond();
},
{signal: options.signal},
);

@@ -48,21 +79,20 @@ respond();

return {
send(message, transfer) {
return parent.postMessage(message, targetOrigin, transfer);
return createThread(
{
send(message, transfer) {
return parent.postMessage(message, targetOrigin, transfer);
},
listen(listen, {signal}) {
self.addEventListener(
'message',
(event) => {
if (event.data === CHECK_MESSAGE) return;
listen(event.data);
},
{signal},
);
},
},
async *listen({signal}) {
const messages = on<WindowEventHandlersEventMap, 'message'>(
self,
'message',
{
signal,
},
);
for await (const message of messages) {
if (message.data === CHECK_MESSAGE) continue;
yield message.data;
}
},
};
options,
);
}

@@ -1,21 +0,44 @@

import {on} from '@quilted/events';
import type {ThreadTarget} from '../types.ts';
import {createThread, type ThreadOptions} from './target.ts';
export function targetFromMessagePort(port: MessagePort): ThreadTarget {
return {
send(...args: [any, Transferable[]]) {
port.postMessage(...args);
},
async *listen({signal}) {
const messages = on<MessagePortEventMap, 'message'>(port, 'message', {
signal,
});
/**
* Creates a thread from a `WebSocket` instance in the browser.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
*
* @example
* import {createThreadFromMessagePort} from '@quilted/threads';
*
* const channel = new MessageChannel();
* const threadOne = createThreadFromMessagePort(channel.port1);
* const threadTwo = createThreadFromMessagePort(channel.port2, {
* expose: {
* sendMessage: (message) => console.log(message),
* },
* });
*
* await threadOne.sendMessage('Hello world!');
*/
export function createThreadFromMessagePort<
Self = Record<string, never>,
Target = Record<string, never>,
>(port: MessagePort, options?: ThreadOptions<Self, Target>) {
return createThread(
{
send(...args: [any, Transferable[]]) {
port.postMessage(...args);
},
listen(listener, {signal}) {
port.addEventListener(
'message',
(event) => {
listener(event.data);
},
{signal},
);
port.start();
for await (const message of messages) {
yield message.data;
}
port.start();
},
},
};
options,
);
}

@@ -1,29 +0,48 @@

import {on, once} from '@quilted/events';
import type {ThreadTarget} from '../types.ts';
import {createThread, type ThreadOptions} from './target.ts';
export function targetFromBrowserWebSocket(websocket: WebSocket): ThreadTarget {
return {
async send(message) {
if (websocket.readyState !== websocket.OPEN) {
await once(websocket, 'open');
}
/**
* Creates a thread from a `WebSocket` instance in the browser.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
*
* @example
* import {createThreadFromBrowserWebSocket} from '@quilted/threads';
*
* const websocket = new WebSocket('ws://localhost:8080');
* const thread = createThreadFromBrowserWebSocket(websocket);
* await thread.sendMessage('Hello world!');
*/
export function createThreadFromBrowserWebSocket<
Self = Record<string, never>,
Target = Record<string, never>,
>(websocket: WebSocket, options?: ThreadOptions<Self, Target>) {
return createThread(
{
async send(message) {
if (websocket.readyState !== websocket.OPEN) {
await new Promise<void>((resolve) => {
websocket.addEventListener(
'open',
() => {
resolve();
},
{once: true},
);
});
}
websocket.send(JSON.stringify(message));
websocket.send(JSON.stringify(message));
},
listen(listener, {signal}) {
websocket.addEventListener(
'message',
(event) => {
listener(JSON.parse(event.data));
},
{signal},
);
},
},
async *listen({signal}) {
const messages = on<WebSocketEventMap, 'message'>(websocket, 'message', {
signal,
});
if (websocket.readyState !== websocket.OPEN) {
await once(websocket, 'open', {signal});
}
if (signal?.aborted) return;
for await (const message of messages) {
yield JSON.parse(message.data);
}
},
};
options,
);
}

@@ -1,19 +0,43 @@

import {on} from '@quilted/events';
import type {ThreadTarget} from '../types.ts';
import {createThread, type ThreadOptions} from './target.ts';
export function targetFromWebWorker(worker: Worker): ThreadTarget {
return {
send(...args: [any, Transferable[]]) {
worker.postMessage(...args);
/**
* Creates a thread from a web worker. This function can be used either from a JavaScript
* environment that *created* a web worker, or from within a web worker that has been
* created.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers
*
* @example
* import {createThreadFromWebWorker} from '@quilted/threads';
*
* // If inside a web worker:
* const thread = createThreadFromWebWorker(self);
*
* // If in an environment that creates a worker:
* const worker = new Worker('worker.js');
* const thread = createThreadFromWebWorker(worker);
*
* await thread.sendMessage('Hello world!');
*/
export function createThreadFromWebWorker<
Self = Record<string, never>,
Target = Record<string, never>,
>(worker: Worker, options?: ThreadOptions<Self, Target>) {
return createThread(
{
send(...args: [any, Transferable[]]) {
worker.postMessage(...args);
},
listen(listener, {signal}) {
worker.addEventListener(
'message',
(event) => {
listener(event.data);
},
{signal},
);
},
},
async *listen({signal}) {
const messages = on<WorkerEventMap, 'message'>(worker, 'message', {
signal,
});
for await (const message of messages) {
yield message.data;
}
},
};
options,
);
}
import {describe, it, expect} from '@quilted/testing';
import {createThread, targetFromMessagePort} from '../index.ts';
import {MessageChannel} from './utiltiies.ts';
import {createThreadFromMessagePort, type ThreadCallable} from '../index.ts';
import {MessageChannel} from './utilities.ts';

@@ -12,7 +12,8 @@ describe('thread', () => {

const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1);
createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {hello: () => 'world'},

@@ -25,12 +26,13 @@ });

it('proxies function calls', async () => {
interface EndpointApi {
type EndpointApi = ThreadCallable<{
greet(getName: () => string): string;
}
}>;
const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1);
createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {

@@ -45,10 +47,11 @@ greet: async (getName) => `Hello, ${await getName()}!`,

it('proxies generators', async () => {
interface EndpointApi {
type EndpointApi = ThreadCallable<{
iterate(): Generator<number, void, void>;
}
}>;
const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1);

@@ -58,3 +61,3 @@ let yielded = 0;

createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {

@@ -80,5 +83,6 @@ *iterate() {

const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1);

@@ -88,3 +92,3 @@ let yielded = 0;

createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {

@@ -112,8 +116,8 @@ async *iterate() {

const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
{signal: abort.signal},
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1, {signal: abort.signal});
createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {

@@ -130,5 +134,5 @@ greet: () => 'Hello, world!',

it('rejects all in-flight requests when a thread terminates', async () => {
interface EndpointApi {
type EndpointApi = ThreadCallable<{
greet(): string;
}
}>;

@@ -138,8 +142,8 @@ const abort = new AbortController();

const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
{signal: abort.signal},
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1, {signal: abort.signal});
createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
expose: {

@@ -159,5 +163,5 @@ // eslint-disable-next-line @typescript-eslint/no-empty-function

it('rejects all in-flight requests when a target thread terminates', async () => {
interface EndpointApi {
type EndpointApi = ThreadCallable<{
greet(): string;
}
}>;

@@ -167,7 +171,8 @@ const abort = new AbortController();

const {port1, port2} = new MessageChannel();
const threadOne = createThread<Record<string, never>, EndpointApi>(
targetFromMessagePort(port1),
);
const threadOne = createThreadFromMessagePort<
Record<string, never>,
EndpointApi
>(port1);
createThread<EndpointApi>(targetFromMessagePort(port2), {
createThreadFromMessagePort<EndpointApi>(port2, {
signal: abort.signal,

@@ -174,0 +179,0 @@ expose: {

@@ -8,23 +8,40 @@ import type {

/**
* A thread represents a target JavaScript environment that exposes a set
* of callable, asynchronous methods. The thread takes care of automatically
* encoding and decoding its arguments and return values, so you can interact
* with it as if its methods were implemented in the same environment as your
* own code.
*/
export type Thread<Target> = ThreadCallable<Target>;
/**
* An object backing a `Thread` that provides the message-passing interface
* that allows communication to flow between environments. This message-passing
* interface is based on the [`postMessage` interface](https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage),
* which is easily adaptable to many JavaScript objects and environments.
*/
export interface ThreadTarget {
/**
* Sends a message to the target thread. The message will be encoded before sending,
* and the consumer may also pass an array of "transferable" objects that should be
* transferred (rather than copied) to the other environment, if supported.
*/
send(message: any, transferables?: Transferable[]): void;
listen(options: {signal?: AbortSignal}): AsyncGenerator<any, void, void>;
}
export interface ThreadExposableFunction<Args extends any[], ReturnType> {
(...args: ThreadSafeArgument<Args>): ReturnType extends Promise<any>
? ReturnType
: ReturnType extends AsyncGenerator<any, any, any>
? ReturnType
: ReturnType | Promise<ReturnType>;
/**
* Listens for messages coming in to the thread. This method must call the provided
* listener for each message as it is received. The thread will then decode the message
* and handle its content. This method may be passed an `AbortSignal` to abort the
* listening process.
*/
listen(listener: (value: any) => void, options: {signal?: AbortSignal}): void;
}
export type ThreadExposable<T> = {
[K in keyof T]: T[K] extends (...args: infer Args) => infer ReturnType
? ThreadExposableFunction<Args, ReturnType>
: never;
};
/**
* A function type that can be called over a thread. It is the same as defining a
* normal function type, but with the additional restriction that the function must
* always return an asynchronous value (either a promise or an async generator). Additionally,
* all arguments to that function must also be thread-callable
*/
export interface ThreadCallableFunction<Args extends any[], ReturnType> {

@@ -34,2 +51,6 @@ (...args: ThreadSafeArgument<Args>): ThreadSafeReturnType<ReturnType>;

/**
* A mapped object type that takes an object with methods, and converts it into the
* an object with the same methods that can be called over a thread.
*/
export type ThreadCallable<T> = {

@@ -43,20 +64,57 @@ [K in keyof T]: T[K] extends (...args: infer Args) => infer ReturnType

export type ThreadSafeReturnType<T> = T extends AsyncGenerator<any, any, any>
? T
/**
* Converts the return type of a function into the type it will be when
* passed over a thread.
*/
export type ThreadSafeReturnType<T> = T extends AsyncGenerator<
infer T,
infer R,
infer N
>
? AsyncGenerator<
ThreadSafeReturnValueType<T>,
ThreadSafeReturnValueType<R>,
ThreadSafeReturnValueType<N>
>
: T extends Generator<infer T, infer R, infer N>
? AsyncGenerator<T, R, N>
: T extends Promise<any>
? T
?
| Generator<
ThreadSafeReturnValueType<T>,
ThreadSafeReturnValueType<R>,
ThreadSafeReturnValueType<N>
>
| AsyncGenerator<
ThreadSafeReturnValueType<T>,
ThreadSafeReturnValueType<R>,
ThreadSafeReturnValueType<N>
>
: T extends Promise<infer U>
? Promise<ThreadSafeReturnValueType<U>>
: T extends infer U | Promise<infer U>
? Promise<U>
: T extends (...args: infer Args) => infer TypeReturned
? (...args: Args) => ThreadSafeReturnType<TypeReturned>
? Promise<ThreadSafeReturnValueType<U>>
: Promise<ThreadSafeReturnValueType<T>>;
/**
* Converts an object into the type it will be when passed over a thread.
*/
export type ThreadSafeReturnValueType<T> = T extends (
...args: infer Args
) => infer ReturnType
? ThreadCallableFunction<Args, ReturnType>
: T extends (infer ArrayElement)[]
? ThreadSafeReturnType<ArrayElement>[]
? ThreadSafeReturnValueType<ArrayElement>[]
: T extends readonly (infer ArrayElement)[]
? readonly ThreadSafeReturnType<ArrayElement>[]
? readonly ThreadSafeReturnValueType<ArrayElement>[]
: T extends Set<infer U>
? Set<ThreadSafeReturnValueType<U>>
: T extends Map<infer K, infer U>
? Map<K, ThreadSafeReturnValueType<U>>
: T extends object
? {[K in keyof T]: ThreadSafeReturnType<T[K]>}
? {[K in keyof T]: ThreadSafeReturnValueType<T[K]>}
: T;
/**
* Converts an object into the type it could be if accepted as an argument to a function
* called over a thread.
*/
export type ThreadSafeArgument<T> = T extends (

@@ -76,2 +134,5 @@ ...args: infer Args

/**
* An object that can retain a reference to a `MemoryManageable` object.
*/
export interface MemoryRetainer {

@@ -81,2 +142,6 @@ add(manageable: MemoryManageable): void;

/**
* An object transferred between threads that must have its memory manually managed,
* in order to release the reference to a corresponding object on the original thread.
*/
export interface MemoryManageable {

@@ -88,20 +153,48 @@ readonly [RETAINED_BY]: Set<MemoryRetainer>;

export interface ThreadEncodingStrategy {
encode(value: unknown): [any, Transferable[]?];
decode(value: unknown, retainedBy?: Iterable<MemoryRetainer>): unknown;
call(id: string, args: any[]): any;
release(id: string): void;
terminate?(): void;
}
/**
* An object that can encode and decode values communicated between two threads.
*/
export interface ThreadEncoder {
/**
* Encodes a value before sending it to another thread. Should return a tuple where
* the first item is the encoded value, and the second item is an array of elements
* that can be transferred to the other thread, instead of being copied.
*/
encode(value: unknown, api: ThreadEncoderApi): [any, Transferable[]?];
export interface ThreadEncodingStrategyApi {
uuid(): string;
release(id: string): void;
call(
id: string,
args: any[],
/**
* Decodes a value received from another thread.
*/
decode(
value: unknown,
api: ThreadEncoderApi,
retainedBy?: Iterable<MemoryRetainer>,
): Promise<any>;
): unknown;
}
export interface ThreadEncoderApi {
/**
* Controls how the thread encoder will handle functions.
*/
functions?: {
/**
* Retrieve a function by its serialized ID. This function will be called while
* decoding responses from the other "side" of a thread. The implementer of this
* API should return a proxy function that will call the function on the other
* thread, or `undefined` to prevent the function from being being decoded.
*/
get(id: string): AnyFunction | undefined;
/**
* Stores a function during encoding. The implementer of this API should return
* a unique ID for the function, or `undefined` to prevent the function from
* being encoded.
*/
add(func: AnyFunction): string | undefined;
};
}
/**
* An object that provides a custom process to encode its value.
*/
export interface ThreadEncodable {

@@ -108,0 +201,0 @@ [ENCODE_METHOD](api: {encode(value: any): unknown}): any;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc