@bitski/provider-engine
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -20,18 +20,24 @@ /// <reference types="node" /> | ||
} | ||
export interface ProviderEngineOptions { | ||
blockTracker?: any; | ||
blockTrackerProvider?: any; | ||
pollingInterval?: number; | ||
} | ||
export declare type JSONRPCResponseHandler = (error: null | Error, response: JSONRPCResponse) => void; | ||
export default class Web3ProviderEngine extends EventEmitter { | ||
_blockTracker: PollingBlockTracker; | ||
_ready: Stoplight; | ||
currentBlock: any; | ||
currentBlockNumber: any; | ||
_providers: Subprovider[]; | ||
constructor(opts?: any); | ||
protected _blockTracker: PollingBlockTracker; | ||
protected _ready: Stoplight; | ||
protected _providers: Subprovider[]; | ||
protected _running: boolean; | ||
constructor(opts?: ProviderEngineOptions); | ||
isRunning(): boolean; | ||
start(): void; | ||
stop(): void; | ||
addProvider(source: Subprovider): void; | ||
send(payload: JSONRPCRequest): void; | ||
send(method: string, params: any[]): Promise<any>; | ||
sendAsync(payload: JSONRPCRequest, cb: JSONRPCResponseHandler): void; | ||
_handleAsync(payload: JSONRPCRequest, finished: JSONRPCResponseHandler): void; | ||
_setCurrentBlockNumber(blockNumber: any): void; | ||
_setCurrentBlock(block: any): void; | ||
protected _handleAsync(payload: JSONRPCRequest, finished: JSONRPCResponseHandler): void; | ||
protected _getBlockByNumber(blockNumber: any, cb: any): void; | ||
protected _setCurrentBlock(block: any): void; | ||
} |
@@ -30,2 +30,3 @@ "use strict"; | ||
var _this = _super.call(this) || this; | ||
_this._running = false; | ||
_this.setMaxListeners(30); | ||
@@ -36,6 +37,3 @@ // parse options | ||
var directProvider = { | ||
sendAsync: function (payload, callback) { | ||
payload.skipCache = true; | ||
_this._handleAsync(payload, callback); | ||
}, | ||
sendAsync: _this._handleAsync.bind(_this), | ||
}; | ||
@@ -46,2 +44,3 @@ var blockTrackerProvider = opts.blockTrackerProvider || directProvider; | ||
pollingInterval: opts.pollingInterval || 4000, | ||
setSkipCacheFlag: true, | ||
}); | ||
@@ -55,19 +54,40 @@ // set initialization blocker | ||
} | ||
Web3ProviderEngine.prototype.isRunning = function () { | ||
return this._running; | ||
}; | ||
Web3ProviderEngine.prototype.start = function () { | ||
var _this = this; | ||
// handle new block | ||
// trigger start | ||
this._ready.go(); | ||
// on new block, request block body and emit as events | ||
this._blockTracker.on('latest', function (blockNumber) { | ||
_this._setCurrentBlockNumber(blockNumber); | ||
// get block body | ||
_this._getBlockByNumber(blockNumber, function (err, block) { | ||
if (err) { | ||
_this.emit('error', err); | ||
return; | ||
} | ||
var bufferBlock = toBufferBlock(block); | ||
// set current + emit "block" event | ||
_this._setCurrentBlock(bufferBlock); | ||
// emit other events | ||
_this.emit('rawBlock', block); | ||
_this.emit('latest', block); | ||
}); | ||
}); | ||
// emit block events from the block tracker | ||
// forward other events | ||
this._blockTracker.on('sync', this.emit.bind(this, 'sync')); | ||
this._blockTracker.on('latest', this.emit.bind(this, 'latest')); | ||
// unblock initialization after first block | ||
this._blockTracker.once('latest', function () { | ||
_this._ready.go(); | ||
}); | ||
this._blockTracker.on('error', this.emit.bind(this, 'error')); | ||
// update state | ||
this._running = true; | ||
// signal that we started | ||
this.emit('start'); | ||
}; | ||
Web3ProviderEngine.prototype.stop = function () { | ||
// stop block polling | ||
// stop block polling by removing event listeners | ||
this._blockTracker.removeAllListeners(); | ||
// update state | ||
this._running = false; | ||
// signal that we stopped | ||
this.emit('stop'); | ||
}; | ||
@@ -78,18 +98,35 @@ Web3ProviderEngine.prototype.addProvider = function (source) { | ||
}; | ||
Web3ProviderEngine.prototype.send = function (payload) { | ||
throw new Error('Web3ProviderEngine does not support synchronous requests.'); | ||
}; | ||
Web3ProviderEngine.prototype.sendAsync = function (payload, cb) { | ||
// New send method | ||
Web3ProviderEngine.prototype.send = function (method, params) { | ||
var _this = this; | ||
this._ready.await(function () { | ||
if (Array.isArray(payload)) { | ||
// handle batch | ||
map_1.default(payload, _this._handleAsync.bind(_this), cb); | ||
} | ||
else { | ||
// handle single | ||
_this._handleAsync(payload, cb); | ||
} | ||
var payload = { | ||
id: 0, | ||
jsonrpc: '2.0', | ||
method: method, | ||
params: params, | ||
}; | ||
return new Promise(function (fulfill, reject) { | ||
_this._ready.await(function () { | ||
_this._handleAsync(payload, function (error, result) { | ||
if (error) { | ||
reject(error); | ||
} | ||
else { | ||
fulfill(result); | ||
} | ||
}); | ||
}); | ||
}); | ||
}; | ||
// Legacy sendAsync method | ||
Web3ProviderEngine.prototype.sendAsync = function (payload, cb) { | ||
if (Array.isArray(payload)) { | ||
// handle batch | ||
map_1.default(payload, this._handleAsync.bind(this), cb); | ||
} | ||
else { | ||
// handle single | ||
this._handleAsync(payload, cb); | ||
} | ||
}; | ||
Web3ProviderEngine.prototype._handleAsync = function (payload, finished) { | ||
@@ -155,20 +192,14 @@ var _this = this; | ||
}; | ||
// Once we detect a new block number, load the block data | ||
Web3ProviderEngine.prototype._setCurrentBlockNumber = function (blockNumber) { | ||
var self = this; | ||
self.currentBlockNumber = blockNumber; | ||
// Make sure we skip the cache for this request | ||
var payload = create_payload_1.createPayload({ method: 'eth_getBlockByNumber', params: [blockNumber, false], skipCache: true }); | ||
self.sendAsync(payload, function (err, result) { | ||
Web3ProviderEngine.prototype._getBlockByNumber = function (blockNumber, cb) { | ||
var req = create_payload_1.createPayload({ method: 'eth_getBlockByNumber', params: [blockNumber, false], skipCache: true }); | ||
this._handleAsync(req, function (err, res) { | ||
if (err) { | ||
return; | ||
return cb(err); | ||
} | ||
var bufferBlock = toBufferBlock(result.result); | ||
self._setCurrentBlock(bufferBlock); | ||
return cb(null, res.result); | ||
}); | ||
}; | ||
Web3ProviderEngine.prototype._setCurrentBlock = function (block) { | ||
var self = this; | ||
self.currentBlock = block; | ||
self.emit('block', block); | ||
this.currentBlock = block; | ||
this.emit('block', block); | ||
}; | ||
@@ -175,0 +206,0 @@ return Web3ProviderEngine; |
@@ -118,23 +118,18 @@ "use strict"; | ||
var _this = this; | ||
this._getBlockNumber(function (error, blockNumber) { | ||
if (error) { | ||
return done(error); | ||
} | ||
var filter = new log_filter_1.default(opts); | ||
var newLogHandler = filter.update.bind(filter); | ||
var blockHandler = function (block, cb) { | ||
_this._logsForBlock(block, function (err, logs) { | ||
if (err) { | ||
return cb(err); | ||
} | ||
newLogHandler(logs); | ||
cb(); | ||
}); | ||
}; | ||
_this.filterIndex++; | ||
_this.asyncBlockHandlers[_this.filterIndex] = blockHandler; | ||
_this.filters[_this.filterIndex] = filter; | ||
var hexFilterIndex = eth_util_1.intToHex(_this.filterIndex); | ||
done(null, hexFilterIndex); | ||
}); | ||
var filter = new log_filter_1.default(opts); | ||
var newLogHandler = filter.update.bind(filter); | ||
var blockHandler = function (block, cb) { | ||
_this._logsForBlock(block, function (err, logs) { | ||
if (err) { | ||
return cb(err); | ||
} | ||
newLogHandler(logs); | ||
cb(); | ||
}); | ||
}; | ||
this.filterIndex++; | ||
this.asyncBlockHandlers[this.filterIndex] = blockHandler; | ||
this.filters[this.filterIndex] = filter; | ||
var hexFilterIndex = eth_util_1.intToHex(this.filterIndex); | ||
done(null, hexFilterIndex); | ||
}; | ||
@@ -141,0 +136,0 @@ FilterSubprovider.prototype.newPendingTransactionFilter = function (done) { |
@@ -36,3 +36,2 @@ "use strict"; | ||
BlockFilter.prototype.update = function (block) { | ||
// console.log('BlockFilter - update') | ||
var blockHash = bufferToHex(block.hash); | ||
@@ -39,0 +38,0 @@ this.updates.push(blockHash); |
@@ -41,3 +41,2 @@ "use strict"; | ||
var _this = this; | ||
// console.log('LogFilter - update') | ||
// validate filter match | ||
@@ -44,0 +43,0 @@ var validLogs = []; |
@@ -51,2 +51,3 @@ "use strict"; | ||
_this.filters[id].on('data', function (results) { | ||
_this.filters[id].clearChanges(); | ||
if (!Array.isArray(results)) { | ||
@@ -56,3 +57,2 @@ results = [results]; | ||
results.forEach(function (r) { return _this._notificationHandler(hexId, subscriptionType, r); }); | ||
_this.filters[id].clearChanges(); | ||
}); | ||
@@ -59,0 +59,0 @@ if (subscriptionType === 'newPendingTransactions') { |
{ | ||
"name": "@bitski/provider-engine", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "repository": "https://github.com/BitskiCo/provider-engine", |
@@ -26,2 +26,8 @@ import eachSeries from 'async/eachSeries'; | ||
export interface ProviderEngineOptions { | ||
blockTracker?: any; | ||
blockTrackerProvider?: any; | ||
pollingInterval?: number; | ||
} | ||
export type JSONRPCResponseHandler = (error: null | Error, response: JSONRPCResponse) => void; | ||
@@ -31,9 +37,10 @@ | ||
public _blockTracker: PollingBlockTracker; | ||
public _ready: Stoplight; | ||
public currentBlock: any; | ||
public currentBlockNumber: any; | ||
public _providers: Subprovider[]; | ||
constructor(opts?) { | ||
protected _blockTracker: PollingBlockTracker; | ||
protected _ready: Stoplight; | ||
protected _providers: Subprovider[]; | ||
protected _running: boolean = false; | ||
constructor(opts?: ProviderEngineOptions) { | ||
super(); | ||
@@ -45,6 +52,3 @@ this.setMaxListeners(30); | ||
const directProvider = { | ||
sendAsync: (payload, callback) => { | ||
payload.skipCache = true; | ||
this._handleAsync(payload, callback); | ||
}, | ||
sendAsync: this._handleAsync.bind(this), | ||
}; | ||
@@ -55,2 +59,3 @@ const blockTrackerProvider = opts.blockTrackerProvider || directProvider; | ||
pollingInterval: opts.pollingInterval || 4000, | ||
setSkipCacheFlag: true, | ||
}); | ||
@@ -66,21 +71,44 @@ | ||
public isRunning(): boolean { | ||
return this._running; | ||
} | ||
public start() { | ||
// handle new block | ||
// trigger start | ||
this._ready.go(); | ||
// on new block, request block body and emit as events | ||
this._blockTracker.on('latest', (blockNumber) => { | ||
this._setCurrentBlockNumber(blockNumber); | ||
// get block body | ||
this._getBlockByNumber(blockNumber, (err, block) => { | ||
if (err) { | ||
this.emit('error', err); | ||
return; | ||
} | ||
const bufferBlock = toBufferBlock(block); | ||
// set current + emit "block" event | ||
this._setCurrentBlock(bufferBlock); | ||
// emit other events | ||
this.emit('rawBlock', block); | ||
this.emit('latest', block); | ||
}); | ||
}); | ||
// emit block events from the block tracker | ||
// forward other events | ||
this._blockTracker.on('sync', this.emit.bind(this, 'sync')); | ||
this._blockTracker.on('latest', this.emit.bind(this, 'latest')); | ||
this._blockTracker.on('error', this.emit.bind(this, 'error')); | ||
// unblock initialization after first block | ||
this._blockTracker.once('latest', () => { | ||
this._ready.go(); | ||
}); | ||
// update state | ||
this._running = true; | ||
// signal that we started | ||
this.emit('start'); | ||
} | ||
public stop() { | ||
// stop block polling | ||
// stop block polling by removing event listeners | ||
this._blockTracker.removeAllListeners(); | ||
// update state | ||
this._running = false; | ||
// signal that we stopped | ||
this.emit('stop'); | ||
} | ||
@@ -93,21 +121,35 @@ | ||
public send(payload: JSONRPCRequest) { | ||
throw new Error('Web3ProviderEngine does not support synchronous requests.'); | ||
// New send method | ||
public send(method: string, params: any[]): Promise<any> { | ||
const payload = { | ||
id: 0, | ||
jsonrpc: '2.0', | ||
method, | ||
params, | ||
}; | ||
return new Promise((fulfill, reject) => { | ||
this._ready.await(() => { | ||
this._handleAsync(payload, (error, result) => { | ||
if (error) { | ||
reject(error); | ||
} else { | ||
fulfill(result); | ||
} | ||
}); | ||
}); | ||
}); | ||
} | ||
// Legacy sendAsync method | ||
public sendAsync(payload: JSONRPCRequest, cb: JSONRPCResponseHandler) { | ||
this._ready.await(() => { | ||
if (Array.isArray(payload)) { | ||
// handle batch | ||
map(payload, this._handleAsync.bind(this), cb); | ||
} else { | ||
// handle single | ||
this._handleAsync(payload, cb); | ||
} | ||
}); | ||
if (Array.isArray(payload)) { | ||
// handle batch | ||
map(payload, this._handleAsync.bind(this), cb); | ||
} else { | ||
// handle single | ||
this._handleAsync(payload, cb); | ||
} | ||
} | ||
public _handleAsync(payload: JSONRPCRequest, finished: JSONRPCResponseHandler) { | ||
protected _handleAsync(payload: JSONRPCRequest, finished: JSONRPCResponseHandler) { | ||
let currentProvider = -1; | ||
@@ -174,19 +216,13 @@ let result = null; | ||
// Once we detect a new block number, load the block data | ||
public _setCurrentBlockNumber(blockNumber) { | ||
const self = this; | ||
self.currentBlockNumber = blockNumber; | ||
// Make sure we skip the cache for this request | ||
const payload = createPayload({ method: 'eth_getBlockByNumber', params: [blockNumber, false], skipCache: true }); | ||
self.sendAsync(payload, (err, result) => { | ||
if (err) { return; } | ||
const bufferBlock = toBufferBlock(result.result); | ||
self._setCurrentBlock(bufferBlock); | ||
protected _getBlockByNumber(blockNumber, cb) { | ||
const req = createPayload({ method: 'eth_getBlockByNumber', params: [blockNumber, false], skipCache: true }); | ||
this._handleAsync(req, (err, res) => { | ||
if (err) { return cb(err); } | ||
return cb(null, res.result); | ||
}); | ||
} | ||
public _setCurrentBlock(block) { | ||
const self = this; | ||
self.currentBlock = block; | ||
self.emit('block', block); | ||
protected _setCurrentBlock(block) { | ||
this.currentBlock = block; | ||
this.emit('block', block); | ||
} | ||
@@ -193,0 +229,0 @@ |
@@ -136,22 +136,18 @@ import parallel from 'async/parallel'; | ||
public newLogFilter(opts, done) { | ||
this._getBlockNumber((error, blockNumber) => { | ||
if (error) { return done(error); } | ||
const filter = new LogFilter(opts); | ||
const newLogHandler = filter.update.bind(filter); | ||
const blockHandler = (block, cb) => { | ||
this._logsForBlock(block, (err, logs) => { | ||
if (err) { return cb(err); } | ||
newLogHandler(logs); | ||
cb(); | ||
}); | ||
}; | ||
const filter = new LogFilter(opts); | ||
const newLogHandler = filter.update.bind(filter); | ||
const blockHandler = (block, cb) => { | ||
this._logsForBlock(block, (err, logs) => { | ||
if (err) { return cb(err); } | ||
newLogHandler(logs); | ||
cb(); | ||
}); | ||
}; | ||
this.filterIndex++; | ||
this.asyncBlockHandlers[this.filterIndex] = blockHandler; | ||
this.filters[this.filterIndex] = filter; | ||
this.filterIndex++; | ||
this.asyncBlockHandlers[this.filterIndex] = blockHandler; | ||
this.filters[this.filterIndex] = filter; | ||
const hexFilterIndex = intToHex(this.filterIndex); | ||
done(null, hexFilterIndex); | ||
}); | ||
const hexFilterIndex = intToHex(this.filterIndex); | ||
done(null, hexFilterIndex); | ||
} | ||
@@ -224,2 +220,3 @@ | ||
const destroyHandler = this.filterDestroyHandlers[filterId]; | ||
delete this.filters[filterId]; | ||
@@ -226,0 +223,0 @@ delete this.asyncBlockHandlers[filterId]; |
@@ -23,3 +23,2 @@ import Web3ProviderEngine from '../..'; | ||
public update(block) { | ||
// console.log('BlockFilter - update') | ||
const blockHash = bufferToHex(block.hash); | ||
@@ -26,0 +25,0 @@ this.updates.push(blockHash); |
@@ -28,3 +28,2 @@ import Filter from './filter'; | ||
public update(logs) { | ||
// console.log('LogFilter - update') | ||
// validate filter match | ||
@@ -31,0 +30,0 @@ const validLogs = []; |
@@ -40,4 +40,4 @@ import { JSONRPCRequest } from '../provider-engine'; | ||
this.subscriptions[id] = subscriptionType; | ||
this.filters[id].on('data', (results) => { | ||
this.filters[id].clearChanges(); | ||
if (!Array.isArray(results)) { | ||
@@ -47,3 +47,2 @@ results = [results]; | ||
results.forEach((r) => this._notificationHandler(hexId, subscriptionType, r)); | ||
this.filters[id].clearChanges(); | ||
}); | ||
@@ -50,0 +49,0 @@ |
@@ -212,3 +212,2 @@ // tslint:disable: max-line-length | ||
pollingInterval: 20, | ||
pollingShouldUnref: true, | ||
}); | ||
@@ -215,0 +214,0 @@ |
@@ -42,3 +42,3 @@ // tslint:disable: max-line-length | ||
}); | ||
const block = testMeta.block = testMeta.blockProvider.nextBlock(); | ||
testMeta.block = testMeta.blockProvider.nextBlock(); | ||
cb(); | ||
@@ -297,3 +297,2 @@ }, | ||
pollingInterval: 20, | ||
pollingShouldUnref: false, | ||
}); | ||
@@ -300,0 +299,0 @@ engine.addProvider(filterProvider); |
@@ -185,4 +185,3 @@ // tslint:disable: max-line-length | ||
const engine = testMeta.engine = new ProviderEngine({ | ||
pollingInterval: 20, | ||
pollingShouldUnref: false, | ||
pollingInterval: 100, | ||
}); | ||
@@ -189,0 +188,0 @@ engine.addProvider(subscriptionSubprovider); |
@@ -29,3 +29,4 @@ import extend from 'xtend'; | ||
eth_getLogs: (payload, next, end) => { | ||
const transactions = this._currentBlock.transactions; | ||
const { fromBlock } = payload.params[0]; | ||
const transactions = this._blockChain[fromBlock].transactions; | ||
// return result asynchronously | ||
@@ -32,0 +33,0 @@ setTimeout(() => end(null, transactions)); |
226724
5834