Research
Security News
Kill Switch Hidden in npm Packages Typosquatting Chalk and Chokidar
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
A robust, async-based Python API designed to interface seamlessly with the Rithmic Protocol Buffer API. This package is built to provide an efficient and reliable connection to Rithmic's trading infrastructure, catering to advanced trading strategies and real-time data handling.
This was originally a fork of pyrithmic, but the code has been completely rewritten.
This repo introduces several key improvements and new features over the original repository, ensuring compatibility with modern Python environments and providing additional functionality:
The most significant upgrade is the transition to an async architecture, providing superior performance and responsiveness when dealing with real-time trading and market data.
pip install async_rithmic
Here's an example to get the front month contract for ES and stream market data:
import asyncio
from async_rithmic import RithmicClient, Gateway, DataType, LastTradePresenceBits
async def callback(data: dict):
if data["presence_bits"] & LastTradePresenceBits.LAST_TRADE:
print("received", data)
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Request front month contract
symbol, exchange = "ES", "CME"
security_code = await client.get_front_month_contract(symbol, exchange)
# Stream market data
print(f"Streaming market data for {security_code}")
data_type = DataType.LAST_TRADE
client.on_tick += callback
await client.subscribe_to_market_data(security_code, exchange, data_type)
# Wait 10 seconds, unsubscribe and disconnect
await asyncio.sleep(10)
await client.unsubscribe_from_market_data(security_code, exchange, data_type)
await client.disconnect()
asyncio.run(main())
import asyncio
from async_rithmic import RithmicClient, Gateway, TimeBarType
async def callback(data: dict):
print("received", data)
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Request front month contract
symbol, exchange = "ES", "CME"
security_code = await client.get_front_month_contract(symbol, exchange)
# Stream market data
print(f"Streaming market data for {security_code}")
client.on_time_bar += callback
await client.subscribe_to_time_bar_data(security_code, exchange, TimeBarType.SECOND_BAR, 6)
# Wait 10 seconds, unsubscribe and disconnect
await asyncio.sleep(20)
await client.unsubscribe_from_time_bar_data(security_code, exchange, TimeBarType.SECOND_BAR, 6)
await client.disconnect()
asyncio.run(main())
As a market order will be filled immediately, this script will submit the order and receive a fill straight away:
import asyncio
from datetime import datetime
from async_rithmic import RithmicClient, Gateway, OrderType, ExchangeOrderNotificationType, TransactionType
async def callback(notification):
if notification.notify_type == ExchangeOrderNotificationType.FILL:
print("order filled", notification)
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Request front month contract
symbol, exchange = "ES", "CME"
security_code = await client.get_front_month_contract(symbol, exchange)
# Submit order
client.on_exchange_order_notification += callback
order_id = '{0}_order'.format(datetime.now().strftime('%Y%m%d_%H%M%S'))
await client.submit_order(
order_id,
security_code,
exchange,
qty=1,
order_type=OrderType.MARKET,
transaction_type=TransactionType.SELL,
#account_id="ABCD" # Mandatory if you have multiple accounts
#stop_ticks=20, # Optional: you can specify a stop loss and profit target in ticks
#target_ticks=40,
#cancel_at=datetime.now() + timedelta(minutes=2), # Optional: cancellation datetime
)
await asyncio.sleep(1)
await client.disconnect()
asyncio.run(main())
import asyncio
from datetime import datetime
from async_rithmic import RithmicClient, Gateway, OrderType, TransactionType
async def exchange_order_notification_callback(notification):
print("exchange order notification", notification)
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Request front month contract
symbol, exchange = "ES", "CME"
security_code = await client.get_front_month_contract(symbol, exchange)
# Submit order
client.on_exchange_order_notification += exchange_order_notification_callback
order_id = '{0}_order'.format(datetime.now().strftime('%Y%m%d_%H%M%S'))
await client.submit_order(
order_id,
security_code,
exchange,
qty=1,
order_type=OrderType.LIMIT,
transaction_type=TransactionType.BUY,
price=5300.,
)
await asyncio.sleep(1)
await client.cancel_order(order_id=order_id)
await asyncio.sleep(1)
await client.disconnect()
asyncio.run(main())
The following example will fetch historical data, in a streaming fashion:
import asyncio
from datetime import datetime
from async_rithmic import RithmicClient, Gateway
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Fetch historical tick data
ticks = await client.get_historical_tick_data(
"ESZ4",
"CME",
datetime(2024, 10, 15, 15, 30),
datetime(2024, 10, 15, 15, 31),
)
print(f"Received {len(ticks)} ticks")
print(f"Last tick timestamp: {ticks[-1]['datetime']}")
await client.disconnect()
asyncio.run(main())
import asyncio
from datetime import datetime
from async_rithmic import RithmicClient, Gateway, TimeBarType
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
# Fetch historical time bar data
bars = await client.get_historical_time_bars(
"ESZ4",
"CME",
datetime(2024, 10, 15, 15, 30),
datetime(2024, 10, 15, 15, 31),
TimeBarType.SECOND_BAR,
6
)
print(f"Received {len(bars)} bars")
print(f"Last bar timestamp: {bars[-1]['bar_end_datetime']}")
await client.disconnect()
asyncio.run(main())
This code snippet will list your account summary, session orders and positions:
import asyncio
from async_rithmic import RithmicClient, Gateway, InstrumentType
async def main():
client = RithmicClient(user="", password="", system_name="Rithmic Test", app_name="my_test_app", app_version="1.0", gateway=Gateway.TEST)
await client.connect()
account_id = "MY_ACCOUNT"
result = await client.search_symbols("MCL", instrument_type=InstrumentType.FUTURE)
print("Search result:", result)
summary = await client.list_account_summary(account_id=account_id)
print("Account summary:", summary[0])
orders = await client.list_orders(account_id=account_id)
print("Orders:", orders)
positions = await client.list_positions(account_id=account_id)
print("Positions:", positions)
await client.disconnect()
asyncio.run(main())
To execute the tests, use the following command: make tests
FAQs
Python API Integration with Rithmic Protocol Buffer API
We found that async-rithmic demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.