ethereumjs-blockstream
Advanced tools
Comparing version 2.0.7 to 3.0.0
import { Block } from "./models/block"; | ||
import { Log } from "./models/log"; | ||
import { Filter, FilterOptions } from "./models/filters"; | ||
export declare class BlockAndLogStreamer { | ||
export declare class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> { | ||
private blockHistory; | ||
@@ -16,10 +16,10 @@ private logHistory; | ||
private readonly onLogRemovedSubscribers; | ||
constructor(getBlockByHash: (hash: string) => Promise<Block | null>, getLogs: (filterOptions: FilterOptions) => Promise<Log[]>, configuration?: { | ||
constructor(getBlockByHash: (hash: string) => Promise<TBlock | null>, getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>, configuration?: { | ||
blockRetention?: number; | ||
}); | ||
static createCallbackStyle: (getBlockByHash: (hash: string, callback: (error?: Error | undefined, block?: Block | null | undefined) => void) => void, getLogs: (filterOptions: FilterOptions, callback: (error?: Error | undefined, logs?: Log[] | undefined) => void) => void, configuration?: { | ||
static createCallbackStyle: <TBlock extends Block, TLog extends Log>(getBlockByHash: (hash: string, callback: (error?: Error | undefined, block?: TBlock | null | undefined) => void) => void, getLogs: (filterOptions: FilterOptions, callback: (error?: Error | undefined, logs?: TLog[] | undefined) => void) => void, configuration?: { | ||
blockRetention?: number | undefined; | ||
} | undefined) => BlockAndLogStreamer; | ||
readonly reconcileNewBlock: (block: Block) => Promise<void>; | ||
readonly reconcileNewBlockCallbackStyle: (block: Block, callback: (error?: Error | undefined) => void) => Promise<void>; | ||
} | undefined) => BlockAndLogStreamer<TBlock, TLog>; | ||
readonly reconcileNewBlock: (block: TBlock) => Promise<void>; | ||
readonly reconcileNewBlockCallbackStyle: (block: TBlock, callback: (error?: Error | undefined) => void) => Promise<void>; | ||
private readonly onBlockAdded; | ||
@@ -29,13 +29,13 @@ private readonly onBlockRemoved; | ||
private readonly onLogRemoved; | ||
readonly getLatestReconciledBlock: () => Block | null; | ||
readonly getLatestReconciledBlock: () => TBlock | null; | ||
readonly addLogFilter: (filter: Filter) => string; | ||
readonly removeLogFilter: (token: string) => void; | ||
readonly subscribeToOnBlockAdded: (onBlockAdded: (block: Block) => void) => string; | ||
readonly subscribeToOnBlockAdded: (onBlockAdded: (block: TBlock) => void) => string; | ||
readonly unsubscribeFromOnBlockAdded: (token: string) => void; | ||
readonly subscribeToOnBlockRemoved: (onBlockRemoved: (block: Block) => void) => string; | ||
readonly subscribeToOnBlockRemoved: (onBlockRemoved: (block: TBlock) => void) => string; | ||
readonly unsubscribeFromOnBlockRemoved: (token: string) => void; | ||
readonly subscribeToOnLogAdded: (onLogAdded: (log: Log) => void) => string; | ||
readonly subscribeToOnLogAdded: (onLogAdded: (log: TLog) => void) => string; | ||
readonly unsubscribeFromOnLogAdded: (token: string) => void; | ||
readonly subscribeToOnLogRemoved: (onLogRemoved: (log: Log) => void) => string; | ||
readonly subscribeToOnLogRemoved: (onLogRemoved: (log: TLog) => void) => string; | ||
readonly unsubscribeFromOnLogRemoved: (token: string) => void; | ||
} |
@@ -189,10 +189,12 @@ "use strict"; | ||
BlockAndLogStreamer.createCallbackStyle = function (getBlockByHash, getLogs, configuration) { | ||
var wrappedGetBlockByHash = function (hash) { return new Promise(function (resolve, reject) { | ||
getBlockByHash(hash, function (error, block) { | ||
if (error) | ||
throw error; | ||
else | ||
resolve(block); | ||
var wrappedGetBlockByHash = function (hash) { | ||
return new Promise(function (resolve, reject) { | ||
getBlockByHash(hash, function (error, block) { | ||
if (error) | ||
throw error; | ||
else | ||
resolve(block); | ||
}); | ||
}); | ||
}); }; | ||
}; | ||
var wrappedGetLogs = function (filterOptions) { return new Promise(function (resolve, reject) { | ||
@@ -199,0 +201,0 @@ getLogs(filterOptions, function (error, logs) { |
import { Block } from "./models/block"; | ||
import { List as ImmutableList } from "immutable"; | ||
export declare const reconcileBlockHistory: (getBlockByHash: (hash: string) => Promise<Block | null>, blockHistory: ImmutableList<Block> | Promise<ImmutableList<Block>>, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, onBlockRemoved: (block: Block) => Promise<void>, blockRetention?: number) => Promise<ImmutableList<Block>>; | ||
export declare const reconcileBlockHistory: <TBlock extends Block>(getBlockByHash: (hash: string) => Promise<TBlock | null>, blockHistory: ImmutableList<TBlock> | Promise<ImmutableList<TBlock>>, newBlock: TBlock, onBlockAdded: (block: TBlock) => Promise<void>, onBlockRemoved: (block: TBlock) => Promise<void>, blockRetention?: number) => Promise<ImmutableList<TBlock>>; |
@@ -116,6 +116,6 @@ "use strict"; | ||
throw new Error("New head block's parent isn't our current head."); | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback be cause it may be trying to signal to use that the block has become invalid and is un-processable | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback because it may be trying to signal to use that the block has become invalid and is un-processable | ||
return [4 /*yield*/, onBlockAdded(newBlock)]; | ||
case 1: | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback be cause it may be trying to signal to use that the block has become invalid and is un-processable | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback because it may be trying to signal to use that the block has become invalid and is un-processable | ||
_a.sent(); | ||
@@ -122,0 +122,0 @@ blockHistory = blockHistory.push(newBlock); |
import { Block } from "./models/block"; | ||
import { Log } from "./models/log"; | ||
import { Filter, FilterOptions } from "./models/filters"; | ||
import { LogHistory } from "./models/log-history"; | ||
export declare function reconcileLogHistoryWithAddedBlock(getLogs: (filterOptions: FilterOptions) => Promise<Log[]>, logHistory: LogHistory | Promise<LogHistory>, newBlock: Block, onLogAdded: (log: Log) => Promise<void>, filters?: Filter[], historyBlockLength?: number): Promise<LogHistory>; | ||
export declare function reconcileLogHistoryWithRemovedBlock(logHistory: LogHistory | Promise<LogHistory>, removedBlock: Block, onLogRemoved: (log: Log) => Promise<void>): Promise<LogHistory>; | ||
import { List as ImmutableList } from "immutable"; | ||
export declare const reconcileLogHistoryWithAddedBlock: <TBlock extends Block, TLog extends Log>(getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>, logHistory: ImmutableList<TLog> | Promise<ImmutableList<TLog>>, newBlock: TBlock, onLogAdded: (log: TLog) => Promise<void>, filters?: Filter[], historyBlockLength?: number) => Promise<ImmutableList<TLog>>; | ||
export declare const reconcileLogHistoryWithRemovedBlock: <TBlock extends Block, TLog extends Log>(logHistory: ImmutableList<TLog> | Promise<ImmutableList<TLog>>, removedBlock: TBlock, onLogRemoved: (log: TLog) => Promise<void>) => Promise<ImmutableList<TLog>>; |
@@ -37,7 +37,8 @@ "use strict"; | ||
}; | ||
var _this = this; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
function reconcileLogHistoryWithAddedBlock(getLogs, logHistory, newBlock, onLogAdded, filters, historyBlockLength) { | ||
exports.reconcileLogHistoryWithAddedBlock = function (getLogs, logHistory, newBlock, onLogAdded, filters, historyBlockLength) { | ||
if (filters === void 0) { filters = []; } | ||
if (historyBlockLength === void 0) { historyBlockLength = 100; } | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __awaiter(_this, void 0, void 0, function () { | ||
var logs; | ||
@@ -62,70 +63,61 @@ return __generator(this, function (_a) { | ||
}); | ||
} | ||
exports.reconcileLogHistoryWithAddedBlock = reconcileLogHistoryWithAddedBlock; | ||
function getFilteredLogs(getLogs, newBlock, filters) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var logPromises; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
logPromises = filters | ||
.map(function (filter) { return ({ fromBlock: newBlock.number, toBlock: newBlock.number, address: filter.address, topics: filter.topics, }); }) | ||
.map(function (filter) { return getLogs(filter); }); | ||
return [4 /*yield*/, Promise.all(logPromises) | ||
.then(function (nestedLogs) { return nestedLogs.reduce(function (allLogs, logs) { return allLogs.concat(logs); }, []); })]; | ||
case 1: return [2 /*return*/, _a.sent()]; | ||
} | ||
}); | ||
}; | ||
var getFilteredLogs = function (getLogs, newBlock, filters) { return __awaiter(_this, void 0, void 0, function () { | ||
var logPromises; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
logPromises = filters | ||
.map(function (filter) { return ({ fromBlock: newBlock.number, toBlock: newBlock.number, address: filter.address, topics: filter.topics, }); }) | ||
.map(function (filter) { return getLogs(filter); }); | ||
return [4 /*yield*/, Promise.all(logPromises) | ||
.then(function (nestedLogs) { return nestedLogs.reduce(function (allLogs, logs) { return allLogs.concat(logs); }, []); })]; | ||
case 1: return [2 /*return*/, _a.sent()]; | ||
} | ||
}); | ||
} | ||
function addNewLogsToHead(logHistory, newLogs, onLogAdded) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var sortedLogs, _i, sortedLogs_1, log; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
sortedLogs = newLogs.sort(function (logA, logB) { return parseInt(logA.logIndex, 16) - parseInt(logB.logIndex, 16); }); | ||
_i = 0, sortedLogs_1 = sortedLogs; | ||
_a.label = 1; | ||
case 1: | ||
if (!(_i < sortedLogs_1.length)) return [3 /*break*/, 4]; | ||
log = sortedLogs_1[_i]; | ||
ensureOrder(logHistory.last(), log); | ||
return [4 /*yield*/, addNewLogToHead(logHistory, log, onLogAdded)]; | ||
case 2: | ||
logHistory = _a.sent(); | ||
_a.label = 3; | ||
case 3: | ||
_i++; | ||
return [3 /*break*/, 1]; | ||
case 4: return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
}); }; | ||
var addNewLogsToHead = function (logHistory, newLogs, onLogAdded) { return __awaiter(_this, void 0, void 0, function () { | ||
var sortedLogs, _i, sortedLogs_1, log; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
sortedLogs = newLogs.sort(function (logA, logB) { return parseInt(logA.logIndex, 16) - parseInt(logB.logIndex, 16); }); | ||
_i = 0, sortedLogs_1 = sortedLogs; | ||
_a.label = 1; | ||
case 1: | ||
if (!(_i < sortedLogs_1.length)) return [3 /*break*/, 4]; | ||
log = sortedLogs_1[_i]; | ||
ensureOrder(logHistory.last(), log); | ||
return [4 /*yield*/, addNewLogToHead(logHistory, log, onLogAdded)]; | ||
case 2: | ||
logHistory = _a.sent(); | ||
_a.label = 3; | ||
case 3: | ||
_i++; | ||
return [3 /*break*/, 1]; | ||
case 4: return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
} | ||
function pruneOldLogs(logHistory, newBlock, historyBlockLength) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
// `logBlock!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions | ||
return [2 /*return*/, logHistory.skipUntil(function (log) { return parseInt(newBlock.number, 16) - parseInt(log.blockNumber, 16) < historyBlockLength; }).toList()]; | ||
}); | ||
}); }; | ||
var pruneOldLogs = function (logHistory, newBlock, historyBlockLength) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
// `log!` is required until the next major version of `immutable` is published to NPM (current version 3.8.2) which improves the type definitions | ||
return [2 /*return*/, logHistory.skipUntil(function (log) { return parseInt(newBlock.number, 16) - parseInt(log.blockNumber, 16) < historyBlockLength; }).toList()]; | ||
}); | ||
} | ||
function addNewLogToHead(logHistory, newLog, onLogAdded) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
logHistory = logHistory.push(newLog); | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs? | ||
return [4 /*yield*/, onLogAdded(newLog)]; | ||
case 1: | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs? | ||
_a.sent(); | ||
return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
}); }; | ||
var addNewLogToHead = function (logHistory, newLog, onLogAdded) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
logHistory = logHistory.push(newLog); | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs? | ||
return [4 /*yield*/, onLogAdded(newLog)]; | ||
case 1: | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs? | ||
_a.sent(); | ||
return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
} | ||
function ensureOrder(headLog, newLog) { | ||
}); }; | ||
var ensureOrder = function (headLog, newLog) { | ||
if (headLog === undefined) | ||
@@ -143,28 +135,25 @@ return; | ||
throw new Error("received log with same block number but index newer than previous index"); | ||
} | ||
function reconcileLogHistoryWithRemovedBlock(logHistory, removedBlock, onLogRemoved) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, logHistory]; | ||
case 1: | ||
logHistory = _a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
if (!(!logHistory.isEmpty() && logHistory.last().blockHash === removedBlock.hash)) return [3 /*break*/, 4]; | ||
return [4 /*yield*/, onLogRemoved(logHistory.last())]; | ||
case 3: | ||
_a.sent(); | ||
logHistory = logHistory.pop(); | ||
return [3 /*break*/, 2]; | ||
case 4: | ||
// sanity check, no known way to trigger the error | ||
if (logHistory.some(function (log) { return log.blockHash === removedBlock.hash; })) | ||
throw new Error("found logs for removed block not at head of log history"); | ||
return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
}; | ||
exports.reconcileLogHistoryWithRemovedBlock = function (logHistory, removedBlock, onLogRemoved) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, logHistory]; | ||
case 1: | ||
logHistory = _a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
if (!(!logHistory.isEmpty() && logHistory.last().blockHash === removedBlock.hash)) return [3 /*break*/, 4]; | ||
return [4 /*yield*/, onLogRemoved(logHistory.last())]; | ||
case 3: | ||
_a.sent(); | ||
logHistory = logHistory.pop(); | ||
return [3 /*break*/, 2]; | ||
case 4: | ||
// sanity check, no known way to trigger the error | ||
if (logHistory.some(function (log) { return log.blockHash === removedBlock.hash; })) | ||
throw new Error("found logs for removed block not at head of log history"); | ||
return [2 /*return*/, logHistory]; | ||
} | ||
}); | ||
} | ||
exports.reconcileLogHistoryWithRemovedBlock = reconcileLogHistoryWithRemovedBlock; | ||
}); }; | ||
//# sourceMappingURL=log-reconciler.js.map |
import { Block } from "./block"; | ||
import { List as ImmutableList } from "immutable"; | ||
export declare type BlockHistory = ImmutableList<Block>; | ||
export declare type BlockHistory<TBlock extends Block> = ImmutableList<TBlock>; |
@@ -1,2 +0,1 @@ | ||
import { Transaction } from "./transaction"; | ||
export interface Block { | ||
@@ -6,17 +5,2 @@ readonly number: string; | ||
readonly parentHash: string; | ||
readonly nonce: string; | ||
readonly sha3Uncles: string; | ||
readonly logsBloom: string; | ||
readonly transactionRoot: string; | ||
readonly stateRoot: string; | ||
readonly receiptsRoot: string; | ||
readonly miner: string; | ||
readonly difficulty: string; | ||
readonly totalDifficulty: string; | ||
readonly size: string; | ||
readonly gasLimit: string; | ||
readonly gasUsed: string; | ||
readonly timestamp: string; | ||
readonly transactions: string[] | Transaction[]; | ||
readonly uncles: string[]; | ||
} |
import { Log } from "./log"; | ||
import { List as ImmutableList } from "immutable"; | ||
export declare type LogHistory = ImmutableList<Log>; | ||
export declare type LogHistory<TLog extends Log> = ImmutableList<TLog>; |
@@ -5,7 +5,2 @@ export interface Log { | ||
readonly blockHash: string; | ||
readonly transactionHash: string; | ||
readonly transactionIndex: string; | ||
readonly address: string; | ||
readonly data: string; | ||
readonly topics: string[]; | ||
} |
{ | ||
"name": "ethereumjs-blockstream", | ||
"version": "2.0.7", | ||
"version": "3.0.0", | ||
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.", | ||
@@ -38,3 +38,3 @@ "main": "output/source/index.js", | ||
"dependencies": { | ||
"immutable": "3.8.1", | ||
"immutable": "3.8.2", | ||
"source-map-support": "0.4.14", | ||
@@ -41,0 +41,0 @@ "uuid": "3.0.1" |
@@ -12,21 +12,21 @@ import { Block } from "./models/block"; | ||
export class BlockAndLogStreamer { | ||
private blockHistory: Promise<BlockHistory> = Promise.resolve(ImmutableList<Block>()); | ||
private logHistory: Promise<LogHistory> = Promise.resolve(ImmutableList<Log>()); | ||
private latestBlock: Block | null = null; | ||
export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> { | ||
private blockHistory: Promise<BlockHistory<TBlock>> = Promise.resolve(ImmutableList<TBlock>()); | ||
private logHistory: Promise<LogHistory<TLog>> = Promise.resolve(ImmutableList<TLog>()); | ||
private latestBlock: TBlock | null = null; | ||
private readonly blockRetention: number; | ||
private readonly getBlockByHash: (hash: string) => Promise<Block | null>; | ||
private readonly getLogs: (filterOptions: FilterOptions) => Promise<Log[]>; | ||
private readonly getBlockByHash: (hash: string) => Promise<TBlock | null>; | ||
private readonly getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>; | ||
private readonly logFilters: { [propName: string]: Filter } = {} | ||
private readonly onBlockAddedSubscribers: { [propName: string]: (block: Block) => void } = {}; | ||
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: Block) => void } = {}; | ||
private readonly onLogAddedSubscribers: { [propName: string]: (log: Log) => void } = {}; | ||
private readonly onLogRemovedSubscribers: { [propName: string]: (log: Log) => void } = {}; | ||
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {}; | ||
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: TBlock) => void } = {}; | ||
private readonly onLogAddedSubscribers: { [propName: string]: (log: TLog) => void } = {}; | ||
private readonly onLogRemovedSubscribers: { [propName: string]: (log: TLog) => void } = {}; | ||
constructor( | ||
getBlockByHash: (hash: string) => Promise<Block | null>, | ||
getLogs: (filterOptions: FilterOptions) => Promise<Log[]>, | ||
getBlockByHash: (hash: string) => Promise<TBlock | null>, | ||
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>, | ||
configuration?: { blockRetention?: number }, | ||
@@ -39,14 +39,16 @@ ) { | ||
static createCallbackStyle = ( | ||
getBlockByHash: (hash: string, callback: (error?: Error, block?: Block | null) => void) => void, | ||
getLogs: (filterOptions: FilterOptions, callback: (error?: Error, logs?: Log[]) => void) => void, | ||
static createCallbackStyle = <TBlock extends Block, TLog extends Log>( | ||
getBlockByHash: (hash: string, callback: (error?: Error, block?: TBlock | null) => void) => void, | ||
getLogs: (filterOptions: FilterOptions, callback: (error?: Error, logs?: TLog[]) => void) => void, | ||
configuration?: { blockRetention?: number }, | ||
): BlockAndLogStreamer => { | ||
const wrappedGetBlockByHash = (hash: string): Promise<Block | null> => new Promise<Block | null>((resolve, reject) => { | ||
getBlockByHash(hash, (error, block) => { | ||
if (error) throw error; | ||
else resolve(block); | ||
): BlockAndLogStreamer<TBlock, TLog> => { | ||
const wrappedGetBlockByHash = (hash: string): Promise<TBlock | null> => { | ||
return new Promise<TBlock | null>((resolve, reject) => { | ||
getBlockByHash(hash, (error, block) => { | ||
if (error) throw error; | ||
else resolve(block); | ||
}); | ||
}); | ||
}); | ||
const wrappedGetLogs = (filterOptions: FilterOptions): Promise<Log[]> => new Promise<Log[]>((resolve, reject) => { | ||
}; | ||
const wrappedGetLogs = (filterOptions: FilterOptions): Promise<Array<TLog>> => new Promise<Array<TLog>>((resolve, reject) => { | ||
getLogs(filterOptions, (error, logs) => { | ||
@@ -58,6 +60,6 @@ if (error) throw error; | ||
}); | ||
return new BlockAndLogStreamer(wrappedGetBlockByHash, wrappedGetLogs, configuration); | ||
return new BlockAndLogStreamer<TBlock, TLog>(wrappedGetBlockByHash, wrappedGetLogs, configuration); | ||
} | ||
public readonly reconcileNewBlock = async (block: Block): Promise<void> => { | ||
public readonly reconcileNewBlock = async (block: TBlock): Promise<void> => { | ||
this.blockHistory = reconcileBlockHistory(this.getBlockByHash, this.blockHistory, block, this.onBlockAdded, this.onBlockRemoved, this.blockRetention); | ||
@@ -68,3 +70,3 @@ const blockHistory = await this.blockHistory; | ||
public readonly reconcileNewBlockCallbackStyle = async (block: Block, callback: (error?: Error) => void): Promise<void> => { | ||
public readonly reconcileNewBlockCallbackStyle = async (block: TBlock, callback: (error?: Error) => void): Promise<void> => { | ||
this.reconcileNewBlock(block) | ||
@@ -75,3 +77,3 @@ .then(() => callback(undefined)) | ||
private readonly onBlockAdded = async (block: Block): Promise<void> => { | ||
private readonly onBlockAdded = async (block: TBlock): Promise<void> => { | ||
const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]); | ||
@@ -87,3 +89,3 @@ this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention); | ||
private readonly onBlockRemoved = async (block: Block): Promise<void> => { | ||
private readonly onBlockRemoved = async (block: TBlock): Promise<void> => { | ||
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved); | ||
@@ -98,3 +100,3 @@ | ||
private readonly onLogAdded = async (log: Log): Promise<void> => { | ||
private readonly onLogAdded = async (log: TLog): Promise<void> => { | ||
Object.keys(this.onLogAddedSubscribers) | ||
@@ -106,3 +108,3 @@ .map((key: string) => this.onLogAddedSubscribers[key]) | ||
private readonly onLogRemoved = async (log: Log): Promise<void> => { | ||
private readonly onLogRemoved = async (log: TLog): Promise<void> => { | ||
Object.keys(this.onLogRemovedSubscribers) | ||
@@ -115,3 +117,3 @@ .map((key: string) => this.onLogRemovedSubscribers[key]) | ||
public readonly getLatestReconciledBlock = (): Block | null => { | ||
public readonly getLatestReconciledBlock = (): TBlock | null => { | ||
return this.latestBlock; | ||
@@ -133,3 +135,3 @@ }; | ||
public readonly subscribeToOnBlockAdded = (onBlockAdded: (block: Block) => void): string => { | ||
public readonly subscribeToOnBlockAdded = (onBlockAdded: (block: TBlock) => void): string => { | ||
const uuid = `on block added token ${createUuid()}`; | ||
@@ -146,3 +148,3 @@ this.onBlockAddedSubscribers[uuid] = onBlockAdded; | ||
public readonly subscribeToOnBlockRemoved = (onBlockRemoved: (block: Block) => void): string => { | ||
public readonly subscribeToOnBlockRemoved = (onBlockRemoved: (block: TBlock) => void): string => { | ||
const uuid = `on block removed token ${createUuid()}`; | ||
@@ -159,3 +161,3 @@ this.onBlockRemovedSubscribers[uuid] = onBlockRemoved; | ||
public readonly subscribeToOnLogAdded = (onLogAdded: (log: Log) => void): string => { | ||
public readonly subscribeToOnLogAdded = (onLogAdded: (log: TLog) => void): string => { | ||
const uuid = `on log added token ${createUuid()}`; | ||
@@ -172,3 +174,3 @@ this.onLogAddedSubscribers[uuid] = onLogAdded; | ||
public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: Log) => void): string => { | ||
public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: TLog) => void): string => { | ||
const uuid = `on log removed token ${createUuid()}`; | ||
@@ -175,0 +177,0 @@ this.onLogRemovedSubscribers[uuid] = onLogRemoved; |
@@ -5,10 +5,12 @@ import { Block } from "./models/block"; | ||
export const reconcileBlockHistory = async ( | ||
getBlockByHash: (hash: string) => Promise<Block|null>, | ||
blockHistory: BlockHistory|Promise<BlockHistory>, | ||
newBlock: Block, | ||
onBlockAdded: (block: Block) => Promise<void>, | ||
onBlockRemoved: (block: Block) => Promise<void>, | ||
type GetBlockByHash<TBlock> = (hash: string) => Promise<TBlock|null>; | ||
export const reconcileBlockHistory = async <TBlock extends Block>( | ||
getBlockByHash: GetBlockByHash<TBlock>, | ||
blockHistory: BlockHistory<TBlock>|Promise<BlockHistory<TBlock>>, | ||
newBlock: TBlock, | ||
onBlockAdded: (block: TBlock) => Promise<void>, | ||
onBlockRemoved: (block: TBlock) => Promise<void>, | ||
blockRetention: number = 100, | ||
): Promise<BlockHistory> => { | ||
): Promise<BlockHistory<TBlock>> => { | ||
blockHistory = await blockHistory; | ||
@@ -34,3 +36,3 @@ if (isFirstBlock(blockHistory)) | ||
const rollback = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => { | ||
const rollback = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, onBlockRemoved: (block: TBlock) => Promise<void>): Promise<BlockHistory<TBlock>> => { | ||
while (!blockHistory.isEmpty()) { | ||
@@ -43,3 +45,3 @@ // CONSIDER: if this throws an exception, removals may have been announced that are actually still in history since throwing will result in no history update. we can't catch errors here because there isn't a clear way to recover from them, the failure may be a downstream system telling us that the block removal isn't possible because they are in a bad state. we could try re-announcing the successfully added blocks, but there would still be a problem with the failed block (should it be re-announced?) and the addition announcements may also fail | ||
const backfill = async (getBlockByHash: (hash: string) => Promise<Block|null>, blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, onBlockRemoved: (block: Block) => Promise<void>, blockRetention: number) => { | ||
const backfill = async <TBlock extends Block>(getBlockByHash: GetBlockByHash<TBlock>, blockHistory: BlockHistory<TBlock>, newBlock: TBlock, onBlockAdded: (block: TBlock) => Promise<void>, onBlockRemoved: (block: TBlock) => Promise<void>, blockRetention: number) => { | ||
if (newBlock.parentHash === "0x0000000000000000000000000000000000000000000000000000000000000000") | ||
@@ -55,6 +57,6 @@ return rollback(blockHistory, onBlockRemoved); | ||
const addNewHeadBlock = async (blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, blockRetention: number): Promise<BlockHistory> => { | ||
const addNewHeadBlock = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock, onBlockAdded: (block: TBlock) => Promise<void>, blockRetention: number): Promise<BlockHistory<TBlock>> => { | ||
// this is here as a final sanity check, in case we somehow got into an unexpected state, there are no known (and should never be) ways to reach this exception | ||
if (!blockHistory.isEmpty() && blockHistory.last().hash !== newBlock.parentHash) throw new Error("New head block's parent isn't our current head."); | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback be cause it may be trying to signal to use that the block has become invalid and is un-processable | ||
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback because it may be trying to signal to use that the block has become invalid and is un-processable | ||
await onBlockAdded(newBlock); | ||
@@ -65,3 +67,3 @@ blockHistory = blockHistory.push(newBlock); | ||
const removeHeadBlock = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => { | ||
const removeHeadBlock = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, onBlockRemoved: (block: TBlock) => Promise<void>): Promise<BlockHistory<TBlock>> => { | ||
let removedBlock = blockHistory.last(); | ||
@@ -73,7 +75,7 @@ blockHistory = blockHistory.pop(); | ||
const isFirstBlock = (blockHistory: BlockHistory, ): boolean => { | ||
const isFirstBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, ): boolean => { | ||
return blockHistory.isEmpty(); | ||
} | ||
const isAlreadyInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => { | ||
const isAlreadyInHistory = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => { | ||
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions | ||
@@ -83,9 +85,9 @@ return blockHistory.some(block => block!.hash === newBlock.hash); | ||
const isNewHeadBlock = (blockHistory: BlockHistory, newBlock: Block): boolean => { | ||
const isNewHeadBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => { | ||
return blockHistory.last().hash === newBlock.parentHash; | ||
} | ||
const parentHashIsInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => { | ||
const parentHashIsInHistory = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => { | ||
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions | ||
return blockHistory.some(block => block!.hash === newBlock.parentHash); | ||
} |
@@ -8,10 +8,10 @@ import { Block } from "./models/block"; | ||
export async function reconcileLogHistoryWithAddedBlock( | ||
getLogs: (filterOptions: FilterOptions) => Promise<Log[]>, | ||
logHistory: LogHistory | Promise<LogHistory>, | ||
newBlock: Block, | ||
onLogAdded: (log: Log) => Promise<void>, | ||
export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TLog extends Log>( | ||
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>, | ||
logHistory: LogHistory<TLog> | Promise<LogHistory<TLog>>, | ||
newBlock: TBlock, | ||
onLogAdded: (log: TLog) => Promise<void>, | ||
filters: Filter[] = [], | ||
historyBlockLength: number = 100, | ||
): Promise<LogHistory> { | ||
): Promise<LogHistory<TLog>> => { | ||
logHistory = await logHistory; | ||
@@ -25,3 +25,3 @@ const logs = await getFilteredLogs(getLogs, newBlock, filters); | ||
async function getFilteredLogs(getLogs: (filterOptions: FilterOptions) => Promise<Log[]>, newBlock: Block, filters: Filter[]): Promise<Log[]> { | ||
const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs: (filterOptions: FilterOptions) => Promise<Array<TLog>>, newBlock: TBlock, filters: Array<Filter>): Promise<Array<TLog>> => { | ||
const logPromises = filters | ||
@@ -34,3 +34,3 @@ .map(filter => ({ fromBlock: newBlock.number, toBlock: newBlock.number, address: filter.address, topics: filter.topics, })) | ||
async function addNewLogsToHead(logHistory: LogHistory, newLogs: Log[], onLogAdded: (log: Log) => Promise<void>): Promise<LogHistory> { | ||
const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => { | ||
const sortedLogs = newLogs.sort((logA, logB) => parseInt(logA.logIndex, 16) - parseInt(logB.logIndex, 16)); | ||
@@ -44,8 +44,8 @@ for (const log of sortedLogs) { | ||
async function pruneOldLogs(logHistory: LogHistory, newBlock: Block, historyBlockLength: number): Promise<LogHistory> { | ||
// `logBlock!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions | ||
return logHistory.skipUntil(log => parseInt(newBlock!.number, 16) - parseInt(log!.blockNumber, 16) < historyBlockLength).toList(); | ||
const pruneOldLogs = async <TBlock extends Block, TLog extends Log>(logHistory: LogHistory<TLog>, newBlock: TBlock, historyBlockLength: number): Promise<LogHistory<TLog>> => { | ||
// `log!` is required until the next major version of `immutable` is published to NPM (current version 3.8.2) which improves the type definitions | ||
return logHistory.skipUntil(log => parseInt(newBlock.number, 16) - parseInt(log!.blockNumber, 16) < historyBlockLength).toList(); | ||
} | ||
async function addNewLogToHead(logHistory: LogHistory, newLog: Log, onLogAdded: (log: Log) => Promise<void>): Promise<LogHistory> { | ||
const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLog: TLog, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => { | ||
logHistory = logHistory.push(newLog); | ||
@@ -57,3 +57,3 @@ // CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs? | ||
function ensureOrder(headLog: Log | undefined, newLog: Log) { | ||
const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog) => { | ||
if (headLog === undefined) return; | ||
@@ -69,7 +69,7 @@ const headBlockNumber = parseInt(headLog.blockNumber, 16); | ||
export async function reconcileLogHistoryWithRemovedBlock( | ||
logHistory: LogHistory|Promise<LogHistory>, | ||
removedBlock: Block, | ||
onLogRemoved: (log: Log) => Promise<void>, | ||
): Promise<LogHistory> { | ||
export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>( | ||
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>, | ||
removedBlock: TBlock, | ||
onLogRemoved: (log: TLog) => Promise<void>, | ||
): Promise<LogHistory<TLog>> => { | ||
logHistory = await logHistory; | ||
@@ -76,0 +76,0 @@ |
import { Block } from "./block"; | ||
import { List as ImmutableList } from "immutable"; | ||
export type BlockHistory = ImmutableList<Block>; | ||
export type BlockHistory<TBlock extends Block> = ImmutableList<TBlock>; |
@@ -7,17 +7,2 @@ import { Transaction } from "./transaction"; | ||
readonly parentHash: string; | ||
readonly nonce: string; | ||
readonly sha3Uncles: string; | ||
readonly logsBloom: string; | ||
readonly transactionRoot: string; | ||
readonly stateRoot: string; | ||
readonly receiptsRoot: string; | ||
readonly miner: string; | ||
readonly difficulty: string; | ||
readonly totalDifficulty: string; | ||
readonly size: string; | ||
readonly gasLimit: string; | ||
readonly gasUsed: string; | ||
readonly timestamp: string; | ||
readonly transactions: string[] | Transaction[]; | ||
readonly uncles: string[]; | ||
} |
import { Log } from "./log"; | ||
import { List as ImmutableList } from "immutable"; | ||
export type LogHistory = ImmutableList<Log>; | ||
export type LogHistory<TLog extends Log> = ImmutableList<TLog>; |
@@ -5,7 +5,2 @@ export interface Log { | ||
readonly blockHash: string, | ||
readonly transactionHash: string, | ||
readonly transactionIndex: string, | ||
readonly address: string, | ||
readonly data: string, | ||
readonly topics: string[], | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
79246
1002
+ Addedimmutable@3.8.2(transitive)
- Removedimmutable@3.8.1(transitive)
Updatedimmutable@3.8.2