Blocknative Python SDK
Install
VirtualEnv QuickStart
$ virtualenv bn
$ . ./bn/bin/activate
(bn) $ pip3 install --upgrade pip
(bn) $ pip3 install blocknative-sdk
(bn) $ curl 'https://raw.githubusercontent.com/blocknative/python-sdk/main/examples/subscribe.py' > subscribe.py
(bn) $ python3 subscribe.py myapikey.key
Manual Install
python3 setup.py install
API Key
To get started using the Blocknative Python SDK you must first obtain an API Key. You can do so by heading over to Blocknative.com!
Usage
Basic usage
from blocknative.stream import Stream
import json
stream = Stream('<API_KEY>')
async def txn_handler(txn, unsubscribe):
print(json.dumps(txn, indent=4))
uniswap_v2_address = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d'
stream.subscribe_address(uniswap_v2_address, txn_handler)
stream.connect()
Unsubscribing
from blocknative.stream import Stream
import json
stream = Stream('<API_KEY>')
async def txn_handler(txn, unsubscribe):
if txn['status'] == "confirmed":
print(json.dumps(txn, indent=4))
unsubscribe()
uniswap_v2_address = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d'
stream.subscribe_address(uniswap_v2_address, txn_handler)
stream.connect()
Using Filters
from blocknative.stream import Stream
import json
stream = Stream('<API_KEY>')
async def txn_handler(txn, unsubscribe):
print(json.dumps(txn, indent=4))
uniswap_v2_address = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d'
filters = [{
'status': 'confirmed'
}]
stream.subscribe_address(curve_fi_address, txn_handler, filter=filters)
stream.connect()
Using Global Filters
Similar as above but this time we use global filters which will apply to all subscriptions.
from blocknative.stream import Stream
import json
global_filters = [{
'status': 'confirmed'
}]
stream = Stream('<API_KEY>', global_filters=global_filters)
async def txn_handler(txn, unsubscribe):
print(json.dumps(txn, indent=4))
uniswap_v2_address = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d'
curve_fi_address = '0xdf5e0e81dff6faf3a7e52ba697820c5e32d806a8'
sushi_swap_address = '0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f'
stream.subscribe_address(curve_fi_address, txn_handler)
stream.subscribe_address(uniswap_v2_address, txn_handler)
stream.subscribe_address(sushi_swap_address, txn_handler)
stream.connect()
Connecting to Binance Smart Chain
from blocknative.stream import Stream
import json
BSC_NETWORK_ID = 56
stream = Stream('<API_KEY>', network_id=BSC_NETWORK_ID)
async def txn_handler(txn, unsubscribe):
print(json.dumps(txn, indent=4))
pancakeswap_v2_address = '0x10ed43c718714eb63d5aa57b78b54704e256024e'
stream.subscribe_address(pancakeswap_v2_address, txn_handler)
stream.connect()