@consento/api
Advanced tools
Comparing version 0.1.3 to 0.2.0
22
index.js
"use strict"; | ||
function __export(m) { | ||
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p]; | ||
} | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __exportStar = (this && this.__exportStar) || function(m, exports) { | ||
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.setup = void 0; | ||
const notifications_1 = require("./notifications"); | ||
const crypto_1 = require("@consento/crypto"); | ||
__export(require("./types")); | ||
__export(require("@consento/crypto")); | ||
__exportStar(require("./types"), exports); | ||
__exportStar(require("@consento/crypto"), exports); | ||
var notifications_2 = require("./notifications"); | ||
exports.isSuccessNotification = notifications_2.isSuccess; | ||
exports.isErrorNotification = notifications_2.isError; | ||
Object.defineProperty(exports, "isSuccessNotification", { enumerable: true, get: function () { return notifications_2.isSuccess; } }); | ||
Object.defineProperty(exports, "isErrorNotification", { enumerable: true, get: function () { return notifications_2.isError; } }); | ||
function setup({ cryptoCore, notificationTransport }) { | ||
@@ -14,0 +22,0 @@ return { |
@@ -1,2 +0,2 @@ | ||
import { ISender, IReceiver, IEncodable, ICancelable } from '@consento/crypto'; | ||
import { ISender, IReceiver, IEncodable } from '@consento/crypto'; | ||
import { INotifications, INotificationsTransport, INotificationsOptions, IConnection, INotificationProcessor, INotification, ISuccessNotification, INotificationError, IBodyFilter } from './types'; | ||
@@ -12,12 +12,28 @@ export declare function isSuccess(input: INotification): input is ISuccessNotification; | ||
constructor({ transport }: INotificationsOptions); | ||
reset(receivers: IReceiver[]): ICancelable<boolean[]>; | ||
subscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>; | ||
unsubscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>; | ||
reset(receivers: IReceiver[], opts?: { | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
subscribe(receivers: IReceiver[], { force, signal }?: { | ||
force?: boolean; | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
unsubscribe(receivers: IReceiver[], { force, signal }?: { | ||
force: boolean; | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
send(sender: ISender, message: IEncodable): Promise<string[]>; | ||
receive<T extends IEncodable>(receiver: IReceiver, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{ | ||
afterSubscribe: ICancelable<T>; | ||
receive<T extends IEncodable>(receiver: IReceiver, { filter, timeout, signal }?: { | ||
filter?: IBodyFilter<T>; | ||
timeout?: number; | ||
signal?: AbortSignal; | ||
}): Promise<{ | ||
afterSubscribe: Promise<T>; | ||
}>; | ||
sendAndReceive<T extends IEncodable = IEncodable>(connection: IConnection, message: IEncodable, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{ | ||
afterSubscribe: ICancelable<T>; | ||
sendAndReceive<T extends IEncodable = IEncodable>(connection: IConnection, message: IEncodable, opts?: { | ||
filter?: IBodyFilter<T>; | ||
timeout?: number; | ||
signal?: AbortSignal; | ||
}): Promise<{ | ||
afterSubscribe: Promise<T>; | ||
}>; | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Notifications = exports.isError = exports.isSuccess = void 0; | ||
/* eslint-disable @typescript-eslint/method-signature-style */ | ||
/* eslint-disable @typescript-eslint/no-throw-literal */ | ||
@@ -7,2 +9,3 @@ const crypto_1 = require("@consento/crypto"); | ||
const events_1 = require("events"); | ||
const mapOutputToInput_1 = require("./mapOutputToInput"); | ||
function isSuccess(input) { | ||
@@ -16,37 +19,2 @@ return input.type === types_1.ENotificationType.success; | ||
exports.isError = isError; | ||
const wait = async (time) => await new Promise(resolve => setTimeout(resolve, time)); | ||
async function untilTrue(op) { | ||
while (!op()) { | ||
await wait(0); | ||
} | ||
} | ||
// TODO: move to @consento/crypto | ||
function isCancelable(promise) { | ||
// @ts-ignore TS2339 | ||
return typeof promise.cancel === 'function'; | ||
} | ||
// TODO: move to @consento/crypto | ||
async function maybeChild(child, promise) { | ||
if (isCancelable(promise)) { | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
child(promise); | ||
} | ||
return await promise; | ||
} | ||
async function mapOutputToInput({ input: inputData, op }) { | ||
if (!Array.isArray(inputData)) { | ||
throw new Error(`Expected input to be list but was ${typeof inputData}`); | ||
} | ||
const outputData = await op(inputData); | ||
if (!Array.isArray(outputData)) { | ||
throw new Error(`Expected list response from ${op.toString()}`); | ||
} | ||
const received = new Map(); | ||
for (let i = 0; i < inputData.length; i++) { | ||
const inputEntry = inputData[i]; | ||
const outputEntry = outputData[i]; | ||
received.set(inputEntry, outputEntry); | ||
} | ||
return received; | ||
} | ||
class EmptyTransport extends events_1.EventEmitter { | ||
@@ -66,2 +34,17 @@ async subscribe(receivers) { | ||
} | ||
function interceptCallback(callback, setup) { | ||
const tearDown = setup(); | ||
return (error, t) => { | ||
const p = tearDown(); | ||
if (p instanceof Promise) { | ||
p.then(() => callback(error, t), err => callback(error !== null && error !== void 0 ? error : err, t)); | ||
} | ||
else { | ||
callback(error, t); | ||
} | ||
}; | ||
} | ||
function toCallback(resolve, reject) { | ||
return (error, t) => (error !== null && error !== undefined) ? reject(error) : resolve(t); | ||
} | ||
class Notifications { | ||
@@ -144,57 +127,48 @@ constructor({ transport }) { | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
reset(receivers) { | ||
return crypto_1.cancelable(function* (child) { | ||
const received = yield mapOutputToInput({ | ||
input: receivers, | ||
op: async (input) => await maybeChild(child, this._transport.reset(input)) | ||
}); | ||
this._receivers = {}; | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver); | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver; | ||
} | ||
return changed; | ||
}); | ||
}, this); | ||
async reset(receivers, opts) { | ||
const received = await mapOutputToInput_1.mapOutputToInput({ | ||
input: receivers, | ||
op: async (input) => await this._transport.reset(input, opts) | ||
}); | ||
this._receivers = {}; | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver); | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver; | ||
} | ||
return changed; | ||
}); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
subscribe(receivers, force = false) { | ||
return crypto_1.cancelable(function* (child) { | ||
if (receivers.length === 0) { | ||
return []; | ||
async subscribe(receivers, { force, signal } = {}) { | ||
if (receivers.length === 0) { | ||
return []; | ||
} | ||
const received = await mapOutputToInput_1.mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined), | ||
op: async (input) => await this._transport.subscribe(input, { signal }) | ||
}); | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false; | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver; | ||
} | ||
const received = yield mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined), | ||
op: async (input) => await maybeChild(child, this._transport.subscribe(input)) | ||
}); | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false; | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver; | ||
} | ||
return changed; | ||
}); | ||
}, this); | ||
return changed; | ||
}); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
unsubscribe(receivers, force = false) { | ||
return crypto_1.cancelable(function* (child) { | ||
if (receivers.length === 0) { | ||
return []; | ||
async unsubscribe(receivers, { force, signal } = { force: false }) { | ||
if (receivers.length === 0) { | ||
return []; | ||
} | ||
const received = await mapOutputToInput_1.mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined), | ||
op: async (input) => await this._transport.unsubscribe(input, { signal }) | ||
}); | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false; | ||
if (changed) { | ||
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete | ||
delete this._receivers[receiver.idBase64]; | ||
} | ||
const received = yield mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined), | ||
op: async (input) => await maybeChild(child, this._transport.unsubscribe(input)) | ||
}); | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false; | ||
if (changed) { | ||
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete | ||
delete this._receivers[receiver.idBase64]; | ||
} | ||
return changed; | ||
}); | ||
}, this); | ||
return changed; | ||
}); | ||
} | ||
@@ -218,14 +192,4 @@ async send(sender, message) { | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
receive(receiver, filter, timeout) { | ||
let _resolve; | ||
let _reject; | ||
let timer; | ||
const promise = new Promise((resolve, reject) => { | ||
_resolve = resolve; | ||
_reject = reject; | ||
}); | ||
if (timeout !== undefined && timeout !== null) { | ||
timer = setTimeout(() => _reject(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout })), timeout); | ||
} | ||
async receive(receiver, { filter, timeout, signal } = {}) { | ||
let _cb; | ||
const processor = (message) => { | ||
@@ -235,50 +199,51 @@ if (isSuccess(message) && message.channelIdBase64 === receiver.idBase64) { | ||
if (typeof filter !== 'function' || filter(body)) { | ||
_resolve(body); | ||
_cb(null, body); | ||
} | ||
} | ||
}; | ||
const afterSubscribe = new Promise((resolve, reject) => { | ||
_cb = toCallback(resolve, reject); | ||
}); | ||
this.processors.add(processor); | ||
const clear = async () => { | ||
this.processors.delete(processor); | ||
if (timer !== undefined) { | ||
clearTimeout(timer); | ||
_cb = interceptCallback(_cb, () => { | ||
return async () => { | ||
this.processors.delete(processor); | ||
await this.unsubscribe([receiver]); | ||
}; | ||
}); | ||
try { | ||
await this.subscribe([receiver], { signal }); | ||
if (timeout !== undefined && timeout !== null) { | ||
_cb = interceptCallback(_cb, () => { | ||
const timer = setTimeout(() => _cb(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout }), null), timeout); | ||
return () => clearTimeout(timer); | ||
}); | ||
} | ||
await this.unsubscribe([receiver]); | ||
if (signal !== undefined && signal !== null) { | ||
_cb = interceptCallback(_cb, () => { | ||
const listener = () => _cb(new crypto_1.AbortError(), null); | ||
signal.addEventListener('abort', listener); | ||
return () => signal.removeEventListener('abort', listener); | ||
}); | ||
} | ||
} | ||
catch (err) { | ||
_cb(err, null); | ||
} | ||
return { | ||
afterSubscribe | ||
}; | ||
// eslint-disable-next-line @typescript-eslint/return-await | ||
return crypto_1.cancelable(function* (child) { | ||
yield child(this.subscribe([receiver])); | ||
return { | ||
afterSubscribe: crypto_1.cancelable(function* () { | ||
return (yield promise); | ||
}).finally(clear) | ||
}; | ||
}, this) | ||
.catch(async (err) => { | ||
await clear(); | ||
throw err; | ||
}); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
sendAndReceive(connection, message, filter, timeout) { | ||
let _continue = false; | ||
return crypto_1.cancelable(function* (child) { | ||
const { afterSubscribe: receivePromise } = (yield child(this.receive(connection.receiver, filter, timeout))); | ||
const next = crypto_1.cancelable(function* (child) { | ||
// @ts-ignore TS2339 | ||
receivePromise.name = 'RECEIVED'; | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
child(receivePromise); | ||
// wait for another instance to be able to receive "afterSubscribe" | ||
// before the submission happens | ||
yield untilTrue(() => _continue); | ||
yield this.send(connection.sender, message); | ||
return yield receivePromise; | ||
}, this); | ||
return { | ||
afterSubscribe: next | ||
}; | ||
}, this).finally(() => { | ||
_continue = true; | ||
}); | ||
async sendAndReceive(connection, message, opts) { | ||
const { afterSubscribe: receivePromise } = await this.receive(connection.receiver, opts); | ||
return { | ||
afterSubscribe: (async () => { | ||
await Promise.race([ | ||
this.send(connection.sender, message), | ||
receivePromise | ||
]); | ||
return await receivePromise; | ||
})() | ||
}; | ||
} | ||
@@ -285,0 +250,0 @@ } |
import { EDecryptionError } from '@consento/crypto/core/types'; | ||
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage, ICancelable } from '@consento/crypto/types'; | ||
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage } from '@consento/crypto/types'; | ||
export * from '@consento/crypto/types'; | ||
export interface INotificationsTransport { | ||
subscribe(receivers: IReceiver[]): Promise<boolean[]>; | ||
unsubscribe(receivers: IReceiver[]): Promise<boolean[]>; | ||
reset(receivers: IReceiver[]): Promise<boolean[]>; | ||
subscribe(receivers: Iterable<IReceiver>, opts?: { | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
unsubscribe(receivers: Iterable<IReceiver>, opts?: { | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
reset(receivers: Iterable<IReceiver>, opts?: { | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
send(channel: IAnnonymous, message: IEncryptedMessage): Promise<any[]>; | ||
@@ -63,19 +69,25 @@ on(event: 'error', handler: (error: Error) => void): this; | ||
export interface INotifications { | ||
subscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>; | ||
unsubscribe(receivers: IReceiver[], force?: boolean): ICancelable<boolean[]>; | ||
reset(receivers: IReceiver[]): ICancelable<boolean[]>; | ||
subscribe(receivers: IReceiver[], opts?: { | ||
force?: boolean; | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
unsubscribe(receivers: IReceiver[], opts?: { | ||
force?: boolean; | ||
signal?: AbortSignal; | ||
}): Promise<boolean[]>; | ||
reset(receivers: IReceiver[]): Promise<boolean[]>; | ||
processors: Set<INotificationProcessor>; | ||
send(sender: ISender, message: IEncodable): Promise<string[]>; | ||
receive(receiver: IReceiver): ICancelable<{ | ||
afterSubscribe: ICancelable<IEncodable>; | ||
receive<T extends IEncodable>(receiver: IReceiver, opts?: { | ||
filter?: IBodyFilter<T>; | ||
signal?: AbortSignal; | ||
}): Promise<{ | ||
afterSubscribe: Promise<IEncodable>; | ||
}>; | ||
receive<T extends IEncodable>(receiver: IReceiver, filter: IBodyFilter<T>): ICancelable<{ | ||
afterSubscribe: ICancelable<T>; | ||
sendAndReceive<T extends IEncodable>(connection: IConnection, message: IEncodable, opts?: { | ||
filter: IBodyFilter<T>; | ||
signal?: AbortSignal; | ||
}): Promise<{ | ||
afterSubscribe: Promise<T>; | ||
}>; | ||
sendAndReceive(connection: IConnection, message: IEncodable): ICancelable<{ | ||
afterSubscribe: ICancelable<IEncodable>; | ||
}>; | ||
sendAndReceive<T extends IEncodable>(connection: IConnection, message: IEncodable, filter: IBodyFilter<T>): ICancelable<{ | ||
afterSubscribe: ICancelable<T>; | ||
}>; | ||
} |
"use strict"; | ||
function __export(m) { | ||
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p]; | ||
} | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __exportStar = (this && this.__exportStar) || function(m, exports) { | ||
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
__export(require("@consento/crypto/types")); | ||
exports.EErrorCode = exports.ENotificationType = void 0; | ||
__exportStar(require("@consento/crypto/types"), exports); | ||
var ENotificationType; | ||
@@ -8,0 +16,0 @@ (function (ENotificationType) { |
{ | ||
"name": "@consento/api", | ||
"version": "0.1.3", | ||
"version": "0.2.0", | ||
"description": "JavasScript API for building things with consento.", | ||
@@ -10,3 +10,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"@consento/crypto": "^0.1.2", | ||
"@consento/crypto": "^0.2.0", | ||
"buffer": "^5.6.0", | ||
@@ -13,0 +13,0 @@ "events": "^3.0.0" |
@@ -0,5 +1,7 @@ | ||
/* eslint-disable @typescript-eslint/method-signature-style */ | ||
/* eslint-disable @typescript-eslint/no-throw-literal */ | ||
import { ISender, IReceiver, IEncodable, IEncryptedMessage, cancelable, ICancelable } from '@consento/crypto' | ||
import { ISender, IReceiver, IEncodable, IEncryptedMessage, AbortError } from '@consento/crypto' | ||
import { INotifications, INotificationsTransport, INotificationsOptions, IConnection, INotificationProcessor, EErrorCode, INotification, ISuccessNotification, INotificationError, IBodyFilter, ENotificationType, IDecryptionError } from './types' | ||
import { EventEmitter } from 'events' | ||
import { mapOutputToInput } from './mapOutputToInput' | ||
@@ -14,42 +16,2 @@ export function isSuccess (input: INotification): input is ISuccessNotification { | ||
const wait: (time: number) => Promise<void> = async (time: number) => await new Promise<void>(resolve => setTimeout(resolve, time)) | ||
async function untilTrue (op: () => boolean): Promise<void> { | ||
while (!op()) { | ||
await wait(0) | ||
} | ||
} | ||
// TODO: move to @consento/crypto | ||
function isCancelable <T> (promise: Promise<T>): promise is ICancelable<T> { | ||
// @ts-ignore TS2339 | ||
return typeof promise.cancel === 'function' | ||
} | ||
// TODO: move to @consento/crypto | ||
async function maybeChild <T> (child: <TChild>(cancelable: ICancelable<TChild>) => ICancelable<TChild>, promise: Promise<T>): Promise<T> { | ||
if (isCancelable(promise)) { | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
child(promise) | ||
} | ||
return await promise | ||
} | ||
async function mapOutputToInput <Input, Output> ({ input: inputData, op }: { input: Input[], op: (inputData: Input[]) => Promise<Output[]> }): Promise<Map<Input, Output>> { | ||
if (!Array.isArray(inputData)) { | ||
throw new Error(`Expected input to be list but was ${typeof inputData}`) | ||
} | ||
const outputData = await op(inputData) | ||
if (!Array.isArray(outputData)) { | ||
throw new Error(`Expected list response from ${op.toString()}`) | ||
} | ||
const received = new Map <Input, Output>() | ||
for (let i = 0; i < inputData.length; i++) { | ||
const inputEntry = inputData[i] | ||
const outputEntry = outputData[i] | ||
received.set(inputEntry, outputEntry) | ||
} | ||
return received | ||
} | ||
class EmptyTransport extends EventEmitter implements INotificationsTransport { | ||
@@ -73,2 +35,24 @@ async subscribe (receivers: IReceiver[]): Promise<boolean[]> { | ||
type Callback<T> = (error: Error, t: T) => any | ||
function interceptCallback <T> (callback: Callback<T>, setup: () => () => any): Callback<T> { | ||
const tearDown = setup() | ||
return (error: Error, t: T) => { | ||
const p = tearDown() | ||
if (p instanceof Promise) { | ||
p.then( | ||
() => callback(error, t), | ||
err => callback(error ?? err, t) | ||
) | ||
} else { | ||
callback(error, t) | ||
} | ||
} | ||
} | ||
function toCallback <T> (resolve: (data: T) => any, reject: (error: Error) => any): Callback<T> { | ||
return (error: Error, t: T) => | ||
(error !== null && error !== undefined) ? reject(error) : resolve(t) | ||
} | ||
export class Notifications implements INotifications { | ||
@@ -156,63 +140,54 @@ _transport: INotificationsTransport | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
reset (receivers: IReceiver[]): ICancelable<boolean[]> { | ||
return cancelable <boolean[], Notifications>(function * (child) { | ||
const received: Map<IReceiver, boolean> = yield mapOutputToInput({ | ||
input: receivers, | ||
op: async input => await maybeChild(child, this._transport.reset(input)) | ||
}) | ||
this._receivers = {} | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver | ||
} | ||
return changed | ||
}) | ||
}, this) | ||
async reset (receivers: IReceiver[], opts?: { signal?: AbortSignal }): Promise<boolean[]> { | ||
const received: Map<IReceiver, boolean> = await mapOutputToInput({ | ||
input: receivers, | ||
op: async input => await this._transport.reset(input, opts) | ||
}) | ||
this._receivers = {} | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver | ||
} | ||
return changed | ||
}) | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
subscribe (receivers: IReceiver[], force: boolean = false): ICancelable<boolean[]> { | ||
return cancelable <boolean[], Notifications>(function * (child) { | ||
if (receivers.length === 0) { | ||
return [] | ||
} | ||
async subscribe (receivers: IReceiver[], { force, signal }: { force?: boolean, signal?: AbortSignal } = {}): Promise<boolean[]> { | ||
if (receivers.length === 0) { | ||
return [] | ||
} | ||
const received: Map<IReceiver, boolean> = yield mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined), | ||
op: async input => await maybeChild(child, this._transport.subscribe(input)) | ||
}) | ||
const received: Map<IReceiver, boolean> = await mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] === undefined), | ||
op: async input => await this._transport.subscribe(input, { signal }) | ||
}) | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver | ||
} | ||
return changed | ||
}) | ||
}, this) | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false | ||
if (changed) { | ||
this._receivers[receiver.idBase64] = receiver | ||
} | ||
return changed | ||
}) | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
unsubscribe (receivers: IReceiver[], force: boolean = false): ICancelable<boolean[]> { | ||
return cancelable <boolean[], Notifications>(function * (child) { | ||
if (receivers.length === 0) { | ||
return [] | ||
} | ||
async unsubscribe (receivers: IReceiver[], { force, signal }: { force: boolean, signal?: AbortSignal } = { force: false }): Promise<boolean[]> { | ||
if (receivers.length === 0) { | ||
return [] | ||
} | ||
const received: Map<IReceiver, boolean> = yield mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined), | ||
op: async input => await maybeChild(child, this._transport.unsubscribe(input)) | ||
}) | ||
const received: Map<IReceiver, boolean> = await mapOutputToInput({ | ||
input: force ? receivers : receivers.filter(receiver => this._receivers[receiver.idBase64] !== undefined), | ||
op: async input => await this._transport.unsubscribe(input, { signal }) | ||
}) | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false | ||
if (changed) { | ||
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete | ||
delete this._receivers[receiver.idBase64] | ||
} | ||
return changed | ||
}) | ||
}, this) | ||
return receivers.map(receiver => { | ||
const changed = received.get(receiver) || false | ||
if (changed) { | ||
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete | ||
delete this._receivers[receiver.idBase64] | ||
} | ||
return changed | ||
}) | ||
} | ||
@@ -238,14 +213,4 @@ | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
receive <T extends IEncodable> (receiver: IReceiver, filter?: IBodyFilter<T>, timeout?: number): ICancelable<{ afterSubscribe: ICancelable<T> }> { | ||
let _resolve: (t: T) => any | ||
let _reject: (error: Error) => void | ||
let timer: any | ||
const promise = new Promise<T>((resolve, reject) => { | ||
_resolve = resolve | ||
_reject = reject | ||
}) | ||
if (timeout !== undefined && timeout !== null) { | ||
timer = setTimeout(() => _reject(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout })), timeout) | ||
} | ||
async receive <T extends IEncodable> (receiver: IReceiver, { filter, timeout, signal }: { filter?: IBodyFilter<T>, timeout?: number, signal?: AbortSignal } = {}): Promise<{ afterSubscribe: Promise<T> }> { | ||
let _cb: Callback<T> | ||
const processor = (message: INotification): void => { | ||
@@ -255,57 +220,63 @@ if (isSuccess(message) && message.channelIdBase64 === receiver.idBase64) { | ||
if (typeof filter !== 'function' || filter(body)) { | ||
_resolve(body as T) | ||
_cb(null, body as T) | ||
} | ||
} | ||
} | ||
const afterSubscribe = new Promise<T>((resolve, reject) => { | ||
_cb = toCallback(resolve, reject) | ||
}) | ||
this.processors.add(processor) | ||
const clear = async (): Promise<void> => { | ||
this.processors.delete(processor) | ||
if (timer !== undefined) { | ||
clearTimeout(timer) | ||
_cb = interceptCallback(_cb, () => { | ||
return async (): Promise<void> => { | ||
this.processors.delete(processor) | ||
await this.unsubscribe([receiver]) | ||
} | ||
await this.unsubscribe([receiver]) | ||
}) | ||
try { | ||
await this.subscribe([receiver], { signal }) | ||
if (timeout !== undefined && timeout !== null) { | ||
_cb = interceptCallback(_cb, () => { | ||
const timer = setTimeout( | ||
() => _cb(Object.assign(new Error(`Not received within ${timeout} milliseconds`), { code: 'timeout', timeout }), null), | ||
timeout | ||
) | ||
return () => clearTimeout(timer) | ||
}) | ||
} | ||
if (signal !== undefined && signal !== null) { | ||
_cb = interceptCallback(_cb, () => { | ||
const listener = (): void => _cb(new AbortError(), null) | ||
signal.addEventListener('abort', listener) | ||
return () => signal.removeEventListener('abort', listener) | ||
}) | ||
} | ||
} catch (err) { | ||
_cb(err, null) | ||
} | ||
// eslint-disable-next-line @typescript-eslint/return-await | ||
return cancelable<{ afterSubscribe: ICancelable<T> }, Notifications>(function * (child) { | ||
yield child(this.subscribe([receiver])) | ||
return { | ||
afterSubscribe: cancelable<{ afterSubscribe: ICancelable }>(function * () { | ||
return (yield promise) as T | ||
}).finally(clear) | ||
} | ||
}, this) | ||
.catch(async (err: Error): Promise<{ afterSubscribe: ICancelable<T> }> => { | ||
await clear() | ||
throw err | ||
}) | ||
return { | ||
afterSubscribe | ||
} | ||
} | ||
// eslint-disable-next-line @typescript-eslint/promise-function-async | ||
sendAndReceive <T extends IEncodable = IEncodable> ( | ||
async sendAndReceive <T extends IEncodable = IEncodable> ( | ||
connection: IConnection, | ||
message: IEncodable, | ||
filter?: IBodyFilter<T>, | ||
timeout?: number | ||
): ICancelable<{ afterSubscribe: ICancelable<T> }> { | ||
let _continue = false | ||
return cancelable<{ afterSubscribe: ICancelable<T> }, Notifications>(function * (child) { | ||
const { afterSubscribe: receivePromise } = (yield child(this.receive(connection.receiver, filter, timeout))) as { afterSubscribe: ICancelable<T> } | ||
const next = cancelable(function * (child) { | ||
// @ts-ignore TS2339 | ||
receivePromise.name = 'RECEIVED' | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
child(receivePromise) | ||
// wait for another instance to be able to receive "afterSubscribe" | ||
// before the submission happens | ||
yield untilTrue(() => _continue) | ||
yield this.send(connection.sender, message) | ||
return yield receivePromise | ||
}, this) | ||
return { | ||
afterSubscribe: next | ||
} | ||
}, this).finally(() => { | ||
_continue = true | ||
}) | ||
opts?: { | ||
filter?: IBodyFilter<T> | ||
timeout?: number | ||
signal?: AbortSignal | ||
} | ||
): Promise<{ afterSubscribe: Promise<T> }> { | ||
const { afterSubscribe: receivePromise } = await this.receive(connection.receiver, opts) | ||
return { | ||
afterSubscribe: (async () => { | ||
await Promise.race([ | ||
this.send(connection.sender, message), | ||
receivePromise | ||
]) | ||
return await receivePromise | ||
})() | ||
} | ||
} | ||
} |
@@ -0,3 +1,4 @@ | ||
/* eslint-disable @typescript-eslint/method-signature-style */ | ||
import { EDecryptionError } from '@consento/crypto/core/types' | ||
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage, ICancelable } from '@consento/crypto/types' | ||
import { ISender, IReceiver, IAnnonymous, IEncodable, IEncryptedMessage } from '@consento/crypto/types' | ||
@@ -7,5 +8,5 @@ export * from '@consento/crypto/types' | ||
export interface INotificationsTransport { | ||
subscribe (receivers: IReceiver[]): Promise<boolean[]> | ||
unsubscribe (receivers: IReceiver[]): Promise<boolean[]> | ||
reset (receivers: IReceiver[]): Promise<boolean[]> | ||
subscribe (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]> | ||
unsubscribe (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]> | ||
reset (receivers: Iterable<IReceiver>, opts?: { signal?: AbortSignal }): Promise<boolean[]> | ||
send (channel: IAnnonymous, message: IEncryptedMessage): Promise<any[]> | ||
@@ -78,11 +79,9 @@ on (event: 'error', handler: (error: Error) => void): this | ||
export interface INotifications { | ||
subscribe (receivers: IReceiver[], force?: boolean): ICancelable<boolean[]> | ||
unsubscribe (receivers: IReceiver[], force?: boolean): ICancelable<boolean[]> | ||
reset (receivers: IReceiver[]): ICancelable<boolean[]> | ||
subscribe (receivers: IReceiver[], opts?: { force?: boolean, signal?: AbortSignal }): Promise<boolean[]> | ||
unsubscribe (receivers: IReceiver[], opts?: { force?: boolean, signal?: AbortSignal }): Promise<boolean[]> | ||
reset (receivers: IReceiver[]): Promise<boolean[]> | ||
processors: Set<INotificationProcessor> | ||
send (sender: ISender, message: IEncodable): Promise<string[]> | ||
receive (receiver: IReceiver): ICancelable<{ afterSubscribe: ICancelable<IEncodable> }> | ||
receive <T extends IEncodable>(receiver: IReceiver, filter: IBodyFilter<T>): ICancelable<{ afterSubscribe: ICancelable<T> }> | ||
sendAndReceive (connection: IConnection, message: IEncodable): ICancelable<{ afterSubscribe: ICancelable<IEncodable> }> | ||
sendAndReceive <T extends IEncodable>(connection: IConnection, message: IEncodable, filter: IBodyFilter<T>): ICancelable<{ afterSubscribe: ICancelable<T> }> | ||
receive <T extends IEncodable>(receiver: IReceiver, opts?: { filter?: IBodyFilter<T>, signal?: AbortSignal }): Promise<{ afterSubscribe: Promise<IEncodable> }> | ||
sendAndReceive <T extends IEncodable>(connection: IConnection, message: IEncodable, opts?: { filter: IBodyFilter<T>, signal?: AbortSignal }): Promise<{ afterSubscribe: Promise<T> }> | ||
} |
15
types.js
"use strict"; | ||
function __export(m) { | ||
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p]; | ||
} | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __exportStar = (this && this.__exportStar) || function(m, exports) { | ||
for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
__export(require("./notifications/types")); | ||
__exportStar(require("./notifications/types"), exports); | ||
//# sourceMappingURL=types.js.map |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
49641
23
873
+ Added@consento/crypto@0.2.0(transitive)
+ Addedchacha20-universal@1.0.4(transitive)
+ Addedfunction-bind@1.1.2(transitive)
+ Addedhasown@2.0.2(transitive)
+ Addedis-core-module@2.16.1(transitive)
+ Addedpath-parse@1.0.7(transitive)
+ Addedresolve@1.22.10(transitive)
+ Addedsha256-universal@1.2.1(transitive)
+ Addedsha256-wasm@2.2.2(transitive)
+ Addedsha512-universal@1.2.1(transitive)
+ Addedsha512-wasm@2.3.4(transitive)
+ Addedsodium-javascript@0.8.0(transitive)
+ Addedsodium-native@3.4.1(transitive)
+ Addedsodium-universal@3.1.0(transitive)
+ Addedsupports-preserve-symlinks-flag@1.0.0(transitive)
- Removed@consento/crypto@0.1.3(transitive)
- Removedini@1.3.8(transitive)
- Removednan@2.22.2(transitive)
- Removednanoassert@1.1.0(transitive)
- Removedsodium-javascript@0.5.6(transitive)
- Removedsodium-native@2.4.9(transitive)
- Removedsodium-universal@2.0.0(transitive)
Updated@consento/crypto@^0.2.0