databento
Advanced tools
| import datetime as dt | ||
| import logging | ||
| import pathlib | ||
| import warnings | ||
| from collections.abc import Callable | ||
| from os import PathLike | ||
| import pathlib | ||
| from typing import IO | ||
| from typing import Generic | ||
| from typing import IO | ||
| from typing import TypedDict | ||
| from typing import TypeVar | ||
| import warnings | ||
@@ -17,2 +17,3 @@ import databento_dbn | ||
| logger = logging.getLogger(__name__) | ||
@@ -192,3 +193,3 @@ | ||
| self._warn( | ||
| f"stream '{self.stream_name}' encountered an exception without an exception handler: {repr(exc)}", | ||
| f"stream '{self.stream_name}' encountered an exception without an exception handler: {exc!r}", | ||
| ) | ||
@@ -200,3 +201,3 @@ else: | ||
| self._warn( | ||
| f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", | ||
| f"exception callback '{self.exc_callback_name}' encountered an exception: {inner_exc!r}", | ||
| ) | ||
@@ -264,3 +265,3 @@ raise inner_exc from exc | ||
| self._warn( | ||
| f"callback '{self.callback_name}' encountered an exception without an exception callback: {repr(exc)}", | ||
| f"callback '{self.callback_name}' encountered an exception without an exception callback: {exc!r}", | ||
| ) | ||
@@ -272,3 +273,3 @@ else: | ||
| self._warn( | ||
| f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", | ||
| f"exception callback '{self.exc_callback_name}' encountered an exception: {inner_exc!r}", | ||
| ) | ||
@@ -275,0 +276,0 @@ raise inner_exc from exc |
@@ -74,3 +74,3 @@ from __future__ import annotations | ||
| pretty_ts: bool = False, | ||
| map_symbols: bool = False, | ||
| map_symbols: bool | None = None, | ||
| split_symbols: bool = False, | ||
@@ -120,5 +120,6 @@ split_duration: SplitDuration | str = "day", | ||
| Only applicable for 'csv' or 'json' encodings. | ||
| map_symbols : bool, default False | ||
| If the requested symbol should be appended to every text encoded record. | ||
| Only applicable for 'csv' or 'json' encodings. | ||
| map_symbols : bool, optional | ||
| If a symbol field should be included with every text encoded record. | ||
| If `None`, will default to `True` for `csv` and `json` encodings and `False` for | ||
| `dbn`. | ||
| split_symbols : bool, default False | ||
@@ -154,3 +155,7 @@ If files should be split by raw symbol. Cannot be requested with `'ALL_SYMBOLS'`. | ||
| symbols_list = symbols_list_to_list(symbols, stype_in_valid) | ||
| encoding_valid = validate_enum(encoding, Encoding, "encoding") | ||
| if map_symbols is None: | ||
| map_symbols = encoding_valid != Encoding.DBN | ||
| data: dict[str, object | None] = { | ||
@@ -164,3 +169,3 @@ "dataset": validate_semantic_string(dataset, "dataset"), | ||
| "stype_out": str(validate_enum(stype_out, SType, "stype_out")), | ||
| "encoding": str(validate_enum(encoding, Encoding, "encoding")), | ||
| "encoding": str(encoding_valid), | ||
| "compression": ( | ||
@@ -299,3 +304,5 @@ str(validate_enum(compression, Compression, "compression")) if compression else None | ||
| if keep_zip and filename_to_download: | ||
| raise ValueError("Cannot specify an individual file to download when `keep_zip=True`") | ||
| raise ValueError( | ||
| "Cannot specify an individual file to download when `keep_zip=True`", | ||
| ) | ||
@@ -377,3 +384,5 @@ batch_download = _BatchJob( | ||
| if keep_zip and filename_to_download: | ||
| raise ValueError("Cannot specify an individual file to download when `keep_zip=True`") | ||
| raise ValueError( | ||
| "Cannot specify an individual file to download when `keep_zip=True`", | ||
| ) | ||
@@ -467,3 +476,5 @@ batch_download = _BatchJob( | ||
| with open(output_path, mode=mode) as f: | ||
| for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE): | ||
| for chunk in response.iter_content( | ||
| chunk_size=HTTP_STREAMING_READ_SIZE, | ||
| ): | ||
| f.write(chunk) | ||
@@ -558,3 +569,5 @@ | ||
| with open(output_path, mode="wb") as f: | ||
| for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE): | ||
| for chunk in response.iter_content( | ||
| chunk_size=HTTP_STREAMING_READ_SIZE, | ||
| ): | ||
| f.write(chunk) | ||
@@ -626,3 +639,5 @@ except BentoHttpError as exc: | ||
| missing_key = exc.args[0] | ||
| raise BentoError(f"Batch job manifest missing key '{missing_key}'") from None | ||
| raise BentoError( | ||
| f"Batch job manifest missing key '{missing_key}'", | ||
| ) from None | ||
| except TypeError: | ||
@@ -629,0 +644,0 @@ raise BentoError("Error parsing job manifest") from None |
@@ -34,2 +34,3 @@ from __future__ import annotations | ||
| from databento.common.validation import validate_semantic_string | ||
| from databento.live.gateway import SubscriptionRequest | ||
| from databento.live.session import DEFAULT_REMOTE_PORT | ||
@@ -233,2 +234,34 @@ from databento.live.session import LiveSession | ||
| @property | ||
| def subscription_requests( | ||
| self, | ||
| ) -> list[tuple[SubscriptionRequest, ...]]: | ||
| """ | ||
| Return a list of tuples containing every `SubscriptionRequest` message | ||
| sent for the session. The list is in order of the subscriptions made | ||
| and can be indexed using the value returned by each call to | ||
| `Live.subscribe()`. | ||
| Subscriptions which contain a large | ||
| list of symbols are batched. Because of this, a single `subscription_id` may have | ||
| more than one associated `SubscriptionRequest`. | ||
| Returns | ||
| ------- | ||
| list[tuple[SubscriptionRequest, ...]] | ||
| A list of tuples containing every subscription request. | ||
| Each entry in the list corresponds to a single subscription. | ||
| Raises | ||
| ------ | ||
| IndexError | ||
| If the subscription ID is invalid. | ||
| See Also | ||
| -------- | ||
| Live.subscribe() | ||
| """ | ||
| return self._session._subscriptions | ||
| @property | ||
| def symbology_map(self) -> dict[int, str | int]: | ||
@@ -451,3 +484,3 @@ """ | ||
| snapshot: bool = False, | ||
| ) -> None: | ||
| ) -> int: | ||
| """ | ||
@@ -482,2 +515,7 @@ Add a new subscription to the session. | ||
| Returns | ||
| ------- | ||
| int | ||
| The numeric identifier for this subscription request. | ||
| Raises | ||
@@ -501,3 +539,3 @@ ------ | ||
| logger.info( | ||
| "subscribing to %s:%s %s start=%s snapshot=%s", | ||
| "subscribing to schema=%s stype_in=%s symbols='%s' start=%s snapshot=%s", | ||
| schema, | ||
@@ -517,3 +555,3 @@ stype_in, | ||
| self._session.subscribe( | ||
| return self._session.subscribe( | ||
| dataset=dataset, | ||
@@ -520,0 +558,0 @@ schema=schema, |
@@ -8,3 +8,2 @@ from __future__ import annotations | ||
| from typing import SupportsBytes | ||
| from typing import TypeVar | ||
@@ -14,2 +13,3 @@ from databento_dbn import Encoding | ||
| from databento_dbn import SType | ||
| from typing_extensions import Self | ||
@@ -22,5 +22,3 @@ from databento.common.publishers import Dataset | ||
| T = TypeVar("T", bound="GatewayControl") | ||
| @dataclasses.dataclass | ||
@@ -33,5 +31,5 @@ class GatewayControl(SupportsBytes): | ||
| @classmethod | ||
| def parse(cls: type[T], line: str | bytes) -> T: | ||
| def parse(cls: type[Self], line: str | bytes) -> Self: | ||
| """ | ||
| Parse a message of type `T` from a string. | ||
| Parse a `GatewayControl` message from a string. | ||
@@ -38,0 +36,0 @@ Parameters |
@@ -10,7 +10,5 @@ from __future__ import annotations | ||
| import databento_dbn | ||
| from databento_dbn import DBNError | ||
| from databento_dbn import Metadata | ||
| from databento_dbn import Schema | ||
| from databento_dbn import SType | ||
| from databento_dbn import SystemCode | ||
| from databento_dbn import VersionUpgradePolicy | ||
@@ -317,4 +315,4 @@ | ||
| """ | ||
| logger.info( | ||
| "sending subscription to %s:%s %s start=%s snapshot=%s", | ||
| logger.debug( | ||
| "sending subscription request schema=%s stype_in=%s symbols='%s' start='%s' snapshot=%s id=%s", | ||
| schema, | ||
@@ -325,2 +323,3 @@ stype_in, | ||
| snapshot, | ||
| subscription_id, | ||
| ) | ||
@@ -347,2 +346,8 @@ | ||
| if len(subscriptions) > 1: | ||
| logger.debug( | ||
| "batched subscription into %d requests id=%s", | ||
| len(subscriptions), | ||
| subscription_id, | ||
| ) | ||
| self.transport.writelines(map(bytes, subscriptions)) | ||
@@ -381,3 +386,4 @@ return subscriptions | ||
| logger.error( | ||
| "gateway error: %s", | ||
| "gateway error code=%s err='%s'", | ||
| record.code, | ||
| record.err, | ||
@@ -390,15 +396,7 @@ ) | ||
| else: | ||
| try: | ||
| msg_code = record.code | ||
| except DBNError: | ||
| msg_code = None | ||
| if msg_code == SystemCode.SLOW_READER_WARNING: | ||
| logger.warning( | ||
| record.msg, | ||
| ) | ||
| else: | ||
| logger.debug( | ||
| "gateway message: %s", | ||
| record.msg, | ||
| ) | ||
| logger.info( | ||
| "system message code=%s msg='%s'", | ||
| record.code, | ||
| record.msg, | ||
| ) | ||
| self.received_record(record) | ||
@@ -432,7 +430,10 @@ | ||
| def _(self, message: Greeting) -> None: | ||
| logger.debug("greeting received by remote gateway v%s", message.lsg_version) | ||
| logger.debug( | ||
| "greeting received by remote gateway version='%s'", | ||
| message.lsg_version, | ||
| ) | ||
| @_handle_gateway_message.register(ChallengeRequest) | ||
| def _(self, message: ChallengeRequest) -> None: | ||
| logger.debug("received CRAM challenge: %s", message.cram) | ||
| logger.debug("received CRAM challenge cram='%s'", message.cram) | ||
| response = cram.get_challenge_response(message.cram, self.__api_key) | ||
@@ -445,3 +446,11 @@ auth_request = AuthenticationRequest( | ||
| ) | ||
| logger.debug("sending CRAM challenge response: %s", str(auth_request).strip()) | ||
| logger.debug( | ||
| "sending CRAM challenge response auth='%s' dataset=%s encoding=%s ts_out=%s heartbeat_interval_s=%s client='%s'", | ||
| auth_request.auth, | ||
| auth_request.dataset, | ||
| auth_request.encoding, | ||
| auth_request.ts_out, | ||
| auth_request.heartbeat_interval_s, | ||
| auth_request.client, | ||
| ) | ||
| self.transport.write(bytes(auth_request)) | ||
@@ -452,5 +461,5 @@ | ||
| if message.success == "0": | ||
| logger.error("CRAM authentication failed: %s", message.error) | ||
| logger.error("CRAM authentication error: %s", message.error) | ||
| self.authenticated.set_exception( | ||
| BentoError(f"User authentication failed: {message.error}"), | ||
| BentoError(message.error), | ||
| ) | ||
@@ -462,5 +471,4 @@ self.transport.close() | ||
| logger.debug( | ||
| "CRAM authenticated session id assigned `%s`", | ||
| session_id, | ||
| "CRAM authentication successful", | ||
| ) | ||
| self.authenticated.set_result(session_id) |
@@ -5,3 +5,5 @@ from __future__ import annotations | ||
| import dataclasses | ||
| import itertools | ||
| import logging | ||
| import math | ||
| import queue | ||
@@ -38,2 +40,3 @@ import struct | ||
| DEFAULT_REMOTE_PORT: Final = 13000 | ||
| CLIENT_TIMEOUT_MARGIN_SECONDS: Final = 10 | ||
@@ -216,2 +219,3 @@ | ||
| self._last_ts_event: int | None = None | ||
| self._last_msg_loop_time: float = math.inf | ||
@@ -240,2 +244,3 @@ def received_metadata(self, metadata: databento_dbn.Metadata) -> None: | ||
| self._last_ts_event = record.ts_event | ||
| self._last_msg_loop_time = self._loop.time() | ||
@@ -330,3 +335,3 @@ return super().received_record(record) | ||
| self._ts_out = ts_out | ||
| self._heartbeat_interval_s = heartbeat_interval_s | ||
| self._heartbeat_interval_s = heartbeat_interval_s or 30 | ||
@@ -337,6 +342,6 @@ self._protocol: _SessionProtocol | None = None | ||
| self._subscription_counter = 0 | ||
| self._subscriptions: list[SubscriptionRequest] = [] | ||
| self._subscriptions: list[tuple[SubscriptionRequest, ...]] = [] | ||
| self._reconnect_policy = ReconnectPolicy(reconnect_policy) | ||
| self._reconnect_task: asyncio.Task[None] | None = None | ||
| self._heartbeat_monitor_task: asyncio.Task[None] | None = None | ||
@@ -445,4 +450,2 @@ self._dataset = "" | ||
| return | ||
| if self._protocol is not None: | ||
| self._protocol.disconnected.add_done_callback(lambda _: self._cleanup()) | ||
| self._loop.call_soon_threadsafe(self._transport.close) | ||
@@ -464,2 +467,5 @@ | ||
| self._protocol.start() | ||
| self._heartbeat_monitor_task = self._loop.create_task( | ||
| self._heartbeat_monitor(), | ||
| ) | ||
@@ -474,3 +480,3 @@ def subscribe( | ||
| snapshot: bool = False, | ||
| ) -> None: | ||
| ) -> int: | ||
| """ | ||
@@ -510,13 +516,16 @@ Send a subscription request on the current connection. This will create | ||
| self._subscription_counter += 1 | ||
| self._subscriptions.extend( | ||
| self._protocol.subscribe( | ||
| schema=schema, | ||
| symbols=symbols, | ||
| stype_in=stype_in, | ||
| start=start, | ||
| snapshot=snapshot, | ||
| subscription_id=self._subscription_counter, | ||
| subscription_id = len(self._subscriptions) | ||
| self._subscriptions.append( | ||
| tuple( | ||
| self._protocol.subscribe( | ||
| schema=schema, | ||
| symbols=symbols, | ||
| stype_in=stype_in, | ||
| start=start, | ||
| snapshot=snapshot, | ||
| subscription_id=subscription_id, | ||
| ), | ||
| ), | ||
| ) | ||
| return subscription_id | ||
@@ -555,3 +564,3 @@ def terminate(self) -> None: | ||
| def _cleanup(self) -> None: | ||
| logger.debug("cleaning up session_id=%s", self.session_id) | ||
| logger.debug("cleaning up session_id='%s'", self.session_id) | ||
| self._user_callbacks.clear() | ||
@@ -564,2 +573,4 @@ for stream in self._user_streams: | ||
| if self._heartbeat_monitor_task is not None: | ||
| self._heartbeat_monitor_task.cancel() | ||
| self._user_callbacks.clear() | ||
@@ -611,3 +622,3 @@ self._user_streams.clear() | ||
| gateway = self._user_gateway | ||
| logger.debug("using user specified gateway: %s", gateway) | ||
| logger.debug("user gateway override gateway='%s'", gateway) | ||
@@ -654,3 +665,3 @@ logger.info("connecting to remote gateway") | ||
| logger.info( | ||
| "authenticated session %s", | ||
| "authenticated session_id='%s'", | ||
| self.session_id, | ||
@@ -661,2 +672,17 @@ ) | ||
| async def _heartbeat_monitor(self) -> None: | ||
| while not self._protocol.disconnected.done(): | ||
| await asyncio.sleep(1) | ||
| gap = self._loop.time() - self._protocol._last_msg_loop_time | ||
| if gap > (self._heartbeat_interval_s + CLIENT_TIMEOUT_MARGIN_SECONDS): | ||
| logger.error( | ||
| "disconnecting client due to timeout, no data received for %d second(s)", | ||
| int(gap), | ||
| ) | ||
| self._protocol.disconnected.set_exception( | ||
| BentoError( | ||
| f"Gateway timeout: {gap:.0f} second(s) since last message", | ||
| ), | ||
| ) | ||
| async def _reconnect(self) -> None: | ||
@@ -687,3 +713,3 @@ while True: | ||
| for sub in self._subscriptions: | ||
| for sub in itertools.chain(*self._subscriptions): | ||
| self._protocol.subscribe( | ||
@@ -690,0 +716,0 @@ schema=sub.schema, |
@@ -1,1 +0,1 @@ | ||
| __version__ = "0.66.0" | ||
| __version__ = "0.67.0" |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: databento | ||
| Version: 0.66.0 | ||
| Version: 0.67.0 | ||
| Summary: Official Python client library for Databento | ||
@@ -5,0 +5,0 @@ License-Expression: Apache-2.0 |
+2
-1
| [project] | ||
| name = "databento" | ||
| version = "0.66.0" | ||
| version = "0.67.0" | ||
| description = "Official Python client library for Databento" | ||
@@ -84,2 +84,3 @@ readme = "README.md" | ||
| [tool.ruff] | ||
| extend = "../ruff.toml" | ||
| target-version = "py310" |
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
375747
1.04%9600
1.14%