Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

chainbench

Package Overview
Dependencies
Maintainers
1
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

chainbench - npm Package Compare versions

Comparing version
0.7.5
to
0.8.0
+16
chainbench/profile/ethereum/subscriptions.py
from chainbench.user import WssJrpcUser
from chainbench.user.protocol.ethereum import EthSubscribe
class EthSubscriptions(WssJrpcUser):
subscriptions = [
EthSubscribe(["newHeads"]),
# logs subscription for approve method signature
EthSubscribe(["logs", {"topics": ["0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"]}]),
EthSubscribe(["newPendingTransactions"]),
]
def get_notification_name(self, parsed_response: dict):
return self.get_subscription(
subscription_id=parsed_response["params"]["subscription"]
).subscribe_rpc_call.params[0]
import random
import typing as t
from locust import tag, task
from locust.contrib.fasthttp import ResponseContextManager
from chainbench.user.http import HttpUser
from chainbench.util.jsonrpc import (
RpcCall,
expand_rpc_calls,
generate_batch_request_body,
)
class JrpcHttpUser(HttpUser):
"""Extension of HttpUser to provide JsonRPC support."""
abstract = True
rpc_path = ""
rpc_error_code_exclusions: list[int] = []
rpc_calls: dict[t.Callable, int] = {} # To be populated in the subclass load profile
calls_per_batch = 10 # default requests to include in a batch request
def __init__(self, environment: t.Any):
self.calls_per_batch = environment.parsed_options.batch_size
super().__init__(environment)
@tag("single")
@task
def rpc_call_task(self) -> None:
self.method_to_task_function(self.environment.parsed_options.method)(self)
@tag("batch")
@task
def batch_rpc_call_task(self) -> None:
rpc_calls = {getattr(self, method.__name__): weight for method, weight in self.rpc_calls.items()}
self.make_random_batch_rpc_call(
rpc_calls,
calls_per_batch=self.calls_per_batch,
)
@tag("batch_single")
@task
def batch_single_rpc_call_task(self) -> None:
rpc_call: RpcCall = self.method_to_rpc_call(self.environment.parsed_options.method)(self)
rpc_calls = [rpc_call for _ in range(self.calls_per_batch)]
self.make_batch_rpc_call(
rpc_calls,
)
@classmethod
def method_to_rpc_call(cls, method: str) -> t.Callable:
method_name = cls.method_to_function_name(method)
return getattr(cls, method_name)
def check_json_rpc_response(self, response: ResponseContextManager, name: str) -> None:
CHUNK_SIZE = 1024
if response.text is None:
self.logger.error(f"Response for {name} is empty")
response.failure(f"Response for {name} is empty")
return
data = response.text[:CHUNK_SIZE]
if "jsonrpc" not in data:
self.logger.error(f"Response for {name} is not a JSON-RPC: {response.text}")
response.failure(f"Response for {name} is not a JSON-RPC")
return
if "error" in data:
response_js: list | dict = response.json()
if isinstance(response_js, dict):
response_js = [response_js]
if isinstance(response_js, list):
for response_js_item in response_js:
if "error" in response_js_item:
if "code" in response_js_item["error"]:
self.logger.error(f"Response for {name} has a JSON-RPC error: {response.text}")
if response_js_item["error"]["code"] not in self.rpc_error_code_exclusions:
response.failure(
f"Response for {name} has a JSON-RPC error {response_js_item['error']['code']} - "
f"{response_js_item['error']['message']}"
)
return
response.failure("Unspecified JSON-RPC error")
self.logger.error(f"Unspecified JSON-RPC error: {response.text}")
return
# TODO: handle multiple errors in batch response properly
if "result" not in data:
response.failure(f"Response for {name} call has no result")
self.logger.error(f"Response for {name} call has no result: {response.text}")
def make_rpc_call(
self,
rpc_call: RpcCall | None = None,
method: str | None = None,
params: list[t.Any] | dict | None = None,
name: str = "",
path: str = "",
) -> None:
"""Make a JSON-RPC call."""
if rpc_call is None:
if method is None:
raise ValueError("Either rpc_call or method must be provided")
else:
rpc_call = RpcCall(method, params)
name = method
else:
name = rpc_call.method
with self.client.request(
"POST", self.rpc_path + path, json=rpc_call.request_body(), name=name, catch_response=True
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)
def make_batch_rpc_call(self, rpc_calls: list[RpcCall], name: str = "", path: str = "") -> None:
"""Make a Batch JSON-RPC call."""
if name == "":
name = f"Batch RPC ({len(rpc_calls)})"
headers = {"Content-Type": "application/json", "accept": "application/json"}
with self.client.request(
"POST",
self.rpc_path + path,
data=generate_batch_request_body(rpc_calls),
name=name,
catch_response=True,
headers=headers,
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)
def make_random_batch_rpc_call(
self,
weighted_rpc_calls: dict[t.Callable[[], RpcCall], int],
calls_per_batch: int,
name: str = "",
path: str = "",
) -> None:
"""Make a Batch JSON-RPC call."""
rpc_calls: list[RpcCall] = expand_rpc_calls(weighted_rpc_calls)
random_rpc_calls: list[RpcCall] = random.choices(rpc_calls, k=calls_per_batch)
self.make_batch_rpc_call(random_rpc_calls, name=name, path=path)
import logging
import time
import gevent
import orjson as json
from gevent import Greenlet, Timeout
from locust import User, task
from locust.env import Environment
from orjson import JSONDecodeError
from websocket import WebSocket, WebSocketConnectionClosedException, create_connection
from chainbench.util.jsonrpc import RpcCall
class WSSubscription:
def __init__(self, subscribe_method: str, subscribe_params: dict | list, unsubscribe_method: str):
self.subscribe_rpc_call: RpcCall = RpcCall(subscribe_method, subscribe_params)
self.unsubscribe_method: str = unsubscribe_method
self.subscribed: bool = False
self._subscription_id: int | str | None = None
@property
def subscription_id(self):
return self._subscription_id
@subscription_id.setter
def subscription_id(self, value: int | str):
self._subscription_id = value
self.subscribed = True
@subscription_id.deleter
def subscription_id(self):
self._subscription_id = None
self.subscribed = False
class WSRequest:
def __init__(self, rpc_call: RpcCall, start_time: int, subscription_index: int | None = None):
self.rpc_call = rpc_call
self.start_time = start_time
self.subscription_index = subscription_index
class WssJrpcUser(User):
abstract = True
logger = logging.getLogger(__name__)
# To be populated by subclass
subscriptions: list[WSSubscription] = []
subscription_ids_to_index: dict[str | int, int] = {}
def __init__(self, environment: Environment):
super().__init__(environment)
self._ws: WebSocket | None = None
self._ws_greenlet: Greenlet | None = None
self._requests: dict[int, WSRequest] = {}
self._running: bool = False
def get_subscription(self, subscription_id: str | int):
return self.subscriptions[self.subscription_ids_to_index[subscription_id]]
@task
def dummy_task(self):
gevent.sleep(3600)
def on_start(self) -> None:
self._running = True
host: str = self.environment.parsed_options.host
if host.startswith("ws") or host.startswith("wss"):
self.connect(host)
else:
raise ValueError("Invalid host provided. Expected ws or wss protocol")
self.subscribe_all()
def on_stop(self) -> None:
self.unsubscribe_all()
timeout = Timeout(30)
timeout.start()
try:
while self._requests:
gevent.sleep(1)
except Timeout:
self.logger.error("Timeout 30s - Failed to unsubscribe from all subscriptions")
timeout.close()
self._running = False
self.logger.debug("Unsubscribed from all subscriptions")
def connect(self, host: str):
self._ws = create_connection(host, skip_utf8_validation=True)
self._ws_greenlet = gevent.spawn(self.receive_loop)
def subscribe_all(self):
for i in range(len(self.subscriptions)):
self.subscribe(self.subscriptions[i], i)
def subscribe(self, subscription: WSSubscription, index: int):
self.send(rpc_call=subscription.subscribe_rpc_call, subscription_index=index)
def unsubscribe_all(self):
for i in range(len(self.subscriptions)):
self.unsubscribe(self.subscriptions[i], i)
def unsubscribe(self, subscription: WSSubscription, index: int):
params = [subscription.subscription_id]
self.send(method=subscription.unsubscribe_method, params=params, subscription_index=index)
def get_notification_name(self, parsed_response: dict):
# Override this method to return the name of the notification if this is not correct
return parsed_response["method"]
def on_message(self, message: str | bytes):
try:
parsed_json: dict = json.loads(message)
if "error" in parsed_json:
self.environment.events.request.fire(
request_type="WSJrpcErr",
name=f"JsonRPC Error {parsed_json['error']['code']}",
response_time=None,
response_length=len(message),
exception=None,
response=message,
)
return
if "id" not in parsed_json:
self.environment.events.request.fire(
request_type="WSNotif",
name=self.get_notification_name(parsed_json),
response_time=None,
response_length=len(message),
exception=None,
)
return
if request := self.get_request(parsed_json):
if request.subscription_index is not None:
self.subscriptions[request.subscription_index].subscription_id = parsed_json["result"]
self.subscriptions[request.subscription_index].subscribed = "subscribed"
self.subscription_ids_to_index.update({parsed_json["result"]: request.subscription_index})
self.environment.events.request.fire(
request_type="WSJrpc",
name=request.rpc_call.method,
response_time=((time.time_ns() - request.start_time) / 1_000_000).__round__(),
response_length=len(message),
exception=None,
)
else:
self.logger.error("Received message with unknown id")
except JSONDecodeError:
self.environment.events.request.fire(
request_type="WSErr",
name="JSONDecodeError",
response_time=None,
response_length=len(message),
exception=JSONDecodeError,
response=message,
)
def get_request(self, json_response: dict):
if json_response["id"] not in self._requests:
self.logger.error("Received message with unknown id")
self.logger.error(json_response)
return None
return self._requests.pop(json_response["id"])
def receive_loop(self):
try:
while self._running:
message = self._ws.recv()
self.logger.debug(f"WSResp: {message.strip()}")
self.on_message(message)
else:
self._ws.close()
except WebSocketConnectionClosedException:
self.environment.events.request.fire(
request_type="WSerr",
name="WebSocket Connection",
response_time=None,
response_length=0,
exception=WebSocketConnectionClosedException,
)
self._running = False
self.logger.error("Connection closed by server, trying to reconnect...")
self.on_start()
def send(
self,
rpc_call: RpcCall | None = None,
method: str | None = None,
params: dict | list | None = None,
subscription_index: int | None = None,
):
def _get_args():
if rpc_call:
return rpc_call
elif method:
return RpcCall(method, params)
else:
raise ValueError("Either rpc_call or method must be provided")
rpc_call = _get_args()
self.logger.debug(f"Sending: {rpc_call or method}")
if rpc_call is None:
raise ValueError("Either rpc_call or method must be provided")
if rpc_call is None and (method is not None):
rpc_call = RpcCall(method, params)
elif rpc_call is None and (method is None):
raise ValueError("Either rpc_call or method must be provided")
self._requests.update(
{rpc_call.request_id: WSRequest(rpc_call, start_time=time.time_ns(), subscription_index=subscription_index)}
)
json_body = json.dumps(rpc_call.request_body())
self.logger.debug(f"WSReq: {json_body.decode('utf-8')}")
if self._ws:
self._ws.send(json_body)
import random
import typing as t
import orjson as json
class RpcCall:
def __init__(self, method: str, params: list[t.Any] | dict | None = None, request_id: int | None = None) -> None:
self._request_id = request_id
self.method = method
self.params = params
@property
def request_id(self) -> int:
if self._request_id is None:
self._request_id = random.Random().randint(1, 100000000)
return self._request_id
def request_body(self, request_id: int | None = None) -> dict:
"""Generate a JSON-RPC request body."""
if self.params is None:
self.params = []
if type(self.params) is dict:
self.params = [self.params]
if request_id:
self._request_id = request_id
return {
"jsonrpc": "2.0",
"method": self.method,
"params": self.params,
"id": self.request_id,
}
def generate_batch_request_body(rpc_calls: list[RpcCall]) -> str:
"""Generate a batch JSON-RPC request body."""
return json.dumps([rpc_calls[i].request_body(i) for i in range(1, len(rpc_calls))]).decode("utf-8")
def expand_rpc_calls(rpc_calls_weighted: dict[t.Callable[[], RpcCall], int]) -> list[RpcCall]:
rpc_call_methods_weighted: dict[RpcCall, int] = {}
for rpc_call_method, weight in rpc_calls_weighted.items():
rpc_call_methods_weighted[rpc_call_method()] = weight
expanded_rpc_calls: list[RpcCall] = expand_to_list(rpc_call_methods_weighted)
return expanded_rpc_calls
def expand_to_list(items_weighted: dict[t.Any, int] | list[t.Any | tuple[t.Any, int]]) -> list[t.Any]:
expanded_items_list: list[t.Any] = []
if isinstance(items_weighted, dict):
items_weighted = list(items_weighted.items())
if isinstance(items_weighted, list):
for rpc_call in items_weighted:
if isinstance(rpc_call, tuple):
rpc_call, count = rpc_call
for _ in range(count):
expanded_items_list.append(rpc_call)
else:
expanded_items_list.append(rpc_call)
return expanded_items_list
+1
-1
from locust import task
from chainbench.user.http import RpcCall
from chainbench.user.jsonrpc import RpcCall
from chainbench.user.protocol.solana import SolanaUser

@@ -5,0 +5,0 @@

from locust import task
from chainbench.user.http import RpcCall
from chainbench.user.jsonrpc import RpcCall
from chainbench.user.protocol.solana import SolanaUser

@@ -5,0 +5,0 @@

@@ -1,2 +0,1 @@

import json
import logging

@@ -7,3 +6,5 @@ import typing as t

import orjson as json
from gevent.lock import Semaphore as GeventSemaphore
from orjson.orjson import OPT_SORT_KEYS
from tenacity import retry, stop_after_attempt

@@ -50,3 +51,3 @@

def to_json(self) -> str:
return json.dumps(self.__dict__)
return json.dumps(self.__dict__).decode("utf-8")

@@ -96,3 +97,3 @@

def to_json(self) -> str:
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
return json.dumps(self, default=lambda o: o.__dict__, option=OPT_SORT_KEYS).decode("utf-8")

@@ -99,0 +100,0 @@ def push_block(self, block: B) -> None:

@@ -1,2 +0,1 @@

import json
import logging

@@ -7,2 +6,3 @@ import typing as t

import orjson as json
from tenacity import retry, stop_after_attempt, wait_fixed

@@ -153,6 +153,8 @@

if isinstance(data, str):
if isinstance(data, (str, bytes)):
data_dict = json.loads(data)
else:
data_dict = data
logger.debug(f"data_dict type {type(data_dict)}")
logger.debug("Data: %s", data_dict)

@@ -159,0 +161,0 @@ slot = data_dict["block_number"]

@@ -1,2 +0,1 @@

import json
import logging

@@ -7,2 +6,4 @@ import typing as t

import orjson as json
from chainbench.util.rng import RNG, get_rng

@@ -9,0 +10,0 @@

@@ -1,2 +0,1 @@

import json
import logging

@@ -7,2 +6,3 @@ import typing as t

import orjson as json
from tenacity import retry, stop_after_attempt

@@ -9,0 +9,0 @@

@@ -1,5 +0,6 @@

import json
import logging
import typing as t
import orjson as json
from .blockchain import (

@@ -6,0 +7,0 @@ Account,

@@ -1,2 +0,1 @@

import json
import os

@@ -7,2 +6,3 @@ from dataclasses import dataclass

import orjson as json
from tenacity import retry, retry_if_exception_type, wait_exponential

@@ -9,0 +9,0 @@

@@ -5,3 +5,5 @@ from chainbench.user.protocol import EthBeaconUser, EvmUser, SolanaUser, StarkNetUser

from .common import get_subclass_tasks
from .http import HttpUser, JsonRpcUser
from .http import HttpUser
from .jsonrpc import JrpcHttpUser
from .wss import WssJrpcUser

@@ -17,6 +19,7 @@ # importing plugins here as all profiles depend on it

"HttpUser",
"JsonRpcUser",
"JrpcHttpUser",
"SolanaUser",
"StarkNetUser",
"WssJrpcUser",
"get_subclass_tasks",
]

@@ -1,45 +0,11 @@

import json
import logging
import random
import typing as t
from locust import FastHttpUser, TaskSet, tag, task
from locust import FastHttpUser, TaskSet
from locust.contrib.fasthttp import ResponseContextManager
from chainbench.test_data import TestData
from chainbench.util.rng import RNGManager
from chainbench.util.jsonrpc import expand_to_list
class RpcCall:
def __init__(self, method: str, params: list[t.Any] | dict | None = None) -> None:
self.method = method
self.params = params
def expand_rpc_calls(rpc_calls_weighted: dict[t.Callable[[], RpcCall], int]) -> list[RpcCall]:
rpc_call_methods_weighted: dict[RpcCall, int] = {}
for rpc_call_method, weight in rpc_calls_weighted.items():
rpc_call_methods_weighted[rpc_call_method()] = weight
expanded_rpc_calls: list[RpcCall] = expand_to_list(rpc_call_methods_weighted)
return expanded_rpc_calls
def expand_to_list(items_weighted: dict[t.Any, int] | list[t.Any | tuple[t.Any, int]]) -> list[t.Any]:
expanded_items_list: list[t.Any] = []
if isinstance(items_weighted, dict):
items_weighted = list(items_weighted.items())
if isinstance(items_weighted, list):
for rpc_call in items_weighted:
if isinstance(rpc_call, tuple):
rpc_call, count = rpc_call
for _ in range(count):
expanded_items_list.append(rpc_call)
else:
expanded_items_list.append(rpc_call)
return expanded_items_list
class HttpUser(FastHttpUser):

@@ -51,3 +17,2 @@ """Extension of FastHttpUser for Chainbench."""

logger = logging.getLogger(__name__)
rng = RNGManager()

@@ -123,162 +88,1 @@ connection_timeout = 120

return response
class JsonRpcUser(HttpUser):
"""Extension of HttpUser to provide JsonRPC support."""
abstract = True
rpc_path = ""
rpc_error_code_exclusions: list[int] = []
rpc_calls: dict[t.Callable, int] = {} # To be populated in the subclass load profile
calls_per_batch = 10 # default requests to include in a batch request
def __init__(self, environment: t.Any):
self.calls_per_batch = environment.parsed_options.batch_size
super().__init__(environment)
@tag("single")
@task
def rpc_call_task(self) -> None:
self.method_to_task_function(self.environment.parsed_options.method)(self)
@tag("batch")
@task
def batch_rpc_call_task(self) -> None:
rpc_calls = {getattr(self, method.__name__): weight for method, weight in self.rpc_calls.items()}
self.make_random_batch_rpc_call(
rpc_calls,
calls_per_batch=self.calls_per_batch,
)
@tag("batch_single")
@task
def batch_single_rpc_call_task(self) -> None:
rpc_call: RpcCall = self.method_to_rpc_call(self.environment.parsed_options.method)(self)
rpc_calls = [rpc_call for _ in range(self.calls_per_batch)]
self.make_batch_rpc_call(
rpc_calls,
)
@classmethod
def method_to_rpc_call(cls, method: str) -> t.Callable:
method_name = cls.method_to_function_name(method)
return getattr(cls, method_name)
def check_json_rpc_response(self, response: ResponseContextManager, name: str) -> None:
CHUNK_SIZE = 1024
if response.text is None:
self.logger.error(f"Response for {name} is empty")
response.failure(f"Response for {name} is empty")
return
data = response.text[:CHUNK_SIZE]
if "jsonrpc" not in data:
self.logger.error(f"Response for {name} is not a JSON-RPC: {response.text}")
response.failure(f"Response for {name} is not a JSON-RPC")
return
if "error" in data:
response_js: list | dict = response.json()
if isinstance(response_js, dict):
response_js = [response_js]
if isinstance(response_js, list):
for response_js_item in response_js:
if "error" in response_js_item:
if "code" in response_js_item["error"]:
self.logger.error(f"Response for {name} has a JSON-RPC error: {response.text}")
if response_js_item["error"]["code"] not in self.rpc_error_code_exclusions:
response.failure(
f"Response for {name} has a JSON-RPC error {response_js_item['error']['code']} - "
f"{response_js_item['error']['message']}"
)
return
response.failure("Unspecified JSON-RPC error")
self.logger.error(f"Unspecified JSON-RPC error: {response.text}")
return
# TODO: handle multiple errors in batch response properly
if "result" not in data:
response.failure(f"Response for {name} call has no result")
self.logger.error(f"Response for {name} call has no result: {response.text}")
def make_rpc_call(
self,
rpc_call: RpcCall | None = None,
method: str | None = None,
params: list[t.Any] | dict | None = None,
name: str = "",
path: str = "",
) -> None:
"""Make a JSON-RPC call."""
if rpc_call is not None:
method = rpc_call.method
params = rpc_call.params
if name == "" and method is not None:
name = method
with self.client.request(
"POST", self.rpc_path + path, json=generate_request_body(method, params), name=name, catch_response=True
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)
def make_batch_rpc_call(self, rpc_calls: list[RpcCall], name: str = "", path: str = "") -> None:
"""Make a Batch JSON-RPC call."""
if name == "":
name = f"Batch RPC ({len(rpc_calls)})"
headers = {"Content-Type": "application/json", "accept": "application/json"}
with self.client.request(
"POST",
self.rpc_path + path,
data=generate_batch_request_body(rpc_calls),
name=name,
catch_response=True,
headers=headers,
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)
def make_random_batch_rpc_call(
self,
weighted_rpc_calls: dict[t.Callable[[], RpcCall], int],
calls_per_batch: int,
name: str = "",
path: str = "",
) -> None:
"""Make a Batch JSON-RPC call."""
rpc_calls: list[RpcCall] = expand_rpc_calls(weighted_rpc_calls)
random_rpc_calls: list[RpcCall] = random.choices(rpc_calls, k=calls_per_batch)
self.make_batch_rpc_call(random_rpc_calls, name=name, path=path)
def generate_request_body(
method: str | None = None, params: list | dict | None = None, request_id: int | None = None, version: str = "2.0"
) -> dict:
"""Generate a JSON-RPC request body."""
if params is None:
params = []
if request_id is None:
request_id = random.randint(1, 100000000)
return {
"jsonrpc": version,
"method": method,
"params": params,
"id": request_id,
}
def generate_batch_request_body(rpc_calls: list[RpcCall], version: str = "2.0") -> str:
"""Generate a batch JSON-RPC request body."""
return json.dumps(
[
generate_request_body(rpc_calls[i].method, rpc_calls[i].params, request_id=i, version=version)
for i in range(1, len(rpc_calls))
]
)
import logging
import typing as t
from locust import task
from locust import tag, task
from chainbench.test_data.ethereum import EthBeaconTestData
from chainbench.user.http import HttpUser
from chainbench.user.wss import WSSubscription
from chainbench.util.rng import RNGManager

@@ -15,2 +17,3 @@ logger = logging.getLogger(__name__)

test_data = EthBeaconTestData()
rng = RNGManager()

@@ -330,4 +333,11 @@ def eth_beacon_blocks_request(

class TestEthMethod(EthBeaconUser):
@tag("single")
@task
def run_task(self) -> None:
self.method_to_task_function(self.environment.parsed_options.method)()
self.logger.info(f"Running task {self.environment.parsed_options.method}")
self.method_to_task_function(self.environment.parsed_options.method)(self)
class EthSubscribe(WSSubscription):
def __init__(self, params: dict | list):
super().__init__("eth_subscribe", params, "eth_unsubscribe")

@@ -11,10 +11,12 @@ import re

)
from chainbench.user.http import JsonRpcUser, RpcCall
from chainbench.user.jsonrpc import JrpcHttpUser
from chainbench.user.tag import tag
from chainbench.util.rng import RNG
from chainbench.util.jsonrpc import RpcCall
from chainbench.util.rng import RNG, RNGManager
class EvmBaseUser(JsonRpcUser):
class EvmBaseUser(JrpcHttpUser):
abstract = True
test_data = EvmTestData()
test_data: EvmTestData = EvmTestData()
rng = RNGManager()

@@ -21,0 +23,0 @@ _default_trace_timeout = "120s"

@@ -7,9 +7,11 @@ import base64

from chainbench.test_data import Account, BlockNumber, SolanaTestData, TxHash
from chainbench.user.http import JsonRpcUser, RpcCall
from chainbench.util.rng import RNG
from chainbench.user.jsonrpc import JrpcHttpUser
from chainbench.util.jsonrpc import RpcCall
from chainbench.util.rng import RNG, RNGManager
class SolanaBaseUser(JsonRpcUser):
class SolanaBaseUser(JrpcHttpUser):
abstract = True
test_data = SolanaTestData()
rng = RNGManager()
rpc_error_code_exclusions = [-32007]

@@ -16,0 +18,0 @@

@@ -5,7 +5,7 @@ import typing as t

from chainbench.test_data.blockchain import Account, TxHash
from chainbench.user.http import JsonRpcUser
from chainbench.user.jsonrpc import JrpcHttpUser
from chainbench.util.rng import RNG
class StarkNetUser(JsonRpcUser):
class StarkNetUser(JrpcHttpUser):
abstract = True

@@ -12,0 +12,0 @@ test_data = StarkNetTestData()

@@ -159,3 +159,3 @@ import subprocess

f"locust -f {self.profile_path} --worker --master-host {self.host} --master-port {self.port} "
f"--logfile {self.results_path}/worker_{worker_id}.log --loglevel {self.log_level}"
f"--logfile {self.results_path}/worker_{worker_id}.log --loglevel {self.log_level} --stop-timeout 30"
)

@@ -162,0 +162,0 @@ return self.get_extra_options(command)

@@ -182,55 +182,62 @@ import logging

