New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@consento/api

Package Overview
Dependencies
Maintainers
2
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@consento/api - npm Package Compare versions

Comparing version 0.1.3 to 0.2.0

notifications/mapOutputToInput.d.ts

22

index.js
"use strict";
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.setup = void 0;
const notifications_1 = require("./notifications");
const crypto_1 = require("@consento/crypto");
__export(require("./types"));
__export(require("@consento/crypto"));
__exportStar(require("./types"), exports);
__exportStar(require("@consento/crypto"), exports);
var notifications_2 = require("./notifications");
exports.isSuccessNotification = notifications_2.isSuccess;
exports.isErrorNotification = notifications_2.isError;
Object.defineProperty(exports, "isSuccessNotification", { enumerable: true, get: function () { return notifications_2.isSuccess; } });
Object.defineProperty(exports, "isErrorNotification", { enumerable: true, get: function () { return notifications_2.isError; } });
function setup({ cryptoCore, notificationTransport }) {

@@ -14,0 +22,0 @@ return {

@@ -1,2 +0,2 @@

import { ISender, IReceiver, IEncodable, ICancelable } from '@consento/crypto';
import { ISender, IReceiver, IEncodable } from '@consento/crypto';
import { INotifications, INotificationsTransport, INotificationsOptions, IConnection, INotificationProcessor, INotification, ISuccessNotification, INotificationError, IBodyFilter } from './types';

@@ -12,12 +12,28 @@ export declare function isSuccess(input: INotification): input is ISuccessNotification;

constructor({ transport }: INotificationsOptions);
reset(receivers: IReceiver[]): ICancelable<boolean[]>;
subscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>;
unsubscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>;
reset(receivers: IReceiver[], opts?: {
signal?: AbortSignal;
}): Promise<boolean[]>;
subscribe(receivers: IReceiver[], { force, signal }?: {
force?: boolean;
signal?: AbortSignal;
}): Promise<boolean[]>;
unsubscribe(receivers: IReceiver[], { force, signal }?: {
force: boolean;
signal?: AbortSignal;
}): Promise<boolean[]>;
send(sender: ISender, message: IEncodable): Promise<string[]>;
receive<T extends IEncodable>(receiver: IReceiver, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{
afterSubscribe: ICancelable<T>;
receive<T extends IEncodable>(receiver: IReceiver, { filter, timeout, signal }?: {
filter?: IBodyFilter<T>;
timeout?: number;
signal?: AbortSignal;
}): Promise<{
afterSubscribe: Promise<T>;
}>;
sendAndReceive<T extends IEncodable = IEncodable>(connection: IConnection, message: IEncodable, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{
afterSubscribe: ICancelable<T>;
sendAndReceive<T extends IEncodable = IEncodable>(connection: IConnection, message: IEncodable, opts?: {
filter?: IBodyFilter<T>;
timeout?: number;
signal?: AbortSignal;
}): Promise<{
afterSubscribe: Promise<T>;
}>;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Notifications = exports.isError = exports.isSuccess = void 0;
/* eslint-disable @typescript-eslint/method-signature-style */
/* eslint-disable @typescript-eslint/no-throw-literal */

@@ -7,2 +9,3 @@ const crypto_1 = require("@consento/crypto");

const events_1 = require("events");
const mapOutputToInput_1 = require("./mapOutputToInput");
function isSuccess(input) {

@@ -16,37 +19,2 @@ return input.type === types_1.ENotificationType.success;

exports.isError = isError;
const wait = async (time) => await new Promise(resolve => setTimeout(resolve, time));
async function untilTrue(op) {
while (!op()) {
await wait(0);
}
}
// TODO: move to @consento/crypto
function isCancelable(promise) {
// @ts-ignore TS2339
return typeof promise.cancel === 'function';
}
// TODO: move to @consento/crypto
async function maybeChild(child, promise) {
if (isCancelable(promise)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
child(promise);
}
return await promise;
}
async function mapOutputToInput({ input: inputData, op }) {
if (!Array.isArray(inputData)) {
throw new Error(`Expected input to be list but was ${typeof inputData}`);
}
const outputData = await op(inputData);
if (!Array.isArray(outputData)) {
throw new Error(`Expected list response from ${op.toString()}`);
}
const received = new Map();
for (let i = 0; i < inputData.length; i++) {
const inputEntry = inputData[i];
const outputEntry = outputData[i];
received.set(inputEntry, outputEntry);
}
return received;
}
class EmptyTransport extends events_1.EventEmitter {

@@ -66,2 +34,17 @@ async subscribe(receivers) {

}
function interceptCallback(callback, setup) {
const tearDown = setup();
return (error, t) => {
const p = tearDown();
if (p instanceof Promise) {
p.then(() => callback(error, t), err => callback(error !== null && error !== void 0 ? error : err, t));
}
else {
callback(error, t);
}
};
}
function toCallback(resolve, reject) {
return (error, t) => (error !== null && error !== undefined) ? reject(error) : resolve(t);
}
class Notifications {

@@ -144,57 +127,48 @@ constructor({ transport }) {

}
// eslint-disable-next-line @typescript-eslint/promise-function-async
reset(receivers) {
return crypto_1.cancelable(function* (child) {
const received = yield mapOutputToInput({
input: receivers,
op: async (input) => await maybeChild(child, this._transport.reset(input))
});
this._receivers = {};
return receivers.map(receiver => {
const changed = received.get(receiver);
if (changed) {
this._receivers[receiver.idBase64] = receiver;
}
return changed;
});
}, this);
async reset(receivers, opts) {
const received = await mapOutputToInput_1.mapOutputToInput({
input: receivers,
op: async (input) => await this._transport.reset(input, opts)
});
this._receivers = {};
return receivers.map(receiver => {
const changed = received.get(receiver);
if (changed) {
this._receivers[receiver.idBase64] = receiver;
}
return changed;
});
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
subscribe(receivers, force = false) {
return crypto_1.cancelable(function* (child) {
if (receivers.length === 0) {
return [];
async subscribe(receivers, { force, signal } = {}) {
if (receivers.length === 0) {
return [];
}
const received = await mapOutputToInput_1.mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined),
op: async (input) => await this._transport.subscribe(input, { signal })
});
return receivers.map(receiver => {
const changed = received.get(receiver) || false;
if (changed) {
this._receivers[receiver.idBase64] = receiver;
}
const received = yield mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined),
op: async (input) => await maybeChild(child, this._transport.subscribe(input))
});
return receivers.map(receiver => {
const changed = received.get(receiver) || false;
if (changed) {
this._receivers[receiver.idBase64] = receiver;
}
return changed;
});
}, this);
return changed;
});
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
unsubscribe(receivers, force = false) {
return crypto_1.cancelable(function* (child) {
if (receivers.length === 0) {
return [];
async unsubscribe(receivers, { force, signal } = { force: false }) {
if (receivers.length === 0) {
return [];
}
const received = await mapOutputToInput_1.mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined),
op: async (input) => await this._transport.unsubscribe(input, { signal })
});
return receivers.map(receiver => {
const changed = received.get(receiver) || false;
if (changed) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete this._receivers[receiver.idBase64];
}
const received = yield mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined),
op: async (input) => await maybeChild(child, this._transport.unsubscribe(input))
});
return receivers.map(receiver => {
const changed = received.get(receiver) || false;
if (changed) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete this._receivers[receiver.idBase64];
}
return changed;
});
}, this);
return changed;
});
}

@@ -218,14 +192,4 @@ async send(sender, message) {

}
// eslint-disable-next-line @typescript-eslint/promise-function-async
receive(receiver, filter, timeout) {
let _resolve;
let _reject;
let timer;
const promise = new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
if (timeout !== undefined && timeout !== null) {
timer = setTimeout(() => _reject(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout })), timeout);
}
async receive(receiver, { filter, timeout, signal } = {}) {
let _cb;
const processor = (message) => {

@@ -235,50 +199,51 @@ if (isSuccess(message) && message.channelIdBase64 === receiver.idBase64) {

if (typeof filter !== 'function' || filter(body)) {
_resolve(body);
_cb(null, body);
}
}
};
const afterSubscribe = new Promise((resolve, reject) => {
_cb = toCallback(resolve, reject);
});
this.processors.add(processor);
const clear = async () => {
this.processors.delete(processor);
if (timer !== undefined) {
clearTimeout(timer);
_cb = interceptCallback(_cb, () => {
return async () => {
this.processors.delete(processor);
await this.unsubscribe([receiver]);
};
});
try {
await this.subscribe([receiver], { signal });
if (timeout !== undefined && timeout !== null) {
_cb = interceptCallback(_cb, () => {
const timer = setTimeout(() => _cb(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout }), null), timeout);
return () => clearTimeout(timer);
});
}
await this.unsubscribe([receiver]);
if (signal !== undefined && signal !== null) {
_cb = interceptCallback(_cb, () => {
const listener = () => _cb(new crypto_1.AbortError(), null);
signal.addEventListener('abort', listener);
return () => signal.removeEventListener('abort', listener);
});
}
}
catch (err) {
_cb(err, null);
}
return {
afterSubscribe
};
// eslint-disable-next-line @typescript-eslint/return-await
return crypto_1.cancelable(function* (child) {
yield child(this.subscribe([receiver]));
return {
afterSubscribe: crypto_1.cancelable(function* () {
return (yield promise);
}).finally(clear)
};
}, this)
.catch(async (err) => {
await clear();
throw err;
});
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
sendAndReceive(connection, message, filter, timeout) {
let _continue = false;
return crypto_1.cancelable(function* (child) {
const { afterSubscribe: receivePromise } = (yield child(this.receive(connection.receiver, filter, timeout)));
const next = crypto_1.cancelable(function* (child) {
// @ts-ignore TS2339
receivePromise.name = 'RECEIVED';
// eslint-disable-next-line @typescript-eslint/no-floating-promises
child(receivePromise);
// wait for another instance to be able to receive "afterSubscribe"
// before the submission happens
yield untilTrue(() => _continue);
yield this.send(connection.sender, message);
return yield receivePromise;
}, this);
return {
afterSubscribe: next
};
}, this).finally(() => {
_continue = true;
});
async sendAndReceive(connection, message, opts) {
const { afterSubscribe: receivePromise } = await this.receive(connection.receiver, opts);
return {
afterSubscribe: (async () => {
await Promise.race([
this.send(connection.sender, message),
receivePromise
]);
return await receivePromise;
})()
};
}

@@ -285,0 +250,0 @@ }

import { EDecryptionError } from '@consento/crypto/core/types';
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage, ICancelable } from '@consento/crypto/types';
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage } from '@consento/crypto/types';
export * from '@consento/crypto/types';
export interface INotificationsTransport {
subscribe(receivers: IReceiver[]): Promise<boolean[]>;
unsubscribe(receivers: IReceiver[]): Promise<boolean[]>;
reset(receivers: IReceiver[]): Promise<boolean[]>;
subscribe(receivers: Iterable<IReceiver>, opts?: {
signal?: AbortSignal;
}): Promise<boolean[]>;
unsubscribe(receivers: Iterable<IReceiver>, opts?: {
signal?: AbortSignal;
}): Promise<boolean[]>;
reset(receivers: Iterable<IReceiver>, opts?: {
signal?: AbortSignal;
}): Promise<boolean[]>;
send(channel: IAnnonymous, message: IEncryptedMessage): Promise<any[]>;

@@ -63,19 +69,25 @@ on(event: 'error', handler: (error: Error) => void): this;

export interface INotifications {
subscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>;
unsubscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>;
reset(receivers: IReceiver[]): ICancelable<boolean[]>;
subscribe(receivers: IReceiver[], opts?: {
force?: boolean;
signal?: AbortSignal;
}): Promise<boolean[]>;
unsubscribe(receivers: IReceiver[], opts?: {
force?: boolean;
signal?: AbortSignal;
}): Promise<boolean[]>;
reset(receivers: IReceiver[]): Promise<boolean[]>;
processors: Set<INotificationProcessor>;
send(sender: ISender, message: IEncodable): Promise<string[]>;
receive(receiver: IReceiver): ICancelable<{
afterSubscribe: ICancelable<IEncodable>;
receive<T extends IEncodable>(receiver: IReceiver, opts?: {
filter?: IBodyFilter<T>;
signal?: AbortSignal;
}): Promise<{
afterSubscribe: Promise<IEncodable>;
}>;
receive<T extends IEncodable>(receiver: IReceiver, filter: IBodyFilter<T>): ICancelable<{
afterSubscribe: ICancelable<T>;
sendAndReceive<T extends IEncodable>(connection: IConnection, message: IEncodable, opts?: {
filter: IBodyFilter<T>;
signal?: AbortSignal;
}): Promise<{
afterSubscribe: Promise<T>;
}>;
sendAndReceive(connection: IConnection, message: IEncodable): ICancelable<{
afterSubscribe: ICancelable<IEncodable>;
}>;
sendAndReceive<T extends IEncodable>(connection: IConnection, message: IEncodable, filter: IBodyFilter<T>): ICancelable<{
afterSubscribe: ICancelable<T>;
}>;
}
"use strict";
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });
__export(require("@consento/crypto/types"));
exports.EErrorCode = exports.ENotificationType = void 0;
__exportStar(require("@consento/crypto/types"), exports);
var ENotificationType;

@@ -8,0 +16,0 @@ (function (ENotificationType) {

{
"name": "@consento/api",
"version": "0.1.3",
"version": "0.2.0",
"description": "JavasScript API for building things with consento.",

@@ -10,3 +10,3 @@ "main": "index.js",

"dependencies": {
"@consento/crypto": "^0.1.2",
"@consento/crypto": "^0.2.0",
"buffer": "^5.6.0",

@@ -13,0 +13,0 @@ "events": "^3.0.0"

@@ -0,5 +1,7 @@

/* eslint-disable @typescript-eslint/method-signature-style */
/* eslint-disable @typescript-eslint/no-throw-literal */
import { ISender, IReceiver, IEncodable, IEncryptedMessage, cancelable, ICancelable } from '@consento/crypto'
import { ISender, IReceiver, IEncodable, IEncryptedMessage, AbortError } from '@consento/crypto'
import { INotifications, INotificationsTransport, INotificationsOptions, IConnection, INotificationProcessor, EErrorCode, INotification, ISuccessNotification, INotificationError, IBodyFilter, ENotificationType, IDecryptionError } from './types'
import { EventEmitter } from 'events'
import { mapOutputToInput } from './mapOutputToInput'

@@ -14,42 +16,2 @@ export function isSuccess (input: INotification): input is ISuccessNotification {

const wait: (time: number) => Promise<void> = async (time: number) => await new Promise<void>(resolve => setTimeout(resolve, time))
async function untilTrue (op: () => boolean): Promise<void> {
while (!op()) {
await wait(0)
}
}
// TODO: move to @consento/crypto
function isCancelable <T> (promise: Promise<T>): promise is ICancelable<T> {
// @ts-ignore TS2339
return typeof promise.cancel === 'function'
}
// TODO: move to @consento/crypto
async function maybeChild <T> (child: <TChild>(cancelable: ICancelable<TChild>) => ICancelable<TChild>, promise: Promise<T>): Promise<T> {
if (isCancelable(promise)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
child(promise)
}
return await promise
}
async function mapOutputToInput <Input, Output> ({ input: inputData, op }: { input: Input[], op: (inputData: Input[]) => Promise<Output[]> }): Promise<Map<Input, Output>> {
if (!Array.isArray(inputData)) {
throw new Error(`Expected input to be list but was ${typeof inputData}`)
}
const outputData = await op(inputData)
if (!Array.isArray(outputData)) {
throw new Error(`Expected list response from ${op.toString()}`)
}
const received = new Map <Input, Output>()
for (let i = 0; i < inputData.length; i++) {
const inputEntry = inputData[i]
const outputEntry = outputData[i]
received.set(inputEntry, outputEntry)
}
return received
}
class EmptyTransport extends EventEmitter implements INotificationsTransport {

@@ -73,2 +35,24 @@ async subscribe (receivers: IReceiver[]): Promise<boolean[]> {

type Callback<T> = (error: Error, t: T) => any
function interceptCallback <T> (callback: Callback<T>, setup: () => () => any): Callback<T> {
const tearDown = setup()
return (error: Error, t: T) => {
const p = tearDown()
if (p instanceof Promise) {
p.then(
() => callback(error, t),
err => callback(error ?? err, t)
)
} else {
callback(error, t)
}
}
}
function toCallback <T> (resolve: (data: T) => any, reject: (error: Error) => any): Callback<T> {
return (error: Error, t: T) =>
(error !== null && error !== undefined) ? reject(error) : resolve(t)
}
export class Notifications implements INotifications {

@@ -156,63 +140,54 @@ _transport: INotificationsTransport

// eslint-disable-next-line @typescript-eslint/promise-function-async
reset (receivers: IReceiver[]): ICancelable<boolean[]> {
return cancelable <boolean[], Notifications>(function * (child) {
const received: Map<IReceiver, boolean> = yield mapOutputToInput({
input: receivers,
op: async input => await maybeChild(child, this._transport.reset(input))
})
this._receivers = {}
return receivers.map(receiver => {
const changed = received.get(receiver)
if (changed) {
this._receivers[receiver.idBase64] = receiver
}
return changed
})
}, this)
async reset (receivers: IReceiver[], opts?: { signal?: AbortSignal }): Promise<boolean[]> {
const received: Map<IReceiver, boolean> = await mapOutputToInput({
input: receivers,
op: async input => await this._transport.reset(input, opts)
})
this._receivers = {}
return receivers.map(receiver => {
const changed = received.get(receiver)
if (changed) {
this._receivers[receiver.idBase64] = receiver
}
return changed
})
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
subscribe (receivers: IReceiver[], force: boolean = false): ICancelable<boolean[]> {
return cancelable <boolean[], Notifications>(function * (child) {
if (receivers.length === 0) {
return []
}
async subscribe (receivers: IReceiver[], { force, signal }: { force?: boolean, signal?: AbortSignal } = {}): Promise<boolean[]> {
if (receivers.length === 0) {
return []
}
const received: Map<IReceiver, boolean> = yield mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined),
op: async input => await maybeChild(child, this._transport.subscribe(input))
})
const received: Map<IReceiver, boolean> = await mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined),
op: async input => await this._transport.subscribe(input, { signal })
})
return receivers.map(receiver => {
const changed = received.get(receiver) || false
if (changed) {
this._receivers[receiver.idBase64] = receiver
}
return changed
})
}, this)
return receivers.map(receiver => {
const changed = received.get(receiver) || false
if (changed) {
this._receivers[receiver.idBase64] = receiver
}
return changed
})
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
unsubscribe (receivers: IReceiver[], force: boolean = false): ICancelable<boolean[]> {
return cancelable <boolean[], Notifications>(function * (child) {
if (receivers.length === 0) {
return []
}
async unsubscribe (receivers: IReceiver[], { force, signal }: { force: boolean, signal?: AbortSignal } = { force: false }): Promise<boolean[]> {
if (receivers.length === 0) {
return []
}
const received: Map<IReceiver, boolean> = yield mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined),
op: async input => await maybeChild(child, this._transport.unsubscribe(input))
})
const received: Map<IReceiver, boolean> = await mapOutputToInput({
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined),
op: async input => await this._transport.unsubscribe(input, { signal })
})
return receivers.map(receiver => {
const changed = received.get(receiver) || false
if (changed) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete this._receivers[receiver.idBase64]
}
return changed
})
}, this)
return receivers.map(receiver => {
const changed = received.get(receiver) || false
if (changed) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete this._receivers[receiver.idBase64]
}
return changed
})
}

@@ -238,14 +213,4 @@

// eslint-disable-next-line @typescript-eslint/promise-function-async
receive <T extends IEncodable> (receiver: IReceiver, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{ afterSubscribe: ICancelable<T> }> {
let _resolve: (t: T) => any
let _reject: (error: Error) => void
let timer: any
const promise = new Promise<T>((resolve, reject) => {
_resolve = resolve
_reject = reject
})
if (timeout !== undefined && timeout !== null) {
timer = setTimeout(() => _reject(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout })), timeout)
}
async receive <T extends IEncodable> (receiver: IReceiver, { filter, timeout, signal }: { filter?: IBodyFilter<T>, timeout?: number, signal?: AbortSignal } = {}): Promise<{ afterSubscribe: Promise<T> }> {
let _cb: Callback<T>
const processor = (message: INotification): void => {

@@ -255,57 +220,63 @@ if (isSuccess(message) && message.channelIdBase64 === receiver.idBase64) {

if (typeof filter !== 'function' || filter(body)) {
_resolve(body as T)
_cb(null, body as T)
}
}
}
const afterSubscribe = new Promise<T>((resolve, reject) => {
_cb = toCallback(resolve, reject)
})
this.processors.add(processor)
const clear = async (): Promise<void> => {
this.processors.delete(processor)
if (timer !== undefined) {
clearTimeout(timer)
_cb = interceptCallback(_cb, () => {
return async (): Promise<void> => {
this.processors.delete(processor)
await this.unsubscribe([receiver])
}
await this.unsubscribe([receiver])
})
try {
await this.subscribe([receiver], { signal })
if (timeout !== undefined && timeout !== null) {
_cb = interceptCallback(_cb, () => {
const timer = setTimeout(
() => _cb(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout }), null),
timeout
)
return () => clearTimeout(timer)
})
}
if (signal !== undefined && signal !== null) {
_cb = interceptCallback(_cb, () => {
const listener = (): void => _cb(new AbortError(), null)
signal.addEventListener('abort', listener)
return () => signal.removeEventListener('abort', listener)
})
}
} catch (err) {
_cb(err, null)
}
// eslint-disable-next-line @typescript-eslint/return-await
return cancelable<{ afterSubscribe: ICancelable<T> }, Notifications>(function * (child) {
yield child(this.subscribe([receiver]))
return {
afterSubscribe: cancelable<{ afterSubscribe: ICancelable }>(function * () {
return (yield promise) as T
}).finally(clear)
}
}, this)
.catch(async (err: Error): Promise<{ afterSubscribe: ICancelable<T> }> => {
await clear()
throw err
})
return {
afterSubscribe
}
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
sendAndReceive <T extends IEncodable = IEncodable> (
async sendAndReceive <T extends IEncodable = IEncodable> (
connection: IConnection,
message: IEncodable,
filter?: IBodyFilter<T>,
timeout?: number
): ICancelable<{ afterSubscribe: ICancelable<T> }> {
let _continue = false
return cancelable<{ afterSubscribe: ICancelable<T> }, Notifications>(function * (child) {
const { afterSubscribe: receivePromise } = (yield child(this.receive(connection.receiver, filter, timeout))) as { afterSubscribe: ICancelable<T> }
const next = cancelable(function * (child) {
// @ts-ignore TS2339
receivePromise.name = 'RECEIVED'
// eslint-disable-next-line @typescript-eslint/no-floating-promises
child(receivePromise)
// wait for another instance to be able to receive "afterSubscribe"
// before the submission happens
yield untilTrue(() => _continue)
yield this.send(connection.sender, message)
return yield receivePromise
}, this)
return {
afterSubscribe: next
}
}, this).finally(() => {
_continue = true
})
opts?: {
filter?: IBodyFilter<T>
timeout?: number
signal?: AbortSignal
}
): Promise<{ afterSubscribe: Promise<T> }> {
const { afterSubscribe: receivePromise } = await this.receive(connection.receiver, opts)
return {
afterSubscribe: (async () => {
await Promise.race([
this.send(connection.sender, message),
receivePromise
])
return await receivePromise
})()
}
}
}

@@ -0,3 +1,4 @@

/* eslint-disable @typescript-eslint/method-signature-style */
import { EDecryptionError } from '@consento/crypto/core/types'
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage, ICancelable } from '@consento/crypto/types'
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage } from '@consento/crypto/types'

@@ -7,5 +8,5 @@ export * from '@consento/crypto/types'

export interface INotificationsTransport {
subscribe (receivers: IReceiver[]): Promise<boolean[]>
unsubscribe (receivers: IReceiver[]): Promise<boolean[]>
reset (receivers: IReceiver[]): Promise<boolean[]>
subscribe (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]>
unsubscribe (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]>
reset (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]>
send (channel: IAnnonymous, message: IEncryptedMessage): Promise<any[]>

@@ -78,11 +79,9 @@ on (event: 'error', handler: (error: Error) => void): this

export interface INotifications {
subscribe (receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>
unsubscribe (receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>
reset (receivers: IReceiver[]): ICancelable<boolean[]>
subscribe (receivers: IReceiver[], opts?: { force?: boolean, signal?: AbortSignal }): Promise<boolean[]>
unsubscribe (receivers: IReceiver[], opts?: { force?: boolean, signal?: AbortSignal }): Promise<boolean[]>
reset (receivers: IReceiver[]): Promise<boolean[]>
processors: Set<INotificationProcessor>
send (sender: ISender, message: IEncodable): Promise<string[]>
receive (receiver: IReceiver): ICancelable<{ afterSubscribe: ICancelable<IEncodable> }>
receive <T extends IEncodable>(receiver: IReceiver, filter: IBodyFilter<T>): ICancelable<{ afterSubscribe: ICancelable<T> }>
sendAndReceive (connection: IConnection, message: IEncodable): ICancelable<{ afterSubscribe: ICancelable<IEncodable> }>
sendAndReceive <T extends IEncodable>(connection: IConnection, message: IEncodable, filter: IBodyFilter<T>): ICancelable<{ afterSubscribe: ICancelable<T> }>
receive <T extends IEncodable>(receiver: IReceiver, opts?: { filter?: IBodyFilter<T>, signal?: AbortSignal }): Promise<{ afterSubscribe: Promise<IEncodable> }>
sendAndReceive <T extends IEncodable>(connection: IConnection, message: IEncodable, opts?: { filter: IBodyFilter<T>, signal?: AbortSignal }): Promise<{ afterSubscribe: Promise<T> }>
}
"use strict";
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });
__export(require("./notifications/types"));
__exportStar(require("./notifications/types"), exports);
//# sourceMappingURL=types.js.map

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc