New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

demux

Package Overview
Dependencies
Maintainers
3
Versions
77
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

demux - npm Package Compare versions

Comparing version 3.1.4-29827da.0 to 3.1.4-431ae7e.0

dist/ExpressActionWatcher.d.ts

36

dist/AbstractActionHandler.d.ts
import * as Logger from "bunyan";
import { Action, Block, HandlerVersion, IndexState } from "./interfaces";
import { Block, HandlerInfo, HandlerVersion, IndexState, NextBlock, VersionedAction } from "./interfaces";
/**

@@ -11,6 +11,7 @@ * Takes `block`s output from implementations of `AbstractActionReader` and processes their actions through the

export declare abstract class AbstractActionHandler {
protected lastProcessedBlockNumber: number;
protected lastProcessedBlockHash: string;
protected handlerVersionName: string;
lastProcessedBlockNumber: number;
lastProcessedBlockHash: string;
handlerVersionName: string;
protected log: Logger;
private deferredEffects;
private handlerVersionMap;

@@ -25,4 +26,8 @@ /**

*/
handleBlock(block: Block, isRollback: boolean, isFirstBlock: boolean, isReplay?: boolean): Promise<[boolean, number]>;
handleBlock(nextBlock: NextBlock, isReplay: boolean): Promise<number | null>;
/**
* Information about the current state of the Action Handler
*/
readonly info: HandlerInfo;
/**
* Updates the `lastProcessedBlockNumber` and `lastProcessedBlockHash` meta state, coinciding with the block

@@ -46,2 +51,13 @@ * that has just been processed. These are the same values read by `updateIndexState()`.

/**
* This method is used when matching the types of incoming actions against the types the `Updater`s and `Effect`s are
* subscribed to. When this returns true, their corresponding functions will run.
*
* By default, this method tests for direct equivalence between the incoming candidate type and the type that is
* subscribed. Override this method to extend this functionality (e.g. wildcards).
*
* @param candidateType The incoming action's type
* @param subscribedType The type the Updater of Effect is subscribed to
*/
protected matchActionType(candidateType: string, subscribedType: string): boolean;
/**
* Process actions against deterministically accumulating `Updater` functions. Returns a promise of versioned actions

@@ -52,7 +68,7 @@ * for consumption by `runEffects`, to make sure the correct effects are run on blocks that include a `HandlerVersion`

*/
protected applyUpdaters(state: any, block: Block, isReplay: boolean, context: any): Promise<Array<[Action, string]>>;
protected applyUpdaters(state: any, block: Block, context: any, isReplay: boolean): Promise<VersionedAction[]>;
/**
* Process versioned actions against asynchronous side effects.
*/
protected runEffects(versionedActions: Array<[Action, string]>, block: Block, context: any): void;
protected runEffects(versionedActions: VersionedAction[], context: any, nextBlock: NextBlock): void;
/**

@@ -67,3 +83,7 @@ * Will run when a rollback block number is passed to handleActions. Implement this method to

*/
protected handleActions(state: any, block: Block, context: any, isReplay: boolean): Promise<void>;
protected handleActions(state: any, context: any, nextBlock: NextBlock, isReplay: boolean): Promise<void>;
private range;
private runOrDeferEffect;
private runDeferredEffects;
private getNextDeferredBlockNumber;
private initHandlerVersions;

@@ -70,0 +90,0 @@ private refreshIndexState;

@@ -35,2 +35,3 @@ "use strict";

this.handlerVersionName = "v1";
this.deferredEffects = {};
this.handlerVersionMap = {};

@@ -43,6 +44,8 @@ this.initHandlerVersions(handlerVersions);