if not hasattr(user, "test_data"):
raise AttributeError(f"{user} class does not have 'test_data' attribute")
user_test_data: TestData = getattr(user, "test_data")
test_data_class_name: str = type(user_test_data).__name__
if test_data_class_name in test_data:
continue
logger.info(f"Initializing test data for {test_data_class_name}")
print(f"Initializing test data for {test_data_class_name}")
if environment.host:
user_test_data.init_http_client(environment.host)
if isinstance(user_test_data, EvmTestData):
chain_id: ChainId = user_test_data.fetch_chain_id()
user_test_data.init_network(chain_id)
logger.info(f"Target endpoint network is {user_test_data.network.name}")
print(f"Target endpoint network is {user_test_data.network.name}")
test_data["chain_id"] = {test_data_class_name: chain_id}
if environment.parsed_options:
user_test_data.init_data(environment.parsed_options)
test_data[test_data_class_name] = user_test_data.data.to_json()
send_msg_to_workers(environment.runner, "test_data", test_data)
print("Fetching blocks...")
if environment.parsed_options.use_latest_blocks:
print(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
logger.info(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
for block_number in range(
user_test_data.data.block_range.start, user_test_data.data.block_range.end + 1
):
try:
block = user_test_data.fetch_block(block_number)
except (BlockNotFoundError, InvalidBlockError):
block = user_test_data.fetch_latest_block()
user_test_data.data.push_block(block)
block_data = {test_data_class_name: block.to_json()}
send_msg_to_workers(environment.runner, "block_data", block_data)
print(user_test_data.data.stats(), end="\r")
else:
print(user_test_data.data.stats(), end="\r")
print("\n") # new line after progress display upon exiting loop
logger.warning(f"{user} class does not have 'test_data' attribute")
else:
while user_test_data.data.size.blocks_len > len(user_test_data.data.blocks):
try:
block = user_test_data.fetch_random_block(user_test_data.data.block_numbers)
except (BlockNotFoundError, InvalidBlockError):
continue
user_test_data.data.push_block(block)
block_data = {test_data_class_name: block.to_json()}
send_msg_to_workers(environment.runner, "block_data", block_data)
print(user_test_data.data.stats(), end="\r")
user_test_data: TestData = getattr(user, "test_data")
test_data_class_name: str = type(user_test_data).__name__
if test_data_class_name in test_data:
continue
logger.info(f"Initializing test data for {test_data_class_name}")
print(f"Initializing test data for {test_data_class_name}")
if environment.host:
user_test_data.init_http_client(environment.host)
if isinstance(user_test_data, EvmTestData):
chain_id: ChainId = user_test_data.fetch_chain_id()
user_test_data.init_network(chain_id)
logger.info(f"Target endpoint network is {user_test_data.network.name}")
print(f"Target endpoint network is {user_test_data.network.name}")
test_data["chain_id"] = {test_data_class_name: chain_id}
if environment.parsed_options:
user_test_data.init_data(environment.parsed_options)
test_data[test_data_class_name] = user_test_data.data.to_json()
send_msg_to_workers(environment.runner, "test_data", test_data)
print("Fetching blocks...")
if environment.parsed_options.use_latest_blocks:
print(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
logger.info(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
for block_number in range(
user_test_data.data.block_range.start, user_test_data.data.block_range.end + 1
):
block = None
try:
block = user_test_data.fetch_block(block_number)
except (BlockNotFoundError, InvalidBlockError):
pass
while block is None:
try:
block = user_test_data.fetch_latest_block()
except (BlockNotFoundError, InvalidBlockError):
pass
user_test_data.data.push_block(block)
block_data = {test_data_class_name: block.to_json()}
send_msg_to_workers(environment.runner, "block_data", block_data)
print(user_test_data.data.stats(), end="\r")
else:
print(user_test_data.data.stats(), end="\r")
print("\n") # new line after progress display upon exiting loop
else:
print(user_test_data.data.stats(), end="\r")
print("\n") # new line after progress display upon exiting loop
logger.info("Test data is ready")
send_msg_to_workers(environment.runner, "release_lock", {})
user_test_data.release_lock()
while user_test_data.data.size.blocks_len > len(user_test_data.data.blocks):
try:
block = user_test_data.fetch_random_block(user_test_data.data.block_numbers)
except (BlockNotFoundError, InvalidBlockError):
continue
user_test_data.data.push_block(block)
block_data = {test_data_class_name: block.to_json()}
send_msg_to_workers(environment.runner, "block_data", block_data)
print(user_test_data.data.stats(), end="\r")
else:
print(user_test_data.data.stats(), end="\r")
print("\n") # new line after progress display upon exiting loop
logger.info("Test data is ready")
send_msg_to_workers(environment.runner, "release_lock", {})
user_test_data.release_lock()
except Exception as e:

@@ -237,0 +244,0 @@ logger.error(f"Failed to init test data: {e.__class__.__name__}: {e}. Exiting...")

@@ -1,2 +0,1 @@

import json
import logging

@@ -7,7 +6,8 @@ import typing as t

from functools import cached_property
from json import JSONDecodeError
from secrets import token_hex
import orjson as json
from geventhttpclient import URL, HTTPClient
from geventhttpclient.response import HTTPSocketPoolResponse
from orjson import JSONDecodeError

@@ -146,3 +146,3 @@ logger = logging.getLogger(__name__)

if isinstance(data, dict):
body = json.dumps(data).encode("utf-8")
body = json.dumps(data)
elif isinstance(data, bytes):

@@ -149,0 +149,0 @@ body = data

import csv
import logging
from datetime import datetime, timedelta
from json import JSONDecodeError
from pathlib import Path

@@ -9,2 +8,3 @@ from time import sleep

from locust.util.timespan import parse_timespan
from orjson import JSONDecodeError

@@ -11,0 +11,0 @@ from .http import HttpClient

Metadata-Version: 2.1
Name: chainbench
Version: 0.7.5
Version: 0.8.0
Summary:

@@ -17,4 +17,7 @@ Author: Egor Molodik

Requires-Dist: locust-plugins[dashboards] (>=4.4.2,<5.0.0)
Requires-Dist: orjson (>=3.10.6,<4.0.0)
Requires-Dist: solders (>=0.21.0,<0.22.0)
Requires-Dist: tenacity (>=8.2.2,<9.0.0)
Requires-Dist: websocket-client (>=1.8.0,<2.0.0)
Requires-Dist: wsaccel (>=0.6.6,<0.7.0)
Description-Content-Type: text/markdown

@@ -21,0 +24,0 @@

[tool.poetry]
name = "chainbench"
version = "0.7.5"
version = "0.8.0"
description = ""

@@ -21,2 +21,5 @@ authors = [

solders = "^0.21.0"
websocket-client = "^1.8.0"
orjson = "^3.10.6"
wsaccel = "^0.6.6"

@@ -23,0 +26,0 @@ [tool.poetry.group.dev.dependencies]