Socket
Socket
Sign inDemoInstall

nanolith

Package Overview
Dependencies
2
Maintainers
1
Versions
95
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.3.2-beta1 to 0.3.2-beta2

2

dist/messenger/messenger.d.ts

@@ -75,3 +75,3 @@ /// <reference types="node" resolution-mode="require"/>

*/
onStream(callback: ConfirmStreamCallback<Messagable>): void;
onStream(callback: ConfirmStreamCallback<Messagable>): RemoveListenerFunction;
/**

@@ -78,0 +78,0 @@ * Listen for messages coming to the `Messenger`.

@@ -1,1 +0,1 @@

import{randomUUID as v4}from"crypto";import{BroadcastChannel}from"worker_threads";import{isMessengerTransferObject}from"./utilities.js";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export class Messenger{#e;#s=[];#t=[];#a=v4();#r;#n={on:(e,s)=>{this.#t.push(s)},off:(e,s)=>{const t=this.#t.indexOf(s);t<=-1||this.#t.splice(t,1)},postMessage:e=>{const s={type:"stream-message",sender:this.#a,data:e};this.#e.postMessage(s)}};#i=!1;constructor(e){if(e&&"string"!=typeof e&&!isMessengerTransferObject(e))throw new Error("Must either provide a string to create a new Messenger, or a MessengerTransferData object.");e&&"string"!=typeof e?(this.#r=e.__messengerID,this.#e=new BroadcastChannel(e.__messengerID)):(this.#r="string"==typeof e?e:v4(),this.#e=new BroadcastChannel(this.#r)),this.#e.unref(),this.#c()}#c(){this.#e.onmessage=async e=>{const{data:s}=e;switch(s?.type){case"close":return this.#e.close();case"stream-message":if(s.sender===this.#a)return;if(["stream-start","stream-chunk","stream-finished"].includes(s?.data?.type)&&!this.#i)return;this.#t.map((e=>e(s.data)));break;case"message":if(s.sender===this.#a)return;await Promise.all(this.#s.map((e=>e(s.data))))}}}get ID(){return this.#r}get uniqueKey(){return this.#a}createStream(e){return prepareWritableToPortStream(this.#n,e??{})}onStream(e){this.#i=!0,listenForStream(this.#n,e,1)}onMessage(e){return this.#s.push(e),()=>this.#l(e)}async waitForMessage(e){return new Promise((s=>{const t=async a=>{await e(a)&&(s(a),this.#l(t))};this.onMessage(t)}))}#l(e){const s=this.#s.indexOf(e);s<=-1||this.#s.splice(s,1)}sendMessage(e){const s={type:"message",sender:this.#a,data:e};this.#e.postMessage(s)}get transfer(){return Object.freeze({__messengerID:this.#r})}setRef(e){if(e)return this.#e.ref();this.#e.unref()}close(){this.#e.close(),this.#s=[]}closeAll(){const e={sender:this.#a,type:"close"};this.#e.postMessage(e)}}
import{randomUUID as v4}from"crypto";import{BroadcastChannel}from"worker_threads";import{isMessengerTransferObject}from"./utilities.js";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export class Messenger{#e;#s=[];#t=[];#a=v4();#r;#n={on:(e,s)=>{this.#t.push(s)},off:(e,s)=>{const t=this.#t.indexOf(s);t<=-1||this.#t.splice(t,1)},postMessage:e=>{const s={type:"stream-message",sender:this.#a,data:e};this.#e.postMessage(s)}};#i=!1;constructor(e){if(e&&"string"!=typeof e&&!isMessengerTransferObject(e))throw new Error("Must either provide a string to create a new Messenger, or a MessengerTransferData object.");e&&"string"!=typeof e?(this.#r=e.__messengerID,this.#e=new BroadcastChannel(e.__messengerID)):(this.#r="string"==typeof e?e:v4(),this.#e=new BroadcastChannel(this.#r)),this.#e.unref(),this.#c()}#c(){this.#e.onmessage=async e=>{const{data:s}=e;switch(s?.type){case"close":return this.#e.close();case"stream-message":if(s.sender===this.#a)return;if(["stream-start","stream-chunk","stream-finished"].includes(s?.data?.type)&&!this.#i)return;this.#t.map((e=>e(s.data)));break;case"message":if(s.sender===this.#a)return;await Promise.all(this.#s.map((e=>e(s.data))))}}}get ID(){return this.#r}get uniqueKey(){return this.#a}createStream(e){return prepareWritableToPortStream(this.#n,e??{})}onStream(e){return this.#i=!0,listenForStream(this.#n,e,1)}onMessage(e){return this.#s.push(e),()=>this.#l(e)}async waitForMessage(e){return new Promise((s=>{const t=async a=>{await e(a)&&(s(a),this.#l(t))};this.onMessage(t)}))}#l(e){const s=this.#s.indexOf(e);s<=-1||this.#s.splice(s,1)}sendMessage(e){const s={type:"message",sender:this.#a,data:e};this.#e.postMessage(s)}get transfer(){return Object.freeze({__messengerID:this.#r})}setRef(e){if(e)return this.#e.ref();this.#e.unref()}close(){this.#e.close(),this.#s=[]}closeAll(){const e={sender:this.#a,type:"close"};this.#e.postMessage(e)}}

@@ -79,3 +79,3 @@ /// <reference types="node" resolution-mode="require"/>

/**
* Send a single message to all services on the cluster.
* Send the same message to all services on the cluster.
*

@@ -82,0 +82,0 @@ * @param data The data to send to the service.

@@ -1,1 +0,1 @@

import{randomUUID as v4}from"crypto";import{pool}from"../pool/index.js";import{Service}from"../service/index.js";export class ServiceCluster{#e;#r=new Map;constructor(e){this.#e=e}get activeServices(){return this.#r.size}get currentServices(){return[...this.#r.values()]}get activeServiceCalls(){return[...this.#r.values()].reduce(((e,r)=>e+r.service.activeCalls),0)}async launch(e,r={}){if(!e||1===e)return this.#s(r);const s=[];for(let i=1;i<=e;i++)s.push(this.#s(r));return Promise.all(s)}async#s(e={}){if(this.#r.size>=pool.maxConcurrency)return;const r=await this.#e.launchService(e);return this.#i(r),r}addService(e){e instanceof Service&&this.#i(e)}#i(e){const r=v4();this.#r.set(r,{service:e,identifier:r}),e.once("terminated",(()=>{this.#r.delete(r)}))}use(e){if("string"==typeof e&&this.#r.has(e))return this.#r.get(e);const r=[...this.#r.values()];if(!r.length)throw new Error("No running services found on this ServiceCluster!");return 1===r.length?r[0].service:r.reduce(((e,r)=>r.service.activeCalls<e.service.activeCalls?r:e),r[0]).service}notifyAll(e,r){this.#r.forEach((({service:s})=>{s.sendMessage(e,r)}))}closeAll(){const e=[...this.#r.values()].map((({service:e})=>e.close()));return Promise.all(e)}closeAllIdle(){const e=[...this.#r.values()].reduce(((e,{service:r})=>(r.activeCalls<=0&&e.push(r.close()),e)),[]);return Promise.all(e)}}
import{randomUUID as v4}from"crypto";import{pool}from"../pool/index.js";import{Service}from"../service/index.js";export class ServiceCluster{#e;#r=new Map;constructor(e){this.#e=e}get activeServices(){return this.#r.size}get currentServices(){return[...this.#r.values()]}get activeServiceCalls(){return[...this.#r.values()].reduce(((e,r)=>e+r.service.activeCalls),0)}async launch(e,r={}){if(!e||1===e)return this.#i(r);const i=[];for(let s=1;s<=e;s++)i.push(this.#i(r));return Promise.all(i)}async#i(e={}){if(this.#r.size>=pool.maxConcurrency)return;const r=await this.#e.launchService(e);return this.#s(r),r}addService(e){if(!(e instanceof Service))throw new Error("Can only provide Service instances to .addService().");this.#s(e)}#s(e){const r=v4();this.#r.set(r,{service:e,identifier:r}),e.once("terminated",(()=>{this.#r.delete(r)}))}use(e){if("string"==typeof e&&this.#r.has(e))return this.#r.get(e);if(!this.#r.size)throw new Error("No running services found on this ServiceCluster!");const r=[...this.#r.values()];return 1===r.length?r[0].service:r.reduce(((e,r)=>r.service.activeCalls<e.service.activeCalls?r:e),r[0]).service}notifyAll(e,r){this.#r.forEach((({service:i})=>{i.sendMessage(e,r)}))}closeAll(){const e=[...this.#r.values()].map((({service:e})=>e.close()));return Promise.all(e)}closeAllIdle(){const e=[...this.#r.values()].reduce(((e,{service:r})=>(r.activeCalls<=0&&e.push(r.close()),e)),[]);return Promise.all(e)}}

@@ -60,3 +60,3 @@ /// <reference types="node" resolution-mode="require"/>

*/
declare function onStream(callback: OnStreamCallback<Exclude<typeof parentPort, null>>): void;
declare function onStream(callback: OnStreamCallback<Exclude<typeof parentPort, null>>): RemoveListenerFunction;
/**

@@ -63,0 +63,0 @@ * Create a {@link Writable} instance that can be piped into in order to stream data to

@@ -1,1 +0,1 @@

import{parentPort,workerData}from"worker_threads";import{assertIsNotMainThread}from"../utilities/index.js";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export const MainThread=Object.freeze({sendMessage:function(e,a){assertIsNotMainThread("MainThread.sendMessage");const r={type:0,data:e};parentPort.postMessage(r,a)},onMessage:function(e){assertIsNotMainThread("MainThread.onMessage");const a=async a=>{0===a.type&&await e(a.data)};return parentPort.on("message",a),()=>parentPort.off("message",a)},onMessengerReceived:function(e){assertIsNotMainThread("MainThread.onMessengerReceived");const{messengers:a}=workerData,r=async r=>{3===r.type&&await e(a[r.data.__messengerID])};return parentPort.on("message",r),()=>parentPort.off("message",r)},waitForMessage:async function(e){return assertIsNotMainThread("MainThread.waitForMessage"),new Promise((a=>{const r=async t=>{if(0!==t.type)return;const{data:s}=t;await e(s)&&(a(s),parentPort.off("message",r))};parentPort.on("message",r)}))},onStream:function(e){assertIsNotMainThread("MainThread.onStream"),listenForStream(parentPort,e)},createStream:async function(e){return assertIsNotMainThread("MainThread.stream"),prepareWritableToPortStream(parentPort,e??{})}});
import{parentPort,workerData}from"worker_threads";import{assertIsNotMainThread}from"../utilities/index.js";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export const MainThread=Object.freeze({sendMessage:function(e,a){assertIsNotMainThread("MainThread.sendMessage");const r={type:0,data:e};parentPort.postMessage(r,a)},onMessage:function(e){assertIsNotMainThread("MainThread.onMessage");const a=async a=>{0===a.type&&await e(a.data)};return parentPort.on("message",a),()=>parentPort.off("message",a)},onMessengerReceived:function(e){assertIsNotMainThread("MainThread.onMessengerReceived");const{messengers:a}=workerData,r=async r=>{3===r.type&&await e(a[r.data.__messengerID])};return parentPort.on("message",r),()=>parentPort.off("message",r)},waitForMessage:async function(e){return assertIsNotMainThread("MainThread.waitForMessage"),new Promise((a=>{const r=async t=>{if(0!==t.type)return;const{data:s}=t;await e(s)&&(a(s),parentPort.off("message",r))};parentPort.on("message",r)}))},onStream:function(e){return assertIsNotMainThread("MainThread.onStream"),listenForStream(parentPort,e)},createStream:async function(e){return assertIsNotMainThread("MainThread.stream"),prepareWritableToPortStream(parentPort,e??{})}});

@@ -88,3 +88,3 @@ /// <reference types="node" resolution-mode="require"/>

*/
onStream(callback: OnStreamCallback<typeof this['worker']>): void;
onStream(callback: OnStreamCallback<typeof this['worker']>): RemoveListenerFunction;
/**

@@ -91,0 +91,0 @@ * Listen for messages coming from the service.

@@ -1,1 +0,1 @@

import{randomUUID as v4}from"crypto";import{TypedEmitter}from"tiny-typed-emitter";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export class Service extends TypedEmitter{#e;#t=!1;#r=0;#s=new Map;constructor(e){super(),this.#e=e;const t=e=>{this.#s.forEach((({resolve:t,reject:r},s)=>{if(e.key===s)switch(e.type){case 3:t(e.data);break;case 4:r(e.data);break;default:return}}))};this.#e.on("message",t),this.#e.once("exit",(()=>{this.#t=!0,this.#s.clear(),this.emit("terminated"),e.off("message",t)}))}get activeCalls(){return this.#r}get closed(){return this.#t}get threadID(){return this.#e.threadId}get worker(){return this.#e}#a(){if(this.#t)throw new Error("Attempting to execute operations on a service with an exited process!")}async call({name:e,params:t,transferList:r}){this.#a();const s=v4();this.#r++;const a={type:1,name:e,params:t??[],key:s},i=new Promise(((e,t)=>{this.#i({key:s,resolve:e,reject:t})}));this.#e.postMessage(a,r);const o=await i;return this.#r--,o}#i({key:e,...t}){return this.#s.set(e,t),()=>{this.#s.delete(e)}}async close(){this.#t=!0,await this.#e.terminate()}sendMessage(e,t){this.#a();const r={type:0,data:e};this.#e.postMessage(r,t)}createStream(e){return prepareWritableToPortStream(this.#e,e??{})}onStream(e){listenForStream(this.#e,e)}onMessage(e){this.#a();const t=async t=>{0===t.type&&await e(t.data)};return this.#e.on("message",t),()=>this.#e.off("message",t)}async waitForMessage(e){return this.#a(),new Promise((t=>{const r=async s=>{if(0!==s.type)return;const{data:a}=s;e(a)&&(t(a),this.#e.off("message",r))};this.#e.on("message",r)}))}sendMessenger(e){this.#a();const t=e.transfer,r={type:3,data:t},s=new Promise((e=>{const r=s=>{5===s.type&&s.data===t.__messengerID&&(e(void 0),this.#e.off("message",r))};this.#e.on("message",r)}));return this.#e.postMessage(r),s}}
import{randomUUID as v4}from"crypto";import{TypedEmitter}from"tiny-typed-emitter";import{listenForStream,prepareWritableToPortStream}from"../streams/index.js";export class Service extends TypedEmitter{#e;#t=!1;#r=0;#s=new Map;constructor(e){super(),this.#e=e;const t=e=>{this.#s.forEach((({resolve:t,reject:r},s)=>{if(e.key===s)switch(e.type){case 3:t(e.data);break;case 4:r(e.data);break;default:return}}))};this.#e.on("message",t),this.#e.once("exit",(()=>{this.#t=!0,this.#s.clear(),this.emit("terminated"),e.off("message",t)}))}get activeCalls(){return this.#r}get closed(){return this.#t}get threadID(){return this.#e.threadId}get worker(){return this.#e}#a(){if(this.#t)throw new Error("Attempting to execute operations on a service with an exited process!")}async call({name:e,params:t,transferList:r}){this.#a();const s=v4();this.#r++;const a={type:1,name:e,params:t??[],key:s},i=new Promise(((e,t)=>{this.#i({key:s,resolve:e,reject:t})}));this.#e.postMessage(a,r);const o=await i;return this.#r--,o}#i({key:e,...t}){return this.#s.set(e,t),()=>{this.#s.delete(e)}}async close(){this.#t=!0,await this.#e.terminate()}sendMessage(e,t){this.#a();const r={type:0,data:e};this.#e.postMessage(r,t)}createStream(e){return prepareWritableToPortStream(this.#e,e??{})}onStream(e){return listenForStream(this.#e,e)}onMessage(e){this.#a();const t=async t=>{0===t.type&&await e(t.data)};return this.#e.on("message",t),()=>this.#e.off("message",t)}async waitForMessage(e){return this.#a(),new Promise((t=>{const r=async s=>{if(0!==s.type)return;const{data:a}=s;e(a)&&(t(a),this.#e.off("message",r))};this.#e.on("message",r)}))}sendMessenger(e){this.#a();const t=e.transfer,r={type:3,data:t},s=new Promise((e=>{const r=s=>{5===s.type&&s.data===t.__messengerID&&(e(void 0),this.#e.off("message",r))};this.#e.on("message",r)}));return this.#e.postMessage(r),s}}
import { Bytes } from '../constants/shared_map.js';
import { TypedEmitter } from 'tiny-typed-emitter';
import type { SharedMapTransferData, SharedMapOptions, SetWithPreviousHandler } from '../types/shared_map.js';

@@ -10,3 +11,5 @@ import type { CleanKeyOf } from '../types/utilities.js';

*/
export declare class SharedMap<Data extends Record<string, any>> {
export declare class SharedMap<Data extends Record<string, any>> extends TypedEmitter<{
close: () => void;
}> {
#private;

@@ -20,7 +23,7 @@ /**

* A single ID assigned to the entire group of SharedMap instances using the
* allocated memory locations.
* shared memory buffers.
*/
get ID(): string;
/**
* An `enum` designed to help you when assigning a fixed byte size
* An enum designed to help with assigning a fixed byte size
* for the map's values.

@@ -30,3 +33,3 @@ */

/**
* An `enum` designed to help you when assigning a fixed byte size
* An enum designed to help with assigning a fixed byte size
* for the map's values.

@@ -50,3 +53,10 @@ */

get<KeyName extends CleanKeyOf<Data extends SharedMapTransferData<infer Type> ? Type : Data>>(name: KeyName): Promise<string | null>;
/**
* Watch a specific value on the map for changes.
*
* @param name The name of the key for the value to watch.
* @returns An object containing a `current` getter for the current value, and a `stopWatching()` function.
*/
watch<KeyName extends CleanKeyOf<Data extends SharedMapTransferData<infer Type> ? Type : Data>>(name: KeyName): Promise<Readonly<{
readonly changed: boolean;
readonly current: string | null;

@@ -53,0 +63,0 @@ stopWatching(): void;

@@ -1,1 +0,1 @@

import{randomUUID as v4}from"crypto";import{createSharedArrayBuffer,encodeValue,isSharedMapTransferData}from"./utilities.js";import*as Keys from"./keys.js";import{Bytes,NULL_ENCODED,ENCODER,DECODER}from"../constants/shared_map.js";import{BroadcastChannelEmitter}from"./broadcast_channel_emitter.js";export class SharedMap{#e;#t;#s=v4();#r;#n=null;#a=!1;get uniqueKey(){return this.#s}get ID(){return this.#r}static option=Bytes;option=Bytes;get transfer(){return this.#i(),Object.freeze({__keys:this.#e,__values:this.#t,__identifier:this.#r})}constructor(e,{bytes:t,multiplier:s=10}={}){if("object"!=typeof e)throw new Error("Can only provide objects to SharedMap.");if(isSharedMapTransferData(e))return this.#e=e.__keys,this.#t=e.__values,void(this.#r=e.__identifier);const r=Object.entries(e),{preppedKeys:n,preppedValues:a,totalLength:i}=r.reduce(((e,[t,s])=>{let r=encodeValue(ENCODER,s);r.byteLength<=0&&(r=NULL_ENCODED);const n=r.byteLength-1+e.totalLength,a=Keys.createKey({name:t,start:e.totalLength,end:n});return e.preppedKeys.push(a),e.preppedValues.push(r),e.totalLength+=r.byteLength,e}),{preppedKeys:[],preppedValues:[],totalLength:0}),o=ENCODER.encode(n.join());if(this.#e=createSharedArrayBuffer(o.byteLength*s||Bytes.kilobyte),this.#e.set(o),t&&t<i)throw new Error(`${t} isn't enough bytes to store all values. Total byteLength of values is ${i}.`);this.#t=createSharedArrayBuffer(t||i*s||3*Bytes.kilobyte),a.reduce(((e,t)=>(this.#t.set(t,e),e+=t.byteLength)),0),this.#r=v4(),this.#n=new BroadcastChannelEmitter(this.#r);const h=[];this.#n.on("push_to_queue",(e=>{h.push(e),h[0]===e&&this.#n?.send(`ready_${e}`)})),this.#n.on("remove_from_queue",(e=>{const t=h.indexOf(e);-1!==t&&h.splice(t,1),h[0]&&this.#n.send(`ready_${h[0]}`)}))}close(){this.#n&&this.#n.close(),this.#a=!0}#i(){if(this.#a)throw new Error("Cannot perform actions on a closed SharedMap instance!")}#o(){return new Promise((e=>{const t=new BroadcastChannelEmitter(this.#r),s=v4();t.once(`ready_${s}`,(()=>{e([s,t])})),t.send("push_to_queue",s)}))}async#h(e){const[t,s]=await this.#o(),r=await e(s);return s.send("remove_from_queue",t),s.close(),r}#c(e){const t=DECODER.decode(this.#e),s=Keys.matchKey(t,e);if(!s)return null;const{start:r,end:n}=Keys.parseKey(s);if(void 0===r||void 0===n)throw new Error("Failed to parse key");return DECODER.decode(this.#t.subarray(r,n+1))}get(e){return this.#i(),this.#h((()=>this.#c(e)))}async watch(e){const t=new BroadcastChannelEmitter(this.#r);let s=await this.get(e);return t.on(`value_changed_${e}`,(async()=>{s=await this.get(e)})),Object.freeze({get current(){return s},stopWatching(){t.close()}})}#l(e,t){const s=DECODER.decode(this.#e).replace(/\x00/g,""),r=s.match(/\d+(?=\);($|\x00))/g)?.[0];let n=encodeValue(ENCODER,t);if(n.byteLength<=0&&(n=NULL_ENCODED),!r||!Keys.createKeyRegex(e).test(s)){const t=r?+r+1:0,a=t+n.byteLength-1,i=Keys.createKey({name:e,start:t,end:a});return this.#e.set(ENCODER.encode(s.concat(i))),void this.#t.set(n,t)}const a=Keys.matchKey(s,e);if(!a)throw new Error("Failed to parse keys.");const{start:i,end:o}=Keys.parseKey(a),h=o-i+1;if(h===n.byteLength)return void this.#t.set(n,i);const c=i+n.byteLength;if(o!==+r){const e=this.#t.slice(o+1,+r+1);this.#t.set(e,c)}this.#t.set(n,i);const l=s.split(/(?<=;)/g),d=l.findIndex((t=>Keys.createKeyRegex(e).test(t))),y=Keys.createKey({name:e,start:i,end:c-1});l.splice(d,1,y);for(let e=d+1;e<l.length;e++){const{name:t,start:s,end:r}=Keys.parseKey(l[e]),a=r-s+1,i=s-h+n.byteLength,o=i+a-1;l.splice(e,1,Keys.createKey({name:t,start:i,end:o}))}let u=l.join("");u.length<s.length&&(u+="\0".repeat(s.length-u.length)),this.#e.set(ENCODER.encode(u))}set(e,t){return this.#i(),this.#h((async s=>{const r="function"!=typeof t?t:await t(this.#c(e));this.#l(e,r),s.send(`value_changed_${e}`)}))}}
import{randomUUID as v4}from"crypto";import{createSharedArrayBuffer,encodeValue,isSharedMapTransferData}from"./utilities.js";import*as Keys from"./keys.js";import{Bytes,NULL_ENCODED,ENCODER,DECODER}from"../constants/shared_map.js";import{BroadcastChannelEmitter}from"./broadcast_channel_emitter.js";import{TypedEmitter}from"tiny-typed-emitter";export class SharedMap extends TypedEmitter{#e;#t;#s=v4();#r;#n=null;#a=!1;get uniqueKey(){return this.#s}get ID(){return this.#r}static option=Bytes;option=Bytes;get transfer(){return this.#i(),Object.freeze({__keys:this.#e,__values:this.#t,__identifier:this.#r})}constructor(e,{bytes:t,multiplier:s=10}={}){if(super(),"object"!=typeof e)throw new Error("Can only provide objects to SharedMap.");if(isSharedMapTransferData(e))return this.#e=e.__keys,this.#t=e.__values,void(this.#r=e.__identifier);const r=Object.entries(e),{preppedKeys:n,preppedValues:a,totalLength:i}=r.reduce(((e,[t,s])=>{let r=encodeValue(ENCODER,s);r.byteLength<=0&&(r=NULL_ENCODED);const n=r.byteLength-1+e.totalLength,a=Keys.createKey({name:t,start:e.totalLength,end:n});return e.preppedKeys.push(a),e.preppedValues.push(r),e.totalLength+=r.byteLength,e}),{preppedKeys:[],preppedValues:[],totalLength:0}),o=ENCODER.encode(n.join());if(this.#e=createSharedArrayBuffer(o.byteLength*s||Bytes.kilobyte),this.#e.set(o),t&&t<i)throw new Error(`${t} isn't enough bytes to store all values. Total byteLength of values is ${i}.`);this.#t=createSharedArrayBuffer(t||i*s||3*Bytes.kilobyte),a.reduce(((e,t)=>(this.#t.set(t,e),e+=t.byteLength)),0),this.#r=v4(),this.#n=new BroadcastChannelEmitter(this.#r);const h=[];this.#n.on("push_to_queue",(e=>{h.push(e),h[0]===e&&this.#n?.send(`ready_${e}`)})),this.#n.on("remove_from_queue",(e=>{const t=h.indexOf(e);-1!==t&&h.splice(t,1),h[0]&&this.#n.send(`ready_${h[0]}`)}))}close(){this.#n&&this.#n.close(),this.#a=!0,this.emit("close")}#i(){if(this.#a)throw new Error("Cannot perform actions on a closed SharedMap instance!")}#o(){return new Promise((e=>{const t=new BroadcastChannelEmitter(this.#r),s=v4();t.once(`ready_${s}`,(()=>{e([s,t])})),t.send("push_to_queue",s)}))}async#h(e){const[t,s]=await this.#o(),r=await e(s);return s.send("remove_from_queue",t),s.close(),r}#c(e){const t=DECODER.decode(this.#e),s=Keys.matchKey(t,e);if(!s)return null;const{start:r,end:n}=Keys.parseKey(s);if(void 0===r||void 0===n)throw new Error("Failed to parse key");return DECODER.decode(this.#t.subarray(r,n+1))}get(e){return this.#i(),this.#h((()=>this.#c(e)))}async watch(e){const t=new BroadcastChannelEmitter(this.#r);let s=await this.get(e),r=!1;return t.on(`value_changed_${e}`,(e=>{s=DECODER.decode(e),r=!0})),this.once("close",t.close.bind(t)),Object.freeze({get changed(){return r},get current(){return r=!1,s},stopWatching(){t.close()}})}#d(e,t){const s=DECODER.decode(this.#e).replace(/\x00/g,""),r=s.match(/\d+(?=\);($|\x00))/g)?.[0];let n=encodeValue(ENCODER,t);if(n.byteLength<=0&&(n=NULL_ENCODED),!r||!Keys.createKeyRegex(e).test(s)){const t=r?+r+1:0,a=t+n.byteLength-1,i=Keys.createKey({name:e,start:t,end:a});return this.#e.set(ENCODER.encode(s.concat(i))),this.#t.set(n,t),n}const a=Keys.matchKey(s,e);if(!a)throw new Error("Failed to parse keys.");const{start:i,end:o}=Keys.parseKey(a),h=o-i+1;if(h===n.byteLength)return this.#t.set(n,i),n;const c=i+n.byteLength;if(o!==+r){const e=this.#t.slice(o+1,+r+1);this.#t.set(e,c)}this.#t.set(n,i);const d=s.split(/(?<=;)/g),l=d.findIndex((t=>Keys.createKeyRegex(e).test(t))),y=Keys.createKey({name:e,start:i,end:c-1});d.splice(l,1,y);for(let e=l+1;e<d.length;e++){const{name:t,start:s,end:r}=Keys.parseKey(d[e]),a=r-s+1,i=s-h+n.byteLength,o=i+a-1;d.splice(e,1,Keys.createKey({name:t,start:i,end:o}))}let u=d.join("");return u.length<s.length&&(u+="\0".repeat(s.length-u.length)),this.#e.set(ENCODER.encode(u)),n}set(e,t){return this.#i(),this.#h((async s=>{const r="function"!=typeof t?t:await t(this.#c(e)),n=this.#d(e,r);s.send(`value_changed_${e}`,n)}))}}
import { ListenForStreamMode } from '../constants/streams.js';
import type { RemoveListenerFunction } from '../types/messages.js';
import type { Messagable } from '../types/streams.js';
import type { OnStreamCallback, ConfirmStreamCallback } from '../types/streams.js';
export declare function listenForStream<Sender extends Messagable>(sender: Sender, callback: ConfirmStreamCallback<Sender>, mode: ListenForStreamMode.ConfirmFirst): void;
export declare function listenForStream<Sender extends Messagable>(sender: Sender, callback: OnStreamCallback<Sender>, mode?: ListenForStreamMode.AcceptAll): void;
export declare function listenForStream<Sender extends Messagable>(sender: Sender, callback: ConfirmStreamCallback<Sender>, mode: ListenForStreamMode.ConfirmFirst): RemoveListenerFunction;
export declare function listenForStream<Sender extends Messagable>(sender: Sender, callback: OnStreamCallback<Sender>, mode?: ListenForStreamMode.AcceptAll): RemoveListenerFunction;

@@ -1,1 +0,1 @@

import{ReadableFromPort}from"./readable_from_port.js";export function listenForStream(t,e,a=0){t.on("message",(async r=>{if("stream-start"!==r.type)return;const o=()=>{const e=new ReadableFromPort(r.id,t,r.meta??{}),a={type:"stream-ready-to-consume",id:r.id};return t.postMessage(a),e};0!==a?1===a&&await e({metaData:r.meta,accept:()=>o()}):await e(o())}))}
import{ReadableFromPort}from"./readable_from_port.js";export function listenForStream(e,t,a=0){const r=async r=>{if("stream-start"!==r.type)return;const o=()=>{const t=new ReadableFromPort(r.id,e,r.meta??{}),a={type:"stream-ready-to-consume",id:r.id};return e.postMessage(a),t};0!==a?1===a&&await t({metaData:r.meta,accept:()=>o()}):await t(o())};return e.on("message",r),()=>{e.off("message",r)}}

@@ -49,4 +49,4 @@ import type { Awaitable } from './utilities.js';

remove_from_queue: (id: string) => void;
[key: `value_changed_${string}`]: () => Awaitable<void>;
[key: `value_changed_${string}`]: (newEncodedValue: Uint8Array) => void;
[key: `ready_${string}`]: () => void;
};
{
"name": "nanolith",
"version": "0.3.2-beta1",
"version": "0.3.2-beta2",
"description": "Multi-threaded nanoservices in no time with seamless TypeScript support.",

@@ -5,0 +5,0 @@ "main": "./dist/index.js",

@@ -23,3 +23,3 @@ # Nanolith

4. Modern [ESModules](https://hacks.mozilla.org/2018/03/es-modules-a-cartoon-deep-dive/)-only support 📈
5. Steady updates with new features & fixes
5. Steady updates with new features & fixes 🚀

@@ -34,3 +34,3 @@ ### So what can you do with it?

- Stream data between threads with the already familiar [`node:stream`](https://nodejs.org/api/stream.html) API.
- Share memory between threads using the familiar-feeling `SharedMap` class.
- Share memory between threads using the familiar-feeling [`SharedMap`](#-sharing-memory-between-threads) class.

@@ -57,2 +57,3 @@ ## 📖 Table of contents

- [💾 Sharing memory between threads](#-sharing-memory-between-threads)
- [Watching for changes on a shared memory location](#watching-for-changes-on-a-shared-memory-location)
- [🧑‍🏫 Examples](#-examples)

@@ -621,3 +622,3 @@ - [📜 License](#-license)

// Grab the current value of "foo"
// Grab the current value of "foo".
console.log(await myMap.get('foo'));

@@ -684,2 +685,50 @@

Notice that the `.get()` method will always return a stringified version of the value.
### Watching for changes on a shared memory location
Calling `.get()` repeatedly can be cumbersome, which is why the `.watch()` method might be useful for certain use cases. `.watch()` returns an object containing a `current` getter, which will always return the most recent value for the provided key.
```TypeScript
import { SharedMap } from 'nanolith';
const myMap = new SharedMap({ foo: 'bar' });
// Create a "watch" object for the key "foo".
const foo = await myMap.watch('foo');
// Every second, check for changes to the value under
// the key "foo" using the watch object.
const interval = setInterval(() => {
// If the watched value has changed since its
// .current property was last accessed, .changed
// will return "true".
if (!foo.changed) return;
// Log out the new changed value.
console.log(foo.current);
clearInterval(interval);
myMap.close();
}, 1000);
// Change the value of foo
await myMap.set('foo', 'hello world');
```
The output of the following code is:
```shell
hello world
```
Because `current` and `changed` are getters and not static properties, destructuring them will not work properly:
```TypeScript
// This code example is wrong!
import { SharedMap } from 'nanolith';
const myMap = new SharedMap({ foo: 'bar' });
// WRONG! WRONG! WRONG!
const { current, changed, stopWatching } = await myMap.watch('foo');
// WRONG! WRONG! WRONG!
```
## 🧑‍🏫 Examples

@@ -686,0 +735,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc