chainbench
Advanced tools
| from chainbench.user import WssJrpcUser | ||
| from chainbench.user.protocol.ethereum import EthSubscribe | ||
| class EthSubscriptions(WssJrpcUser): | ||
| subscriptions = [ | ||
| EthSubscribe(["newHeads"]), | ||
| # logs subscription for approve method signature | ||
| EthSubscribe(["logs", {"topics": ["0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"]}]), | ||
| EthSubscribe(["newPendingTransactions"]), | ||
| ] | ||
| def get_notification_name(self, parsed_response: dict): | ||
| return self.get_subscription( | ||
| subscription_id=parsed_response["params"]["subscription"] | ||
| ).subscribe_rpc_call.params[0] |
| import random | ||
| import typing as t | ||
| from locust import tag, task | ||
| from locust.contrib.fasthttp import ResponseContextManager | ||
| from chainbench.user.http import HttpUser | ||
| from chainbench.util.jsonrpc import ( | ||
| RpcCall, | ||
| expand_rpc_calls, | ||
| generate_batch_request_body, | ||
| ) | ||
| class JrpcHttpUser(HttpUser): | ||
| """Extension of HttpUser to provide JsonRPC support.""" | ||
| abstract = True | ||
| rpc_path = "" | ||
| rpc_error_code_exclusions: list[int] = [] | ||
| rpc_calls: dict[t.Callable, int] = {} # To be populated in the subclass load profile | ||
| calls_per_batch = 10 # default requests to include in a batch request | ||
| def __init__(self, environment: t.Any): | ||
| self.calls_per_batch = environment.parsed_options.batch_size | ||
| super().__init__(environment) | ||
| @tag("single") | ||
| @task | ||
| def rpc_call_task(self) -> None: | ||
| self.method_to_task_function(self.environment.parsed_options.method)(self) | ||
| @tag("batch") | ||
| @task | ||
| def batch_rpc_call_task(self) -> None: | ||
| rpc_calls = {getattr(self, method.__name__): weight for method, weight in self.rpc_calls.items()} | ||
| self.make_random_batch_rpc_call( | ||
| rpc_calls, | ||
| calls_per_batch=self.calls_per_batch, | ||
| ) | ||
| @tag("batch_single") | ||
| @task | ||
| def batch_single_rpc_call_task(self) -> None: | ||
| rpc_call: RpcCall = self.method_to_rpc_call(self.environment.parsed_options.method)(self) | ||
| rpc_calls = [rpc_call for _ in range(self.calls_per_batch)] | ||
| self.make_batch_rpc_call( | ||
| rpc_calls, | ||
| ) | ||
| @classmethod | ||
| def method_to_rpc_call(cls, method: str) -> t.Callable: | ||
| method_name = cls.method_to_function_name(method) | ||
| return getattr(cls, method_name) | ||
| def check_json_rpc_response(self, response: ResponseContextManager, name: str) -> None: | ||
| CHUNK_SIZE = 1024 | ||
| if response.text is None: | ||
| self.logger.error(f"Response for {name} is empty") | ||
| response.failure(f"Response for {name} is empty") | ||
| return | ||
| data = response.text[:CHUNK_SIZE] | ||
| if "jsonrpc" not in data: | ||
| self.logger.error(f"Response for {name} is not a JSON-RPC: {response.text}") | ||
| response.failure(f"Response for {name} is not a JSON-RPC") | ||
| return | ||
| if "error" in data: | ||
| response_js: list | dict = response.json() | ||
| if isinstance(response_js, dict): | ||
| response_js = [response_js] | ||
| if isinstance(response_js, list): | ||
| for response_js_item in response_js: | ||
| if "error" in response_js_item: | ||
| if "code" in response_js_item["error"]: | ||
| self.logger.error(f"Response for {name} has a JSON-RPC error: {response.text}") | ||
| if response_js_item["error"]["code"] not in self.rpc_error_code_exclusions: | ||
| response.failure( | ||
| f"Response for {name} has a JSON-RPC error {response_js_item['error']['code']} - " | ||
| f"{response_js_item['error']['message']}" | ||
| ) | ||
| return | ||
| response.failure("Unspecified JSON-RPC error") | ||
| self.logger.error(f"Unspecified JSON-RPC error: {response.text}") | ||
| return | ||
| # TODO: handle multiple errors in batch response properly | ||
| if "result" not in data: | ||
| response.failure(f"Response for {name} call has no result") | ||
| self.logger.error(f"Response for {name} call has no result: {response.text}") | ||
| def make_rpc_call( | ||
| self, | ||
| rpc_call: RpcCall | None = None, | ||
| method: str | None = None, | ||
| params: list[t.Any] | dict | None = None, | ||
| name: str = "", | ||
| path: str = "", | ||
| ) -> None: | ||
| """Make a JSON-RPC call.""" | ||
| if rpc_call is None: | ||
| if method is None: | ||
| raise ValueError("Either rpc_call or method must be provided") | ||
| else: | ||
| rpc_call = RpcCall(method, params) | ||
| name = method | ||
| else: | ||
| name = rpc_call.method | ||
| with self.client.request( | ||
| "POST", self.rpc_path + path, json=rpc_call.request_body(), name=name, catch_response=True | ||
| ) as response: | ||
| self.check_http_error(response) | ||
| self.check_json_rpc_response(response, name=name) | ||
| def make_batch_rpc_call(self, rpc_calls: list[RpcCall], name: str = "", path: str = "") -> None: | ||
| """Make a Batch JSON-RPC call.""" | ||
| if name == "": | ||
| name = f"Batch RPC ({len(rpc_calls)})" | ||
| headers = {"Content-Type": "application/json", "accept": "application/json"} | ||
| with self.client.request( | ||
| "POST", | ||
| self.rpc_path + path, | ||
| data=generate_batch_request_body(rpc_calls), | ||
| name=name, | ||
| catch_response=True, | ||
| headers=headers, | ||
| ) as response: | ||
| self.check_http_error(response) | ||
| self.check_json_rpc_response(response, name=name) | ||
| def make_random_batch_rpc_call( | ||
| self, | ||
| weighted_rpc_calls: dict[t.Callable[[], RpcCall], int], | ||
| calls_per_batch: int, | ||
| name: str = "", | ||
| path: str = "", | ||
| ) -> None: | ||
| """Make a Batch JSON-RPC call.""" | ||
| rpc_calls: list[RpcCall] = expand_rpc_calls(weighted_rpc_calls) | ||
| random_rpc_calls: list[RpcCall] = random.choices(rpc_calls, k=calls_per_batch) | ||
| self.make_batch_rpc_call(random_rpc_calls, name=name, path=path) |
| import logging | ||
| import time | ||
| import gevent | ||
| import orjson as json | ||
| from gevent import Greenlet, Timeout | ||
| from locust import User, task | ||
| from locust.env import Environment | ||
| from orjson import JSONDecodeError | ||
| from websocket import WebSocket, WebSocketConnectionClosedException, create_connection | ||
| from chainbench.util.jsonrpc import RpcCall | ||
| class WSSubscription: | ||
| def __init__(self, subscribe_method: str, subscribe_params: dict | list, unsubscribe_method: str): | ||
| self.subscribe_rpc_call: RpcCall = RpcCall(subscribe_method, subscribe_params) | ||
| self.unsubscribe_method: str = unsubscribe_method | ||
| self.subscribed: bool = False | ||
| self._subscription_id: int | str | None = None | ||
| @property | ||
| def subscription_id(self): | ||
| return self._subscription_id | ||
| @subscription_id.setter | ||
| def subscription_id(self, value: int | str): | ||
| self._subscription_id = value | ||
| self.subscribed = True | ||
| @subscription_id.deleter | ||
| def subscription_id(self): | ||
| self._subscription_id = None | ||
| self.subscribed = False | ||
| class WSRequest: | ||
| def __init__(self, rpc_call: RpcCall, start_time: int, subscription_index: int | None = None): | ||
| self.rpc_call = rpc_call | ||
| self.start_time = start_time | ||
| self.subscription_index = subscription_index | ||
| class WssJrpcUser(User): | ||
| abstract = True | ||
| logger = logging.getLogger(__name__) | ||
| # To be populated by subclass | ||
| subscriptions: list[WSSubscription] = [] | ||
| subscription_ids_to_index: dict[str | int, int] = {} | ||
| def __init__(self, environment: Environment): | ||
| super().__init__(environment) | ||
| self._ws: WebSocket | None = None | ||
| self._ws_greenlet: Greenlet | None = None | ||
| self._requests: dict[int, WSRequest] = {} | ||
| self._running: bool = False | ||
| def get_subscription(self, subscription_id: str | int): | ||
| return self.subscriptions[self.subscription_ids_to_index[subscription_id]] | ||
| @task | ||
| def dummy_task(self): | ||
| gevent.sleep(3600) | ||
| def on_start(self) -> None: | ||
| self._running = True | ||
| host: str = self.environment.parsed_options.host | ||
| if host.startswith("ws") or host.startswith("wss"): | ||
| self.connect(host) | ||
| else: | ||
| raise ValueError("Invalid host provided. Expected ws or wss protocol") | ||
| self.subscribe_all() | ||
| def on_stop(self) -> None: | ||
| self.unsubscribe_all() | ||
| timeout = Timeout(30) | ||
| timeout.start() | ||
| try: | ||
| while self._requests: | ||
| gevent.sleep(1) | ||
| except Timeout: | ||
| self.logger.error("Timeout 30s - Failed to unsubscribe from all subscriptions") | ||
| timeout.close() | ||
| self._running = False | ||
| self.logger.debug("Unsubscribed from all subscriptions") | ||
| def connect(self, host: str): | ||
| self._ws = create_connection(host, skip_utf8_validation=True) | ||
| self._ws_greenlet = gevent.spawn(self.receive_loop) | ||
| def subscribe_all(self): | ||
| for i in range(len(self.subscriptions)): | ||
| self.subscribe(self.subscriptions[i], i) | ||
| def subscribe(self, subscription: WSSubscription, index: int): | ||
| self.send(rpc_call=subscription.subscribe_rpc_call, subscription_index=index) | ||
| def unsubscribe_all(self): | ||
| for i in range(len(self.subscriptions)): | ||
| self.unsubscribe(self.subscriptions[i], i) | ||
| def unsubscribe(self, subscription: WSSubscription, index: int): | ||
| params = [subscription.subscription_id] | ||
| self.send(method=subscription.unsubscribe_method, params=params, subscription_index=index) | ||
| def get_notification_name(self, parsed_response: dict): | ||
| # Override this method to return the name of the notification if this is not correct | ||
| return parsed_response["method"] | ||
| def on_message(self, message: str | bytes): | ||
| try: | ||
| parsed_json: dict = json.loads(message) | ||
| if "error" in parsed_json: | ||
| self.environment.events.request.fire( | ||
| request_type="WSJrpcErr", | ||
| name=f"JsonRPC Error {parsed_json['error']['code']}", | ||
| response_time=None, | ||
| response_length=len(message), | ||
| exception=None, | ||
| response=message, | ||
| ) | ||
| return | ||
| if "id" not in parsed_json: | ||
| self.environment.events.request.fire( | ||
| request_type="WSNotif", | ||
| name=self.get_notification_name(parsed_json), | ||
| response_time=None, | ||
| response_length=len(message), | ||
| exception=None, | ||
| ) | ||
| return | ||
| if request := self.get_request(parsed_json): | ||
| if request.subscription_index is not None: | ||
| self.subscriptions[request.subscription_index].subscription_id = parsed_json["result"] | ||
| self.subscriptions[request.subscription_index].subscribed = "subscribed" | ||
| self.subscription_ids_to_index.update({parsed_json["result"]: request.subscription_index}) | ||
| self.environment.events.request.fire( | ||
| request_type="WSJrpc", | ||
| name=request.rpc_call.method, | ||
| response_time=((time.time_ns() - request.start_time) / 1_000_000).__round__(), | ||
| response_length=len(message), | ||
| exception=None, | ||
| ) | ||
| else: | ||
| self.logger.error("Received message with unknown id") | ||
| except JSONDecodeError: | ||
| self.environment.events.request.fire( | ||
| request_type="WSErr", | ||
| name="JSONDecodeError", | ||
| response_time=None, | ||
| response_length=len(message), | ||
| exception=JSONDecodeError, | ||
| response=message, | ||
| ) | ||
| def get_request(self, json_response: dict): | ||
| if json_response["id"] not in self._requests: | ||
| self.logger.error("Received message with unknown id") | ||
| self.logger.error(json_response) | ||
| return None | ||
| return self._requests.pop(json_response["id"]) | ||
| def receive_loop(self): | ||
| try: | ||
| while self._running: | ||
| message = self._ws.recv() | ||
| self.logger.debug(f"WSResp: {message.strip()}") | ||
| self.on_message(message) | ||
| else: | ||
| self._ws.close() | ||
| except WebSocketConnectionClosedException: | ||
| self.environment.events.request.fire( | ||
| request_type="WSerr", | ||
| name="WebSocket Connection", | ||
| response_time=None, | ||
| response_length=0, | ||
| exception=WebSocketConnectionClosedException, | ||
| ) | ||
| self._running = False | ||
| self.logger.error("Connection closed by server, trying to reconnect...") | ||
| self.on_start() | ||
| def send( | ||
| self, | ||
| rpc_call: RpcCall | None = None, | ||
| method: str | None = None, | ||
| params: dict | list | None = None, | ||
| subscription_index: int | None = None, | ||
| ): | ||
| def _get_args(): | ||
| if rpc_call: | ||
| return rpc_call | ||
| elif method: | ||
| return RpcCall(method, params) | ||
| else: | ||
| raise ValueError("Either rpc_call or method must be provided") | ||
| rpc_call = _get_args() | ||
| self.logger.debug(f"Sending: {rpc_call or method}") | ||
| if rpc_call is None: | ||
| raise ValueError("Either rpc_call or method must be provided") | ||
| if rpc_call is None and (method is not None): | ||
| rpc_call = RpcCall(method, params) | ||
| elif rpc_call is None and (method is None): | ||
| raise ValueError("Either rpc_call or method must be provided") | ||
| self._requests.update( | ||
| {rpc_call.request_id: WSRequest(rpc_call, start_time=time.time_ns(), subscription_index=subscription_index)} | ||
| ) | ||
| json_body = json.dumps(rpc_call.request_body()) | ||
| self.logger.debug(f"WSReq: {json_body.decode('utf-8')}") | ||
| if self._ws: | ||
| self._ws.send(json_body) |
| import random | ||
| import typing as t | ||
| import orjson as json | ||
| class RpcCall: | ||
| def __init__(self, method: str, params: list[t.Any] | dict | None = None, request_id: int | None = None) -> None: | ||
| self._request_id = request_id | ||
| self.method = method | ||
| self.params = params | ||
| @property | ||
| def request_id(self) -> int: | ||
| if self._request_id is None: | ||
| self._request_id = random.Random().randint(1, 100000000) | ||
| return self._request_id | ||
| def request_body(self, request_id: int | None = None) -> dict: | ||
| """Generate a JSON-RPC request body.""" | ||
| if self.params is None: | ||
| self.params = [] | ||
| if type(self.params) is dict: | ||
| self.params = [self.params] | ||
| if request_id: | ||
| self._request_id = request_id | ||
| return { | ||
| "jsonrpc": "2.0", | ||
| "method": self.method, | ||
| "params": self.params, | ||
| "id": self.request_id, | ||
| } | ||
| def generate_batch_request_body(rpc_calls: list[RpcCall]) -> str: | ||
| """Generate a batch JSON-RPC request body.""" | ||
| return json.dumps([rpc_calls[i].request_body(i) for i in range(1, len(rpc_calls))]).decode("utf-8") | ||
| def expand_rpc_calls(rpc_calls_weighted: dict[t.Callable[[], RpcCall], int]) -> list[RpcCall]: | ||
| rpc_call_methods_weighted: dict[RpcCall, int] = {} | ||
| for rpc_call_method, weight in rpc_calls_weighted.items(): | ||
| rpc_call_methods_weighted[rpc_call_method()] = weight | ||
| expanded_rpc_calls: list[RpcCall] = expand_to_list(rpc_call_methods_weighted) | ||
| return expanded_rpc_calls | ||
| def expand_to_list(items_weighted: dict[t.Any, int] | list[t.Any | tuple[t.Any, int]]) -> list[t.Any]: | ||
| expanded_items_list: list[t.Any] = [] | ||
| if isinstance(items_weighted, dict): | ||
| items_weighted = list(items_weighted.items()) | ||
| if isinstance(items_weighted, list): | ||
| for rpc_call in items_weighted: | ||
| if isinstance(rpc_call, tuple): | ||
| rpc_call, count = rpc_call | ||
| for _ in range(count): | ||
| expanded_items_list.append(rpc_call) | ||
| else: | ||
| expanded_items_list.append(rpc_call) | ||
| return expanded_items_list |
| from locust import task | ||
| from chainbench.user.http import RpcCall | ||
| from chainbench.user.jsonrpc import RpcCall | ||
| from chainbench.user.protocol.solana import SolanaUser | ||
@@ -5,0 +5,0 @@ |
| from locust import task | ||
| from chainbench.user.http import RpcCall | ||
| from chainbench.user.jsonrpc import RpcCall | ||
| from chainbench.user.protocol.solana import SolanaUser | ||
@@ -5,0 +5,0 @@ |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import logging | ||
@@ -7,3 +6,5 @@ import typing as t | ||
| import orjson as json | ||
| from gevent.lock import Semaphore as GeventSemaphore | ||
| from orjson.orjson import OPT_SORT_KEYS | ||
| from tenacity import retry, stop_after_attempt | ||
@@ -50,3 +51,3 @@ | ||
| def to_json(self) -> str: | ||
| return json.dumps(self.__dict__) | ||
| return json.dumps(self.__dict__).decode("utf-8") | ||
@@ -96,3 +97,3 @@ | ||
| def to_json(self) -> str: | ||
| return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True) | ||
| return json.dumps(self, default=lambda o: o.__dict__, option=OPT_SORT_KEYS).decode("utf-8") | ||
@@ -99,0 +100,0 @@ def push_block(self, block: B) -> None: |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import logging | ||
@@ -7,2 +6,3 @@ import typing as t | ||
| import orjson as json | ||
| from tenacity import retry, stop_after_attempt, wait_fixed | ||
@@ -153,6 +153,8 @@ | ||
| if isinstance(data, str): | ||
| if isinstance(data, (str, bytes)): | ||
| data_dict = json.loads(data) | ||
| else: | ||
| data_dict = data | ||
| logger.debug(f"data_dict type {type(data_dict)}") | ||
| logger.debug("Data: %s", data_dict) | ||
@@ -159,0 +161,0 @@ slot = data_dict["block_number"] |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import logging | ||
@@ -7,2 +6,4 @@ import typing as t | ||
| import orjson as json | ||
| from chainbench.util.rng import RNG, get_rng | ||
@@ -9,0 +10,0 @@ |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import logging | ||
@@ -7,2 +6,3 @@ import typing as t | ||
| import orjson as json | ||
| from tenacity import retry, stop_after_attempt | ||
@@ -9,0 +9,0 @@ |
@@ -1,5 +0,6 @@ | ||
| import json | ||
| import logging | ||
| import typing as t | ||
| import orjson as json | ||
| from .blockchain import ( | ||
@@ -6,0 +7,0 @@ Account, |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import os | ||
@@ -7,2 +6,3 @@ from dataclasses import dataclass | ||
| import orjson as json | ||
| from tenacity import retry, retry_if_exception_type, wait_exponential | ||
@@ -9,0 +9,0 @@ |
@@ -5,3 +5,5 @@ from chainbench.user.protocol import EthBeaconUser, EvmUser, SolanaUser, StarkNetUser | ||
| from .common import get_subclass_tasks | ||
| from .http import HttpUser, JsonRpcUser | ||
| from .http import HttpUser | ||
| from .jsonrpc import JrpcHttpUser | ||
| from .wss import WssJrpcUser | ||
@@ -17,6 +19,7 @@ # importing plugins here as all profiles depend on it | ||
| "HttpUser", | ||
| "JsonRpcUser", | ||
| "JrpcHttpUser", | ||
| "SolanaUser", | ||
| "StarkNetUser", | ||
| "WssJrpcUser", | ||
| "get_subclass_tasks", | ||
| ] |
+2
-198
@@ -1,45 +0,11 @@ | ||
| import json | ||
| import logging | ||
| import random | ||
| import typing as t | ||
| from locust import FastHttpUser, TaskSet, tag, task | ||
| from locust import FastHttpUser, TaskSet | ||
| from locust.contrib.fasthttp import ResponseContextManager | ||
| from chainbench.test_data import TestData | ||
| from chainbench.util.rng import RNGManager | ||
| from chainbench.util.jsonrpc import expand_to_list | ||
| class RpcCall: | ||
| def __init__(self, method: str, params: list[t.Any] | dict | None = None) -> None: | ||
| self.method = method | ||
| self.params = params | ||
| def expand_rpc_calls(rpc_calls_weighted: dict[t.Callable[[], RpcCall], int]) -> list[RpcCall]: | ||
| rpc_call_methods_weighted: dict[RpcCall, int] = {} | ||
| for rpc_call_method, weight in rpc_calls_weighted.items(): | ||
| rpc_call_methods_weighted[rpc_call_method()] = weight | ||
| expanded_rpc_calls: list[RpcCall] = expand_to_list(rpc_call_methods_weighted) | ||
| return expanded_rpc_calls | ||
| def expand_to_list(items_weighted: dict[t.Any, int] | list[t.Any | tuple[t.Any, int]]) -> list[t.Any]: | ||
| expanded_items_list: list[t.Any] = [] | ||
| if isinstance(items_weighted, dict): | ||
| items_weighted = list(items_weighted.items()) | ||
| if isinstance(items_weighted, list): | ||
| for rpc_call in items_weighted: | ||
| if isinstance(rpc_call, tuple): | ||
| rpc_call, count = rpc_call | ||
| for _ in range(count): | ||
| expanded_items_list.append(rpc_call) | ||
| else: | ||
| expanded_items_list.append(rpc_call) | ||
| return expanded_items_list | ||
| class HttpUser(FastHttpUser): | ||
@@ -51,3 +17,2 @@ """Extension of FastHttpUser for Chainbench.""" | ||
| logger = logging.getLogger(__name__) | ||
| rng = RNGManager() | ||
@@ -123,162 +88,1 @@ connection_timeout = 120 | ||
| return response | ||
| class JsonRpcUser(HttpUser): | ||
| """Extension of HttpUser to provide JsonRPC support.""" | ||
| abstract = True | ||
| rpc_path = "" | ||
| rpc_error_code_exclusions: list[int] = [] | ||
| rpc_calls: dict[t.Callable, int] = {} # To be populated in the subclass load profile | ||
| calls_per_batch = 10 # default requests to include in a batch request | ||
| def __init__(self, environment: t.Any): | ||
| self.calls_per_batch = environment.parsed_options.batch_size | ||
| super().__init__(environment) | ||
| @tag("single") | ||
| @task | ||
| def rpc_call_task(self) -> None: | ||
| self.method_to_task_function(self.environment.parsed_options.method)(self) | ||
| @tag("batch") | ||
| @task | ||
| def batch_rpc_call_task(self) -> None: | ||
| rpc_calls = {getattr(self, method.__name__): weight for method, weight in self.rpc_calls.items()} | ||
| self.make_random_batch_rpc_call( | ||
| rpc_calls, | ||
| calls_per_batch=self.calls_per_batch, | ||
| ) | ||
| @tag("batch_single") | ||
| @task | ||
| def batch_single_rpc_call_task(self) -> None: | ||
| rpc_call: RpcCall = self.method_to_rpc_call(self.environment.parsed_options.method)(self) | ||
| rpc_calls = [rpc_call for _ in range(self.calls_per_batch)] | ||
| self.make_batch_rpc_call( | ||
| rpc_calls, | ||
| ) | ||
| @classmethod | ||
| def method_to_rpc_call(cls, method: str) -> t.Callable: | ||
| method_name = cls.method_to_function_name(method) | ||
| return getattr(cls, method_name) | ||
| def check_json_rpc_response(self, response: ResponseContextManager, name: str) -> None: | ||
| CHUNK_SIZE = 1024 | ||
| if response.text is None: | ||
| self.logger.error(f"Response for {name} is empty") | ||
| response.failure(f"Response for {name} is empty") | ||
| return | ||
| data = response.text[:CHUNK_SIZE] | ||
| if "jsonrpc" not in data: | ||
| self.logger.error(f"Response for {name} is not a JSON-RPC: {response.text}") | ||
| response.failure(f"Response for {name} is not a JSON-RPC") | ||
| return | ||
| if "error" in data: | ||
| response_js: list | dict = response.json() | ||
| if isinstance(response_js, dict): | ||
| response_js = [response_js] | ||
| if isinstance(response_js, list): | ||
| for response_js_item in response_js: | ||
| if "error" in response_js_item: | ||
| if "code" in response_js_item["error"]: | ||
| self.logger.error(f"Response for {name} has a JSON-RPC error: {response.text}") | ||
| if response_js_item["error"]["code"] not in self.rpc_error_code_exclusions: | ||
| response.failure( | ||
| f"Response for {name} has a JSON-RPC error {response_js_item['error']['code']} - " | ||
| f"{response_js_item['error']['message']}" | ||
| ) | ||
| return | ||
| response.failure("Unspecified JSON-RPC error") | ||
| self.logger.error(f"Unspecified JSON-RPC error: {response.text}") | ||
| return | ||
| # TODO: handle multiple errors in batch response properly | ||
| if "result" not in data: | ||
| response.failure(f"Response for {name} call has no result") | ||
| self.logger.error(f"Response for {name} call has no result: {response.text}") | ||
| def make_rpc_call( | ||
| self, | ||
| rpc_call: RpcCall | None = None, | ||
| method: str | None = None, | ||
| params: list[t.Any] | dict | None = None, | ||
| name: str = "", | ||
| path: str = "", | ||
| ) -> None: | ||
| """Make a JSON-RPC call.""" | ||
| if rpc_call is not None: | ||
| method = rpc_call.method | ||
| params = rpc_call.params | ||
| if name == "" and method is not None: | ||
| name = method | ||
| with self.client.request( | ||
| "POST", self.rpc_path + path, json=generate_request_body(method, params), name=name, catch_response=True | ||
| ) as response: | ||
| self.check_http_error(response) | ||
| self.check_json_rpc_response(response, name=name) | ||
| def make_batch_rpc_call(self, rpc_calls: list[RpcCall], name: str = "", path: str = "") -> None: | ||
| """Make a Batch JSON-RPC call.""" | ||
| if name == "": | ||
| name = f"Batch RPC ({len(rpc_calls)})" | ||
| headers = {"Content-Type": "application/json", "accept": "application/json"} | ||
| with self.client.request( | ||
| "POST", | ||
| self.rpc_path + path, | ||
| data=generate_batch_request_body(rpc_calls), | ||
| name=name, | ||
| catch_response=True, | ||
| headers=headers, | ||
| ) as response: | ||
| self.check_http_error(response) | ||
| self.check_json_rpc_response(response, name=name) | ||
| def make_random_batch_rpc_call( | ||
| self, | ||
| weighted_rpc_calls: dict[t.Callable[[], RpcCall], int], | ||
| calls_per_batch: int, | ||
| name: str = "", | ||
| path: str = "", | ||
| ) -> None: | ||
| """Make a Batch JSON-RPC call.""" | ||
| rpc_calls: list[RpcCall] = expand_rpc_calls(weighted_rpc_calls) | ||
| random_rpc_calls: list[RpcCall] = random.choices(rpc_calls, k=calls_per_batch) | ||
| self.make_batch_rpc_call(random_rpc_calls, name=name, path=path) | ||
| def generate_request_body( | ||
| method: str | None = None, params: list | dict | None = None, request_id: int | None = None, version: str = "2.0" | ||
| ) -> dict: | ||
| """Generate a JSON-RPC request body.""" | ||
| if params is None: | ||
| params = [] | ||
| if request_id is None: | ||
| request_id = random.randint(1, 100000000) | ||
| return { | ||
| "jsonrpc": version, | ||
| "method": method, | ||
| "params": params, | ||
| "id": request_id, | ||
| } | ||
| def generate_batch_request_body(rpc_calls: list[RpcCall], version: str = "2.0") -> str: | ||
| """Generate a batch JSON-RPC request body.""" | ||
| return json.dumps( | ||
| [ | ||
| generate_request_body(rpc_calls[i].method, rpc_calls[i].params, request_id=i, version=version) | ||
| for i in range(1, len(rpc_calls)) | ||
| ] | ||
| ) |
| import logging | ||
| import typing as t | ||
| from locust import task | ||
| from locust import tag, task | ||
| from chainbench.test_data.ethereum import EthBeaconTestData | ||
| from chainbench.user.http import HttpUser | ||
| from chainbench.user.wss import WSSubscription | ||
| from chainbench.util.rng import RNGManager | ||
@@ -15,2 +17,3 @@ logger = logging.getLogger(__name__) | ||
| test_data = EthBeaconTestData() | ||
| rng = RNGManager() | ||
@@ -330,4 +333,11 @@ def eth_beacon_blocks_request( | ||
| class TestEthMethod(EthBeaconUser): | ||
| @tag("single") | ||
| @task | ||
| def run_task(self) -> None: | ||
| self.method_to_task_function(self.environment.parsed_options.method)() | ||
| self.logger.info(f"Running task {self.environment.parsed_options.method}") | ||
| self.method_to_task_function(self.environment.parsed_options.method)(self) | ||
| class EthSubscribe(WSSubscription): | ||
| def __init__(self, params: dict | list): | ||
| super().__init__("eth_subscribe", params, "eth_unsubscribe") |
@@ -11,10 +11,12 @@ import re | ||
| ) | ||
| from chainbench.user.http import JsonRpcUser, RpcCall | ||
| from chainbench.user.jsonrpc import JrpcHttpUser | ||
| from chainbench.user.tag import tag | ||
| from chainbench.util.rng import RNG | ||
| from chainbench.util.jsonrpc import RpcCall | ||
| from chainbench.util.rng import RNG, RNGManager | ||
| class EvmBaseUser(JsonRpcUser): | ||
| class EvmBaseUser(JrpcHttpUser): | ||
| abstract = True | ||
| test_data = EvmTestData() | ||
| test_data: EvmTestData = EvmTestData() | ||
| rng = RNGManager() | ||
@@ -21,0 +23,0 @@ _default_trace_timeout = "120s" |
@@ -7,9 +7,11 @@ import base64 | ||
| from chainbench.test_data import Account, BlockNumber, SolanaTestData, TxHash | ||
| from chainbench.user.http import JsonRpcUser, RpcCall | ||
| from chainbench.util.rng import RNG | ||
| from chainbench.user.jsonrpc import JrpcHttpUser | ||
| from chainbench.util.jsonrpc import RpcCall | ||
| from chainbench.util.rng import RNG, RNGManager | ||
| class SolanaBaseUser(JsonRpcUser): | ||
| class SolanaBaseUser(JrpcHttpUser): | ||
| abstract = True | ||
| test_data = SolanaTestData() | ||
| rng = RNGManager() | ||
| rpc_error_code_exclusions = [-32007] | ||
@@ -16,0 +18,0 @@ |
@@ -5,7 +5,7 @@ import typing as t | ||
| from chainbench.test_data.blockchain import Account, TxHash | ||
| from chainbench.user.http import JsonRpcUser | ||
| from chainbench.user.jsonrpc import JrpcHttpUser | ||
| from chainbench.util.rng import RNG | ||
| class StarkNetUser(JsonRpcUser): | ||
| class StarkNetUser(JrpcHttpUser): | ||
| abstract = True | ||
@@ -12,0 +12,0 @@ test_data = StarkNetTestData() |
@@ -159,3 +159,3 @@ import subprocess | ||
| f"locust -f {self.profile_path} --worker --master-host {self.host} --master-port {self.port} " | ||
| f"--logfile {self.results_path}/worker_{worker_id}.log --loglevel {self.log_level}" | ||
| f"--logfile {self.results_path}/worker_{worker_id}.log --loglevel {self.log_level} --stop-timeout 30" | ||
| ) | ||
@@ -162,0 +162,0 @@ return self.get_extra_options(command) |
+58
-51
@@ -182,55 +182,62 @@ import logging | ||
| if not hasattr(user, "test_data"): | ||
| raise AttributeError(f"{user} class does not have 'test_data' attribute") | ||
| user_test_data: TestData = getattr(user, "test_data") | ||
| test_data_class_name: str = type(user_test_data).__name__ | ||
| if test_data_class_name in test_data: | ||
| continue | ||
| logger.info(f"Initializing test data for {test_data_class_name}") | ||
| print(f"Initializing test data for {test_data_class_name}") | ||
| if environment.host: | ||
| user_test_data.init_http_client(environment.host) | ||
| if isinstance(user_test_data, EvmTestData): | ||
| chain_id: ChainId = user_test_data.fetch_chain_id() | ||
| user_test_data.init_network(chain_id) | ||
| logger.info(f"Target endpoint network is {user_test_data.network.name}") | ||
| print(f"Target endpoint network is {user_test_data.network.name}") | ||
| test_data["chain_id"] = {test_data_class_name: chain_id} | ||
| if environment.parsed_options: | ||
| user_test_data.init_data(environment.parsed_options) | ||
| test_data[test_data_class_name] = user_test_data.data.to_json() | ||
| send_msg_to_workers(environment.runner, "test_data", test_data) | ||
| print("Fetching blocks...") | ||
| if environment.parsed_options.use_latest_blocks: | ||
| print(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data") | ||
| logger.info(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data") | ||
| for block_number in range( | ||
| user_test_data.data.block_range.start, user_test_data.data.block_range.end + 1 | ||
| ): | ||
| try: | ||
| block = user_test_data.fetch_block(block_number) | ||
| except (BlockNotFoundError, InvalidBlockError): | ||
| block = user_test_data.fetch_latest_block() | ||
| user_test_data.data.push_block(block) | ||
| block_data = {test_data_class_name: block.to_json()} | ||
| send_msg_to_workers(environment.runner, "block_data", block_data) | ||
| print(user_test_data.data.stats(), end="\r") | ||
| else: | ||
| print(user_test_data.data.stats(), end="\r") | ||
| print("\n") # new line after progress display upon exiting loop | ||
| logger.warning(f"{user} class does not have 'test_data' attribute") | ||
| else: | ||
| while user_test_data.data.size.blocks_len > len(user_test_data.data.blocks): | ||
| try: | ||
| block = user_test_data.fetch_random_block(user_test_data.data.block_numbers) | ||
| except (BlockNotFoundError, InvalidBlockError): | ||
| continue | ||
| user_test_data.data.push_block(block) | ||
| block_data = {test_data_class_name: block.to_json()} | ||
| send_msg_to_workers(environment.runner, "block_data", block_data) | ||
| print(user_test_data.data.stats(), end="\r") | ||
| user_test_data: TestData = getattr(user, "test_data") | ||
| test_data_class_name: str = type(user_test_data).__name__ | ||
| if test_data_class_name in test_data: | ||
| continue | ||
| logger.info(f"Initializing test data for {test_data_class_name}") | ||
| print(f"Initializing test data for {test_data_class_name}") | ||
| if environment.host: | ||
| user_test_data.init_http_client(environment.host) | ||
| if isinstance(user_test_data, EvmTestData): | ||
| chain_id: ChainId = user_test_data.fetch_chain_id() | ||
| user_test_data.init_network(chain_id) | ||
| logger.info(f"Target endpoint network is {user_test_data.network.name}") | ||
| print(f"Target endpoint network is {user_test_data.network.name}") | ||
| test_data["chain_id"] = {test_data_class_name: chain_id} | ||
| if environment.parsed_options: | ||
| user_test_data.init_data(environment.parsed_options) | ||
| test_data[test_data_class_name] = user_test_data.data.to_json() | ||
| send_msg_to_workers(environment.runner, "test_data", test_data) | ||
| print("Fetching blocks...") | ||
| if environment.parsed_options.use_latest_blocks: | ||
| print(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data") | ||
| logger.info(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data") | ||
| for block_number in range( | ||
| user_test_data.data.block_range.start, user_test_data.data.block_range.end + 1 | ||
| ): | ||
| block = None | ||
| try: | ||
| block = user_test_data.fetch_block(block_number) | ||
| except (BlockNotFoundError, InvalidBlockError): | ||
| pass | ||
| while block is None: | ||
| try: | ||
| block = user_test_data.fetch_latest_block() | ||
| except (BlockNotFoundError, InvalidBlockError): | ||
| pass | ||
| user_test_data.data.push_block(block) | ||
| block_data = {test_data_class_name: block.to_json()} | ||
| send_msg_to_workers(environment.runner, "block_data", block_data) | ||
| print(user_test_data.data.stats(), end="\r") | ||
| else: | ||
| print(user_test_data.data.stats(), end="\r") | ||
| print("\n") # new line after progress display upon exiting loop | ||
| else: | ||
| print(user_test_data.data.stats(), end="\r") | ||
| print("\n") # new line after progress display upon exiting loop | ||
| logger.info("Test data is ready") | ||
| send_msg_to_workers(environment.runner, "release_lock", {}) | ||
| user_test_data.release_lock() | ||
| while user_test_data.data.size.blocks_len > len(user_test_data.data.blocks): | ||
| try: | ||
| block = user_test_data.fetch_random_block(user_test_data.data.block_numbers) | ||
| except (BlockNotFoundError, InvalidBlockError): | ||
| continue | ||
| user_test_data.data.push_block(block) | ||
| block_data = {test_data_class_name: block.to_json()} | ||
| send_msg_to_workers(environment.runner, "block_data", block_data) | ||
| print(user_test_data.data.stats(), end="\r") | ||
| else: | ||
| print(user_test_data.data.stats(), end="\r") | ||
| print("\n") # new line after progress display upon exiting loop | ||
| logger.info("Test data is ready") | ||
| send_msg_to_workers(environment.runner, "release_lock", {}) | ||
| user_test_data.release_lock() | ||
| except Exception as e: | ||
@@ -237,0 +244,0 @@ logger.error(f"Failed to init test data: {e.__class__.__name__}: {e}. Exiting...") |
@@ -1,2 +0,1 @@ | ||
| import json | ||
| import logging | ||
@@ -7,7 +6,8 @@ import typing as t | ||
| from functools import cached_property | ||
| from json import JSONDecodeError | ||
| from secrets import token_hex | ||
| import orjson as json | ||
| from geventhttpclient import URL, HTTPClient | ||
| from geventhttpclient.response import HTTPSocketPoolResponse | ||
| from orjson import JSONDecodeError | ||
@@ -146,3 +146,3 @@ logger = logging.getLogger(__name__) | ||
| if isinstance(data, dict): | ||
| body = json.dumps(data).encode("utf-8") | ||
| body = json.dumps(data) | ||
| elif isinstance(data, bytes): | ||
@@ -149,0 +149,0 @@ body = data |
| import csv | ||
| import logging | ||
| from datetime import datetime, timedelta | ||
| from json import JSONDecodeError | ||
| from pathlib import Path | ||
@@ -9,2 +8,3 @@ from time import sleep | ||
| from locust.util.timespan import parse_timespan | ||
| from orjson import JSONDecodeError | ||
@@ -11,0 +11,0 @@ from .http import HttpClient |
+4
-1
| Metadata-Version: 2.1 | ||
| Name: chainbench | ||
| Version: 0.7.5 | ||
| Version: 0.8.0 | ||
| Summary: | ||
@@ -17,4 +17,7 @@ Author: Egor Molodik | ||
| Requires-Dist: locust-plugins[dashboards] (>=4.4.2,<5.0.0) | ||
| Requires-Dist: orjson (>=3.10.6,<4.0.0) | ||
| Requires-Dist: solders (>=0.21.0,<0.22.0) | ||
| Requires-Dist: tenacity (>=8.2.2,<9.0.0) | ||
| Requires-Dist: websocket-client (>=1.8.0,<2.0.0) | ||
| Requires-Dist: wsaccel (>=0.6.6,<0.7.0) | ||
| Description-Content-Type: text/markdown | ||
@@ -21,0 +24,0 @@ |
+4
-1
| [tool.poetry] | ||
| name = "chainbench" | ||
| version = "0.7.5" | ||
| version = "0.8.0" | ||
| description = "" | ||
@@ -21,2 +21,5 @@ authors = [ | ||
| solders = "^0.21.0" | ||
| websocket-client = "^1.8.0" | ||
| orjson = "^3.10.6" | ||
| wsaccel = "^0.6.6" | ||
@@ -23,0 +26,0 @@ [tool.poetry.group.dev.dependencies] |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
300813
3.76%68
6.25%8641
2.82%