@agoric/eventual-send
Advanced tools
Comparing version 0.1.0 to 0.1.1
'use strict'; | ||
/** | ||
* Create an EPromise class that supports eventual send (infix-bang) operations. | ||
* This is a class that extends the BasePromise class argument (which may be platform | ||
* Promises, or some other implementation). Only the `new BasePromise(executor)` | ||
* constructor form is used directly by EPromise. | ||
* Modify a Promise class to have it support eventual send (infix-bang) operations. | ||
* | ||
@@ -14,208 +11,203 @@ * Based heavily on nanoq https://github.com/drses/nanoq/blob/master/src/nanoq.js | ||
* | ||
* @param {typeof Promise} BasePromise ES6 Promise contstructor | ||
* @returns {typeof EPromise} EPromise class | ||
* @param {typeof Promise} Promise ES6 Promise class to shim | ||
* @return {typeof EPromise} Extended promise | ||
*/ | ||
function makeEPromiseClass(BasePromise) { | ||
const presenceToResolvedRelay = new WeakMap(); | ||
const promiseToRelay = new WeakMap(); | ||
function maybeExtendPromise(Promise) { | ||
// Make idempotent, so we don't layer on top of a BasePromise that is adequate. | ||
let needsShim = false; | ||
for (const method of [ | ||
'get', | ||
'put', | ||
'post', | ||
'delete', | ||
'invoke', | ||
'fapply', | ||
'fcall', | ||
]) { | ||
if (typeof Promise.prototype[method] !== 'function') { | ||
needsShim = true; | ||
break; | ||
} | ||
} | ||
if (!needsShim) { | ||
// Already supports all the methods. | ||
return Promise; | ||
} | ||
// This special relay accepts Promises, and forwards | ||
// the remote to its corresponding resolvedRelay. | ||
const presenceToHandler = new WeakMap(); | ||
const presenceToPromise = new WeakMap(); | ||
const promiseToHandler = new WeakMap(); | ||
// This special handler accepts Promises, and forwards | ||
// handled Promises to their corresponding resolvedHandler. | ||
// | ||
// If passed a Promise that is not remote, perform | ||
// If passed a Promise that is not handled, perform | ||
// the corresponding local operation. | ||
let forwardingRelay; | ||
let forwardingHandler; | ||
function relay(p) { | ||
return promiseToRelay.get(p) || forwardingRelay; | ||
return promiseToHandler.get(p) || forwardingHandler; | ||
} | ||
class EPromise extends BasePromise { | ||
static makeRemote(executor, unresolvedRelay) { | ||
let remoteResolve; | ||
let remoteReject; | ||
let relayResolve; | ||
const remoteP = new EPromise((resolve, reject) => { | ||
remoteResolve = resolve; | ||
remoteReject = reject; | ||
}); | ||
Object.defineProperties( | ||
Promise.prototype, | ||
Object.getOwnPropertyDescriptors({ | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
}, | ||
if (!unresolvedRelay) { | ||
// Create a simple unresolvedRelay that just postpones until the | ||
// resolvedRelay is set. | ||
// | ||
// This is insufficient for actual remote Promises (too many round-trips), | ||
// but is an easy way to create a local Remote. | ||
const relayP = new EPromise(resolve => { | ||
relayResolve = resolve; | ||
}); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
}, | ||
const postpone = forwardedOperation => { | ||
// Just wait until the relay is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await relayP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
}, | ||
unresolvedRelay = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
// Until the remote is resolved, we use the unresolvedRelay. | ||
promiseToRelay.set(remoteP, unresolvedRelay); | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
function rejectRemote(reason) { | ||
if (relayResolve) { | ||
relayResolve(null); | ||
} | ||
remoteReject(reason); | ||
} | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
function resolveRemote(presence, resolvedRelay) { | ||
try { | ||
if (resolvedRelay) { | ||
// Sanity checks. | ||
if (Object(resolvedRelay) !== resolvedRelay) { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedRelay[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToResolvedRelay.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
}), | ||
); | ||
// Create a table entry for the presence mapped to the resolvedRelay. | ||
presenceToResolvedRelay.set(presence, resolvedRelay); | ||
} | ||
const baseResolve = Promise.resolve.bind(Promise); | ||
// Remove the mapping, as our resolvedRelay should be used instead. | ||
promiseToRelay.delete(remoteP); | ||
// Resolve with the new presence or other value. | ||
remoteResolve(presence); | ||
if (relayResolve) { | ||
// Activate the default unresolvedRelay. | ||
relayResolve(resolvedRelay); | ||
} | ||
} catch (e) { | ||
rejectRemote(e); | ||
// Add Promise.makeHandled and update Promise.resolve. | ||
Object.defineProperties( | ||
Promise, | ||
Object.getOwnPropertyDescriptors({ | ||
resolve(value) { | ||
// Resolving a Presence returns the pre-registered handled promise. | ||
const handledPromise = presenceToPromise.get(value); | ||
if (handledPromise) { | ||
return handledPromise; | ||
} | ||
} | ||
return baseResolve(value); | ||
}, | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveRemote, rejectRemote); | ||
makeHandled(executor, unresolvedHandler) { | ||
let handledResolve; | ||
let handledReject; | ||
let resolveTheHandler; | ||
const handledP = new Promise((resolve, reject) => { | ||
handledResolve = resolve; | ||
handledReject = reject; | ||
}); | ||
// Return a remote EPromise, which wil be resolved/rejected | ||
// by the executor. | ||
return remoteP; | ||
} | ||
if (!unresolvedHandler) { | ||
// Create a simple unresolvedHandler that just postpones until the | ||
// resolvedHandler is set. | ||
// | ||
// This is insufficient for actual remote handled Promises | ||
// (too many round-trips), but is an easy way to create a local handled Promise. | ||
const handlerP = new Promise(resolve => { | ||
resolveTheHandler = resolve; | ||
}); | ||
static resolve(value) { | ||
return new EPromise(resolve => resolve(value)); | ||
} | ||
const postpone = forwardedOperation => { | ||
// Just wait until the handler is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await handlerP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
static reject(reason) { | ||
return new EPromise((_resolve, reject) => reject(reason)); | ||
} | ||
unresolvedHandler = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
} | ||
// Until the handled promise is resolved, we use the unresolvedHandler. | ||
promiseToHandler.set(handledP, unresolvedHandler); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
} | ||
function rejectHandled(reason) { | ||
if (resolveTheHandler) { | ||
resolveTheHandler(null); | ||
} | ||
handledReject(reason); | ||
} | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
} | ||
function resolveHandled(presence, resolvedHandler) { | ||
try { | ||
if (resolvedHandler) { | ||
// Sanity checks. | ||
if (Object(resolvedHandler) !== resolvedHandler) { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedHandler[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToHandler.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Create table entries for the presence mapped to the resolvedHandler. | ||
presenceToPromise.set(presence, handledP); | ||
presenceToHandler.set(presence, resolvedHandler); | ||
} | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Remove the mapping, as our resolvedHandler should be used instead. | ||
promiseToHandler.delete(handledP); | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// Resolve with the new presence or other value. | ||
handledResolve(presence); | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// *********************************************************** | ||
// The rest of these static methods ensure we use the correct | ||
// EPromise.resolve and EPromise.reject, no matter what the | ||
// implementation of the inherited BasePromise is. | ||
static all(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Reject if any reject. | ||
if (item.status === 'rejected') { | ||
throw item.reason; | ||
if (resolveTheHandler) { | ||
// Activate the default unresolvedHandler. | ||
resolveTheHandler(resolvedHandler); | ||
} | ||
} catch (e) { | ||
rejectHandled(e); | ||
} | ||
} | ||
// Add the resolved value to the array. | ||
res[index] = item.value; | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveHandled, rejectHandled); | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// Return a handled Promise, which wil be resolved/rejected | ||
// by the executor. | ||
return handledP; | ||
}, | ||
}), | ||
); | ||
static allSettled(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Add the reified promise result to the array. | ||
res[index] = item; | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// TODO: Implement any(iterable) according to spec. | ||
// Also add it to the SES/Jessie whitelists. | ||
static race(iterable) { | ||
// Just return the first reified promise result, whether fulfilled or rejected. | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (_, item) => item); | ||
} | ||
} | ||
function makeForwarder(operation, localImpl) { | ||
return async (ep, ...args) => { | ||
const o = await ep; | ||
const resolvedRelay = presenceToResolvedRelay.get(o); | ||
if (resolvedRelay) { | ||
// The relay was resolved, so give it a naked object. | ||
return resolvedRelay[operation](o, ...args); | ||
const resolvedHandler = presenceToHandler.get(o); | ||
if (resolvedHandler) { | ||
// The handler was resolved, so give it a naked object. | ||
return resolvedHandler[operation](o, ...args); | ||
} | ||
@@ -229,3 +221,3 @@ | ||
forwardingRelay = { | ||
forwardingHandler = { | ||
GET: makeForwarder('GET', (o, key) => o[key]), | ||
@@ -241,154 +233,5 @@ PUT: makeForwarder('PUT', (o, key, val) => (o[key] = val)), | ||
}; | ||
/** | ||
* Reduce-like helper function to support iterable values mapped to Promise.resolve, | ||
* and combine them asynchronously. | ||
* | ||
* The combiner may be called in any order, and the collection is not necessarily | ||
* done iterating by the time it's called. | ||
* | ||
* The notable difference from reduce is that the combiner gets a reified | ||
* settled promise as its `item` argument, and returns a combiner action | ||
* with a `status` field of "rejected", "fulfilled", or "continue". | ||
* | ||
* @param {*} initValue first value of result | ||
* @param {Iterable} iterable values to EPromise.resolve | ||
* @param {Combiner} combiner synchronously reduce each item | ||
* @returns {EPromise<*>} | ||
*/ | ||
function combinePromises(initValue, iterable, combiner) { | ||
let result = initValue; | ||
// We use the platform async keyword here to simplify | ||
// the executor function. | ||
return new EPromise(async (resolve, reject) => { | ||
// We start at 1 to prevent the iterator from resolving | ||
// the EPromise until the loop is complete and all items | ||
// have been reduced. | ||
let countDown = 1; | ||
let alreadySettled = false; | ||
function rejectOnce(e) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
reject(e); | ||
} | ||
} | ||
function resolveOnce(value) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
resolve(value); | ||
} | ||
} | ||
function doCountDown() { | ||
countDown -= 1; | ||
if (countDown === 0) { | ||
// Resolve the outer promise. | ||
resolveOnce(result); | ||
} | ||
} | ||
async function doCombine(mapped, index) { | ||
if (alreadySettled) { | ||
// Short-circuit out of here, since we already | ||
// rejected or resolved. | ||
return; | ||
} | ||
// Either update the result or throw an exception. | ||
const action = await combiner(result, mapped, index); | ||
switch (action.status) { | ||
case 'continue': | ||
// eslint-disable-next-line prefer-destructuring | ||
result = action.result; | ||
break; | ||
case 'rejected': | ||
rejectOnce(action.reason); | ||
break; | ||
case 'fulfilled': | ||
// Resolve the outer promise. | ||
result = action.value; | ||
resolveOnce(result); | ||
break; | ||
default: | ||
throw TypeError(`Not a valid combiner return value: ${action}`); | ||
} | ||
doCountDown(); | ||
} | ||
try { | ||
let i = 0; | ||
for (const item of iterable) { | ||
const index = i; | ||
i += 1; | ||
// Say that we have one more to wait for. | ||
countDown += 1; | ||
EPromise.resolve(item) | ||
.then( | ||
value => doCombine({ status: 'fulfilled', value }, index), // Successful resolve. | ||
reason => doCombine({ status: 'rejected', reason }, index), // Failed resolve. | ||
) | ||
.catch(rejectOnce); | ||
} | ||
// If we had no items or they all settled before the | ||
// loop ended, this will count down to zero and resolve | ||
// the result. | ||
doCountDown(); | ||
} catch (e) { | ||
rejectOnce(e); | ||
} | ||
}); | ||
} | ||
return EPromise; | ||
return Promise; | ||
} | ||
/** | ||
* Return a new value based on a reified promise result. | ||
* | ||
* @callback Combiner | ||
* @param {*} previousValue last value passed with CombinerContinue | ||
* @param {SettledStatus} currentStatus current reified promise result | ||
* @param {number} currentIndex current index in the input iterable | ||
* @returns {CombinerContinue|SettledStatus} what to do next | ||
*/ | ||
/** | ||
* A reified settled promise. | ||
* @typedef {FulfilledStatus | RejectedStatus} SettledStatus | ||
*/ | ||
/** | ||
* A reified fulfilled promise. | ||
* | ||
* @typedef {Object} FulfilledStatus | ||
* @property {'fulfilled'} status the promise was fulfilled | ||
* @property {*} [value] the value of the promise resolution | ||
*/ | ||
/** | ||
* A reified rejected promise. | ||
* | ||
* @typedef {Object} RejectedStatus | ||
* @property {'rejected'} status the promise was rejected | ||
* @property {*} [reason] the value of the promise rejection | ||
*/ | ||
/** | ||
* Tell combinePromises to continue with a new value for the result. | ||
* | ||
* @typedef {Object} CombinerContinue | ||
* @property {'continue'} status continue with combining | ||
* @property {*} result the new result to use as `currentStatus` | ||
*/ | ||
module.exports = makeEPromiseClass; | ||
module.exports = maybeExtendPromise; |
/** | ||
* Create an EPromise class that supports eventual send (infix-bang) operations. | ||
* This is a class that extends the BasePromise class argument (which may be platform | ||
* Promises, or some other implementation). Only the `new BasePromise(executor)` | ||
* constructor form is used directly by EPromise. | ||
* Modify a Promise class to have it support eventual send (infix-bang) operations. | ||
* | ||
@@ -12,208 +9,203 @@ * Based heavily on nanoq https://github.com/drses/nanoq/blob/master/src/nanoq.js | ||
* | ||
* @param {typeof Promise} BasePromise ES6 Promise contstructor | ||
* @returns {typeof EPromise} EPromise class | ||
* @param {typeof Promise} Promise ES6 Promise class to shim | ||
* @return {typeof EPromise} Extended promise | ||
*/ | ||
function makeEPromiseClass(BasePromise) { | ||
const presenceToResolvedRelay = new WeakMap(); | ||
const promiseToRelay = new WeakMap(); | ||
function maybeExtendPromise(Promise) { | ||
// Make idempotent, so we don't layer on top of a BasePromise that is adequate. | ||
let needsShim = false; | ||
for (const method of [ | ||
'get', | ||
'put', | ||
'post', | ||
'delete', | ||
'invoke', | ||
'fapply', | ||
'fcall', | ||
]) { | ||
if (typeof Promise.prototype[method] !== 'function') { | ||
needsShim = true; | ||
break; | ||
} | ||
} | ||
if (!needsShim) { | ||
// Already supports all the methods. | ||
return Promise; | ||
} | ||
// This special relay accepts Promises, and forwards | ||
// the remote to its corresponding resolvedRelay. | ||
const presenceToHandler = new WeakMap(); | ||
const presenceToPromise = new WeakMap(); | ||
const promiseToHandler = new WeakMap(); | ||
// This special handler accepts Promises, and forwards | ||
// handled Promises to their corresponding resolvedHandler. | ||
// | ||
// If passed a Promise that is not remote, perform | ||
// If passed a Promise that is not handled, perform | ||
// the corresponding local operation. | ||
let forwardingRelay; | ||
let forwardingHandler; | ||
function relay(p) { | ||
return promiseToRelay.get(p) || forwardingRelay; | ||
return promiseToHandler.get(p) || forwardingHandler; | ||
} | ||
class EPromise extends BasePromise { | ||
static makeRemote(executor, unresolvedRelay) { | ||
let remoteResolve; | ||
let remoteReject; | ||
let relayResolve; | ||
const remoteP = new EPromise((resolve, reject) => { | ||
remoteResolve = resolve; | ||
remoteReject = reject; | ||
}); | ||
Object.defineProperties( | ||
Promise.prototype, | ||
Object.getOwnPropertyDescriptors({ | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
}, | ||
if (!unresolvedRelay) { | ||
// Create a simple unresolvedRelay that just postpones until the | ||
// resolvedRelay is set. | ||
// | ||
// This is insufficient for actual remote Promises (too many round-trips), | ||
// but is an easy way to create a local Remote. | ||
const relayP = new EPromise(resolve => { | ||
relayResolve = resolve; | ||
}); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
}, | ||
const postpone = forwardedOperation => { | ||
// Just wait until the relay is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await relayP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
}, | ||
unresolvedRelay = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
// Until the remote is resolved, we use the unresolvedRelay. | ||
promiseToRelay.set(remoteP, unresolvedRelay); | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
function rejectRemote(reason) { | ||
if (relayResolve) { | ||
relayResolve(null); | ||
} | ||
remoteReject(reason); | ||
} | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
function resolveRemote(presence, resolvedRelay) { | ||
try { | ||
if (resolvedRelay) { | ||
// Sanity checks. | ||
if (Object(resolvedRelay) !== resolvedRelay) { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedRelay[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToResolvedRelay.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
}), | ||
); | ||
// Create a table entry for the presence mapped to the resolvedRelay. | ||
presenceToResolvedRelay.set(presence, resolvedRelay); | ||
} | ||
const baseResolve = Promise.resolve.bind(Promise); | ||
// Remove the mapping, as our resolvedRelay should be used instead. | ||
promiseToRelay.delete(remoteP); | ||
// Resolve with the new presence or other value. | ||
remoteResolve(presence); | ||
if (relayResolve) { | ||
// Activate the default unresolvedRelay. | ||
relayResolve(resolvedRelay); | ||
} | ||
} catch (e) { | ||
rejectRemote(e); | ||
// Add Promise.makeHandled and update Promise.resolve. | ||
Object.defineProperties( | ||
Promise, | ||
Object.getOwnPropertyDescriptors({ | ||
resolve(value) { | ||
// Resolving a Presence returns the pre-registered handled promise. | ||
const handledPromise = presenceToPromise.get(value); | ||
if (handledPromise) { | ||
return handledPromise; | ||
} | ||
} | ||
return baseResolve(value); | ||
}, | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveRemote, rejectRemote); | ||
makeHandled(executor, unresolvedHandler) { | ||
let handledResolve; | ||
let handledReject; | ||
let resolveTheHandler; | ||
const handledP = new Promise((resolve, reject) => { | ||
handledResolve = resolve; | ||
handledReject = reject; | ||
}); | ||
// Return a remote EPromise, which wil be resolved/rejected | ||
// by the executor. | ||
return remoteP; | ||
} | ||
if (!unresolvedHandler) { | ||
// Create a simple unresolvedHandler that just postpones until the | ||
// resolvedHandler is set. | ||
// | ||
// This is insufficient for actual remote handled Promises | ||
// (too many round-trips), but is an easy way to create a local handled Promise. | ||
const handlerP = new Promise(resolve => { | ||
resolveTheHandler = resolve; | ||
}); | ||
static resolve(value) { | ||
return new EPromise(resolve => resolve(value)); | ||
} | ||
const postpone = forwardedOperation => { | ||
// Just wait until the handler is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await handlerP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
static reject(reason) { | ||
return new EPromise((_resolve, reject) => reject(reason)); | ||
} | ||
unresolvedHandler = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
} | ||
// Until the handled promise is resolved, we use the unresolvedHandler. | ||
promiseToHandler.set(handledP, unresolvedHandler); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
} | ||
function rejectHandled(reason) { | ||
if (resolveTheHandler) { | ||
resolveTheHandler(null); | ||
} | ||
handledReject(reason); | ||
} | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
} | ||
function resolveHandled(presence, resolvedHandler) { | ||
try { | ||
if (resolvedHandler) { | ||
// Sanity checks. | ||
if (Object(resolvedHandler) !== resolvedHandler) { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedHandler[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToHandler.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Create table entries for the presence mapped to the resolvedHandler. | ||
presenceToPromise.set(presence, handledP); | ||
presenceToHandler.set(presence, resolvedHandler); | ||
} | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Remove the mapping, as our resolvedHandler should be used instead. | ||
promiseToHandler.delete(handledP); | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// Resolve with the new presence or other value. | ||
handledResolve(presence); | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// *********************************************************** | ||
// The rest of these static methods ensure we use the correct | ||
// EPromise.resolve and EPromise.reject, no matter what the | ||
// implementation of the inherited BasePromise is. | ||
static all(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Reject if any reject. | ||
if (item.status === 'rejected') { | ||
throw item.reason; | ||
if (resolveTheHandler) { | ||
// Activate the default unresolvedHandler. | ||
resolveTheHandler(resolvedHandler); | ||
} | ||
} catch (e) { | ||
rejectHandled(e); | ||
} | ||
} | ||
// Add the resolved value to the array. | ||
res[index] = item.value; | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveHandled, rejectHandled); | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// Return a handled Promise, which wil be resolved/rejected | ||
// by the executor. | ||
return handledP; | ||
}, | ||
}), | ||
); | ||
static allSettled(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Add the reified promise result to the array. | ||
res[index] = item; | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// TODO: Implement any(iterable) according to spec. | ||
// Also add it to the SES/Jessie whitelists. | ||
static race(iterable) { | ||
// Just return the first reified promise result, whether fulfilled or rejected. | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (_, item) => item); | ||
} | ||
} | ||
function makeForwarder(operation, localImpl) { | ||
return async (ep, ...args) => { | ||
const o = await ep; | ||
const resolvedRelay = presenceToResolvedRelay.get(o); | ||
if (resolvedRelay) { | ||
// The relay was resolved, so give it a naked object. | ||
return resolvedRelay[operation](o, ...args); | ||
const resolvedHandler = presenceToHandler.get(o); | ||
if (resolvedHandler) { | ||
// The handler was resolved, so give it a naked object. | ||
return resolvedHandler[operation](o, ...args); | ||
} | ||
@@ -227,3 +219,3 @@ | ||
forwardingRelay = { | ||
forwardingHandler = { | ||
GET: makeForwarder('GET', (o, key) => o[key]), | ||
@@ -239,154 +231,5 @@ PUT: makeForwarder('PUT', (o, key, val) => (o[key] = val)), | ||
}; | ||
/** | ||
* Reduce-like helper function to support iterable values mapped to Promise.resolve, | ||
* and combine them asynchronously. | ||
* | ||
* The combiner may be called in any order, and the collection is not necessarily | ||
* done iterating by the time it's called. | ||
* | ||
* The notable difference from reduce is that the combiner gets a reified | ||
* settled promise as its `item` argument, and returns a combiner action | ||
* with a `status` field of "rejected", "fulfilled", or "continue". | ||
* | ||
* @param {*} initValue first value of result | ||
* @param {Iterable} iterable values to EPromise.resolve | ||
* @param {Combiner} combiner synchronously reduce each item | ||
* @returns {EPromise<*>} | ||
*/ | ||
function combinePromises(initValue, iterable, combiner) { | ||
let result = initValue; | ||
// We use the platform async keyword here to simplify | ||
// the executor function. | ||
return new EPromise(async (resolve, reject) => { | ||
// We start at 1 to prevent the iterator from resolving | ||
// the EPromise until the loop is complete and all items | ||
// have been reduced. | ||
let countDown = 1; | ||
let alreadySettled = false; | ||
function rejectOnce(e) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
reject(e); | ||
} | ||
} | ||
function resolveOnce(value) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
resolve(value); | ||
} | ||
} | ||
function doCountDown() { | ||
countDown -= 1; | ||
if (countDown === 0) { | ||
// Resolve the outer promise. | ||
resolveOnce(result); | ||
} | ||
} | ||
async function doCombine(mapped, index) { | ||
if (alreadySettled) { | ||
// Short-circuit out of here, since we already | ||
// rejected or resolved. | ||
return; | ||
} | ||
// Either update the result or throw an exception. | ||
const action = await combiner(result, mapped, index); | ||
switch (action.status) { | ||
case 'continue': | ||
// eslint-disable-next-line prefer-destructuring | ||
result = action.result; | ||
break; | ||
case 'rejected': | ||
rejectOnce(action.reason); | ||
break; | ||
case 'fulfilled': | ||
// Resolve the outer promise. | ||
result = action.value; | ||
resolveOnce(result); | ||
break; | ||
default: | ||
throw TypeError(`Not a valid combiner return value: ${action}`); | ||
} | ||
doCountDown(); | ||
} | ||
try { | ||
let i = 0; | ||
for (const item of iterable) { | ||
const index = i; | ||
i += 1; | ||
// Say that we have one more to wait for. | ||
countDown += 1; | ||
EPromise.resolve(item) | ||
.then( | ||
value => doCombine({ status: 'fulfilled', value }, index), // Successful resolve. | ||
reason => doCombine({ status: 'rejected', reason }, index), // Failed resolve. | ||
) | ||
.catch(rejectOnce); | ||
} | ||
// If we had no items or they all settled before the | ||
// loop ended, this will count down to zero and resolve | ||
// the result. | ||
doCountDown(); | ||
} catch (e) { | ||
rejectOnce(e); | ||
} | ||
}); | ||
} | ||
return EPromise; | ||
return Promise; | ||
} | ||
/** | ||
* Return a new value based on a reified promise result. | ||
* | ||
* @callback Combiner | ||
* @param {*} previousValue last value passed with CombinerContinue | ||
* @param {SettledStatus} currentStatus current reified promise result | ||
* @param {number} currentIndex current index in the input iterable | ||
* @returns {CombinerContinue|SettledStatus} what to do next | ||
*/ | ||
/** | ||
* A reified settled promise. | ||
* @typedef {FulfilledStatus | RejectedStatus} SettledStatus | ||
*/ | ||
/** | ||
* A reified fulfilled promise. | ||
* | ||
* @typedef {Object} FulfilledStatus | ||
* @property {'fulfilled'} status the promise was fulfilled | ||
* @property {*} [value] the value of the promise resolution | ||
*/ | ||
/** | ||
* A reified rejected promise. | ||
* | ||
* @typedef {Object} RejectedStatus | ||
* @property {'rejected'} status the promise was rejected | ||
* @property {*} [reason] the value of the promise rejection | ||
*/ | ||
/** | ||
* Tell combinePromises to continue with a new value for the result. | ||
* | ||
* @typedef {Object} CombinerContinue | ||
* @property {'continue'} status continue with combining | ||
* @property {*} result the new result to use as `currentStatus` | ||
*/ | ||
export default makeEPromiseClass; | ||
export default maybeExtendPromise; |
@@ -8,6 +8,3 @@ (function (global, factory) { | ||
/** | ||
* Create an EPromise class that supports eventual send (infix-bang) operations. | ||
* This is a class that extends the BasePromise class argument (which may be platform | ||
* Promises, or some other implementation). Only the `new BasePromise(executor)` | ||
* constructor form is used directly by EPromise. | ||
* Modify a Promise class to have it support eventual send (infix-bang) operations. | ||
* | ||
@@ -19,208 +16,203 @@ * Based heavily on nanoq https://github.com/drses/nanoq/blob/master/src/nanoq.js | ||
* | ||
* @param {typeof Promise} BasePromise ES6 Promise contstructor | ||
* @returns {typeof EPromise} EPromise class | ||
* @param {typeof Promise} Promise ES6 Promise class to shim | ||
* @return {typeof EPromise} Extended promise | ||
*/ | ||
function makeEPromiseClass(BasePromise) { | ||
const presenceToResolvedRelay = new WeakMap(); | ||
const promiseToRelay = new WeakMap(); | ||
function maybeExtendPromise(Promise) { | ||
// Make idempotent, so we don't layer on top of a BasePromise that is adequate. | ||
let needsShim = false; | ||
for (const method of [ | ||
'get', | ||
'put', | ||
'post', | ||
'delete', | ||
'invoke', | ||
'fapply', | ||
'fcall', | ||
]) { | ||
if (typeof Promise.prototype[method] !== 'function') { | ||
needsShim = true; | ||
break; | ||
} | ||
} | ||
if (!needsShim) { | ||
// Already supports all the methods. | ||
return Promise; | ||
} | ||
// This special relay accepts Promises, and forwards | ||
// the remote to its corresponding resolvedRelay. | ||
const presenceToHandler = new WeakMap(); | ||
const presenceToPromise = new WeakMap(); | ||
const promiseToHandler = new WeakMap(); | ||
// This special handler accepts Promises, and forwards | ||
// handled Promises to their corresponding resolvedHandler. | ||
// | ||
// If passed a Promise that is not remote, perform | ||
// If passed a Promise that is not handled, perform | ||
// the corresponding local operation. | ||
let forwardingRelay; | ||
let forwardingHandler; | ||
function relay(p) { | ||
return promiseToRelay.get(p) || forwardingRelay; | ||
return promiseToHandler.get(p) || forwardingHandler; | ||
} | ||
class EPromise extends BasePromise { | ||
static makeRemote(executor, unresolvedRelay) { | ||
let remoteResolve; | ||
let remoteReject; | ||
let relayResolve; | ||
const remoteP = new EPromise((resolve, reject) => { | ||
remoteResolve = resolve; | ||
remoteReject = reject; | ||
}); | ||
Object.defineProperties( | ||
Promise.prototype, | ||
Object.getOwnPropertyDescriptors({ | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
}, | ||
if (!unresolvedRelay) { | ||
// Create a simple unresolvedRelay that just postpones until the | ||
// resolvedRelay is set. | ||
// | ||
// This is insufficient for actual remote Promises (too many round-trips), | ||
// but is an easy way to create a local Remote. | ||
const relayP = new EPromise(resolve => { | ||
relayResolve = resolve; | ||
}); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
}, | ||
const postpone = forwardedOperation => { | ||
// Just wait until the relay is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await relayP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
}, | ||
unresolvedRelay = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
// Until the remote is resolved, we use the unresolvedRelay. | ||
promiseToRelay.set(remoteP, unresolvedRelay); | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
}, | ||
function rejectRemote(reason) { | ||
if (relayResolve) { | ||
relayResolve(null); | ||
} | ||
remoteReject(reason); | ||
} | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
function resolveRemote(presence, resolvedRelay) { | ||
try { | ||
if (resolvedRelay) { | ||
// Sanity checks. | ||
if (Object(resolvedRelay) !== resolvedRelay) { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedRelay[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved relay ${resolvedRelay} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToResolvedRelay.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
}, | ||
}), | ||
); | ||
// Create a table entry for the presence mapped to the resolvedRelay. | ||
presenceToResolvedRelay.set(presence, resolvedRelay); | ||
} | ||
const baseResolve = Promise.resolve.bind(Promise); | ||
// Remove the mapping, as our resolvedRelay should be used instead. | ||
promiseToRelay.delete(remoteP); | ||
// Resolve with the new presence or other value. | ||
remoteResolve(presence); | ||
if (relayResolve) { | ||
// Activate the default unresolvedRelay. | ||
relayResolve(resolvedRelay); | ||
} | ||
} catch (e) { | ||
rejectRemote(e); | ||
// Add Promise.makeHandled and update Promise.resolve. | ||
Object.defineProperties( | ||
Promise, | ||
Object.getOwnPropertyDescriptors({ | ||
resolve(value) { | ||
// Resolving a Presence returns the pre-registered handled promise. | ||
const handledPromise = presenceToPromise.get(value); | ||
if (handledPromise) { | ||
return handledPromise; | ||
} | ||
} | ||
return baseResolve(value); | ||
}, | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveRemote, rejectRemote); | ||
makeHandled(executor, unresolvedHandler) { | ||
let handledResolve; | ||
let handledReject; | ||
let resolveTheHandler; | ||
const handledP = new Promise((resolve, reject) => { | ||
handledResolve = resolve; | ||
handledReject = reject; | ||
}); | ||
// Return a remote EPromise, which wil be resolved/rejected | ||
// by the executor. | ||
return remoteP; | ||
} | ||
if (!unresolvedHandler) { | ||
// Create a simple unresolvedHandler that just postpones until the | ||
// resolvedHandler is set. | ||
// | ||
// This is insufficient for actual remote handled Promises | ||
// (too many round-trips), but is an easy way to create a local handled Promise. | ||
const handlerP = new Promise(resolve => { | ||
resolveTheHandler = resolve; | ||
}); | ||
static resolve(value) { | ||
return new EPromise(resolve => resolve(value)); | ||
} | ||
const postpone = forwardedOperation => { | ||
// Just wait until the handler is resolved/rejected. | ||
return async (p, ...args) => { | ||
// console.log(`forwarding ${forwardedOperation}`); | ||
await handlerP; | ||
return p[forwardedOperation](args); | ||
}; | ||
}; | ||
static reject(reason) { | ||
return new EPromise((_resolve, reject) => reject(reason)); | ||
} | ||
unresolvedHandler = { | ||
GET: postpone('get'), | ||
PUT: postpone('put'), | ||
DELETE: postpone('delete'), | ||
POST: postpone('post'), | ||
}; | ||
} | ||
get(key) { | ||
return relay(this).GET(this, key); | ||
} | ||
// Until the handled promise is resolved, we use the unresolvedHandler. | ||
promiseToHandler.set(handledP, unresolvedHandler); | ||
put(key, val) { | ||
return relay(this).PUT(this, key, val); | ||
} | ||
function rejectHandled(reason) { | ||
if (resolveTheHandler) { | ||
resolveTheHandler(null); | ||
} | ||
handledReject(reason); | ||
} | ||
delete(key) { | ||
return relay(this).DELETE(this, key); | ||
} | ||
function resolveHandled(presence, resolvedHandler) { | ||
try { | ||
if (resolvedHandler) { | ||
// Sanity checks. | ||
if (Object(resolvedHandler) !== resolvedHandler) { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} cannot be a primitive`, | ||
); | ||
} | ||
for (const method of ['GET', 'PUT', 'DELETE', 'POST']) { | ||
if (typeof resolvedHandler[method] !== 'function') { | ||
throw TypeError( | ||
`Resolved handler ${resolvedHandler} requires a ${method} method`, | ||
); | ||
} | ||
} | ||
if (Object(presence) !== presence) { | ||
throw TypeError(`Presence ${presence} cannot be a primitive`); | ||
} | ||
if (presence === null) { | ||
throw TypeError(`Presence ${presence} cannot be null`); | ||
} | ||
if (presenceToHandler.has(presence)) { | ||
throw TypeError(`Presence ${presence} is already mapped`); | ||
} | ||
if (presence && typeof presence.then === 'function') { | ||
throw TypeError( | ||
`Presence ${presence} cannot be a Promise or other thenable`, | ||
); | ||
} | ||
post(optKey, args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Create table entries for the presence mapped to the resolvedHandler. | ||
presenceToPromise.set(presence, handledP); | ||
presenceToHandler.set(presence, resolvedHandler); | ||
} | ||
invoke(optKey, ...args) { | ||
return relay(this).POST(this, optKey, args); | ||
} | ||
// Remove the mapping, as our resolvedHandler should be used instead. | ||
promiseToHandler.delete(handledP); | ||
fapply(args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// Resolve with the new presence or other value. | ||
handledResolve(presence); | ||
fcall(...args) { | ||
return relay(this).POST(this, undefined, args); | ||
} | ||
// *********************************************************** | ||
// The rest of these static methods ensure we use the correct | ||
// EPromise.resolve and EPromise.reject, no matter what the | ||
// implementation of the inherited BasePromise is. | ||
static all(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Reject if any reject. | ||
if (item.status === 'rejected') { | ||
throw item.reason; | ||
if (resolveTheHandler) { | ||
// Activate the default unresolvedHandler. | ||
resolveTheHandler(resolvedHandler); | ||
} | ||
} catch (e) { | ||
rejectHandled(e); | ||
} | ||
} | ||
// Add the resolved value to the array. | ||
res[index] = item.value; | ||
// Invoke the callback to let the user resolve/reject. | ||
executor(resolveHandled, rejectHandled); | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// Return a handled Promise, which wil be resolved/rejected | ||
// by the executor. | ||
return handledP; | ||
}, | ||
}), | ||
); | ||
static allSettled(iterable) { | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (res, item, index) => { | ||
// Add the reified promise result to the array. | ||
res[index] = item; | ||
// Continue combining promise results. | ||
return { status: 'continue', result: res }; | ||
}); | ||
} | ||
// TODO: Implement any(iterable) according to spec. | ||
// Also add it to the SES/Jessie whitelists. | ||
static race(iterable) { | ||
// Just return the first reified promise result, whether fulfilled or rejected. | ||
// eslint-disable-next-line no-use-before-define | ||
return combinePromises([], iterable, (_, item) => item); | ||
} | ||
} | ||
function makeForwarder(operation, localImpl) { | ||
return async (ep, ...args) => { | ||
const o = await ep; | ||
const resolvedRelay = presenceToResolvedRelay.get(o); | ||
if (resolvedRelay) { | ||
// The relay was resolved, so give it a naked object. | ||
return resolvedRelay[operation](o, ...args); | ||
const resolvedHandler = presenceToHandler.get(o); | ||
if (resolvedHandler) { | ||
// The handler was resolved, so give it a naked object. | ||
return resolvedHandler[operation](o, ...args); | ||
} | ||
@@ -234,3 +226,3 @@ | ||
forwardingRelay = { | ||
forwardingHandler = { | ||
GET: makeForwarder('GET', (o, key) => o[key]), | ||
@@ -246,156 +238,7 @@ PUT: makeForwarder('PUT', (o, key, val) => (o[key] = val)), | ||
}; | ||
/** | ||
* Reduce-like helper function to support iterable values mapped to Promise.resolve, | ||
* and combine them asynchronously. | ||
* | ||
* The combiner may be called in any order, and the collection is not necessarily | ||
* done iterating by the time it's called. | ||
* | ||
* The notable difference from reduce is that the combiner gets a reified | ||
* settled promise as its `item` argument, and returns a combiner action | ||
* with a `status` field of "rejected", "fulfilled", or "continue". | ||
* | ||
* @param {*} initValue first value of result | ||
* @param {Iterable} iterable values to EPromise.resolve | ||
* @param {Combiner} combiner synchronously reduce each item | ||
* @returns {EPromise<*>} | ||
*/ | ||
function combinePromises(initValue, iterable, combiner) { | ||
let result = initValue; | ||
// We use the platform async keyword here to simplify | ||
// the executor function. | ||
return new EPromise(async (resolve, reject) => { | ||
// We start at 1 to prevent the iterator from resolving | ||
// the EPromise until the loop is complete and all items | ||
// have been reduced. | ||
let countDown = 1; | ||
let alreadySettled = false; | ||
function rejectOnce(e) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
reject(e); | ||
} | ||
} | ||
function resolveOnce(value) { | ||
if (!alreadySettled) { | ||
alreadySettled = true; | ||
resolve(value); | ||
} | ||
} | ||
function doCountDown() { | ||
countDown -= 1; | ||
if (countDown === 0) { | ||
// Resolve the outer promise. | ||
resolveOnce(result); | ||
} | ||
} | ||
async function doCombine(mapped, index) { | ||
if (alreadySettled) { | ||
// Short-circuit out of here, since we already | ||
// rejected or resolved. | ||
return; | ||
} | ||
// Either update the result or throw an exception. | ||
const action = await combiner(result, mapped, index); | ||
switch (action.status) { | ||
case 'continue': | ||
// eslint-disable-next-line prefer-destructuring | ||
result = action.result; | ||
break; | ||
case 'rejected': | ||
rejectOnce(action.reason); | ||
break; | ||
case 'fulfilled': | ||
// Resolve the outer promise. | ||
result = action.value; | ||
resolveOnce(result); | ||
break; | ||
default: | ||
throw TypeError(`Not a valid combiner return value: ${action}`); | ||
} | ||
doCountDown(); | ||
} | ||
try { | ||
let i = 0; | ||
for (const item of iterable) { | ||
const index = i; | ||
i += 1; | ||
// Say that we have one more to wait for. | ||
countDown += 1; | ||
EPromise.resolve(item) | ||
.then( | ||
value => doCombine({ status: 'fulfilled', value }, index), // Successful resolve. | ||
reason => doCombine({ status: 'rejected', reason }, index), // Failed resolve. | ||
) | ||
.catch(rejectOnce); | ||
} | ||
// If we had no items or they all settled before the | ||
// loop ended, this will count down to zero and resolve | ||
// the result. | ||
doCountDown(); | ||
} catch (e) { | ||
rejectOnce(e); | ||
} | ||
}); | ||
} | ||
return EPromise; | ||
return Promise; | ||
} | ||
/** | ||
* Return a new value based on a reified promise result. | ||
* | ||
* @callback Combiner | ||
* @param {*} previousValue last value passed with CombinerContinue | ||
* @param {SettledStatus} currentStatus current reified promise result | ||
* @param {number} currentIndex current index in the input iterable | ||
* @returns {CombinerContinue|SettledStatus} what to do next | ||
*/ | ||
return maybeExtendPromise; | ||
/** | ||
* A reified settled promise. | ||
* @typedef {FulfilledStatus | RejectedStatus} SettledStatus | ||
*/ | ||
/** | ||
* A reified fulfilled promise. | ||
* | ||
* @typedef {Object} FulfilledStatus | ||
* @property {'fulfilled'} status the promise was fulfilled | ||
* @property {*} [value] the value of the promise resolution | ||
*/ | ||
/** | ||
* A reified rejected promise. | ||
* | ||
* @typedef {Object} RejectedStatus | ||
* @property {'rejected'} status the promise was rejected | ||
* @property {*} [reason] the value of the promise rejection | ||
*/ | ||
/** | ||
* Tell combinePromises to continue with a new value for the result. | ||
* | ||
* @typedef {Object} CombinerContinue | ||
* @property {'continue'} status continue with combining | ||
* @property {*} result the new result to use as `currentStatus` | ||
*/ | ||
return makeEPromiseClass; | ||
})); |
{ | ||
"name": "@agoric/eventual-send", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"description": "Extend a Promise class to implement the eventual-send API", | ||
@@ -5,0 +5,0 @@ "main": "dist/eventual-send.cjs.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
36846
619
1