Comparing version 3.1.4-29827da.0 to 3.1.4-431ae7e.0
import * as Logger from "bunyan"; | ||
import { Action, Block, HandlerVersion, IndexState } from "./interfaces"; | ||
import { Block, HandlerInfo, HandlerVersion, IndexState, NextBlock, VersionedAction } from "./interfaces"; | ||
/** | ||
@@ -11,6 +11,7 @@ * Takes `block`s output from implementations of `AbstractActionReader` and processes their actions through the | ||
export declare abstract class AbstractActionHandler { | ||
protected lastProcessedBlockNumber: number; | ||
protected lastProcessedBlockHash: string; | ||
protected handlerVersionName: string; | ||
lastProcessedBlockNumber: number; | ||
lastProcessedBlockHash: string; | ||
handlerVersionName: string; | ||
protected log: Logger; | ||
private deferredEffects; | ||
private handlerVersionMap; | ||
@@ -25,4 +26,8 @@ /** | ||
*/ | ||
handleBlock(block: Block, isRollback: boolean, isFirstBlock: boolean, isReplay?: boolean): Promise<[boolean, number]>; | ||
handleBlock(nextBlock: NextBlock, isReplay: boolean): Promise<number | null>; | ||
/** | ||
* Information about the current state of the Action Handler | ||
*/ | ||
readonly info: HandlerInfo; | ||
/** | ||
* Updates the `lastProcessedBlockNumber` and `lastProcessedBlockHash` meta state, coinciding with the block | ||
@@ -46,2 +51,13 @@ * that has just been processed. These are the same values read by `updateIndexState()`. | ||
/** | ||
* This method is used when matching the types of incoming actions against the types the `Updater`s and `Effect`s are | ||
* subscribed to. When this returns true, their corresponding functions will run. | ||
* | ||
* By default, this method tests for direct equivalence between the incoming candidate type and the type that is | ||
* subscribed. Override this method to extend this functionality (e.g. wildcards). | ||
* | ||
* @param candidateType The incoming action's type | ||
* @param subscribedType The type the Updater of Effect is subscribed to | ||
*/ | ||
protected matchActionType(candidateType: string, subscribedType: string): boolean; | ||
/** | ||
* Process actions against deterministically accumulating `Updater` functions. Returns a promise of versioned actions | ||
@@ -52,7 +68,7 @@ * for consumption by `runEffects`, to make sure the correct effects are run on blocks that include a `HandlerVersion` | ||
*/ | ||
protected applyUpdaters(state: any, block: Block, isReplay: boolean, context: any): Promise<Array<[Action, string]>>; | ||
protected applyUpdaters(state: any, block: Block, context: any, isReplay: boolean): Promise<VersionedAction[]>; | ||
/** | ||
* Process versioned actions against asynchronous side effects. | ||
*/ | ||
protected runEffects(versionedActions: Array<[Action, string]>, block: Block, context: any): void; | ||
protected runEffects(versionedActions: VersionedAction[], context: any, nextBlock: NextBlock): void; | ||
/** | ||
@@ -67,3 +83,7 @@ * Will run when a rollback block number is passed to handleActions. Implement this method to | ||
*/ | ||
protected handleActions(state: any, block: Block, context: any, isReplay: boolean): Promise<void>; | ||
protected handleActions(state: any, context: any, nextBlock: NextBlock, isReplay: boolean): Promise<void>; | ||
private range; | ||
private runOrDeferEffect; | ||
private runDeferredEffects; | ||
private getNextDeferredBlockNumber; | ||
private initHandlerVersions; | ||
@@ -70,0 +90,0 @@ private refreshIndexState; |
@@ -35,2 +35,3 @@ "use strict"; | ||
this.handlerVersionName = "v1"; | ||
this.deferredEffects = {}; | ||
this.handlerVersionMap = {}; | ||
@@ -43,6 +44,8 @@ this.initHandlerVersions(handlerVersions); | ||
*/ | ||
handleBlock(block, isRollback, isFirstBlock, isReplay = false) { | ||
handleBlock(nextBlock, isReplay) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const { block, blockMeta } = nextBlock; | ||
const { blockInfo } = block; | ||
if (isRollback || (isReplay && isFirstBlock)) { | ||
const { isRollback, isEarliestBlock } = blockMeta; | ||
if (isRollback || (isReplay && isEarliestBlock)) { | ||
const rollbackBlockNumber = blockInfo.blockNumber - 1; | ||
@@ -61,12 +64,12 @@ const rollbackCount = this.lastProcessedBlockNumber - rollbackBlockNumber; | ||
&& blockInfo.blockHash === this.lastProcessedBlockHash) { | ||
return [false, 0]; | ||
return null; | ||
} | ||
// If it's the first block but we've already processed blocks, seek to next block | ||
if (isFirstBlock && this.lastProcessedBlockHash) { | ||
return [true, nextBlockNeeded]; | ||
if (isEarliestBlock && this.lastProcessedBlockHash) { | ||
return nextBlockNeeded; | ||
} | ||
// Only check if this is the block we need if it's not the first block | ||
if (!isFirstBlock) { | ||
if (!isEarliestBlock) { | ||
if (blockInfo.blockNumber !== nextBlockNeeded) { | ||
return [true, nextBlockNeeded]; | ||
return nextBlockNeeded; | ||
} | ||
@@ -79,9 +82,32 @@ // Block sequence consistency should be handled by the ActionReader instance | ||
const handleWithArgs = (state, context = {}) => __awaiter(this, void 0, void 0, function* () { | ||
yield this.handleActions(state, block, context, isReplay); | ||
yield this.handleActions(state, context, nextBlock, isReplay); | ||
}); | ||
yield this.handleWithState(handleWithArgs); | ||
return [false, 0]; | ||
return null; | ||
}); | ||
} | ||
/** | ||
* Information about the current state of the Action Handler | ||
*/ | ||
get info() { | ||
return { | ||
lastProcessedBlockNumber: this.lastProcessedBlockNumber, | ||
lastProcessedBlockHash: this.lastProcessedBlockHash, | ||
handlerVersionName: this.handlerVersionName, | ||
}; | ||
} | ||
/** | ||
* This method is used when matching the types of incoming actions against the types the `Updater`s and `Effect`s are | ||
* subscribed to. When this returns true, their corresponding functions will run. | ||
* | ||
* By default, this method tests for direct equivalence between the incoming candidate type and the type that is | ||
* subscribed. Override this method to extend this functionality (e.g. wildcards). | ||
* | ||
* @param candidateType The incoming action's type | ||
* @param subscribedType The type the Updater of Effect is subscribed to | ||
*/ | ||
matchActionType(candidateType, subscribedType) { | ||
return candidateType === subscribedType; | ||
} | ||
/** | ||
* Process actions against deterministically accumulating `Updater` functions. Returns a promise of versioned actions | ||
@@ -92,3 +118,3 @@ * for consumption by `runEffects`, to make sure the correct effects are run on blocks that include a `HandlerVersion` | ||
*/ | ||
applyUpdaters(state, block, isReplay, context) { | ||
applyUpdaters(state, block, context, isReplay) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
@@ -101,3 +127,3 @@ const versionedActions = []; | ||
updaterIndex += 1; | ||
if (action.type === updater.actionType) { | ||
if (this.matchActionType(action.type, updater.actionType)) { | ||
const { payload } = action; | ||
@@ -117,3 +143,6 @@ const newVersion = yield updater.apply(state, payload, blockInfo, context); | ||
} | ||
versionedActions.push([action, this.handlerVersionName]); | ||
versionedActions.push({ | ||
action, | ||
handlerVersionName: this.handlerVersionName, | ||
}); | ||
} | ||
@@ -126,8 +155,8 @@ return versionedActions; | ||
*/ | ||
runEffects(versionedActions, block, context) { | ||
for (const [action, handlerVersionName] of versionedActions) { | ||
runEffects(versionedActions, context, nextBlock) { | ||
this.runDeferredEffects(nextBlock.lastIrreversibleBlockNumber); | ||
for (const { action, handlerVersionName } of versionedActions) { | ||
for (const effect of this.handlerVersionMap[handlerVersionName].effects) { | ||
if (action.type === effect.actionType) { | ||
const { payload } = action; | ||
effect.run(payload, block, context); | ||
if (this.matchActionType(action.type, effect.actionType)) { | ||
this.runOrDeferEffect(effect, action.payload, nextBlock, context); | ||
} | ||
@@ -140,8 +169,9 @@ } | ||
*/ | ||
handleActions(state, block, context, isReplay) { | ||
handleActions(state, context, nextBlock, isReplay) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const { block } = nextBlock; | ||
const { blockInfo } = block; | ||
const versionedActions = yield this.applyUpdaters(state, block, isReplay, context); | ||
const versionedActions = yield this.applyUpdaters(state, block, context, isReplay); | ||
if (!isReplay) { | ||
this.runEffects(versionedActions, block, context); | ||
this.runEffects(versionedActions, context, nextBlock); | ||
} | ||
@@ -153,2 +183,39 @@ yield this.updateIndexState(state, block, isReplay, this.handlerVersionName, context); | ||
} | ||
range(start, end) { | ||
return Array(end - start).fill(0).map((_, i) => i + start); | ||
} | ||
runOrDeferEffect(effect, payload, nextBlock, context) { | ||
const { block, lastIrreversibleBlockNumber } = nextBlock; | ||
const shouldRunImmediately = (!effect.deferUntilIrreversible || block.blockInfo.blockNumber <= lastIrreversibleBlockNumber); | ||
if (shouldRunImmediately) { | ||
effect.run(payload, block, context); | ||
} | ||
else if (!this.deferredEffects[block.blockInfo.blockNumber]) { | ||
this.deferredEffects[block.blockInfo.blockNumber] = [() => effect.run(payload, block, context)]; | ||
} | ||
else { | ||
this.deferredEffects[block.blockInfo.blockNumber].push(() => effect.run(payload, block, context)); | ||
} | ||
} | ||
runDeferredEffects(lastIrreversibleBlockNumber) { | ||
const nextDeferredBlockNumber = this.getNextDeferredBlockNumber(); | ||
if (!nextDeferredBlockNumber) { | ||
return; | ||
} | ||
for (const blockNumber of this.range(nextDeferredBlockNumber, lastIrreversibleBlockNumber + 1)) { | ||
if (this.deferredEffects[blockNumber]) { | ||
for (const deferredEffect of this.deferredEffects[blockNumber]) { | ||
deferredEffect(); | ||
} | ||
delete this.deferredEffects[blockNumber]; | ||
} | ||
} | ||
} | ||
getNextDeferredBlockNumber() { | ||
const blockNumbers = Object.keys(this.deferredEffects).map((num) => parseInt(num, 10)); | ||
if (blockNumbers.length === 0) { | ||
return 0; | ||
} | ||
return Math.min(...blockNumbers); | ||
} | ||
initHandlerVersions(handlerVersions) { | ||
@@ -155,0 +222,0 @@ if (handlerVersions.length === 0) { |
import * as Logger from "bunyan"; | ||
import { Block } from "./interfaces"; | ||
import { ActionReaderOptions, Block, NextBlock, ReaderInfo } from "./interfaces"; | ||
/** | ||
@@ -8,27 +8,12 @@ * Reads blocks from a blockchain, outputting normalized `Block` objects. | ||
startAtBlock: number; | ||
protected onlyIrreversible: boolean; | ||
protected maxHistoryLength: number; | ||
headBlockNumber: number; | ||
currentBlockNumber: number; | ||
isFirstBlock: boolean; | ||
protected currentBlockData: Block | null; | ||
protected onlyIrreversible: boolean; | ||
protected currentBlockData: Block; | ||
protected lastIrreversibleBlockNumber: number; | ||
protected blockHistory: Block[]; | ||
protected log: Logger; | ||
private isFirstRun; | ||
private initialized; | ||
constructor(options?: ActionReaderOptions); | ||
/** | ||
* @param startAtBlock For positive values, this sets the first block that this will start at. For negative | ||
* values, this will start at (most recent block + startAtBlock), effectively tailing the | ||
* chain. Be careful when using this feature, as this will make your starting block dynamic. | ||
* | ||
* @param onlyIrreversible When false (default), `getHeadBlockNumber` will load the most recent block number. When | ||
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible | ||
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality | ||
* is the responsibility of the implementing class. | ||
* | ||
* @param maxHistoryLength This determines how many blocks in the past are cached. This is used for determining | ||
* block validity during both normal operation and when rolling back. | ||
*/ | ||
constructor(startAtBlock?: number, onlyIrreversible?: boolean, maxHistoryLength?: number); | ||
/** | ||
* Loads the number of the latest block. | ||
@@ -54,3 +39,3 @@ */ | ||
*/ | ||
nextBlock(): Promise<[Block, boolean, boolean]>; | ||
getNextBlock(): Promise<NextBlock>; | ||
/** | ||
@@ -65,2 +50,6 @@ * Changes the state of the `AbstractActionReader` instance to have just processed the block at the given block | ||
/** | ||
* Information about the current state of the Action Reader | ||
*/ | ||
readonly info: ReaderInfo; | ||
/** | ||
* Incrementally rolls back reader state one block at a time, comparing the blockHistory with | ||
@@ -71,3 +60,8 @@ * newly fetched blocks. Fork resolution is finished when either the current block's previous hash | ||
protected resolveFork(): Promise<void>; | ||
private initBlockState; | ||
private getLatestNeededBlockNumber; | ||
private acceptBlock; | ||
private range; | ||
private pruneHistory; | ||
private reloadHistory; | ||
private addPreviousBlockToHistory; | ||
@@ -74,0 +68,0 @@ private logForkDetected; |
@@ -19,2 +19,11 @@ "use strict"; | ||
const Logger = __importStar(require("bunyan")); | ||
const defaultBlock = { | ||
blockInfo: { | ||
blockNumber: 0, | ||
blockHash: "", | ||
previousBlockHash: "", | ||
timestamp: new Date(0), | ||
}, | ||
actions: [], | ||
}; | ||
/** | ||
@@ -24,26 +33,12 @@ * Reads blocks from a blockchain, outputting normalized `Block` objects. | ||
class AbstractActionReader { | ||
/** | ||
* @param startAtBlock For positive values, this sets the first block that this will start at. For negative | ||
* values, this will start at (most recent block + startAtBlock), effectively tailing the | ||
* chain. Be careful when using this feature, as this will make your starting block dynamic. | ||
* | ||
* @param onlyIrreversible When false (default), `getHeadBlockNumber` will load the most recent block number. When | ||
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible | ||
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality | ||
* is the responsibility of the implementing class. | ||
* | ||
* @param maxHistoryLength This determines how many blocks in the past are cached. This is used for determining | ||
* block validity during both normal operation and when rolling back. | ||
*/ | ||
constructor(startAtBlock = 1, onlyIrreversible = false, maxHistoryLength = 600) { | ||
this.startAtBlock = startAtBlock; | ||
this.onlyIrreversible = onlyIrreversible; | ||
this.maxHistoryLength = maxHistoryLength; | ||
constructor(options = {}) { | ||
this.headBlockNumber = 0; | ||
this.isFirstBlock = true; | ||
this.currentBlockData = null; | ||
this.currentBlockData = defaultBlock; | ||
this.lastIrreversibleBlockNumber = 0; | ||
this.blockHistory = []; | ||
this.isFirstRun = true; | ||
this.currentBlockNumber = startAtBlock - 1; | ||
this.initialized = false; | ||
const optionsWithDefaults = Object.assign({ startAtBlock: 1, onlyIrreversible: false }, options); | ||
this.startAtBlock = optionsWithDefaults.startAtBlock; | ||
this.currentBlockNumber = optionsWithDefaults.startAtBlock - 1; | ||
this.onlyIrreversible = optionsWithDefaults.onlyIrreversible; | ||
this.log = Logger.createLogger({ name: "demux" }); | ||
@@ -58,42 +53,32 @@ } | ||
*/ | ||
nextBlock() { | ||
getNextBlock() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let blockData = null; | ||
let isRollback = false; | ||
let isNewBlock = false; | ||
// If we're on the head block, refresh current head block | ||
if (this.currentBlockNumber === this.headBlockNumber || !this.headBlockNumber) { | ||
const blockMeta = { | ||
isRollback: false, | ||
isNewBlock: false, | ||
isEarliestBlock: false, | ||
}; | ||
// TODO: Should this only be called when updating headBlockNumber? | ||
this.lastIrreversibleBlockNumber = yield this.getLastIrreversibleBlockNumber(); | ||
if (!this.initialized) { | ||
yield this.initBlockState(); | ||
} | ||
if (this.currentBlockNumber === this.headBlockNumber) { | ||
this.headBlockNumber = yield this.getLatestNeededBlockNumber(); | ||
} | ||
// If currentBlockNumber is negative, it means we wrap to the end of the chain (most recent blocks) | ||
if (this.currentBlockNumber < 0 && this.isFirstRun) { | ||
this.currentBlockNumber = this.headBlockNumber + this.currentBlockNumber; | ||
this.startAtBlock = this.currentBlockNumber + 1; | ||
} | ||
else if (this.isFirstRun) { | ||
this.isFirstRun = false; | ||
} | ||
// If we're now behind one or more new blocks, process them | ||
if (this.currentBlockNumber < this.headBlockNumber) { | ||
const unvalidatedBlockData = yield this.getBlock(this.currentBlockNumber + 1); | ||
const expectedHash = this.currentBlockData !== null ? this.currentBlockData.blockInfo.blockHash : "INVALID"; | ||
const actualHash = unvalidatedBlockData.blockInfo.previousBlockHash; | ||
// Continue if the new block is on the same chain as our history, or if we've just started | ||
if (expectedHash === actualHash || this.blockHistory.length === 0) { | ||
blockData = unvalidatedBlockData; // Block is now validated | ||
if (this.currentBlockData) { | ||
this.blockHistory.push(this.currentBlockData); // No longer current, belongs on history | ||
} | ||
this.blockHistory.splice(0, this.blockHistory.length - this.maxHistoryLength); // Trim history | ||
this.currentBlockData = blockData; // Replaced with the real current block | ||
isNewBlock = true; | ||
this.currentBlockNumber = this.currentBlockData.blockInfo.blockNumber; | ||
const expectedHash = this.currentBlockData.blockInfo.blockHash; | ||
const actualHash = this.currentBlockNumber ? | ||
unvalidatedBlockData.blockInfo.previousBlockHash : | ||
defaultBlock.blockInfo.blockHash; | ||
if (expectedHash === actualHash) { | ||
this.acceptBlock(unvalidatedBlockData); | ||
blockMeta.isNewBlock = true; | ||
} | ||
else { | ||
// Since the new block did not match our history, we can assume our history is wrong | ||
// and need to roll back | ||
this.logForkDetected(unvalidatedBlockData, expectedHash, actualHash); | ||
yield this.resolveFork(); | ||
isNewBlock = true; | ||
isRollback = true; // Signal action handler that we must roll back | ||
blockMeta.isNewBlock = true; | ||
blockMeta.isRollback = true; | ||
// Reset for safety, as new fork could have less blocks than the previous fork | ||
@@ -103,8 +88,8 @@ this.headBlockNumber = yield this.getLatestNeededBlockNumber(); | ||
} | ||
// Let handler know if this is the earliest block we'll send | ||
this.isFirstBlock = this.currentBlockNumber === this.startAtBlock; | ||
if (this.currentBlockData === null) { | ||
throw Error("currentBlockData must not be null."); | ||
} | ||
return [this.currentBlockData, isRollback, isNewBlock]; | ||
blockMeta.isEarliestBlock = this.currentBlockNumber === this.startAtBlock; | ||
return { | ||
block: this.currentBlockData, | ||
blockMeta, | ||
lastIrreversibleBlockNumber: this.lastIrreversibleBlockNumber, | ||
}; | ||
}); | ||
@@ -121,40 +106,26 @@ } | ||
return __awaiter(this, void 0, void 0, function* () { | ||
// Clear current block data | ||
this.currentBlockData = null; | ||
this.headBlockNumber = 0; | ||
this.headBlockNumber = yield this.getLatestNeededBlockNumber(); | ||
if (blockNumber < this.startAtBlock) { | ||
throw Error("Cannot seek to block before configured startAtBlock."); | ||
throw new Error("Cannot seek to block before configured `startAtBlock` number."); | ||
} | ||
// If we're going back to the first block, we don't want to get the preceding block | ||
if (blockNumber === 1) { | ||
this.blockHistory = []; | ||
this.currentBlockNumber = 0; | ||
return; | ||
if (blockNumber > this.headBlockNumber) { | ||
throw new Error(`Cannot seek to block number ${blockNumber} as it does not exist yet.`); | ||
} | ||
// Check if block exists in history | ||
let toDelete = -1; | ||
for (let i = this.blockHistory.length - 1; i >= 0; i--) { | ||
if (this.blockHistory[i].blockInfo.blockNumber === blockNumber) { | ||
break; | ||
} | ||
else { | ||
toDelete += 1; | ||
} | ||
} | ||
if (toDelete >= 0) { | ||
this.blockHistory.splice(toDelete); | ||
this.currentBlockData = this.blockHistory.pop() || null; | ||
} | ||
// Load current block | ||
this.currentBlockNumber = blockNumber - 1; | ||
if (!this.currentBlockData) { | ||
this.currentBlockData = yield this.getBlock(this.currentBlockNumber); | ||
} | ||
// Fetch block if there is no history | ||
if (this.blockHistory.length === 0) { | ||
yield this.addPreviousBlockToHistory(false); | ||
} | ||
yield this.reloadHistory(); | ||
}); | ||
} | ||
/** | ||
* Information about the current state of the Action Reader | ||
*/ | ||
get info() { | ||
return { | ||
currentBlockNumber: this.currentBlockNumber, | ||
startAtBlock: this.startAtBlock, | ||
headBlockNumber: this.headBlockNumber, | ||
onlyIrreversible: this.onlyIrreversible, | ||
lastIrreversibleBlockNumber: this.lastIrreversibleBlockNumber, | ||
}; | ||
} | ||
/** | ||
* Incrementally rolls back reader state one block at a time, comparing the blockHistory with | ||
@@ -166,5 +137,2 @@ * newly fetched blocks. Fork resolution is finished when either the current block's previous hash | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (this.currentBlockData === null) { | ||
throw Error("`currentBlockData` must not be null when initiating fork resolution."); | ||
} | ||
if (this.blockHistory.length === 0) { | ||
@@ -181,20 +149,31 @@ yield this.addPreviousBlockToHistory(); | ||
this.currentBlockData = yield this.getBlock(this.currentBlockData.blockInfo.blockNumber); | ||
if (this.currentBlockData !== null) { | ||
const { blockInfo: currentBlockInfo } = this.currentBlockData; | ||
const { blockInfo: previousBlockInfo } = previousBlockData; | ||
if (currentBlockInfo.previousBlockHash === previousBlockInfo.blockHash) { | ||
this.logForkResolved(currentBlockInfo, previousBlockInfo); | ||
break; | ||
} | ||
this.logForkMismatch(currentBlockInfo, previousBlockInfo); | ||
const { blockInfo: currentBlockInfo } = this.currentBlockData; | ||
const { blockInfo: previousBlockInfo } = previousBlockData; | ||
if (currentBlockInfo.previousBlockHash === previousBlockInfo.blockHash) { | ||
this.logForkResolved(currentBlockInfo, previousBlockInfo); | ||
break; | ||
} | ||
this.logForkMismatch(currentBlockInfo, previousBlockInfo); | ||
this.currentBlockData = previousBlockData; | ||
this.blockHistory.pop(); | ||
} | ||
if (this.blockHistory.length === 0) { | ||
yield this.addPreviousBlockToHistory(); | ||
} | ||
this.currentBlockNumber = this.blockHistory[this.blockHistory.length - 1].blockInfo.blockNumber + 1; | ||
}); | ||
} | ||
initBlockState() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this.headBlockNumber = yield this.getLatestNeededBlockNumber(); | ||
if (this.currentBlockNumber < 0) { | ||
this.currentBlockNumber = this.headBlockNumber + this.currentBlockNumber; | ||
this.startAtBlock = this.currentBlockNumber + 1; | ||
} | ||
yield this.reloadHistory(); | ||
this.initialized = true; | ||
}); | ||
} | ||
getLatestNeededBlockNumber() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this.lastIrreversibleBlockNumber = yield this.getLastIrreversibleBlockNumber(); | ||
if (this.onlyIrreversible) { | ||
@@ -208,8 +187,75 @@ return this.lastIrreversibleBlockNumber; | ||
} | ||
acceptBlock(blockData) { | ||
this.blockHistory.push(this.currentBlockData); | ||
this.pruneHistory(); | ||
this.currentBlockData = blockData; | ||
this.currentBlockNumber = this.currentBlockData.blockInfo.blockNumber; | ||
} | ||
range(start, end) { | ||
if (start > end) { | ||
return []; | ||
} | ||
return Array(end - start).fill(0).map((_, i) => i + start); | ||
} | ||
pruneHistory() { | ||
let toDelete = 0; | ||
for (const block of this.blockHistory) { | ||
if (block.blockInfo.blockNumber < this.lastIrreversibleBlockNumber) { | ||
toDelete += 1; | ||
} | ||
else { | ||
break; | ||
} | ||
} | ||
if (toDelete === this.blockHistory.length) { | ||
this.blockHistory = [this.blockHistory[this.blockHistory.length - 1]]; | ||
return; | ||
} | ||
this.blockHistory.splice(0, toDelete); | ||
} | ||
reloadHistory(maxTries = 10) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (this.currentBlockNumber === 0) { | ||
this.blockHistory = []; | ||
this.currentBlockData = defaultBlock; | ||
return; | ||
} | ||
if (this.currentBlockNumber === 1) { | ||
this.blockHistory = [defaultBlock]; | ||
this.currentBlockData = yield this.getBlock(1); | ||
return; | ||
} | ||
let historyRange = this.range(this.lastIrreversibleBlockNumber, this.currentBlockNumber + 1); | ||
if (historyRange.length <= 1) { | ||
historyRange = [this.currentBlockNumber - 1, this.currentBlockNumber]; | ||
} | ||
let microForked = true; | ||
let tryCount = 0; | ||
while (microForked) { | ||
microForked = false; | ||
this.blockHistory = []; | ||
for (const blockNumber of historyRange) { | ||
const historyBlock = yield this.getBlock(blockNumber); | ||
if (this.blockHistory.length === 0) { | ||
this.blockHistory.push(historyBlock); | ||
continue; | ||
} | ||
const latestHistoryBlockHash = this.blockHistory[this.blockHistory.length - 1].blockInfo.blockHash; | ||
if (latestHistoryBlockHash !== historyBlock.blockInfo.previousBlockHash) { | ||
microForked = true; | ||
break; | ||
} | ||
this.blockHistory.push(historyBlock); | ||
} | ||
tryCount += 1; | ||
if (tryCount === maxTries) { | ||
throw new Error("Could not reload history."); | ||
} | ||
} | ||
this.currentBlockData = this.blockHistory.pop(); | ||
}); | ||
} | ||
addPreviousBlockToHistory(checkIrreversiblility = true) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (!this.currentBlockData) { | ||
throw Error("`currentBlockData` must not be null when initiating fork resolution."); | ||
} | ||
if (this.currentBlockData.blockInfo.blockNumber <= this.lastIrreversibleBlockNumber && checkIrreversiblility) { | ||
if (this.currentBlockData.blockInfo.blockNumber < this.lastIrreversibleBlockNumber && checkIrreversiblility) { | ||
throw new Error("Last irreversible block has been passed without resolving fork"); | ||
@@ -216,0 +262,0 @@ } |
@@ -0,3 +1,5 @@ | ||
import * as Logger from "bunyan"; | ||
import { AbstractActionHandler } from "./AbstractActionHandler"; | ||
import { AbstractActionReader } from "./AbstractActionReader"; | ||
import { DemuxInfo } from "./interfaces"; | ||
/** | ||
@@ -16,2 +18,6 @@ * Coordinates implementations of `AbstractActionReader`s and `AbstractActionHandler`s in | ||
*/ | ||
protected log: Logger; | ||
private running; | ||
private shouldPause; | ||
private error; | ||
constructor(actionReader: AbstractActionReader, actionHandler: AbstractActionHandler, pollInterval: number); | ||
@@ -24,8 +30,24 @@ /** | ||
* Start a polling loop | ||
* | ||
* @param isReplay Set to true to disable Effects from running until caught up with head block. | ||
*/ | ||
watch(isReplay?: boolean): Promise<void>; | ||
/** | ||
* Start or resume indexing. | ||
*/ | ||
start(): boolean; | ||
/** | ||
* Suspend indexing. Will go into effect after the currently-processing block. | ||
*/ | ||
pause(): boolean; | ||
/** | ||
* Information about the current state of Demux | ||
*/ | ||
readonly info: DemuxInfo; | ||
/** | ||
* Use the actionReader and actionHandler to process new blocks. | ||
* | ||
* @param isReplay Set to true to disable Effects from running until caught up with head block. | ||
*/ | ||
protected checkForBlocks(isReplay?: boolean): Promise<void>; | ||
} |
@@ -10,3 +10,11 @@ "use strict"; | ||
}; | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k]; | ||
result["default"] = mod; | ||
return result; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const Logger = __importStar(require("bunyan")); | ||
/** | ||
@@ -17,7 +25,2 @@ * Coordinates implementations of `AbstractActionReader`s and `AbstractActionHandler`s in | ||
class BaseActionWatcher { | ||
/** | ||
* @param actionReader An instance of an implemented `AbstractActionReader` | ||
* @param actionHandler An instance of an implemented `AbstractActionHandler` | ||
* @param pollInterval Number of milliseconds between each polling loop iteration | ||
*/ | ||
constructor(actionReader, actionHandler, pollInterval) { | ||
@@ -27,2 +30,6 @@ this.actionReader = actionReader; | ||
this.pollInterval = pollInterval; | ||
this.running = false; | ||
this.shouldPause = false; | ||
this.error = null; | ||
this.log = Logger.createLogger({ name: "demux" }); | ||
} | ||
@@ -39,8 +46,28 @@ /** | ||
* Start a polling loop | ||
* | ||
* @param isReplay Set to true to disable Effects from running until caught up with head block. | ||
*/ | ||
watch(isReplay = false) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const startTime = new Date().getTime(); | ||
yield this.checkForBlocks(isReplay); | ||
const endTime = new Date().getTime(); | ||
if (this.shouldPause) { | ||
this.running = false; | ||
this.shouldPause = false; | ||
this.log.info("Indexing paused."); | ||
return; | ||
} | ||
this.running = true; | ||
this.error = null; | ||
const startTime = Date.now(); | ||
try { | ||
yield this.checkForBlocks(isReplay); | ||
} | ||
catch (err) { | ||
this.running = false; | ||
this.shouldPause = false; | ||
this.log.error(err); | ||
this.error = err; | ||
this.log.info("Indexing unexpectedly paused due to an error."); | ||
return; | ||
} | ||
const endTime = Date.now(); | ||
const duration = endTime - startTime; | ||
@@ -55,3 +82,53 @@ let waitTime = this.pollInterval - duration; | ||
/** | ||
* Start or resume indexing. | ||
*/ | ||
start() { | ||
if (this.running) { | ||
this.log.info("Cannot start; already indexing."); | ||
return false; | ||
} | ||
this.log.info("Starting indexing."); | ||
this.watch(); | ||
return true; | ||
} | ||
/** | ||
* Suspend indexing. Will go into effect after the currently-processing block. | ||
*/ | ||
pause() { | ||
if (!this.running) { | ||
this.log.info("Cannot pause; not currently indexing."); | ||
return false; | ||
} | ||
this.log.info("Pausing indexing."); | ||
this.shouldPause = true; | ||
return true; | ||
} | ||
/** | ||
* Information about the current state of Demux | ||
*/ | ||
get info() { | ||
let status; | ||
if (this.running && !this.shouldPause) { | ||
status = "indexing"; | ||
} | ||
else if (this.running && this.shouldPause) { | ||
status = "pausing"; | ||
} | ||
else { | ||
status = "paused"; | ||
} | ||
const info = { | ||
handler: this.actionHandler.info, | ||
reader: this.actionReader.info, | ||
status, | ||
}; | ||
if (this.error) { | ||
info.error = this.error; | ||
} | ||
return info; | ||
} | ||
/** | ||
* Use the actionReader and actionHandler to process new blocks. | ||
* | ||
* @param isReplay Set to true to disable Effects from running until caught up with head block. | ||
*/ | ||
@@ -62,14 +139,13 @@ checkForBlocks(isReplay = false) { | ||
while (!headBlockNumber || this.actionReader.currentBlockNumber < headBlockNumber) { | ||
const [blockData, isRollback, isNewBlock] = yield this.actionReader.nextBlock(); | ||
if (!isNewBlock) { | ||
if (this.shouldPause) { | ||
return; | ||
} | ||
const nextBlock = yield this.actionReader.getNextBlock(); | ||
if (!nextBlock.blockMeta.isNewBlock) { | ||
break; | ||
} | ||
let needToSeek = false; | ||
let seekBlockNum = 0; | ||
if (blockData) { | ||
[needToSeek, seekBlockNum] = yield this.actionHandler.handleBlock(blockData, isRollback, this.actionReader.isFirstBlock, isReplay); | ||
const nextBlockNumberNeeded = yield this.actionHandler.handleBlock(nextBlock, isReplay); | ||
if (nextBlockNumberNeeded) { | ||
yield this.actionReader.seekToBlock(nextBlockNumberNeeded - 1); | ||
} | ||
if (needToSeek) { | ||
yield this.actionReader.seekToBlock(seekBlockNum - 1); | ||
} | ||
headBlockNumber = this.actionReader.headBlockNumber; | ||
@@ -76,0 +152,0 @@ } |
@@ -5,1 +5,2 @@ export { Action, Block, BlockInfo, Effect, HandlerVersion, IndexState, Updater } from "./interfaces"; | ||
export { BaseActionWatcher } from "./BaseActionWatcher"; | ||
export { ExpressActionWatcher } from "./ExpressActionWatcher"; |
@@ -9,1 +9,3 @@ "use strict"; | ||
exports.BaseActionWatcher = BaseActionWatcher_1.BaseActionWatcher; | ||
var ExpressActionWatcher_1 = require("./ExpressActionWatcher"); | ||
exports.ExpressActionWatcher = ExpressActionWatcher_1.ExpressActionWatcher; |
@@ -0,1 +1,21 @@ | ||
export interface ActionReaderOptions { | ||
/** | ||
* For positive values, this sets the first block that this will start at. For negative | ||
* values, this will start at (most recent block + startAtBlock), effectively tailing the | ||
* chain. Be careful when using this feature, as this will make your starting block dynamic. | ||
*/ | ||
startAtBlock?: number; | ||
/** | ||
* When false (default), `getHeadBlockNumber` will load the most recent block number. When | ||
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible | ||
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality | ||
* is the responsibility of the implementing class. | ||
*/ | ||
onlyIrreversible?: boolean; | ||
/** | ||
* This determines how many blocks in the past are cached. This is used for determining | ||
* block validity during both normal operation and when rolling back. | ||
*/ | ||
maxHistoryLength?: number; | ||
} | ||
export interface Block { | ||
@@ -5,2 +25,7 @@ actions: Action[]; | ||
} | ||
export interface BlockMeta { | ||
isRollback: boolean; | ||
isEarliestBlock: boolean; | ||
isNewBlock: boolean; | ||
} | ||
export interface IndexState { | ||
@@ -18,2 +43,7 @@ blockNumber: number; | ||
} | ||
export interface NextBlock { | ||
block: Block; | ||
blockMeta: BlockMeta; | ||
lastIrreversibleBlockNumber: number; | ||
} | ||
export interface Action { | ||
@@ -34,2 +64,3 @@ type: string; | ||
run: StatelessActionCallback; | ||
deferUntilIrreversible?: boolean; | ||
onRollback?: StatelessActionCallback; | ||
@@ -42,1 +73,27 @@ } | ||
} | ||
export interface VersionedAction { | ||
action: Action; | ||
handlerVersionName: string; | ||
} | ||
export declare type CurriedEffectRun = (() => void | Promise<void>); | ||
export interface DeferredEffects { | ||
[key: number]: CurriedEffectRun[]; | ||
} | ||
export interface HandlerInfo { | ||
lastProcessedBlockNumber: number; | ||
lastProcessedBlockHash: string; | ||
handlerVersionName: string; | ||
} | ||
export interface ReaderInfo { | ||
currentBlockNumber: number; | ||
startAtBlock: number; | ||
headBlockNumber: number; | ||
onlyIrreversible: boolean; | ||
lastIrreversibleBlockNumber: number; | ||
} | ||
export interface DemuxInfo { | ||
status: string; | ||
error?: Error; | ||
handler: HandlerInfo; | ||
reader: ReaderInfo; | ||
} |
{ | ||
"name": "demux", | ||
"version": "3.1.4-29827da.0", | ||
"version": "3.1.4-431ae7e.0", | ||
"author": { | ||
@@ -21,4 +21,5 @@ "name": "Julien Heller", | ||
"eslint-plugin-import": "^2.7.0", | ||
"jest": "^22.4.3", | ||
"jest": "^23.6.0", | ||
"release-it": "^7.5.0", | ||
"supertest": "^3.4.1", | ||
"ts-jest": "^23.0.0", | ||
@@ -36,4 +37,5 @@ "tslint": "^5.10.0", | ||
"lint": "tslint -c tslint.json -p tsconfig.json", | ||
"test": "jest", | ||
"build-docs": "./build-docs.sh" | ||
"test": "jest --detectOpenHandles --maxWorkers=2", | ||
"build-docs": "./build-docs.sh", | ||
"current-version": "echo $npm_package_version" | ||
}, | ||
@@ -58,4 +60,7 @@ "jest": { | ||
"dependencies": { | ||
"bunyan": "^1.8.12" | ||
"@types/express": "^4.16.0", | ||
"@types/supertest": "^2.0.7", | ||
"bunyan": "^1.8.12", | ||
"express": "^4.16.4" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
327440
19
1106
0
4
15
+ Added@types/express@^4.16.0
+ Added@types/supertest@^2.0.7
+ Addedexpress@^4.16.4
+ Added@types/body-parser@1.19.5(transitive)
+ Added@types/connect@3.4.38(transitive)
+ Added@types/cookiejar@2.1.5(transitive)
+ Added@types/express@4.17.21(transitive)
+ Added@types/express-serve-static-core@4.19.6(transitive)
+ Added@types/http-errors@2.0.4(transitive)
+ Added@types/methods@1.1.4(transitive)
+ Added@types/mime@1.3.5(transitive)
+ Added@types/node@22.13.4(transitive)
+ Added@types/qs@6.9.18(transitive)
+ Added@types/range-parser@1.2.7(transitive)
+ Added@types/send@0.17.4(transitive)
+ Added@types/serve-static@1.15.7(transitive)
+ Added@types/superagent@8.1.9(transitive)
+ Added@types/supertest@2.0.16(transitive)
+ Addedaccepts@1.3.8(transitive)
+ Addedarray-flatten@1.1.1(transitive)
+ Addedasynckit@0.4.0(transitive)
+ Addedbody-parser@1.20.3(transitive)
+ Addedbytes@3.1.2(transitive)
+ Addedcall-bind-apply-helpers@1.0.2(transitive)
+ Addedcall-bound@1.0.3(transitive)
+ Addedcombined-stream@1.0.8(transitive)
+ Addedcontent-disposition@0.5.4(transitive)
+ Addedcontent-type@1.0.5(transitive)
+ Addedcookie@0.7.1(transitive)
+ Addedcookie-signature@1.0.6(transitive)
+ Addeddebug@2.6.9(transitive)
+ Addeddelayed-stream@1.0.0(transitive)
+ Addeddepd@2.0.0(transitive)
+ Addeddestroy@1.2.0(transitive)
+ Addeddunder-proto@1.0.1(transitive)
+ Addedee-first@1.1.1(transitive)
+ Addedencodeurl@1.0.22.0.0(transitive)
+ Addedes-define-property@1.0.1(transitive)
+ Addedes-errors@1.3.0(transitive)
+ Addedes-object-atoms@1.1.1(transitive)
+ Addedes-set-tostringtag@2.1.0(transitive)
+ Addedescape-html@1.0.3(transitive)
+ Addedetag@1.8.1(transitive)
+ Addedexpress@4.21.2(transitive)
+ Addedfinalhandler@1.3.1(transitive)
+ Addedform-data@4.0.2(transitive)
+ Addedforwarded@0.2.0(transitive)
+ Addedfresh@0.5.2(transitive)
+ Addedfunction-bind@1.1.2(transitive)
+ Addedget-intrinsic@1.2.7(transitive)
+ Addedget-proto@1.0.1(transitive)
+ Addedgopd@1.2.0(transitive)
+ Addedhas-symbols@1.1.0(transitive)
+ Addedhas-tostringtag@1.0.2(transitive)
+ Addedhasown@2.0.2(transitive)
+ Addedhttp-errors@2.0.0(transitive)
+ Addediconv-lite@0.4.24(transitive)
+ Addedipaddr.js@1.9.1(transitive)
+ Addedmath-intrinsics@1.1.0(transitive)
+ Addedmedia-typer@0.3.0(transitive)
+ Addedmerge-descriptors@1.0.3(transitive)
+ Addedmethods@1.1.2(transitive)
+ Addedmime@1.6.0(transitive)
+ Addedmime-db@1.52.0(transitive)
+ Addedmime-types@2.1.35(transitive)
+ Addedms@2.0.02.1.3(transitive)
+ Addednegotiator@0.6.3(transitive)
+ Addedobject-inspect@1.13.4(transitive)
+ Addedon-finished@2.4.1(transitive)
+ Addedparseurl@1.3.3(transitive)
+ Addedpath-to-regexp@0.1.12(transitive)
+ Addedproxy-addr@2.0.7(transitive)
+ Addedqs@6.13.0(transitive)
+ Addedrange-parser@1.2.1(transitive)
+ Addedraw-body@2.5.2(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
+ Addedsafer-buffer@2.1.2(transitive)
+ Addedsend@0.19.0(transitive)
+ Addedserve-static@1.16.2(transitive)
+ Addedsetprototypeof@1.2.0(transitive)
+ Addedside-channel@1.1.0(transitive)
+ Addedside-channel-list@1.0.0(transitive)
+ Addedside-channel-map@1.0.1(transitive)
+ Addedside-channel-weakmap@1.0.2(transitive)
+ Addedstatuses@2.0.1(transitive)
+ Addedtoidentifier@1.0.1(transitive)
+ Addedtype-is@1.6.18(transitive)
+ Addedundici-types@6.20.0(transitive)
+ Addedunpipe@1.0.0(transitive)
+ Addedutils-merge@1.0.1(transitive)
+ Addedvary@1.1.2(transitive)