*/
handleBlock(block, isRollback, isFirstBlock, isReplay = false) {
handleBlock(nextBlock, isReplay) {
return __awaiter(this, void 0, void 0, function* () {
const { block, blockMeta } = nextBlock;
const { blockInfo } = block;
if (isRollback || (isReplay && isFirstBlock)) {
const { isRollback, isEarliestBlock } = blockMeta;
if (isRollback || (isReplay && isEarliestBlock)) {
const rollbackBlockNumber = blockInfo.blockNumber - 1;

@@ -61,12 +64,12 @@ const rollbackCount = this.lastProcessedBlockNumber - rollbackBlockNumber;

&& blockInfo.blockHash === this.lastProcessedBlockHash) {
return [false, 0];
return null;
}
// If it's the first block but we've already processed blocks, seek to next block
if (isFirstBlock && this.lastProcessedBlockHash) {
return [true, nextBlockNeeded];
if (isEarliestBlock && this.lastProcessedBlockHash) {
return nextBlockNeeded;
}
// Only check if this is the block we need if it's not the first block
if (!isFirstBlock) {
if (!isEarliestBlock) {
if (blockInfo.blockNumber !== nextBlockNeeded) {
return [true, nextBlockNeeded];
return nextBlockNeeded;
}

@@ -79,9 +82,32 @@ // Block sequence consistency should be handled by the ActionReader instance

const handleWithArgs = (state, context = {}) => __awaiter(this, void 0, void 0, function* () {
yield this.handleActions(state, block, context, isReplay);
yield this.handleActions(state, context, nextBlock, isReplay);
});
yield this.handleWithState(handleWithArgs);
return [false, 0];
return null;
});
}
/**
* Information about the current state of the Action Handler
*/
get info() {
return {
lastProcessedBlockNumber: this.lastProcessedBlockNumber,
lastProcessedBlockHash: this.lastProcessedBlockHash,
handlerVersionName: this.handlerVersionName,
};
}
/**
* This method is used when matching the types of incoming actions against the types the `Updater`s and `Effect`s are
* subscribed to. When this returns true, their corresponding functions will run.
*
* By default, this method tests for direct equivalence between the incoming candidate type and the type that is
* subscribed. Override this method to extend this functionality (e.g. wildcards).
*
* @param candidateType The incoming action's type
* @param subscribedType The type the Updater of Effect is subscribed to
*/
matchActionType(candidateType, subscribedType) {
return candidateType === subscribedType;
}
/**
* Process actions against deterministically accumulating `Updater` functions. Returns a promise of versioned actions

@@ -92,3 +118,3 @@ * for consumption by `runEffects`, to make sure the correct effects are run on blocks that include a `HandlerVersion`

*/
applyUpdaters(state, block, isReplay, context) {
applyUpdaters(state, block, context, isReplay) {
return __awaiter(this, void 0, void 0, function* () {

@@ -101,3 +127,3 @@ const versionedActions = [];

updaterIndex += 1;
if (action.type === updater.actionType) {
if (this.matchActionType(action.type, updater.actionType)) {
const { payload } = action;

@@ -117,3 +143,6 @@ const newVersion = yield updater.apply(state, payload, blockInfo, context);

}
versionedActions.push([action, this.handlerVersionName]);
versionedActions.push({
action,
handlerVersionName: this.handlerVersionName,
});
}

@@ -126,8 +155,8 @@ return versionedActions;

*/
runEffects(versionedActions, block, context) {
for (const [action, handlerVersionName] of versionedActions) {
runEffects(versionedActions, context, nextBlock) {
this.runDeferredEffects(nextBlock.lastIrreversibleBlockNumber);
for (const { action, handlerVersionName } of versionedActions) {
for (const effect of this.handlerVersionMap[handlerVersionName].effects) {
if (action.type === effect.actionType) {
const { payload } = action;
effect.run(payload, block, context);
if (this.matchActionType(action.type, effect.actionType)) {
this.runOrDeferEffect(effect, action.payload, nextBlock, context);
}

@@ -140,8 +169,9 @@ }

*/
handleActions(state, block, context, isReplay) {
handleActions(state, context, nextBlock, isReplay) {
return __awaiter(this, void 0, void 0, function* () {
const { block } = nextBlock;
const { blockInfo } = block;
const versionedActions = yield this.applyUpdaters(state, block, isReplay, context);
const versionedActions = yield this.applyUpdaters(state, block, context, isReplay);
if (!isReplay) {
this.runEffects(versionedActions, block, context);
this.runEffects(versionedActions, context, nextBlock);
}

@@ -153,2 +183,39 @@ yield this.updateIndexState(state, block, isReplay, this.handlerVersionName, context);

}
range(start, end) {
return Array(end - start).fill(0).map((_, i) => i + start);
}
runOrDeferEffect(effect, payload, nextBlock, context) {
const { block, lastIrreversibleBlockNumber } = nextBlock;
const shouldRunImmediately = (!effect.deferUntilIrreversible || block.blockInfo.blockNumber <= lastIrreversibleBlockNumber);
if (shouldRunImmediately) {
effect.run(payload, block, context);
}
else if (!this.deferredEffects[block.blockInfo.blockNumber]) {
this.deferredEffects[block.blockInfo.blockNumber] = [() => effect.run(payload, block, context)];
}
else {
this.deferredEffects[block.blockInfo.blockNumber].push(() => effect.run(payload, block, context));
}
}
runDeferredEffects(lastIrreversibleBlockNumber) {
const nextDeferredBlockNumber = this.getNextDeferredBlockNumber();
if (!nextDeferredBlockNumber) {
return;
}
for (const blockNumber of this.range(nextDeferredBlockNumber, lastIrreversibleBlockNumber + 1)) {
if (this.deferredEffects[blockNumber]) {
for (const deferredEffect of this.deferredEffects[blockNumber]) {
deferredEffect();
}
delete this.deferredEffects[blockNumber];
}
}
}
getNextDeferredBlockNumber() {
const blockNumbers = Object.keys(this.deferredEffects).map((num) => parseInt(num, 10));
if (blockNumbers.length === 0) {
return 0;
}
return Math.min(...blockNumbers);
}
initHandlerVersions(handlerVersions) {

@@ -155,0 +222,0 @@ if (handlerVersions.length === 0) {

import * as Logger from "bunyan";
import { Block } from "./interfaces";
import { ActionReaderOptions, Block, NextBlock, ReaderInfo } from "./interfaces";
/**

@@ -8,27 +8,12 @@ * Reads blocks from a blockchain, outputting normalized `Block` objects.

startAtBlock: number;
protected onlyIrreversible: boolean;
protected maxHistoryLength: number;
headBlockNumber: number;
currentBlockNumber: number;
isFirstBlock: boolean;
protected currentBlockData: Block | null;
protected onlyIrreversible: boolean;
protected currentBlockData: Block;
protected lastIrreversibleBlockNumber: number;
protected blockHistory: Block[];
protected log: Logger;
private isFirstRun;
private initialized;
constructor(options?: ActionReaderOptions);
/**
* @param startAtBlock For positive values, this sets the first block that this will start at. For negative
* values, this will start at (most recent block + startAtBlock), effectively tailing the
* chain. Be careful when using this feature, as this will make your starting block dynamic.
*
* @param onlyIrreversible When false (default), `getHeadBlockNumber` will load the most recent block number. When
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality
* is the responsibility of the implementing class.
*
* @param maxHistoryLength This determines how many blocks in the past are cached. This is used for determining
* block validity during both normal operation and when rolling back.
*/
constructor(startAtBlock?: number, onlyIrreversible?: boolean, maxHistoryLength?: number);
/**
* Loads the number of the latest block.

@@ -54,3 +39,3 @@ */

*/
nextBlock(): Promise<[Block, boolean, boolean]>;
getNextBlock(): Promise<NextBlock>;
/**

@@ -65,2 +50,6 @@ * Changes the state of the `AbstractActionReader` instance to have just processed the block at the given block

/**
* Information about the current state of the Action Reader
*/
readonly info: ReaderInfo;
/**
* Incrementally rolls back reader state one block at a time, comparing the blockHistory with

@@ -71,3 +60,8 @@ * newly fetched blocks. Fork resolution is finished when either the current block's previous hash

protected resolveFork(): Promise<void>;
private initBlockState;
private getLatestNeededBlockNumber;
private acceptBlock;
private range;
private pruneHistory;
private reloadHistory;
private addPreviousBlockToHistory;

@@ -74,0 +68,0 @@ private logForkDetected;

@@ -19,2 +19,11 @@ "use strict";

const Logger = __importStar(require("bunyan"));
const defaultBlock = {
blockInfo: {
blockNumber: 0,
blockHash: "",
previousBlockHash: "",
timestamp: new Date(0),
},
actions: [],
};
/**

@@ -24,26 +33,12 @@ * Reads blocks from a blockchain, outputting normalized `Block` objects.

class AbstractActionReader {
/**
* @param startAtBlock For positive values, this sets the first block that this will start at. For negative
* values, this will start at (most recent block + startAtBlock), effectively tailing the
* chain. Be careful when using this feature, as this will make your starting block dynamic.
*
* @param onlyIrreversible When false (default), `getHeadBlockNumber` will load the most recent block number. When
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality
* is the responsibility of the implementing class.
*
* @param maxHistoryLength This determines how many blocks in the past are cached. This is used for determining
* block validity during both normal operation and when rolling back.
*/
constructor(startAtBlock = 1, onlyIrreversible = false, maxHistoryLength = 600) {
this.startAtBlock = startAtBlock;
this.onlyIrreversible = onlyIrreversible;
this.maxHistoryLength = maxHistoryLength;
constructor(options = {}) {
this.headBlockNumber = 0;
this.isFirstBlock = true;
this.currentBlockData = null;
this.currentBlockData = defaultBlock;
this.lastIrreversibleBlockNumber = 0;
this.blockHistory = [];
this.isFirstRun = true;
this.currentBlockNumber = startAtBlock - 1;
this.initialized = false;
const optionsWithDefaults = Object.assign({ startAtBlock: 1, onlyIrreversible: false }, options);
this.startAtBlock = optionsWithDefaults.startAtBlock;
this.currentBlockNumber = optionsWithDefaults.startAtBlock - 1;
this.onlyIrreversible = optionsWithDefaults.onlyIrreversible;
this.log = Logger.createLogger({ name: "demux" });

@@ -58,42 +53,32 @@ }

*/
nextBlock() {
getNextBlock() {
return __awaiter(this, void 0, void 0, function* () {
let blockData = null;
let isRollback = false;
let isNewBlock = false;
// If we're on the head block, refresh current head block
if (this.currentBlockNumber === this.headBlockNumber || !this.headBlockNumber) {
const blockMeta = {
isRollback: false,
isNewBlock: false,
isEarliestBlock: false,
};
// TODO: Should this only be called when updating headBlockNumber?
this.lastIrreversibleBlockNumber = yield this.getLastIrreversibleBlockNumber();
if (!this.initialized) {
yield this.initBlockState();
}
if (this.currentBlockNumber === this.headBlockNumber) {
this.headBlockNumber = yield this.getLatestNeededBlockNumber();
}
// If currentBlockNumber is negative, it means we wrap to the end of the chain (most recent blocks)
if (this.currentBlockNumber < 0 && this.isFirstRun) {
this.currentBlockNumber = this.headBlockNumber + this.currentBlockNumber;
this.startAtBlock = this.currentBlockNumber + 1;
}
else if (this.isFirstRun) {
this.isFirstRun = false;
}
// If we're now behind one or more new blocks, process them
if (this.currentBlockNumber < this.headBlockNumber) {
const unvalidatedBlockData = yield this.getBlock(this.currentBlockNumber + 1);
const expectedHash = this.currentBlockData !== null ? this.currentBlockData.blockInfo.blockHash : "INVALID";
const actualHash = unvalidatedBlockData.blockInfo.previousBlockHash;
// Continue if the new block is on the same chain as our history, or if we've just started
if (expectedHash === actualHash || this.blockHistory.length === 0) {
blockData = unvalidatedBlockData; // Block is now validated
if (this.currentBlockData) {
this.blockHistory.push(this.currentBlockData); // No longer current, belongs on history
}
this.blockHistory.splice(0, this.blockHistory.length - this.maxHistoryLength); // Trim history
this.currentBlockData = blockData; // Replaced with the real current block
isNewBlock = true;
this.currentBlockNumber = this.currentBlockData.blockInfo.blockNumber;
const expectedHash = this.currentBlockData.blockInfo.blockHash;
const actualHash = this.currentBlockNumber ?
unvalidatedBlockData.blockInfo.previousBlockHash :
defaultBlock.blockInfo.blockHash;
if (expectedHash === actualHash) {
this.acceptBlock(unvalidatedBlockData);
blockMeta.isNewBlock = true;
}
else {
// Since the new block did not match our history, we can assume our history is wrong
// and need to roll back
this.logForkDetected(unvalidatedBlockData, expectedHash, actualHash);
yield this.resolveFork();
isNewBlock = true;
isRollback = true; // Signal action handler that we must roll back
blockMeta.isNewBlock = true;
blockMeta.isRollback = true;
// Reset for safety, as new fork could have less blocks than the previous fork

@@ -103,8 +88,8 @@ this.headBlockNumber = yield this.getLatestNeededBlockNumber();

}
// Let handler know if this is the earliest block we'll send
this.isFirstBlock = this.currentBlockNumber === this.startAtBlock;
if (this.currentBlockData === null) {
throw Error("currentBlockData must not be null.");
}
return [this.currentBlockData, isRollback, isNewBlock];
blockMeta.isEarliestBlock = this.currentBlockNumber === this.startAtBlock;
return {
block: this.currentBlockData,
blockMeta,
lastIrreversibleBlockNumber: this.lastIrreversibleBlockNumber,
};
});

@@ -121,40 +106,26 @@ }

return __awaiter(this, void 0, void 0, function* () {
// Clear current block data
this.currentBlockData = null;
this.headBlockNumber = 0;
this.headBlockNumber = yield this.getLatestNeededBlockNumber();
if (blockNumber < this.startAtBlock) {
throw Error("Cannot seek to block before configured startAtBlock.");
throw new Error("Cannot seek to block before configured `startAtBlock` number.");
}
// If we're going back to the first block, we don't want to get the preceding block
if (blockNumber === 1) {
this.blockHistory = [];
this.currentBlockNumber = 0;
return;
if (blockNumber > this.headBlockNumber) {
throw new Error(`Cannot seek to block number ${blockNumber} as it does not exist yet.`);
}
// Check if block exists in history
let toDelete = -1;
for (let i = this.blockHistory.length - 1; i >= 0; i--) {
if (this.blockHistory[i].blockInfo.blockNumber === blockNumber) {
break;
}
else {
toDelete += 1;
}
}
if (toDelete >= 0) {
this.blockHistory.splice(toDelete);
this.currentBlockData = this.blockHistory.pop() || null;
}
// Load current block
this.currentBlockNumber = blockNumber - 1;
if (!this.currentBlockData) {
this.currentBlockData = yield this.getBlock(this.currentBlockNumber);
}
// Fetch block if there is no history
if (this.blockHistory.length === 0) {
yield this.addPreviousBlockToHistory(false);
}
yield this.reloadHistory();
});
}
/**
* Information about the current state of the Action Reader
*/
get info() {
return {
currentBlockNumber: this.currentBlockNumber,
startAtBlock: this.startAtBlock,
headBlockNumber: this.headBlockNumber,
onlyIrreversible: this.onlyIrreversible,
lastIrreversibleBlockNumber: this.lastIrreversibleBlockNumber,
};
}
/**
* Incrementally rolls back reader state one block at a time, comparing the blockHistory with

@@ -166,5 +137,2 @@ * newly fetched blocks. Fork resolution is finished when either the current block's previous hash

return __awaiter(this, void 0, void 0, function* () {
if (this.currentBlockData === null) {
throw Error("`currentBlockData` must not be null when initiating fork resolution.");
}
if (this.blockHistory.length === 0) {

@@ -181,20 +149,31 @@ yield this.addPreviousBlockToHistory();

this.currentBlockData = yield this.getBlock(this.currentBlockData.blockInfo.blockNumber);
if (this.currentBlockData !== null) {
const { blockInfo: currentBlockInfo } = this.currentBlockData;
const { blockInfo: previousBlockInfo } = previousBlockData;
if (currentBlockInfo.previousBlockHash === previousBlockInfo.blockHash) {
this.logForkResolved(currentBlockInfo, previousBlockInfo);
break;
}
this.logForkMismatch(currentBlockInfo, previousBlockInfo);
const { blockInfo: currentBlockInfo } = this.currentBlockData;
const { blockInfo: previousBlockInfo } = previousBlockData;
if (currentBlockInfo.previousBlockHash === previousBlockInfo.blockHash) {
this.logForkResolved(currentBlockInfo, previousBlockInfo);
break;
}
this.logForkMismatch(currentBlockInfo, previousBlockInfo);
this.currentBlockData = previousBlockData;
this.blockHistory.pop();
}
if (this.blockHistory.length === 0) {
yield this.addPreviousBlockToHistory();
}
this.currentBlockNumber = this.blockHistory[this.blockHistory.length - 1].blockInfo.blockNumber + 1;
});
}
initBlockState() {
return __awaiter(this, void 0, void 0, function* () {
this.headBlockNumber = yield this.getLatestNeededBlockNumber();
if (this.currentBlockNumber < 0) {
this.currentBlockNumber = this.headBlockNumber + this.currentBlockNumber;
this.startAtBlock = this.currentBlockNumber + 1;
}
yield this.reloadHistory();
this.initialized = true;
});
}
getLatestNeededBlockNumber() {
return __awaiter(this, void 0, void 0, function* () {
this.lastIrreversibleBlockNumber = yield this.getLastIrreversibleBlockNumber();
if (this.onlyIrreversible) {

@@ -208,8 +187,75 @@ return this.lastIrreversibleBlockNumber;

}
acceptBlock(blockData) {
this.blockHistory.push(this.currentBlockData);
this.pruneHistory();
this.currentBlockData = blockData;
this.currentBlockNumber = this.currentBlockData.blockInfo.blockNumber;
}
range(start, end) {
if (start > end) {
return [];
}
return Array(end - start).fill(0).map((_, i) => i + start);
}
pruneHistory() {
let toDelete = 0;
for (const block of this.blockHistory) {
if (block.blockInfo.blockNumber < this.lastIrreversibleBlockNumber) {
toDelete += 1;
}
else {
break;
}
}
if (toDelete === this.blockHistory.length) {
this.blockHistory = [this.blockHistory[this.blockHistory.length - 1]];
return;
}
this.blockHistory.splice(0, toDelete);
}
reloadHistory(maxTries = 10) {
return __awaiter(this, void 0, void 0, function* () {
if (this.currentBlockNumber === 0) {
this.blockHistory = [];
this.currentBlockData = defaultBlock;
return;
}
if (this.currentBlockNumber === 1) {
this.blockHistory = [defaultBlock];
this.currentBlockData = yield this.getBlock(1);
return;
}
let historyRange = this.range(this.lastIrreversibleBlockNumber, this.currentBlockNumber + 1);
if (historyRange.length <= 1) {
historyRange = [this.currentBlockNumber - 1, this.currentBlockNumber];
}
let microForked = true;
let tryCount = 0;
while (microForked) {
microForked = false;
this.blockHistory = [];
for (const blockNumber of historyRange) {
const historyBlock = yield this.getBlock(blockNumber);
if (this.blockHistory.length === 0) {
this.blockHistory.push(historyBlock);
continue;
}
const latestHistoryBlockHash = this.blockHistory[this.blockHistory.length - 1].blockInfo.blockHash;
if (latestHistoryBlockHash !== historyBlock.blockInfo.previousBlockHash) {
microForked = true;
break;
}
this.blockHistory.push(historyBlock);
}
tryCount += 1;
if (tryCount === maxTries) {
throw new Error("Could not reload history.");
}
}
this.currentBlockData = this.blockHistory.pop();
});
}
addPreviousBlockToHistory(checkIrreversiblility = true) {
return __awaiter(this, void 0, void 0, function* () {
if (!this.currentBlockData) {
throw Error("`currentBlockData` must not be null when initiating fork resolution.");
}
if (this.currentBlockData.blockInfo.blockNumber <= this.lastIrreversibleBlockNumber && checkIrreversiblility) {
if (this.currentBlockData.blockInfo.blockNumber < this.lastIrreversibleBlockNumber && checkIrreversiblility) {
throw new Error("Last irreversible block has been passed without resolving fork");

@@ -216,0 +262,0 @@ }

@@ -0,3 +1,5 @@

import * as Logger from "bunyan";
import { AbstractActionHandler } from "./AbstractActionHandler";
import { AbstractActionReader } from "./AbstractActionReader";
import { DemuxInfo } from "./interfaces";
/**

@@ -16,2 +18,6 @@ * Coordinates implementations of `AbstractActionReader`s and `AbstractActionHandler`s in

*/
protected log: Logger;
private running;
private shouldPause;
private error;
constructor(actionReader: AbstractActionReader, actionHandler: AbstractActionHandler, pollInterval: number);

@@ -24,8 +30,24 @@ /**

* Start a polling loop
*
* @param isReplay Set to true to disable Effects from running until caught up with head block.
*/
watch(isReplay?: boolean): Promise<void>;
/**
* Start or resume indexing.
*/
start(): boolean;
/**
* Suspend indexing. Will go into effect after the currently-processing block.
*/
pause(): boolean;
/**
* Information about the current state of Demux
*/
readonly info: DemuxInfo;
/**
* Use the actionReader and actionHandler to process new blocks.
*
* @param isReplay Set to true to disable Effects from running until caught up with head block.
*/
protected checkForBlocks(isReplay?: boolean): Promise<void>;
}

@@ -10,3 +10,11 @@ "use strict";

};
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k];
result["default"] = mod;
return result;
};
Object.defineProperty(exports, "__esModule", { value: true });
const Logger = __importStar(require("bunyan"));
/**

@@ -17,7 +25,2 @@ * Coordinates implementations of `AbstractActionReader`s and `AbstractActionHandler`s in

class BaseActionWatcher {
/**
* @param actionReader An instance of an implemented `AbstractActionReader`
* @param actionHandler An instance of an implemented `AbstractActionHandler`
* @param pollInterval Number of milliseconds between each polling loop iteration
*/
constructor(actionReader, actionHandler, pollInterval) {

@@ -27,2 +30,6 @@ this.actionReader = actionReader;

this.pollInterval = pollInterval;
this.running = false;
this.shouldPause = false;
this.error = null;
this.log = Logger.createLogger({ name: "demux" });
}

@@ -39,8 +46,28 @@ /**

* Start a polling loop
*
* @param isReplay Set to true to disable Effects from running until caught up with head block.
*/
watch(isReplay = false) {
return __awaiter(this, void 0, void 0, function* () {
const startTime = new Date().getTime();
yield this.checkForBlocks(isReplay);
const endTime = new Date().getTime();
if (this.shouldPause) {
this.running = false;
this.shouldPause = false;
this.log.info("Indexing paused.");
return;
}
this.running = true;
this.error = null;
const startTime = Date.now();
try {
yield this.checkForBlocks(isReplay);
}
catch (err) {
this.running = false;
this.shouldPause = false;
this.log.error(err);
this.error = err;
this.log.info("Indexing unexpectedly paused due to an error.");
return;
}
const endTime = Date.now();
const duration = endTime - startTime;

@@ -55,3 +82,53 @@ let waitTime = this.pollInterval - duration;

/**
* Start or resume indexing.
*/
start() {
if (this.running) {
this.log.info("Cannot start; already indexing.");
return false;
}
this.log.info("Starting indexing.");
this.watch();
return true;
}
/**
* Suspend indexing. Will go into effect after the currently-processing block.
*/
pause() {
if (!this.running) {
this.log.info("Cannot pause; not currently indexing.");
return false;
}
this.log.info("Pausing indexing.");
this.shouldPause = true;
return true;
}
/**
* Information about the current state of Demux
*/
get info() {
let status;
if (this.running && !this.shouldPause) {
status = "indexing";
}
else if (this.running && this.shouldPause) {
status = "pausing";
}
else {
status = "paused";
}
const info = {
handler: this.actionHandler.info,
reader: this.actionReader.info,
status,
};
if (this.error) {
info.error = this.error;
}
return info;
}
/**
* Use the actionReader and actionHandler to process new blocks.
*
* @param isReplay Set to true to disable Effects from running until caught up with head block.
*/

@@ -62,14 +139,13 @@ checkForBlocks(isReplay = false) {

while (!headBlockNumber || this.actionReader.currentBlockNumber < headBlockNumber) {
const [blockData, isRollback, isNewBlock] = yield this.actionReader.nextBlock();
if (!isNewBlock) {
if (this.shouldPause) {
return;
}
const nextBlock = yield this.actionReader.getNextBlock();
if (!nextBlock.blockMeta.isNewBlock) {
break;
}
let needToSeek = false;
let seekBlockNum = 0;
if (blockData) {
[needToSeek, seekBlockNum] = yield this.actionHandler.handleBlock(blockData, isRollback, this.actionReader.isFirstBlock, isReplay);
const nextBlockNumberNeeded = yield this.actionHandler.handleBlock(nextBlock, isReplay);
if (nextBlockNumberNeeded) {
yield this.actionReader.seekToBlock(nextBlockNumberNeeded - 1);
}
if (needToSeek) {
yield this.actionReader.seekToBlock(seekBlockNum - 1);
}
headBlockNumber = this.actionReader.headBlockNumber;

@@ -76,0 +152,0 @@ }

@@ -5,1 +5,2 @@ export { Action, Block, BlockInfo, Effect, HandlerVersion, IndexState, Updater } from "./interfaces";

export { BaseActionWatcher } from "./BaseActionWatcher";
export { ExpressActionWatcher } from "./ExpressActionWatcher";

@@ -9,1 +9,3 @@ "use strict";

exports.BaseActionWatcher = BaseActionWatcher_1.BaseActionWatcher;
var ExpressActionWatcher_1 = require("./ExpressActionWatcher");
exports.ExpressActionWatcher = ExpressActionWatcher_1.ExpressActionWatcher;

@@ -0,1 +1,21 @@

export interface ActionReaderOptions {
/**
* For positive values, this sets the first block that this will start at. For negative
* values, this will start at (most recent block + startAtBlock), effectively tailing the
* chain. Be careful when using this feature, as this will make your starting block dynamic.
*/
startAtBlock?: number;
/**
* When false (default), `getHeadBlockNumber` will load the most recent block number. When
* true, `getHeadBlockNumber` will return the block number of the most recent irreversible
* block. Keep in mind that `getHeadBlockNumber` is an abstract method and this functionality
* is the responsibility of the implementing class.
*/
onlyIrreversible?: boolean;
/**
* This determines how many blocks in the past are cached. This is used for determining
* block validity during both normal operation and when rolling back.
*/
maxHistoryLength?: number;
}
export interface Block {

@@ -5,2 +25,7 @@ actions: Action[];

}
export interface BlockMeta {
isRollback: boolean;
isEarliestBlock: boolean;
isNewBlock: boolean;
}
export interface IndexState {

@@ -18,2 +43,7 @@ blockNumber: number;

}
export interface NextBlock {
block: Block;
blockMeta: BlockMeta;
lastIrreversibleBlockNumber: number;
}
export interface Action {

@@ -34,2 +64,3 @@ type: string;

run: StatelessActionCallback;
deferUntilIrreversible?: boolean;
onRollback?: StatelessActionCallback;

@@ -42,1 +73,27 @@ }

}
export interface VersionedAction {
action: Action;
handlerVersionName: string;
}
export declare type CurriedEffectRun = (() => void | Promise<void>);
export interface DeferredEffects {
[key: number]: CurriedEffectRun[];
}
export interface HandlerInfo {
lastProcessedBlockNumber: number;
lastProcessedBlockHash: string;
handlerVersionName: string;
}
export interface ReaderInfo {
currentBlockNumber: number;
startAtBlock: number;
headBlockNumber: number;
onlyIrreversible: boolean;
lastIrreversibleBlockNumber: number;
}
export interface DemuxInfo {
status: string;
error?: Error;
handler: HandlerInfo;
reader: ReaderInfo;
}
{
"name": "demux",
"version": "3.1.4-29827da.0",
"version": "3.1.4-431ae7e.0",
"author": {

@@ -21,4 +21,5 @@ "name": "Julien Heller",

"eslint-plugin-import": "^2.7.0",
"jest": "^22.4.3",
"jest": "^23.6.0",
"release-it": "^7.5.0",
"supertest": "^3.4.1",
"ts-jest": "^23.0.0",

@@ -36,4 +37,5 @@ "tslint": "^5.10.0",

"lint": "tslint -c tslint.json -p tsconfig.json",
"test": "jest",
"build-docs": "./build-docs.sh"
"test": "jest --detectOpenHandles --maxWorkers=2",
"build-docs": "./build-docs.sh",
"current-version": "echo $npm_package_version"
},

@@ -58,4 +60,7 @@ "jest": {

"dependencies": {
"bunyan": "^1.8.12"
"@types/express": "^4.16.0",
"@types/supertest": "^2.0.7",
"bunyan": "^1.8.12",
"express": "^4.16.4"
}
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc