nats-py
Advanced tools
| Metadata-Version: 2.4 | ||
| Name: nats-py | ||
| Version: 2.10.0 | ||
| Version: 2.9.1 | ||
| Summary: NATS client for Python | ||
@@ -37,2 +37,3 @@ Author-email: Waldemar Quevedo <wally@synadia.com> | ||
| [](https://pypi.org/project/nats-py) | ||
| [](http://travis-ci.com/nats-io/nats.py) | ||
| [](https://pypi.org/project/nats-py) | ||
@@ -258,25 +259,2 @@ [](https://www.apache.org/licenses/LICENSE-2.0) | ||
| ## Updating Docs | ||
| To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo | ||
| as follows: | ||
| ```sh | ||
| git clone https://github.com/nats-io/nats.py | ||
| cd nats.py | ||
| git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs | ||
| cd docs | ||
| pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments | ||
| pipenv shell | ||
| make html | ||
| # preview the changes: | ||
| make serve | ||
| ``` | ||
| If you are happy with the changes, make a PR on the docs branch: | ||
| ``` | ||
| make publish | ||
| git add docs | ||
| ``` | ||
| ## License | ||
@@ -283,0 +261,0 @@ |
+0
-1
@@ -17,3 +17,2 @@ # Copyright 2016-2021 The NATS Authors | ||
| from typing import List, Union | ||
| from .aio.client import Client as NATS | ||
@@ -20,0 +19,0 @@ |
+0
-20
@@ -27,3 +27,2 @@ # Copyright 2016-2021 The NATS Authors | ||
| """ | ||
| pass | ||
@@ -39,3 +38,2 @@ | ||
| """ | ||
| pass | ||
@@ -51,3 +49,2 @@ | ||
| """ | ||
| pass | ||
@@ -63,3 +60,2 @@ | ||
| """ | ||
| pass | ||
@@ -75,3 +71,2 @@ | ||
| """ | ||
| pass | ||
@@ -87,3 +82,2 @@ | ||
| """ | ||
| pass | ||
@@ -99,3 +93,2 @@ | ||
| """ | ||
| pass | ||
@@ -111,3 +104,2 @@ | ||
| """ | ||
| pass | ||
@@ -123,3 +115,2 @@ | ||
| """ | ||
| pass | ||
@@ -135,3 +126,2 @@ | ||
| """ | ||
| pass | ||
@@ -147,3 +137,2 @@ | ||
| """ | ||
| pass | ||
@@ -159,3 +148,2 @@ | ||
| """ | ||
| pass | ||
@@ -171,3 +159,2 @@ | ||
| """ | ||
| pass | ||
@@ -183,3 +170,2 @@ | ||
| """ | ||
| pass | ||
@@ -195,3 +181,2 @@ | ||
| """ | ||
| pass | ||
@@ -207,3 +192,2 @@ | ||
| """ | ||
| pass | ||
@@ -219,3 +203,2 @@ | ||
| """ | ||
| pass | ||
@@ -231,3 +214,2 @@ | ||
| """ | ||
| pass | ||
@@ -243,3 +225,2 @@ | ||
| """ | ||
| pass | ||
@@ -255,3 +236,2 @@ | ||
| """ | ||
| pass |
+22
-32
@@ -19,3 +19,3 @@ # Copyright 2016-2021 The NATS Authors | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING, Dict, List, Optional, Union | ||
| from typing import TYPE_CHECKING, List, Optional, Dict, Union | ||
@@ -44,7 +44,6 @@ from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError | ||
| """ | ||
| _client: NATS | ||
| subject: str = "" | ||
| reply: str = "" | ||
| data: bytes = b"" | ||
| subject: str = '' | ||
| reply: str = '' | ||
| data: bytes = b'' | ||
| headers: Optional[Dict[str, str]] = None | ||
@@ -63,4 +62,4 @@ | ||
| # Reply metadata... | ||
| Prefix0 = "$JS" | ||
| Prefix1 = "ACK" | ||
| Prefix0 = '$JS' | ||
| Prefix1 = 'ACK' | ||
| Domain = 2 | ||
@@ -84,9 +83,2 @@ AccHash = 3 | ||
| @property | ||
| def is_acked(self) -> bool: | ||
| """ | ||
| Have we sent a terminal ack message (not in-progress) in response to this original message? | ||
| """ | ||
| return self._ackd | ||
| @property | ||
| def sid(self) -> int: | ||
@@ -97,3 +89,3 @@ """ | ||
| if self._sid is None: | ||
| raise Error("sid not set") | ||
| raise Error('sid not set') | ||
| return self._sid | ||
@@ -106,5 +98,5 @@ | ||
| if not self.reply: | ||
| raise Error("no reply subject available") | ||
| raise Error('no reply subject available') | ||
| if not self._client: | ||
| raise Error("client not set") | ||
| raise Error('client not set') | ||
@@ -139,5 +131,5 @@ await self._client.publish(self.reply, data, headers=self.headers) | ||
| if delay: | ||
| json_args["delay"] = int(delay * 10**9) # from seconds to ns | ||
| json_args['delay'] = int(delay * 10**9) # from seconds to ns | ||
| if json_args: | ||
| payload += b" " + json.dumps(json_args).encode() | ||
| payload += (b' ' + json.dumps(json_args).encode()) | ||
| await self._client.publish(self.reply, payload) | ||
@@ -151,3 +143,3 @@ self._ackd = True | ||
| """ | ||
| if self.reply is None or self.reply == "": | ||
| if self.reply is None or self.reply == '': | ||
| raise NotJSMessageError | ||
@@ -184,3 +176,3 @@ await self._client.publish(self.reply, Msg.Ack.Progress) | ||
| def _check_reply(self) -> None: | ||
| if self.reply is None or self.reply == "": | ||
| if self.reply is None or self.reply == '': | ||
| raise NotJSMessageError | ||
@@ -204,3 +196,2 @@ if self._ackd: | ||
| """ | ||
| sequence: SequencePair | ||
@@ -219,3 +210,2 @@ num_pending: int | ||
| """ | ||
| consumer: int | ||
@@ -228,7 +218,7 @@ stream: int | ||
| raise NotJSMessageError | ||
| tokens = reply.split(".") | ||
| if ((len(tokens) == _V1_TOKEN_COUNT | ||
| or len(tokens) >= _V2_TOKEN_COUNT - 1) | ||
| and tokens[0] == Msg.Ack.Prefix0 | ||
| and tokens[1] == Msg.Ack.Prefix1): | ||
| tokens = reply.split('.') | ||
| if (len(tokens) == _V1_TOKEN_COUNT or | ||
| len(tokens) >= _V2_TOKEN_COUNT-1) and \ | ||
| tokens[0] == Msg.Ack.Prefix0 and \ | ||
| tokens[1] == Msg.Ack.Prefix1: | ||
| return tokens | ||
@@ -239,7 +229,8 @@ raise NotJSMessageError | ||
| def _from_reply(cls, reply: str) -> Msg.Metadata: | ||
| """Construct the metadata from the reply string""" | ||
| """Construct the metadata from the reply string | ||
| """ | ||
| tokens = cls._get_metadata_fields(reply) | ||
| if len(tokens) == _V1_TOKEN_COUNT: | ||
| t = datetime.datetime.fromtimestamp( | ||
| int(tokens[7]) / 1_000_000_000.0, datetime.timezone.utc | ||
| int(tokens[7]) / 1_000_000_000.0 | ||
| ) | ||
@@ -259,4 +250,3 @@ return cls( | ||
| t = datetime.datetime.fromtimestamp( | ||
| int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0, | ||
| datetime.timezone.utc | ||
| int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0 | ||
| ) | ||
@@ -263,0 +253,0 @@ |
@@ -66,4 +66,4 @@ # Copyright 2016-2021 The NATS Authors | ||
| id: int = 0, | ||
| subject: str = "", | ||
| queue: str = "", | ||
| subject: str = '', | ||
| queue: str = '', | ||
| cb: Optional[Callable[[Msg], Awaitable[None]]] = None, | ||
@@ -127,3 +127,3 @@ future: Optional[asyncio.Future] = None, | ||
| :: | ||
| nc = await nats.connect() | ||
@@ -182,3 +182,3 @@ sub = await nc.subscribe('foo') | ||
| raise errors.Error( | ||
| "nats: next_msg cannot be used in async subscriptions" | ||
| 'nats: next_msg cannot be used in async subscriptions' | ||
| ) | ||
@@ -216,5 +216,4 @@ | ||
| if self._cb: | ||
| if not asyncio.iscoroutinefunction(self._cb) and not ( | ||
| hasattr(self._cb, "func") | ||
| and asyncio.iscoroutinefunction(self._cb.func)): | ||
| if not asyncio.iscoroutinefunction(self._cb) and \ | ||
| not (hasattr(self._cb, "func") and asyncio.iscoroutinefunction(self._cb.func)): | ||
| raise errors.Error( | ||
@@ -336,4 +335,3 @@ "nats: must use coroutine for subscriptions" | ||
| # Apply auto unsubscribe checks after having processed last msg. | ||
| if (self._max_msgs > 0 and self._received >= self._max_msgs | ||
| and self._pending_queue.empty): | ||
| if self._max_msgs > 0 and self._received >= self._max_msgs and self._pending_queue.empty: | ||
| self._stop_processing() | ||
@@ -340,0 +338,0 @@ except asyncio.CancelledError: |
@@ -6,4 +6,4 @@ from __future__ import annotations | ||
| import ssl | ||
| from typing import List, Optional, Union | ||
| from urllib.parse import ParseResult | ||
| from typing import List, Union, Optional | ||
@@ -127,4 +127,3 @@ try: | ||
| limit=buffer_size, | ||
| ), | ||
| connect_timeout, | ||
| ), connect_timeout | ||
| ) | ||
@@ -148,3 +147,3 @@ # We keep a reference to the initial transport we used when | ||
| ) -> None: | ||
| assert self._io_writer, f"{type(self).__name__}.connect must be called first" | ||
| assert self._io_writer, f'{type(self).__name__}.connect must be called first' | ||
@@ -159,3 +158,3 @@ # manually recreate the stream reader/writer with a tls upgraded transport | ||
| # hostname here will be passed directly as string | ||
| server_hostname=uri if isinstance(uri, str) else uri.hostname, | ||
| server_hostname=uri if isinstance(uri, str) else uri.hostname | ||
| ) | ||
@@ -175,3 +174,3 @@ transport = await asyncio.wait_for(transport_future, connect_timeout) | ||
| async def read(self, buffer_size: int): | ||
| assert self._io_reader, f"{type(self).__name__}.connect must be called first" | ||
| assert self._io_reader, f'{type(self).__name__}.connect must be called first' | ||
| return await self._io_reader.read(buffer_size) | ||
@@ -235,3 +234,3 @@ | ||
| ssl=ssl_context, | ||
| timeout=connect_timeout, | ||
| timeout=connect_timeout | ||
| ) | ||
@@ -254,3 +253,3 @@ self._using_tls = True | ||
| # if the connection terminated abruptly, return empty binary data to raise unexpected EOF | ||
| return b"" | ||
| return b'' | ||
| return data.data | ||
@@ -257,0 +256,0 @@ |
+2
-4
@@ -110,6 +110,4 @@ # Copyright 2021 The NATS Authors | ||
| def __str__(self) -> str: | ||
| return ( | ||
| "nats: slow consumer, messages dropped subject: " | ||
| f"{self.subject}, sid: {self.sid}, sub: {self.sub}" | ||
| ) | ||
| return "nats: slow consumer, messages dropped subject: " \ | ||
| f"{self.subject}, sid: {self.sid}, sub: {self.sub}" | ||
@@ -116,0 +114,0 @@ |
+56
-70
@@ -19,3 +19,3 @@ # Copyright 2021 The NATS Authors | ||
| from enum import Enum | ||
| from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar | ||
| from typing import Any, Dict, Optional, TypeVar, List, Iterable, Iterator | ||
@@ -40,3 +40,3 @@ _NANOSECOND = 10**9 | ||
| DEFAULT_PREFIX = "$JS.API" | ||
| INBOX_PREFIX = b"_INBOX." | ||
| INBOX_PREFIX = b'_INBOX.' | ||
@@ -63,3 +63,4 @@ | ||
| def _convert(resp: Dict[str, Any], field: str, type: type[Base]) -> None: | ||
| """Convert the field into the given type in place.""" | ||
| """Convert the field into the given type in place. | ||
| """ | ||
| data = resp.get(field, None) | ||
@@ -75,3 +76,4 @@ if data is None: | ||
| def _convert_nanoseconds(resp: Dict[str, Any], field: str) -> None: | ||
| """Convert the given field from nanoseconds to seconds in place.""" | ||
| """Convert the given field from nanoseconds to seconds in place. | ||
| """ | ||
| val = resp.get(field, None) | ||
@@ -84,3 +86,4 @@ if val is not None: | ||
| def _to_nanoseconds(val: Optional[float]) -> Optional[int]: | ||
| """Convert the value from seconds to nanoseconds.""" | ||
| """Convert the value from seconds to nanoseconds. | ||
| """ | ||
| if val is None: | ||
@@ -104,7 +107,9 @@ # We use 0 to avoid sending null to Go servers. | ||
| def evolve(self: _B, **params) -> _B: | ||
| """Return a copy of the instance with the passed values replaced.""" | ||
| """Return a copy of the instance with the passed values replaced. | ||
| """ | ||
| return replace(self, **params) | ||
| def as_dict(self) -> Dict[str, object]: | ||
| """Return the object converted into an API-friendly dict.""" | ||
| """Return the object converted into an API-friendly dict. | ||
| """ | ||
| result = {} | ||
@@ -129,3 +134,2 @@ for field in fields(self): | ||
| """ | ||
| stream: str | ||
@@ -167,4 +171,4 @@ seq: int | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "external", ExternalStream) | ||
| cls._convert(resp, "subject_transforms", SubjectTransform) | ||
| cls._convert(resp, 'external', ExternalStream) | ||
| cls._convert(resp, 'subject_transforms', SubjectTransform) | ||
| return super().from_response(resp) | ||
@@ -175,3 +179,3 @@ | ||
| if self.subject_transforms: | ||
| result["subject_transform"] = [ | ||
| result['subject_transform'] = [ | ||
| tr.as_dict() for tr in self.subject_transforms | ||
@@ -210,3 +214,3 @@ ] | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "lost", LostStreamData) | ||
| cls._convert(resp, 'lost', LostStreamData) | ||
| return super().from_response(resp) | ||
@@ -256,3 +260,2 @@ | ||
| """ | ||
| src: Optional[str] = None | ||
@@ -266,3 +269,2 @@ dest: Optional[str] = None | ||
| """Subject transform to apply to matching messages.""" | ||
| src: str | ||
@@ -281,3 +283,2 @@ dest: str | ||
| """ | ||
| name: Optional[str] = None | ||
@@ -291,3 +292,2 @@ description: Optional[str] = None | ||
| discard: Optional[DiscardPolicy] = DiscardPolicy.OLD | ||
| discard_new_per_subject: bool = False | ||
| max_age: Optional[float] = None # in seconds | ||
@@ -327,9 +327,9 @@ max_msgs_per_subject: int = -1 | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert_nanoseconds(resp, "max_age") | ||
| cls._convert_nanoseconds(resp, "duplicate_window") | ||
| cls._convert(resp, "placement", Placement) | ||
| cls._convert(resp, "mirror", StreamSource) | ||
| cls._convert(resp, "sources", StreamSource) | ||
| cls._convert(resp, "republish", RePublish) | ||
| cls._convert(resp, "subject_transform", SubjectTransform) | ||
| cls._convert_nanoseconds(resp, 'max_age') | ||
| cls._convert_nanoseconds(resp, 'duplicate_window') | ||
| cls._convert(resp, 'placement', Placement) | ||
| cls._convert(resp, 'mirror', StreamSource) | ||
| cls._convert(resp, 'sources', StreamSource) | ||
| cls._convert(resp, 'republish', RePublish) | ||
| cls._convert(resp, 'subject_transform', SubjectTransform) | ||
| return super().from_response(resp) | ||
@@ -339,10 +339,9 @@ | ||
| result = super().as_dict() | ||
| result["duplicate_window"] = self._to_nanoseconds( | ||
| result['duplicate_window'] = self._to_nanoseconds( | ||
| self.duplicate_window | ||
| ) | ||
| result["max_age"] = self._to_nanoseconds(self.max_age) | ||
| result['max_age'] = self._to_nanoseconds(self.max_age) | ||
| if self.sources: | ||
| result["sources"] = [src.as_dict() for src in self.sources] | ||
| if self.compression and (self.compression != StoreCompression.NONE | ||
| and self.compression != StoreCompression.S2): | ||
| result['sources'] = [src.as_dict() for src in self.sources] | ||
| if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2): | ||
| raise ValueError( | ||
@@ -373,3 +372,3 @@ "nats: invalid store compression type: %s" % self.compression | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "replicas", PeerInfo) | ||
| cls._convert(resp, 'replicas', PeerInfo) | ||
| return super().from_response(resp) | ||
@@ -383,3 +382,2 @@ | ||
| """ | ||
| config: StreamConfig | ||
@@ -394,7 +392,7 @@ state: StreamState | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "config", StreamConfig) | ||
| cls._convert(resp, "state", StreamState) | ||
| cls._convert(resp, "mirror", StreamSourceInfo) | ||
| cls._convert(resp, "sources", StreamSourceInfo) | ||
| cls._convert(resp, "cluster", ClusterInfo) | ||
| cls._convert(resp, 'config', StreamConfig) | ||
| cls._convert(resp, 'state', StreamState) | ||
| cls._convert(resp, 'mirror', StreamSourceInfo) | ||
| cls._convert(resp, 'sources', StreamSourceInfo) | ||
| cls._convert(resp, 'cluster', ClusterInfo) | ||
| return super().from_response(resp) | ||
@@ -408,6 +406,3 @@ | ||
| """ | ||
| def __init__( | ||
| self, offset: int, total: int, streams: List[Dict[str, any]] | ||
| ) -> None: | ||
| def __init__(self, offset: int, total: int, streams: List[Dict[str, any]]) -> None: | ||
| self.offset = offset | ||
@@ -483,3 +478,2 @@ self.total = total | ||
| """ | ||
| name: Optional[str] = None | ||
@@ -526,7 +520,7 @@ durable_name: Optional[str] = None | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert_nanoseconds(resp, "ack_wait") | ||
| cls._convert_nanoseconds(resp, "idle_heartbeat") | ||
| cls._convert_nanoseconds(resp, "inactive_threshold") | ||
| if "backoff" in resp: | ||
| resp["backoff"] = [val / _NANOSECOND for val in resp["backoff"]] | ||
| cls._convert_nanoseconds(resp, 'ack_wait') | ||
| cls._convert_nanoseconds(resp, 'idle_heartbeat') | ||
| cls._convert_nanoseconds(resp, 'inactive_threshold') | ||
| if 'backoff' in resp: | ||
| resp['backoff'] = [val / _NANOSECOND for val in resp['backoff']] | ||
| return super().from_response(resp) | ||
@@ -536,9 +530,9 @@ | ||
| result = super().as_dict() | ||
| result["ack_wait"] = self._to_nanoseconds(self.ack_wait) | ||
| result["idle_heartbeat"] = self._to_nanoseconds(self.idle_heartbeat) | ||
| result["inactive_threshold"] = self._to_nanoseconds( | ||
| result['ack_wait'] = self._to_nanoseconds(self.ack_wait) | ||
| result['idle_heartbeat'] = self._to_nanoseconds(self.idle_heartbeat) | ||
| result['inactive_threshold'] = self._to_nanoseconds( | ||
| self.inactive_threshold | ||
| ) | ||
| if self.backoff: | ||
| result["backoff"] = [self._to_nanoseconds(i) for i in self.backoff] | ||
| result['backoff'] = [self._to_nanoseconds(i) for i in self.backoff] | ||
| return result | ||
@@ -560,3 +554,2 @@ | ||
| """ | ||
| name: str | ||
@@ -578,6 +571,6 @@ stream_name: str | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "delivered", SequenceInfo) | ||
| cls._convert(resp, "ack_floor", SequenceInfo) | ||
| cls._convert(resp, "config", ConsumerConfig) | ||
| cls._convert(resp, "cluster", ClusterInfo) | ||
| cls._convert(resp, 'delivered', SequenceInfo) | ||
| cls._convert(resp, 'ack_floor', SequenceInfo) | ||
| cls._convert(resp, 'config', ConsumerConfig) | ||
| cls._convert(resp, 'cluster', ClusterInfo) | ||
| return super().from_response(resp) | ||
@@ -614,3 +607,3 @@ | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "limits", AccountLimits) | ||
| cls._convert(resp, 'limits', AccountLimits) | ||
| return super().from_response(resp) | ||
@@ -634,3 +627,2 @@ | ||
| """ | ||
| # NOTE: These fields are shared with Tier type as well. | ||
@@ -649,6 +641,6 @@ memory: int | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "limits", AccountLimits) | ||
| cls._convert(resp, "api", APIStats) | ||
| cls._convert(resp, 'limits', AccountLimits) | ||
| cls._convert(resp, 'api', APIStats) | ||
| info = super().from_response(resp) | ||
| tiers = resp.get("tiers", None) | ||
| tiers = resp.get('tiers', None) | ||
| if tiers: | ||
@@ -689,3 +681,2 @@ result = {} | ||
| """ | ||
| bucket: str | ||
@@ -705,3 +696,3 @@ description: Optional[str] = None | ||
| result = super().as_dict() | ||
| result["ttl"] = self._to_nanoseconds(self.ttl) | ||
| result['ttl'] = self._to_nanoseconds(self.ttl) | ||
| return result | ||
@@ -715,3 +706,2 @@ | ||
| """ | ||
| # Purge up to but not including sequence. | ||
@@ -730,3 +720,2 @@ seq: Optional[int] = None | ||
| """ | ||
| bucket: Optional[str] = None | ||
@@ -742,3 +731,3 @@ description: Optional[str] = None | ||
| result = super().as_dict() | ||
| result["ttl"] = self._to_nanoseconds(self.ttl) | ||
| result['ttl'] = self._to_nanoseconds(self.ttl) | ||
| return result | ||
@@ -752,3 +741,2 @@ | ||
| """ | ||
| # Bucket is the name of the other object store. | ||
@@ -772,3 +760,3 @@ bucket: str | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "link", ObjectLink) | ||
| cls._convert(resp, 'link', ObjectLink) | ||
| return super().from_response(resp) | ||
@@ -782,3 +770,2 @@ | ||
| """ | ||
| name: Optional[str] = None | ||
@@ -792,3 +779,3 @@ description: Optional[str] = None | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "options", ObjectMetaOptions) | ||
| cls._convert(resp, 'options', ObjectMetaOptions) | ||
| return super().from_response(resp) | ||
@@ -802,3 +789,2 @@ | ||
| """ | ||
| name: str | ||
@@ -833,3 +819,3 @@ bucket: str | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| cls._convert(resp, "options", ObjectMetaOptions) | ||
| cls._convert(resp, 'options', ObjectMetaOptions) | ||
| return super().from_response(resp) |
+46
-71
@@ -22,11 +22,3 @@ # Copyright 2021-2022 The NATS Authors | ||
| from secrets import token_hex | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Awaitable, | ||
| Callable, | ||
| Dict, | ||
| List, | ||
| Optional, | ||
| ) | ||
| from typing import TYPE_CHECKING, Awaitable, Callable, Optional, List, Dict | ||
@@ -38,17 +30,8 @@ import nats.errors | ||
| from nats.js import api | ||
| from nats.js.errors import ( | ||
| BadBucketError, | ||
| BucketNotFoundError, | ||
| FetchTimeoutError, | ||
| InvalidBucketNameError, | ||
| NotFoundError, | ||
| ) | ||
| from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError, FetchTimeoutError | ||
| from nats.js.kv import KeyValue | ||
| from nats.js.manager import JetStreamManager | ||
| from nats.js.object_store import ( | ||
| OBJ_ALL_CHUNKS_PRE_TEMPLATE, | ||
| OBJ_ALL_META_PRE_TEMPLATE, | ||
| OBJ_STREAM_TEMPLATE, | ||
| VALID_BUCKET_RE, | ||
| ObjectStore, | ||
| VALID_BUCKET_RE, OBJ_ALL_CHUNKS_PRE_TEMPLATE, OBJ_ALL_META_PRE_TEMPLATE, | ||
| OBJ_STREAM_TEMPLATE, ObjectStore | ||
| ) | ||
@@ -61,9 +44,9 @@ | ||
| NATS_HDR_LINE = bytearray(b"NATS/1.0") | ||
| NATS_HDR_LINE = bytearray(b'NATS/1.0') | ||
| NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE) | ||
| _CRLF_ = b"\r\n" | ||
| _CRLF_ = b'\r\n' | ||
| _CRLF_LEN_ = len(_CRLF_) | ||
| KV_STREAM_TEMPLATE = "KV_{bucket}" | ||
| KV_PRE_TEMPLATE = "$KV.{bucket}." | ||
| Callback = Callable[["Msg"], Awaitable[None]] | ||
| Callback = Callable[['Msg'], Awaitable[None]] | ||
@@ -129,5 +112,3 @@ # For JetStream the default pending limits are larger. | ||
| self._publish_async_pending_semaphore = asyncio.Semaphore( | ||
| publish_async_max_pending | ||
| ) | ||
| self._publish_async_pending_semaphore = asyncio.Semaphore(publish_async_max_pending) | ||
@@ -146,8 +127,8 @@ @property | ||
| self._async_reply_prefix = self._nc._inbox_prefix[:] | ||
| self._async_reply_prefix.extend(b".") | ||
| self._async_reply_prefix.extend(b'.') | ||
| self._async_reply_prefix.extend(self._nc._nuid.next()) | ||
| self._async_reply_prefix.extend(b".") | ||
| self._async_reply_prefix.extend(b'.') | ||
| async_reply_subject = self._async_reply_prefix[:] | ||
| async_reply_subject.extend(b"*") | ||
| async_reply_subject.extend(b'*') | ||
@@ -169,4 +150,3 @@ await self._nc.subscribe( | ||
| # Handle no responders | ||
| if msg.headers and msg.headers.get(api.Header.STATUS | ||
| ) == NO_RESPONDERS_STATUS: | ||
| if msg.headers and msg.headers.get(api.Header.STATUS) == NO_RESPONDERS_STATUS: | ||
| future.set_exception(nats.js.errors.NoStreamResponseError) | ||
@@ -178,4 +158,4 @@ return | ||
| resp = json.loads(msg.data) | ||
| if "error" in resp: | ||
| err = nats.js.errors.APIError.from_error(resp["error"]) | ||
| if 'error' in resp: | ||
| err = nats.js.errors.APIError.from_error(resp['error']) | ||
| future.set_exception(err) | ||
@@ -192,6 +172,6 @@ return | ||
| subject: str, | ||
| payload: bytes = b"", | ||
| payload: bytes = b'', | ||
| timeout: Optional[float] = None, | ||
| stream: Optional[str] = None, | ||
| headers: Optional[Dict[str, Any]] = None, | ||
| headers: Optional[Dict] = None | ||
| ) -> api.PubAck: | ||
@@ -219,4 +199,4 @@ """ | ||
| resp = json.loads(msg.data) | ||
| if "error" in resp: | ||
| raise nats.js.errors.APIError.from_error(resp["error"]) | ||
| if 'error' in resp: | ||
| raise nats.js.errors.APIError.from_error(resp['error']) | ||
| return api.PubAck.from_response(resp) | ||
@@ -227,3 +207,3 @@ | ||
| subject: str, | ||
| payload: bytes = b"", | ||
| payload: bytes = b'', | ||
| wait_stall: Optional[float] = None, | ||
@@ -247,6 +227,3 @@ stream: Optional[str] = None, | ||
| try: | ||
| await asyncio.wait_for( | ||
| self._publish_async_pending_semaphore.acquire(), | ||
| timeout=wait_stall | ||
| ) | ||
| await asyncio.wait_for(self._publish_async_pending_semaphore.acquire(), timeout=wait_stall) | ||
| except (asyncio.TimeoutError, asyncio.CancelledError): | ||
@@ -436,6 +413,5 @@ raise nats.js.errors.TooManyStalledMsgsError | ||
| # Create inbox for push consumer, if deliver_subject is not assigned already. | ||
| if config.deliver_subject is None: | ||
| deliver = self._nc.new_inbox() | ||
| config.deliver_subject = deliver | ||
| # Create inbox for push consumer. | ||
| deliver = self._nc.new_inbox() | ||
| config.deliver_subject = deliver | ||
@@ -492,3 +468,4 @@ # Auto created consumers use the filter subject. | ||
| ) -> PushSubscription: | ||
| """Push-subscribe to an existing consumer.""" | ||
| """Push-subscribe to an existing consumer. | ||
| """ | ||
| # By default, async subscribers wrap the original callback and | ||
@@ -666,3 +643,3 @@ # auto ack the messages as they are delivered. | ||
| pending_msgs_limit=pending_msgs_limit, | ||
| pending_bytes_limit=pending_bytes_limit, | ||
| pending_bytes_limit=pending_bytes_limit | ||
| ) | ||
@@ -704,5 +681,4 @@ consumer_name = None | ||
| def _is_temporary_error(cls, status: Optional[str]) -> bool: | ||
| if (status == api.StatusCode.NO_MESSAGES | ||
| or status == api.StatusCode.CONFLICT | ||
| or status == api.StatusCode.REQUEST_TIMEOUT): | ||
| if status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.CONFLICT \ | ||
| or status == api.StatusCode.REQUEST_TIMEOUT: | ||
| return True | ||
@@ -726,3 +702,3 @@ else: | ||
| class _JSI: | ||
| class _JSI(): | ||
@@ -803,4 +779,3 @@ def __init__( | ||
| if (self._fciseq - | ||
| self._psub._pending_queue.qsize()) >= self._fcd: | ||
| if (self._fciseq - self._psub._pending_queue.qsize()) >= self._fcd: | ||
| fc_reply = self._fcr | ||
@@ -1023,3 +998,3 @@ try: | ||
| prefix = self._js._prefix | ||
| self._nms = f"{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}" | ||
| self._nms = f'{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}' | ||
| self._deliver = deliver.decode() | ||
@@ -1073,3 +1048,3 @@ | ||
| timeout: Optional[float] = 5, | ||
| heartbeat: Optional[float] = None, | ||
| heartbeat: Optional[float] = None | ||
| ) -> List[Msg]: | ||
@@ -1126,3 +1101,3 @@ """ | ||
| timeout: Optional[float], | ||
| heartbeat: Optional[float] = None, | ||
| heartbeat: Optional[float] = None | ||
| ) -> Msg: | ||
@@ -1148,7 +1123,7 @@ queue = self._sub._pending_queue | ||
| next_req = {} | ||
| next_req["batch"] = 1 | ||
| next_req['batch'] = 1 | ||
| if expires: | ||
| next_req["expires"] = int(expires) | ||
| next_req['expires'] = int(expires) | ||
| if heartbeat: | ||
| next_req["idle_heartbeat"] = int( | ||
| next_req['idle_heartbeat'] = int( | ||
| heartbeat * 1_000_000_000 | ||
@@ -1206,3 +1181,3 @@ ) # to nanoseconds | ||
| timeout: Optional[float], | ||
| heartbeat: Optional[float] = None, | ||
| heartbeat: Optional[float] = None | ||
| ) -> List[Msg]: | ||
@@ -1235,10 +1210,10 @@ msgs = [] | ||
| next_req = {} | ||
| next_req["batch"] = needed | ||
| next_req['batch'] = needed | ||
| if expires: | ||
| next_req["expires"] = expires | ||
| next_req['expires'] = expires | ||
| if heartbeat: | ||
| next_req["idle_heartbeat"] = int( | ||
| next_req['idle_heartbeat'] = int( | ||
| heartbeat * 1_000_000_000 | ||
| ) # to nanoseconds | ||
| next_req["no_wait"] = True | ||
| next_req['no_wait'] = True | ||
| await self._nc.publish( | ||
@@ -1279,3 +1254,3 @@ self._nms, | ||
| status = JetStreamContext.is_status_msg(msg) | ||
| if status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.REQUEST_TIMEOUT: | ||
| if status == api.StatusCode.NO_MESSAGES: | ||
| # No more messages after this so fallthrough | ||
@@ -1303,7 +1278,7 @@ # after receiving the rest. | ||
| next_req = {} | ||
| next_req["batch"] = needed | ||
| next_req['batch'] = needed | ||
| if expires: | ||
| next_req["expires"] = expires | ||
| next_req['expires'] = expires | ||
| if heartbeat: | ||
| next_req["idle_heartbeat"] = int( | ||
| next_req['idle_heartbeat'] = int( | ||
| heartbeat * 1_000_000_000 | ||
@@ -1413,3 +1388,3 @@ ) # to nanoseconds | ||
| js=self, | ||
| direct=bool(si.config.allow_direct), | ||
| direct=bool(si.config.allow_direct) | ||
| ) | ||
@@ -1416,0 +1391,0 @@ |
+4
-19
@@ -18,3 +18,3 @@ # Copyright 2016-2024 The NATS Authors | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING, Any, Dict, NoReturn, Optional | ||
| from typing import TYPE_CHECKING, Any, NoReturn, Optional, Dict | ||
@@ -37,3 +37,3 @@ import nats.errors | ||
| def __str__(self) -> str: | ||
| desc = "" | ||
| desc = '' | ||
| if self.description: | ||
@@ -49,3 +49,2 @@ desc = self.description | ||
| """ | ||
| code: Optional[int] | ||
@@ -84,3 +83,3 @@ err_code: Optional[int] | ||
| def from_error(cls, err: Dict[str, Any]): | ||
| code = err["code"] | ||
| code = err['code'] | ||
| if code == 503: | ||
@@ -108,3 +107,2 @@ raise ServiceUnavailableError(**err) | ||
| """ | ||
| pass | ||
@@ -117,3 +115,2 @@ | ||
| """ | ||
| pass | ||
@@ -126,3 +123,2 @@ | ||
| """ | ||
| pass | ||
@@ -135,3 +131,2 @@ | ||
| """ | ||
| pass | ||
@@ -177,3 +172,3 @@ | ||
| consumer_sequence=None, | ||
| last_consumer_sequence=None, | ||
| last_consumer_sequence=None | ||
| ) -> None: | ||
@@ -196,3 +191,2 @@ self.stream_resume_sequence = stream_resume_sequence | ||
| """ | ||
| pass | ||
@@ -209,3 +203,2 @@ | ||
| """ | ||
| pass | ||
@@ -272,3 +265,2 @@ | ||
| """ | ||
| pass | ||
@@ -281,3 +273,2 @@ | ||
| """ | ||
| pass | ||
@@ -290,3 +281,2 @@ | ||
| """ | ||
| pass | ||
@@ -299,3 +289,2 @@ | ||
| """ | ||
| pass | ||
@@ -308,3 +297,2 @@ | ||
| """ | ||
| pass | ||
@@ -317,3 +305,2 @@ | ||
| """ | ||
| pass | ||
@@ -326,3 +313,2 @@ | ||
| """ | ||
| pass | ||
@@ -335,3 +321,2 @@ | ||
| """ | ||
| pass |
+9
-35
@@ -19,3 +19,2 @@ # Copyright 2021-2022 The NATS Authors | ||
| import datetime | ||
| import logging | ||
| from dataclasses import dataclass | ||
@@ -36,5 +35,3 @@ from typing import TYPE_CHECKING, List, Optional | ||
| logger = logging.getLogger(__name__) | ||
| class KeyValue: | ||
@@ -44,2 +41,5 @@ """ | ||
| .. note:: | ||
| This functionality is EXPERIMENTAL and may be changed in later releases. | ||
| :: | ||
@@ -75,3 +75,2 @@ | ||
| """ | ||
| bucket: str | ||
@@ -90,3 +89,2 @@ key: str | ||
| """ | ||
| stream_info: api.StreamInfo | ||
@@ -289,5 +287,3 @@ bucket: str | ||
| subject = f"{self._pre}{entry.key}" | ||
| duration = datetime.datetime.now( | ||
| datetime.timezone.utc | ||
| ) - entry.created | ||
| duration = datetime.datetime.now() - entry.created | ||
| if olderthan > 0 and olderthan > duration.total_seconds(): | ||
@@ -311,4 +307,3 @@ keep = 1 | ||
| self._js = js | ||
| self._updates: asyncio.Queue[KeyValue.Entry | ||
| | None] = asyncio.Queue(maxsize=256) | ||
| self._updates: asyncio.Queue[KeyValue.Entry | None] = asyncio.Queue(maxsize=256) | ||
| self._sub = None | ||
@@ -352,6 +347,5 @@ self._pending: Optional[int] = None | ||
| async def keys(self, filters: List[str] = None, **kwargs) -> List[str]: | ||
| async def keys(self, **kwargs) -> List[str]: | ||
| """ | ||
| Returns a list of the keys from a KeyValue store. | ||
| Optionally filters the keys based on the provided filter list. | ||
| keys will return a list of the keys from a KeyValue store. | ||
| """ | ||
@@ -364,14 +358,2 @@ watcher = await self.watchall( | ||
| # Check consumer info and make sure filters are applied correctly | ||
| try: | ||
| consumer_info = await watcher._sub.consumer_info() | ||
| if consumer_info and filters: | ||
| # If NATS server < 2.10, filters might be ignored. | ||
| if consumer_info.config.filter_subject != ">": | ||
| logger.warning( | ||
| "Server may ignore filters if version is < 2.10." | ||
| ) | ||
| except Exception as e: | ||
| raise e | ||
| async for key in watcher: | ||
@@ -381,11 +363,3 @@ # None entry is used to signal that there is no more info. | ||
| break | ||
| # Apply filters if any were provided | ||
| if filters: | ||
| if any(f in key.key for f in filters): | ||
| keys.append(key.key) | ||
| else: | ||
| # No filters provided, append all keys | ||
| keys.append(key.key) | ||
| keys.append(key.key) | ||
| await watcher.stop() | ||
@@ -449,3 +423,3 @@ | ||
| if ignore_deletes: | ||
| if op == KV_PURGE or op == KV_DEL: | ||
| if (op == KV_PURGE or op == KV_DEL): | ||
| if meta.num_pending == 0 and not watcher._init_done: | ||
@@ -452,0 +426,0 @@ await watcher._updates.put(None) |
+48
-75
@@ -20,3 +20,3 @@ # Copyright 2021 The NATS Authors | ||
| from email.parser import BytesParser | ||
| from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional | ||
| from typing import TYPE_CHECKING, Any, List, Optional, Dict, Iterable | ||
@@ -30,5 +30,5 @@ from nats.errors import NoRespondersError | ||
| NATS_HDR_LINE = bytearray(b"NATS/1.0") | ||
| NATS_HDR_LINE = bytearray(b'NATS/1.0') | ||
| NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE) | ||
| _CRLF_ = b"\r\n" | ||
| _CRLF_ = b'\r\n' | ||
| _CRLF_LEN_ = len(_CRLF_) | ||
@@ -55,3 +55,3 @@ | ||
| resp = await self._api_request( | ||
| f"{self._prefix}.INFO", b"", timeout=self._timeout | ||
| f"{self._prefix}.INFO", b'', timeout=self._timeout | ||
| ) | ||
@@ -70,21 +70,15 @@ return api.AccountInfo.from_response(resp) | ||
| ) | ||
| if not info["streams"]: | ||
| if not info['streams']: | ||
| raise NotFoundError | ||
| return info["streams"][0] | ||
| return info['streams'][0] | ||
| async def stream_info( | ||
| self, | ||
| name: str, | ||
| subjects_filter: Optional[str] = None | ||
| ) -> api.StreamInfo: | ||
| async def stream_info(self, name: str, subjects_filter: Optional[str] = None) -> api.StreamInfo: | ||
| """ | ||
| Get the latest StreamInfo by stream name. | ||
| """ | ||
| req_data = "" | ||
| req_data = '' | ||
| if subjects_filter: | ||
| req_data = json.dumps({"subjects_filter": subjects_filter}) | ||
| resp = await self._api_request( | ||
| f"{self._prefix}.STREAM.INFO.{name}", | ||
| req_data.encode(), | ||
| timeout=self._timeout, | ||
| f"{self._prefix}.STREAM.INFO.{name}", req_data.encode(), timeout=self._timeout | ||
| ) | ||
@@ -104,22 +98,8 @@ return api.StreamInfo.from_response(resp) | ||
| config = config.evolve(**params) | ||
| stream_name = config.name | ||
| if stream_name is None: | ||
| if config.name is None: | ||
| raise ValueError("nats: stream name is required") | ||
| # Validate stream name | ||
| invalid_chars = set(".*>/\\") | ||
| has_invalid_chars = any(char in stream_name for char in invalid_chars) | ||
| has_whitespace = any(char.isspace() for char in stream_name) | ||
| is_not_printable = not stream_name.isprintable() | ||
| if has_invalid_chars or has_whitespace or is_not_printable: | ||
| raise ValueError( | ||
| f"nats: stream name ({stream_name}) is invalid. Names cannot contain whitespace, '.', '*', '>', " | ||
| "path separators (forward or backward slash), or non-printable characters." | ||
| ) | ||
| data = json.dumps(config.as_dict()) | ||
| resp = await self._api_request( | ||
| f"{self._prefix}.STREAM.CREATE.{stream_name}", | ||
| f"{self._prefix}.STREAM.CREATE.{config.name}", | ||
| data.encode(), | ||
@@ -159,3 +139,3 @@ timeout=self._timeout, | ||
| ) | ||
| return resp["success"] | ||
| return resp['success'] | ||
@@ -167,3 +147,3 @@ async def purge_stream( | ||
| subject: Optional[str] = None, | ||
| keep: Optional[int] = None, | ||
| keep: Optional[int] = None | ||
| ) -> bool: | ||
@@ -175,7 +155,7 @@ """ | ||
| if seq: | ||
| stream_req["seq"] = seq | ||
| stream_req['seq'] = seq | ||
| if subject: | ||
| stream_req["filter"] = subject | ||
| stream_req['filter'] = subject | ||
| if keep: | ||
| stream_req["keep"] = keep | ||
| stream_req['keep'] = keep | ||
@@ -188,3 +168,3 @@ req = json.dumps(stream_req) | ||
| ) | ||
| return resp["success"] | ||
| return resp['success'] | ||
@@ -199,3 +179,3 @@ async def consumer_info( | ||
| f"{self._prefix}.CONSUMER.INFO.{stream}.{consumer}", | ||
| b"", | ||
| b'', | ||
| timeout=timeout | ||
@@ -211,9 +191,7 @@ ) | ||
| f"{self._prefix}.STREAM.LIST", | ||
| json.dumps({ | ||
| "offset": offset | ||
| }).encode(), | ||
| json.dumps({"offset": offset}).encode(), | ||
| timeout=self._timeout, | ||
| ) | ||
| streams = [] | ||
| for stream in resp["streams"]: | ||
| for stream in resp['streams']: | ||
| stream_info = api.StreamInfo.from_response(stream) | ||
@@ -223,4 +201,3 @@ streams.append(stream_info) | ||
| async def streams_info_iterator(self, | ||
| offset=0) -> Iterable[api.StreamInfo]: | ||
| async def streams_info_iterator(self, offset=0) -> Iterable[api.StreamInfo]: | ||
| """ | ||
@@ -231,11 +208,7 @@ streams_info retrieves a list of streams Iterator. | ||
| f"{self._prefix}.STREAM.LIST", | ||
| json.dumps({ | ||
| "offset": offset | ||
| }).encode(), | ||
| json.dumps({"offset": offset}).encode(), | ||
| timeout=self._timeout, | ||
| ) | ||
| return api.StreamsListIterator( | ||
| resp["offset"], resp["total"], resp["streams"] | ||
| ) | ||
| return api.StreamsListIterator(resp["offset"], resp["total"], resp["streams"]) | ||
@@ -259,3 +232,3 @@ async def add_consumer( | ||
| resp = None | ||
| subject = "" | ||
| subject = '' | ||
| version = self._nc.connected_server_version | ||
@@ -282,6 +255,6 @@ consumer_name_supported = version.major >= 2 and version.minor >= 9 | ||
| f"{self._prefix}.CONSUMER.DELETE.{stream}.{consumer}", | ||
| b"", | ||
| timeout=self._timeout, | ||
| b'', | ||
| timeout=self._timeout | ||
| ) | ||
| return resp["success"] | ||
| return resp['success'] | ||
@@ -301,3 +274,3 @@ async def consumers_info( | ||
| f"{self._prefix}.CONSUMER.LIST.{stream}", | ||
| b"" if offset is None else json.dumps({ | ||
| b'' if offset is None else json.dumps({ | ||
| "offset": offset | ||
@@ -308,3 +281,3 @@ }).encode(), | ||
| consumers = [] | ||
| for consumer in resp["consumers"]: | ||
| for consumer in resp['consumers']: | ||
| consumer_info = api.ConsumerInfo.from_response(consumer) | ||
@@ -328,12 +301,12 @@ consumers.append(consumer_info) | ||
| if seq: | ||
| req["seq"] = seq | ||
| req['seq'] = seq | ||
| if subject: | ||
| req["seq"] = None | ||
| req.pop("seq", None) | ||
| req["last_by_subj"] = subject | ||
| req['seq'] = None | ||
| req.pop('seq', None) | ||
| req['last_by_subj'] = subject | ||
| if next: | ||
| req["seq"] = seq | ||
| req["last_by_subj"] = None | ||
| req.pop("last_by_subj", None) | ||
| req["next_by_subj"] = subject | ||
| req['seq'] = seq | ||
| req['last_by_subj'] = None | ||
| req.pop('last_by_subj', None) | ||
| req['next_by_subj'] = subject | ||
| data = json.dumps(req) | ||
@@ -345,3 +318,3 @@ | ||
| # last_by_subject type request requires no payload. | ||
| data = "" | ||
| data = '' | ||
| req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}" | ||
@@ -363,3 +336,3 @@ else: | ||
| raw_msg = api.RawStreamMsg.from_response(resp_data["message"]) | ||
| raw_msg = api.RawStreamMsg.from_response(resp_data['message']) | ||
| if raw_msg.hdrs: | ||
@@ -387,5 +360,5 @@ hdrs = base64.b64decode(raw_msg.hdrs) | ||
| msg.data = None | ||
| status = msg.headers.get("Status") | ||
| status = msg.headers.get('Status') | ||
| if status: | ||
| if status == "404": | ||
| if status == '404': | ||
| raise NotFoundError | ||
@@ -396,6 +369,6 @@ else: | ||
| raw_msg = api.RawStreamMsg() | ||
| subject = msg.headers["Nats-Subject"] | ||
| subject = msg.headers['Nats-Subject'] | ||
| raw_msg.subject = subject | ||
| seq = msg.headers.get("Nats-Sequence") | ||
| seq = msg.headers.get('Nats-Sequence') | ||
| if seq: | ||
@@ -413,6 +386,6 @@ raw_msg.seq = int(seq) | ||
| req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}" | ||
| req = {"seq": seq} | ||
| req = {'seq': seq} | ||
| data = json.dumps(req) | ||
| resp = await self._api_request(req_subject, data.encode()) | ||
| return resp["success"] | ||
| return resp['success'] | ||
@@ -433,3 +406,3 @@ async def get_last_msg( | ||
| req_subject: str, | ||
| req: bytes = b"", | ||
| req: bytes = b'', | ||
| timeout: float = 5, | ||
@@ -444,5 +417,5 @@ ) -> Dict[str, Any]: | ||
| # Check for API errors. | ||
| if "error" in resp: | ||
| raise APIError.from_error(resp["error"]) | ||
| if 'error' in resp: | ||
| raise APIError.from_error(resp['error']) | ||
| return resp |
+23
-26
@@ -15,11 +15,11 @@ # Copyright 2021-2023 The NATS Authors | ||
| import asyncio | ||
| import base64 | ||
| import re | ||
| import io | ||
| import json | ||
| import re | ||
| from dataclasses import dataclass | ||
| from datetime import datetime, timezone | ||
| from hashlib import sha256 | ||
| from typing import TYPE_CHECKING, List, Optional, Union | ||
| from dataclasses import dataclass | ||
| import json | ||
| import asyncio | ||
| from typing import TYPE_CHECKING, Optional, Union, List | ||
@@ -29,9 +29,4 @@ import nats.errors | ||
| from nats.js.errors import ( | ||
| BadObjectMetaError, | ||
| DigestMismatchError, | ||
| LinkIsABucketError, | ||
| NotFoundError, | ||
| ObjectAlreadyExists, | ||
| ObjectDeletedError, | ||
| ObjectNotFoundError, | ||
| BadObjectMetaError, DigestMismatchError, ObjectAlreadyExists, | ||
| ObjectDeletedError, ObjectNotFoundError, NotFoundError, LinkIsABucketError | ||
| ) | ||
@@ -61,2 +56,5 @@ from nats.js.kv import MSG_ROLLUP_SUBJECT | ||
| .. note:: | ||
| This functionality is EXPERIMENTAL and may be changed in later releases. | ||
| :: | ||
@@ -79,3 +77,2 @@ """ | ||
| """ | ||
| stream_info: api.StreamInfo | ||
@@ -150,3 +147,3 @@ bucket: str | ||
| bucket=self._name, | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(), | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode() | ||
| ) | ||
@@ -220,3 +217,3 @@ stream = OBJ_STREAM_TEMPLATE.format(bucket=self._name) | ||
| executor = asyncio.get_running_loop().run_in_executor | ||
| if hasattr(writeinto, "buffer"): | ||
| if hasattr(writeinto, 'buffer'): | ||
| executor_fn = writeinto.buffer.write | ||
@@ -293,6 +290,6 @@ else: | ||
| data = io.BytesIO(data) | ||
| elif hasattr(data, "readinto") or isinstance(data, io.BufferedIOBase): | ||
| elif hasattr(data, 'readinto') or isinstance(data, io.BufferedIOBase): | ||
| # Need to delegate to a threaded executor to avoid blocking. | ||
| executor = asyncio.get_running_loop().run_in_executor | ||
| elif hasattr(data, "buffer") or isinstance(data, io.TextIOWrapper): | ||
| elif hasattr(data, 'buffer') or isinstance(data, io.TextIOWrapper): | ||
| data = data.buffer | ||
@@ -311,3 +308,3 @@ else: | ||
| chunks=0, | ||
| mtime=datetime.now(timezone.utc).isoformat(), | ||
| mtime=datetime.now(timezone.utc).isoformat() | ||
| ) | ||
@@ -348,3 +345,3 @@ h = sha256() | ||
| bucket=self._name, | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(), | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode() | ||
| ) | ||
@@ -356,3 +353,3 @@ # Publish the meta message. | ||
| json.dumps(info.as_dict()).encode(), | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT} | ||
| ) | ||
@@ -419,3 +416,3 @@ except Exception as err: | ||
| bucket=self._name, | ||
| obj=base64.urlsafe_b64encode(bytes(name, "utf-8")).decode(), | ||
| obj=base64.urlsafe_b64encode(bytes(name, "utf-8")).decode() | ||
| ) | ||
@@ -427,3 +424,3 @@ # Publish the meta message. | ||
| json.dumps(info.as_dict()).encode(), | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT} | ||
| ) | ||
@@ -541,4 +538,4 @@ except Exception as err: | ||
| info.chunks = 0 | ||
| info.digest = "" | ||
| info.mtime = "" | ||
| info.digest = '' | ||
| info.mtime = '' | ||
@@ -548,3 +545,3 @@ # Prepare the meta message. | ||
| bucket=self._name, | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(), | ||
| obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode() | ||
| ) | ||
@@ -556,3 +553,3 @@ # Publish the meta message. | ||
| json.dumps(info.as_dict()).encode(), | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, | ||
| headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT} | ||
| ) | ||
@@ -559,0 +556,0 @@ finally: |
@@ -16,8 +16,7 @@ # Copyright 2021-2022 The NATS Authors | ||
| from dataclasses import replace | ||
| from nats.aio.client import Client | ||
| from typing import Optional | ||
| from nats.aio.client import Client | ||
| from .request import Handler, Request | ||
| from .service import Service, ServiceConfig | ||
| from .request import Request, Handler | ||
@@ -24,0 +23,0 @@ |
+10
-11
@@ -18,3 +18,4 @@ # Copyright 2021-2024 The NATS Authors | ||
| from dataclasses import dataclass | ||
| from typing import Any, Awaitable, Callable, Dict, Optional | ||
| from enum import Enum | ||
| from typing import Any, Dict, Awaitable, Callable, Optional | ||
@@ -50,5 +51,3 @@ from nats.aio.msg import Msg | ||
| async def respond( | ||
| self, | ||
| data: bytes = b"", | ||
| headers: Optional[Dict[str, str]] = None | ||
| self, data: bytes = b"", headers: Optional[Dict[str, str]] = None | ||
| ) -> None: | ||
@@ -88,6 +87,8 @@ """Send a response to the request. | ||
| headers.update({ | ||
| ERROR_HEADER: description, | ||
| ERROR_CODE_HEADER: code, | ||
| }) | ||
| headers.update( | ||
| { | ||
| ERROR_HEADER: description, | ||
| ERROR_CODE_HEADER: code, | ||
| } | ||
| ) | ||
@@ -114,4 +115,2 @@ await self.respond(data, headers=headers) | ||
| def from_dict(cls, data: Dict[str, Any]) -> ServiceError: | ||
| return cls( | ||
| code=data.get("code", ""), description=data.get("description", "") | ||
| ) | ||
| return cls(code=data.get("code", ""), description=data.get("description", "")) |
+46
-85
| from __future__ import annotations | ||
| import json | ||
| import re | ||
| import time | ||
| from asyncio import Event | ||
| from dataclasses import dataclass, field, replace | ||
| from dataclasses import dataclass, replace, field | ||
| from datetime import datetime, timedelta | ||
| from enum import Enum | ||
| from nats.aio.client import Client | ||
| from nats.aio.msg import Msg | ||
| from nats.aio.subscription import Subscription | ||
| from typing import ( | ||
| Any, | ||
| AsyncContextManager, | ||
| Callable, | ||
| Optional, | ||
| Protocol, | ||
| Dict, | ||
| List, | ||
| Optional, | ||
| Protocol, | ||
| overload, | ||
| Callable, | ||
| ) | ||
| from nats.aio.client import Client | ||
| from nats.aio.msg import Msg | ||
| from nats.aio.subscription import Subscription | ||
| import re | ||
| import json | ||
| import time | ||
| from .request import Handler, Request, ServiceError | ||
| from .request import Request, Handler, ServiceError | ||
| DEFAULT_QUEUE_GROUP = "q" | ||
@@ -275,5 +277,3 @@ """Queue Group name used across all services.""" | ||
| self._processing_time += elapsed_time | ||
| self._average_processing_time = int( | ||
| self._processing_time / self._num_requests | ||
| ) | ||
| self._average_processing_time = int(self._processing_time / self._num_requests) | ||
@@ -298,4 +298,3 @@ | ||
| @overload | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: | ||
| ... | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: ... | ||
@@ -311,9 +310,7 @@ @overload | ||
| metadata: Optional[Dict[str, str]] = None, | ||
| ) -> None: | ||
| ... | ||
| ) -> None: ... | ||
| async def add_endpoint( | ||
| self, config: Optional[EndpointConfig] = None, **kwargs | ||
| ) -> None: | ||
| ... | ||
| ) -> None: ... | ||
@@ -327,19 +324,11 @@ | ||
| @overload | ||
| def add_group( | ||
| self, *, name: str, queue_group: Optional[str] = None | ||
| ) -> Group: | ||
| ... | ||
| def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ... | ||
| @overload | ||
| def add_group(self, config: GroupConfig) -> Group: | ||
| ... | ||
| def add_group(self, config: GroupConfig) -> Group: ... | ||
| def add_group( | ||
| self, config: Optional[GroupConfig] = None, **kwargs | ||
| ) -> Group: | ||
| ... | ||
| def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group: ... | ||
| class Group(GroupManager, EndpointManager): | ||
| def __init__(self, service: "Service", config: GroupConfig) -> None: | ||
@@ -351,4 +340,3 @@ self._service = service | ||
| @overload | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: | ||
| ... | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: ... | ||
@@ -364,4 +352,3 @@ @overload | ||
| metadata: Optional[Dict[str, str]] = None, | ||
| ) -> None: | ||
| ... | ||
| ) -> None: ... | ||
@@ -378,4 +365,5 @@ async def add_endpoint( | ||
| config, | ||
| subject=f"{self._prefix.strip('.')}.{config.subject or config.name}" | ||
| .strip("."), | ||
| subject=f"{self._prefix.strip('.')}.{config.subject or config.name}".strip( | ||
| "." | ||
| ), | ||
| queue_group=config.queue_group or self._queue_group, | ||
@@ -387,14 +375,8 @@ ) | ||
| @overload | ||
| def add_group( | ||
| self, *, name: str, queue_group: Optional[str] = None | ||
| ) -> Group: | ||
| ... | ||
| def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ... | ||
| @overload | ||
| def add_group(self, config: GroupConfig) -> Group: | ||
| ... | ||
| def add_group(self, config: GroupConfig) -> Group: ... | ||
| def add_group( | ||
| self, config: Optional[GroupConfig] = None, **kwargs | ||
| ) -> Group: | ||
| def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group: | ||
| if config: | ||
@@ -553,8 +535,5 @@ config = replace(config, **kwargs) | ||
| version=data["version"], | ||
| started=datetime.strptime( | ||
| data["started"], "%Y-%m-%dT%H:%M:%S.%fZ" | ||
| ), | ||
| started=datetime.strptime(data["started"], "%Y-%m-%dT%H:%M:%S.%fZ"), | ||
| endpoints=[ | ||
| EndpointStats.from_dict(endpoint) | ||
| for endpoint in data["endpoints"] | ||
| EndpointStats.from_dict(endpoint) for endpoint in data["endpoints"] | ||
| ], | ||
@@ -572,4 +551,3 @@ metadata=data["metadata"], | ||
| "version": self.version, | ||
| "started": self.started.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + | ||
| "Z", | ||
| "started": self.started.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", | ||
| "endpoints": [endpoint.to_dict() for endpoint in self.endpoints], | ||
@@ -634,4 +612,3 @@ "metadata": self.metadata, | ||
| endpoints=[ | ||
| EndpointInfo.from_dict(endpoint) | ||
| for endpoint in data["endpoints"] | ||
| EndpointInfo.from_dict(endpoint) for endpoint in data["endpoints"] | ||
| ], | ||
@@ -659,3 +636,2 @@ metadata=data["metadata"], | ||
| class Service(AsyncContextManager): | ||
| def __init__(self, client: Client, config: ServiceConfig) -> None: | ||
@@ -698,5 +674,3 @@ self._id = client._nuid.next().decode() | ||
| f"{verb}-all", | ||
| control_subject( | ||
| verb, name=None, id=None, prefix=self._prefix | ||
| ), | ||
| control_subject(verb, name=None, id=None, prefix=self._prefix), | ||
| ), | ||
@@ -712,6 +686,3 @@ ( | ||
| control_subject( | ||
| verb, | ||
| name=self._name, | ||
| id=self._id, | ||
| prefix=self._prefix | ||
| verb, name=self._name, id=self._id, prefix=self._prefix | ||
| ), | ||
@@ -726,8 +697,7 @@ ), | ||
| self._started = datetime.utcnow() | ||
| self._started = datetime.now() | ||
| await self._client.flush() | ||
| @overload | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: | ||
| ... | ||
| async def add_endpoint(self, config: EndpointConfig) -> None: ... | ||
@@ -743,4 +713,3 @@ @overload | ||
| metadata: Optional[Dict[str, str]] = None, | ||
| ) -> None: | ||
| ... | ||
| ) -> None: ... | ||
@@ -755,5 +724,3 @@ async def add_endpoint( | ||
| config = replace( | ||
| config, queue_group=config.queue_group or self._queue_group | ||
| ) | ||
| config = replace(config, queue_group=config.queue_group or self._queue_group) | ||
@@ -765,14 +732,8 @@ endpoint = Endpoint(self, config) | ||
| @overload | ||
| def add_group( | ||
| self, *, name: str, queue_group: Optional[str] = None | ||
| ) -> Group: | ||
| ... | ||
| def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ... | ||
| @overload | ||
| def add_group(self, config: GroupConfig) -> Group: | ||
| ... | ||
| def add_group(self, config: GroupConfig) -> Group: ... | ||
| def add_group( | ||
| self, config: Optional[GroupConfig] = None, **kwargs | ||
| ) -> Group: | ||
| def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group: | ||
| if config: | ||
@@ -783,5 +744,3 @@ config = replace(config, **kwargs) | ||
| config = replace( | ||
| config, queue_group=config.queue_group or self._queue_group | ||
| ) | ||
| config = replace(config, queue_group=config.queue_group or self._queue_group) | ||
@@ -809,3 +768,4 @@ return Group(self, config) | ||
| average_processing_time=endpoint._average_processing_time, | ||
| ) for endpoint in (self._endpoints or []) | ||
| ) | ||
| for endpoint in (self._endpoints or []) | ||
| ], | ||
@@ -834,3 +794,4 @@ started=self._started, | ||
| metadata=endpoint._metadata, | ||
| ) for endpoint in self._endpoints | ||
| ) | ||
| for endpoint in self._endpoints | ||
| ], | ||
@@ -837,0 +798,0 @@ ) |
+1
-1
@@ -21,3 +21,3 @@ # Copyright 2016-2018 The NATS Authors | ||
| DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" | ||
| DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' | ||
| BASE = 62 | ||
@@ -24,0 +24,0 @@ PREFIX_LENGTH = 12 |
+12
-16
@@ -5,7 +5,7 @@ from __future__ import annotations | ||
| PUB_OP = "PUB" | ||
| HPUB_OP = "HPUB" | ||
| SUB_OP = "SUB" | ||
| UNSUB_OP = "UNSUB" | ||
| _CRLF_ = "\r\n" | ||
| PUB_OP = 'PUB' | ||
| HPUB_OP = 'HPUB' | ||
| SUB_OP = 'SUB' | ||
| UNSUB_OP = 'UNSUB' | ||
| _CRLF_ = '\r\n' | ||
@@ -16,6 +16,4 @@ Command = Callable[..., bytes] | ||
| def pub_cmd(subject, reply, payload) -> bytes: | ||
| return ( | ||
| f"{PUB_OP} {subject} {reply} {len(payload)}{_CRLF_}".encode() + | ||
| payload + _CRLF_.encode() | ||
| ) | ||
| return f'{PUB_OP} {subject} {reply} {len(payload)}{_CRLF_}'.encode( | ||
| ) + payload + _CRLF_.encode() | ||
@@ -26,14 +24,12 @@ | ||
| total_size = len(payload) + hdr_len | ||
| return ( | ||
| f"{HPUB_OP} {subject} {reply} {hdr_len} {total_size}{_CRLF_}".encode() | ||
| + hdr + payload + _CRLF_.encode() | ||
| ) | ||
| return f'{HPUB_OP} {subject} {reply} {hdr_len} {total_size}{_CRLF_}'.encode( | ||
| ) + hdr + payload + _CRLF_.encode() | ||
| def sub_cmd(subject, queue, sid) -> bytes: | ||
| return f"{SUB_OP} {subject} {queue} {sid}{_CRLF_}".encode() | ||
| return f'{SUB_OP} {subject} {queue} {sid}{_CRLF_}'.encode() | ||
| def unsub_cmd(sid, limit) -> bytes: | ||
| limit_s = "" if limit == 0 else f"{limit}" | ||
| return f"{UNSUB_OP} {sid} {limit_s}{_CRLF_}".encode() | ||
| limit_s = '' if limit == 0 else f'{limit}' | ||
| return f'{UNSUB_OP} {sid} {limit_s}{_CRLF_}'.encode() |
+26
-27
@@ -27,27 +27,27 @@ # Copyright 2016-2023 The NATS Authors | ||
| MSG_RE = re.compile( | ||
| b"\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n" | ||
| b'\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n' | ||
| ) | ||
| HMSG_RE = re.compile( | ||
| b"\\AHMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?([\\d]+)\\s+(\\d+)\r\n" | ||
| b'\\AHMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?([\\d]+)\\s+(\\d+)\r\n' | ||
| ) | ||
| OK_RE = re.compile(b"\\A\\+OK\\s*\r\n") | ||
| ERR_RE = re.compile(b"\\A-ERR\\s+('.+')?\r\n") | ||
| PING_RE = re.compile(b"\\APING\\s*\r\n") | ||
| PONG_RE = re.compile(b"\\APONG\\s*\r\n") | ||
| INFO_RE = re.compile(b"\\AINFO\\s+([^\r\n]+)\r\n") | ||
| OK_RE = re.compile(b'\\A\\+OK\\s*\r\n') | ||
| ERR_RE = re.compile(b'\\A-ERR\\s+(\'.+\')?\r\n') | ||
| PING_RE = re.compile(b'\\APING\\s*\r\n') | ||
| PONG_RE = re.compile(b'\\APONG\\s*\r\n') | ||
| INFO_RE = re.compile(b'\\AINFO\\s+([^\r\n]+)\r\n') | ||
| INFO_OP = b"INFO" | ||
| CONNECT_OP = b"CONNECT" | ||
| PUB_OP = b"PUB" | ||
| MSG_OP = b"MSG" | ||
| HMSG_OP = b"HMSG" | ||
| SUB_OP = b"SUB" | ||
| UNSUB_OP = b"UNSUB" | ||
| PING_OP = b"PING" | ||
| PONG_OP = b"PONG" | ||
| OK_OP = b"+OK" | ||
| ERR_OP = b"-ERR" | ||
| MSG_END = b"\n" | ||
| _CRLF_ = b"\r\n" | ||
| _SPC_ = b" " | ||
| INFO_OP = b'INFO' | ||
| CONNECT_OP = b'CONNECT' | ||
| PUB_OP = b'PUB' | ||
| MSG_OP = b'MSG' | ||
| HMSG_OP = b'HMSG' | ||
| SUB_OP = b'SUB' | ||
| UNSUB_OP = b'UNSUB' | ||
| PING_OP = b'PING' | ||
| PONG_OP = b'PONG' | ||
| OK_OP = b'+OK' | ||
| ERR_OP = b'-ERR' | ||
| MSG_END = b'\n' | ||
| _CRLF_ = b'\r\n' | ||
| _SPC_ = b' ' | ||
@@ -91,3 +91,3 @@ OK = OK_OP + _CRLF_ | ||
| async def parse(self, data: bytes = b""): | ||
| async def parse(self, data: bytes = b''): | ||
| """ | ||
@@ -109,3 +109,3 @@ Parses the wire protocol from NATS for the client | ||
| else: | ||
| self.msg_arg["reply"] = b"" | ||
| self.msg_arg["reply"] = b'' | ||
| self.needed = int(needed_bytes) | ||
@@ -128,3 +128,3 @@ del self.buf[:msg.end()] | ||
| else: | ||
| self.msg_arg["reply"] = b"" | ||
| self.msg_arg["reply"] = b'' | ||
| self.needed = int(needed_bytes) | ||
@@ -168,8 +168,7 @@ self.header_needed = int(header_size) | ||
| srv_info = json.loads(info_line.decode()) | ||
| await self.nc._process_info(srv_info) | ||
| self.nc._process_info(srv_info) | ||
| del self.buf[:info.end()] | ||
| continue | ||
| if len(self.buf | ||
| ) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf: | ||
| if len(self.buf) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf: | ||
| # FIXME: By default server uses a max protocol | ||
@@ -176,0 +175,0 @@ # line of 4096 bytes but it can be tuned in latest |
+2
-24
| Metadata-Version: 2.4 | ||
| Name: nats-py | ||
| Version: 2.10.0 | ||
| Version: 2.9.1 | ||
| Summary: NATS client for Python | ||
@@ -37,2 +37,3 @@ Author-email: Waldemar Quevedo <wally@synadia.com> | ||
| [](https://pypi.org/project/nats-py) | ||
| [](http://travis-ci.com/nats-io/nats.py) | ||
| [](https://pypi.org/project/nats-py) | ||
@@ -258,25 +259,2 @@ [](https://www.apache.org/licenses/LICENSE-2.0) | ||
| ## Updating Docs | ||
| To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo | ||
| as follows: | ||
| ```sh | ||
| git clone https://github.com/nats-io/nats.py | ||
| cd nats.py | ||
| git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs | ||
| cd docs | ||
| pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments | ||
| pipenv shell | ||
| make html | ||
| # preview the changes: | ||
| make serve | ||
| ``` | ||
| If you are happy with the changes, make a PR on the docs branch: | ||
| ``` | ||
| make publish | ||
| git add docs | ||
| ``` | ||
| ## License | ||
@@ -283,0 +261,0 @@ |
+6
-0
@@ -64,1 +64,7 @@ [build-system] | ||
| split_before_expression_after_opening_paren = true | ||
| [tool.isort] | ||
| combine_as_imports = true | ||
| multi_line_output = 3 | ||
| include_trailing_comma = true | ||
| src_paths = ["nats", "tests"] |
+1
-23
@@ -7,2 +7,3 @@ # NATS - Python3 Client for Asyncio | ||
| [](https://pypi.org/project/nats-py) | ||
| [](http://travis-ci.com/nats-io/nats.py) | ||
| [](https://pypi.org/project/nats-py) | ||
@@ -228,25 +229,2 @@ [](https://www.apache.org/licenses/LICENSE-2.0) | ||
| ## Updating Docs | ||
| To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo | ||
| as follows: | ||
| ```sh | ||
| git clone https://github.com/nats-io/nats.py | ||
| cd nats.py | ||
| git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs | ||
| cd docs | ||
| pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments | ||
| pipenv shell | ||
| make html | ||
| # preview the changes: | ||
| make serve | ||
| ``` | ||
| If you are happy with the changes, make a PR on the docs branch: | ||
| ``` | ||
| make publish | ||
| git add docs | ||
| ``` | ||
| ## License | ||
@@ -253,0 +231,0 @@ |
+0
-1
| [flake8] | ||
| ignore = W503, W504 | ||
| max-line-length = 120 | ||
@@ -4,0 +3,0 @@ |
+7
-7
@@ -7,12 +7,12 @@ from setuptools import setup | ||
| name="nats-py", | ||
| version="2.10.0", | ||
| license="Apache 2 License", | ||
| version='2.9.1', | ||
| license='Apache 2 License', | ||
| extras_require={ | ||
| "nkeys": ["nkeys"], | ||
| "aiohttp": ["aiohttp"], | ||
| "fast_parse": ["fast-mail-parser"], | ||
| 'nkeys': ['nkeys'], | ||
| 'aiohttp': ['aiohttp'], | ||
| 'fast_parse': ['fast-mail-parser'] | ||
| }, | ||
| packages=["nats", "nats.aio", "nats.micro", "nats.protocol", "nats.js"], | ||
| packages=['nats', 'nats.aio', 'nats.micro', 'nats.protocol', 'nats.js'], | ||
| package_data={"nats": ["py.typed"]}, | ||
| zip_safe=True, | ||
| zip_safe=True | ||
| ) |
| import asyncio | ||
| import sys | ||
| import unittest | ||
| from nats.aio.client import Client as NATS | ||
| from nats.aio.errors import ErrTimeout | ||
| from nats.errors import SlowConsumerError, TimeoutError | ||
@@ -27,3 +30,3 @@ from tests.utils import SingleServerTestCase, async_test | ||
| for i in range(0, 5): | ||
| await nc.publish(f"tests.{i}", b"bar") | ||
| await nc.publish(f"tests.{i}", b'bar') | ||
@@ -61,30 +64,30 @@ # Wait a bit for messages to be received. | ||
| if msg.reply != "": | ||
| await nc.publish(msg.reply, b"") | ||
| await nc.publish(msg.reply, b'') | ||
| await nc.subscribe("bar", cb=handler_bar) | ||
| await nc.publish("foo", b"1") | ||
| await nc.publish("foo", b"2") | ||
| await nc.publish("foo", b"3") | ||
| await nc.publish("foo", b'1') | ||
| await nc.publish("foo", b'2') | ||
| await nc.publish("foo", b'3') | ||
| # Will be processed before the others since no head of line | ||
| # blocking among the subscriptions. | ||
| await nc.publish("bar", b"4") | ||
| await nc.publish("bar", b'4') | ||
| response = await nc.request("foo", b"hello1", timeout=1) | ||
| self.assertEqual(response.data, b"hello1hello1") | ||
| response = await nc.request("foo", b'hello1', timeout=1) | ||
| self.assertEqual(response.data, b'hello1hello1') | ||
| with self.assertRaises(TimeoutError): | ||
| await nc.request("foo", b"hello2", timeout=0.1) | ||
| await nc.request("foo", b'hello2', timeout=0.1) | ||
| await nc.publish("bar", b"5") | ||
| response = await nc.request("foo", b"hello2", timeout=1) | ||
| self.assertEqual(response.data, b"hello2hello2") | ||
| await nc.publish("bar", b'5') | ||
| response = await nc.request("foo", b'hello2', timeout=1) | ||
| self.assertEqual(response.data, b'hello2hello2') | ||
| self.assertEqual(msgs[0].data, b"1") | ||
| self.assertEqual(msgs[1].data, b"4") | ||
| self.assertEqual(msgs[2].data, b"2") | ||
| self.assertEqual(msgs[3].data, b"3") | ||
| self.assertEqual(msgs[4].data, b"hello1") | ||
| self.assertEqual(msgs[5].data, b"hello2") | ||
| self.assertEqual(msgs[0].data, b'1') | ||
| self.assertEqual(msgs[1].data, b'4') | ||
| self.assertEqual(msgs[2].data, b'2') | ||
| self.assertEqual(msgs[3].data, b'3') | ||
| self.assertEqual(msgs[4].data, b'hello1') | ||
| self.assertEqual(msgs[5].data, b'hello2') | ||
| self.assertEqual(len(errors), 0) | ||
@@ -122,13 +125,13 @@ await nc.close() | ||
| for i in range(10): | ||
| await nc.publish("foo", f"{i}".encode()) | ||
| await nc.publish("foo", f'{i}'.encode()) | ||
| # Will be processed before the others since no head of line | ||
| # blocking among the subscriptions. | ||
| await nc.publish("bar", b"14") | ||
| response = await nc.request("bar", b"hi1", 2) | ||
| self.assertEqual(response.data, b"hi1hi1hi1") | ||
| await nc.publish("bar", b'14') | ||
| response = await nc.request("bar", b'hi1', 2) | ||
| self.assertEqual(response.data, b'hi1hi1hi1') | ||
| self.assertEqual(len(msgs), 2) | ||
| self.assertEqual(msgs[0].data, b"14") | ||
| self.assertEqual(msgs[1].data, b"hi1") | ||
| self.assertEqual(msgs[0].data, b'14') | ||
| self.assertEqual(msgs[1].data, b'hi1') | ||
@@ -177,9 +180,9 @@ # Consumed messages but the rest were slow consumers. | ||
| # blocking among the subscriptions. | ||
| await nc.publish("bar", b"14") | ||
| await nc.publish("bar", b'14') | ||
| response = await nc.request("bar", b"hi1", 2) | ||
| self.assertEqual(response.data, b"hi1hi1hi1") | ||
| response = await nc.request("bar", b'hi1', 2) | ||
| self.assertEqual(response.data, b'hi1hi1hi1') | ||
| self.assertEqual(len(msgs), 2) | ||
| self.assertEqual(msgs[0].data, b"14") | ||
| self.assertEqual(msgs[1].data, b"hi1") | ||
| self.assertEqual(msgs[0].data, b'14') | ||
| self.assertEqual(msgs[1].data, b'hi1') | ||
@@ -195,4 +198,4 @@ # Consumed a few messages but the rest were slow consumers. | ||
| await asyncio.sleep(3) | ||
| response = await nc.request("foo", b"B", 1) | ||
| self.assertEqual(response.data, b"BB") | ||
| response = await nc.request("foo", b'B', 1) | ||
| self.assertEqual(response.data, b'BB') | ||
| await nc.close() |
@@ -9,3 +9,2 @@ import asyncio | ||
| import nkeys | ||
| nkeys_installed = True | ||
@@ -15,3 +14,4 @@ except ModuleNotFoundError: | ||
| from nats.aio.client import Client as NATS, RawCredentials | ||
| from nats.aio.client import Client as NATS | ||
| from nats.aio.client import RawCredentials | ||
| from nats.aio.errors import * | ||
@@ -35,3 +35,3 @@ from nats.errors import * | ||
| seed = None | ||
| with open(config_file, "rb") as f: | ||
| with open(config_file, 'rb') as f: | ||
| seed = bytearray(os.fstat(f.fileno()).st_size) | ||
@@ -41,6 +41,6 @@ f.readinto(seed) | ||
| { | ||
| "nkeys_seed": config_file | ||
| 'nkeys_seed': config_file | ||
| }, | ||
| { | ||
| "nkeys_seed_str": seed.decode() | ||
| 'nkeys_seed_str': seed.decode() | ||
| }, | ||
@@ -60,17 +60,15 @@ ] | ||
| await nc.connect( | ||
| ["tls://127.0.0.1:4222"], | ||
| error_cb=error_cb, | ||
| connect_timeout=10, | ||
| allow_reconnect=False, | ||
| **nkeys_args, | ||
| ) | ||
| await nc.connect(["tls://127.0.0.1:4222"], | ||
| error_cb=error_cb, | ||
| connect_timeout=10, | ||
| allow_reconnect=False, | ||
| **nkeys_args) | ||
| async def help_handler(msg): | ||
| await nc.publish(msg.reply, b"OK!") | ||
| await nc.publish(msg.reply, b'OK!') | ||
| await nc.subscribe("help", cb=help_handler) | ||
| await nc.flush() | ||
| msg = await nc.request("help", b"I need help") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| msg = await nc.request("help", b'I need help') | ||
| self.assertEqual(msg.data, b'OK!') | ||
@@ -82,4 +80,4 @@ await nc.subscribe("bar", cb=help_handler) | ||
| msg = await nc.request("help", b"I need help") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| msg = await nc.request("help", b'I need help') | ||
| self.assertEqual(msg.data, b'OK!') | ||
@@ -110,8 +108,8 @@ await nc.close() | ||
| async def help_handler(msg): | ||
| await nc.publish(msg.reply, b"OK!") | ||
| await nc.publish(msg.reply, b'OK!') | ||
| await nc.subscribe("help", cb=help_handler) | ||
| await nc.flush() | ||
| msg = await nc.request("help", b"I need help") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| msg = await nc.request("help", b'I need help') | ||
| self.assertEqual(msg.data, b'OK!') | ||
| await nc.close() | ||
@@ -135,3 +133,3 @@ | ||
| get_config_file("nkeys/foo-user.jwt"), | ||
| get_config_file("nkeys/foo-user.nk"), | ||
| get_config_file("nkeys/foo-user.nk") | ||
| ), | ||
@@ -142,8 +140,8 @@ allow_reconnect=False, | ||
| async def help_handler(msg): | ||
| await nc.publish(msg.reply, b"OK!") | ||
| await nc.publish(msg.reply, b'OK!') | ||
| await nc.subscribe("help", cb=help_handler) | ||
| await nc.flush() | ||
| msg = await nc.request("help", b"I need help") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| msg = await nc.request("help", b'I need help') | ||
| self.assertEqual(msg.data, b'OK!') | ||
| await nc.close() | ||
@@ -211,8 +209,8 @@ | ||
| async def help_handler(msg): | ||
| await nc.publish(msg.reply, b"OK!") | ||
| await nc.publish(msg.reply, b'OK!') | ||
| await nc.subscribe("help", cb=help_handler) | ||
| await nc.flush() | ||
| msg = await nc.request("help", b"I need help") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| msg = await nc.request("help", b'I need help') | ||
| self.assertEqual(msg.data, b'OK!') | ||
| await nc.close() |
+33
-25
@@ -0,4 +1,13 @@ | ||
| import asyncio | ||
| import http.client | ||
| import json | ||
| import shutil | ||
| import ssl | ||
| import tempfile | ||
| import time | ||
| import unittest | ||
| from unittest import mock | ||
| import nats | ||
| from nats.aio.client import Client as NATS, __version__ | ||
| from nats.aio.errors import * | ||
@@ -17,5 +26,5 @@ from tests.utils import * | ||
| await nc.publish( | ||
| "foo", b"hello world", headers={ | ||
| "foo": "bar", | ||
| "hello": "world-1" | ||
| "foo", b'hello world', headers={ | ||
| 'foo': 'bar', | ||
| 'hello': 'world-1' | ||
| } | ||
@@ -28,4 +37,4 @@ ) | ||
| self.assertEqual(msg.headers["foo"], "bar") | ||
| self.assertEqual(msg.headers["hello"], "world-1") | ||
| self.assertEqual(msg.headers['foo'], 'bar') | ||
| self.assertEqual(msg.headers['hello'], 'world-1') | ||
@@ -40,4 +49,4 @@ await nc.close() | ||
| # Add another header | ||
| msg.headers["quux"] = "quuz" | ||
| await msg.respond(b"OK!") | ||
| msg.headers['quux'] = 'quuz' | ||
| await msg.respond(b'OK!') | ||
@@ -47,5 +56,5 @@ await nc.subscribe("foo", cb=service) | ||
| msg = await nc.request( | ||
| "foo", b"hello world", headers={ | ||
| "foo": "bar", | ||
| "hello": "world" | ||
| "foo", b'hello world', headers={ | ||
| 'foo': 'bar', | ||
| 'hello': 'world' | ||
| } | ||
@@ -56,6 +65,6 @@ ) | ||
| self.assertEqual(len(msg.headers), 3) | ||
| self.assertEqual(msg.headers["foo"], "bar") | ||
| self.assertEqual(msg.headers["hello"], "world") | ||
| self.assertEqual(msg.headers["quux"], "quuz") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| self.assertEqual(msg.headers['foo'], 'bar') | ||
| self.assertEqual(msg.headers['hello'], 'world') | ||
| self.assertEqual(msg.headers['quux'], 'quuz') | ||
| self.assertEqual(msg.data, b'OK!') | ||
@@ -70,3 +79,3 @@ await nc.close() | ||
| await nc.flush() | ||
| await nc.publish("foo", b"hello world", headers={"": ""}) | ||
| await nc.publish("foo", b'hello world', headers={'': ''}) | ||
@@ -77,3 +86,3 @@ msg = await sub.next_msg() | ||
| # Empty long key | ||
| await nc.publish("foo", b"hello world", headers={" ": ""}) | ||
| await nc.publish("foo", b'hello world', headers={' ': ''}) | ||
| msg = await sub.next_msg() | ||
@@ -84,3 +93,3 @@ self.assertTrue(msg.headers == None) | ||
| await nc.publish( | ||
| "foo", b"hello world", headers={"": " "} | ||
| "foo", b'hello world', headers={'': ' '} | ||
| ) | ||
@@ -91,9 +100,9 @@ msg = await sub.next_msg() | ||
| hdrs = { | ||
| "timestamp": "2022-06-15T19:08:14.639020", | ||
| "type": "rpc", | ||
| "command": "publish_state", | ||
| "trace_id": "", | ||
| "span_id": "", | ||
| 'timestamp': '2022-06-15T19:08:14.639020', | ||
| 'type': 'rpc', | ||
| 'command': 'publish_state', | ||
| 'trace_id': '', | ||
| 'span_id': '' | ||
| } | ||
| await nc.publish("foo", b"Hello from Python!", headers=hdrs) | ||
| await nc.publish("foo", b'Hello from Python!', headers=hdrs) | ||
| msg = await sub.next_msg() | ||
@@ -105,6 +114,5 @@ self.assertEqual(msg.headers, hdrs) | ||
| if __name__ == "__main__": | ||
| if __name__ == '__main__': | ||
| import sys | ||
| runner = unittest.TextTestRunner(stream=sys.stdout) | ||
| unittest.main(verbosity=2, exit=False, testRunner=runner) |
| import asyncio | ||
| import http.client | ||
| import json | ||
| import shutil | ||
| import ssl | ||
| import tempfile | ||
| import time | ||
| import unittest | ||
| from unittest import mock | ||
| import pytest | ||
| import nats | ||
| import pytest | ||
| from nats.aio.client import Client as NATS, __version__ | ||
| from nats.aio.errors import * | ||
@@ -28,5 +36,5 @@ from tests.utils import * | ||
| await nc.publish( | ||
| "foo", b"hello world", headers={ | ||
| "foo": "bar", | ||
| "hello": "world-1" | ||
| "foo", b'hello world', headers={ | ||
| 'foo': 'bar', | ||
| 'hello': 'world-1' | ||
| } | ||
@@ -39,4 +47,4 @@ ) | ||
| self.assertEqual(msg.headers["foo"], "bar") | ||
| self.assertEqual(msg.headers["hello"], "world-1") | ||
| self.assertEqual(msg.headers['foo'], 'bar') | ||
| self.assertEqual(msg.headers['hello'], 'world-1') | ||
@@ -54,4 +62,4 @@ await nc.close() | ||
| # Add another header | ||
| msg.headers["quux"] = "quuz" | ||
| await msg.respond(b"OK!") | ||
| msg.headers['quux'] = 'quuz' | ||
| await msg.respond(b'OK!') | ||
@@ -61,5 +69,5 @@ await nc.subscribe("foo", cb=service) | ||
| msg = await nc.request( | ||
| "foo", b"hello world", headers={ | ||
| "foo": "bar", | ||
| "hello": "world" | ||
| "foo", b'hello world', headers={ | ||
| 'foo': 'bar', | ||
| 'hello': 'world' | ||
| } | ||
@@ -70,6 +78,6 @@ ) | ||
| self.assertEqual(len(msg.headers), 3) | ||
| self.assertEqual(msg.headers["foo"], "bar") | ||
| self.assertEqual(msg.headers["hello"], "world") | ||
| self.assertEqual(msg.headers["quux"], "quuz") | ||
| self.assertEqual(msg.data, b"OK!") | ||
| self.assertEqual(msg.headers['foo'], 'bar') | ||
| self.assertEqual(msg.headers['hello'], 'world') | ||
| self.assertEqual(msg.headers['quux'], 'quuz') | ||
| self.assertEqual(msg.data, b'OK!') | ||
@@ -87,3 +95,3 @@ await nc.close() | ||
| await nc.flush() | ||
| await nc.publish("foo", b"hello world", headers={"": ""}) | ||
| await nc.publish("foo", b'hello world', headers={'': ''}) | ||
@@ -94,3 +102,3 @@ msg = await sub.next_msg() | ||
| # Empty long key | ||
| await nc.publish("foo", b"hello world", headers={" ": ""}) | ||
| await nc.publish("foo", b'hello world', headers={' ': ''}) | ||
| msg = await sub.next_msg() | ||
@@ -101,3 +109,3 @@ self.assertTrue(msg.headers == None) | ||
| await nc.publish( | ||
| "foo", b"hello world", headers={"": " "} | ||
| "foo", b'hello world', headers={'': ' '} | ||
| ) | ||
@@ -108,9 +116,9 @@ msg = await sub.next_msg() | ||
| hdrs = { | ||
| "timestamp": "2022-06-15T19:08:14.639020", | ||
| "type": "rpc", | ||
| "command": "publish_state", | ||
| "trace_id": "", | ||
| "span_id": "", | ||
| 'timestamp': '2022-06-15T19:08:14.639020', | ||
| 'type': 'rpc', | ||
| 'command': 'publish_state', | ||
| 'trace_id': '', | ||
| 'span_id': '' | ||
| } | ||
| await nc.publish("foo", b"Hello from Python!", headers=hdrs) | ||
| await nc.publish("foo", b'Hello from Python!', headers=hdrs) | ||
| msg = await sub.next_msg() | ||
@@ -140,12 +148,12 @@ self.assertEqual(msg.headers, hdrs) | ||
| async def bar_cb(msg): | ||
| await msg.respond(b"OK!") | ||
| await msg.respond(b'OK!') | ||
| rsub = await nc.subscribe("bar", cb=bar_cb) | ||
| await nc.publish("foo", b"First") | ||
| await nc.publish("foo", b'First') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"First") | ||
| self.assertEqual(msg.data, b'First') | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -163,8 +171,8 @@ # Restart the server and wait for reconnect. | ||
| # Get another message. | ||
| await nc.publish("foo", b"Second") | ||
| await nc.publish("foo", b'Second') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"Second") | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| self.assertEqual(msg.data, b'Second') | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -193,11 +201,11 @@ await nc.close() | ||
| async def bar_cb(msg): | ||
| await msg.respond(b"OK!") | ||
| await msg.respond(b'OK!') | ||
| rsub = await nc.subscribe("bar", cb=bar_cb) | ||
| await nc.publish("foo", b"First") | ||
| await nc.publish("foo", b'First') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"First") | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| self.assertEqual(msg.data, b'First') | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -225,3 +233,3 @@ # Restart the server and wait for reconnect. | ||
| await nc.flush() | ||
| await nc.publish("foo", b"hello world", headers={"foo": "bar"}) | ||
| await nc.publish("foo", b'hello world', headers={'foo': 'bar'}) | ||
@@ -232,3 +240,3 @@ msg = await sub.next_msg() | ||
| self.assertEqual(msg.headers["foo"], "bar") | ||
| self.assertEqual(msg.headers['foo'], 'bar') | ||
@@ -257,12 +265,12 @@ await nc.close() | ||
| async def bar_cb(msg): | ||
| await msg.respond(b"OK!") | ||
| await msg.respond(b'OK!') | ||
| rsub = await nc.subscribe("bar", cb=bar_cb) | ||
| await nc.publish("foo", b"First") | ||
| await nc.publish("foo", b'First') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"First") | ||
| self.assertEqual(msg.data, b'First') | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -280,8 +288,8 @@ # Restart the server and wait for reconnect. | ||
| # Get another message. | ||
| await nc.publish("foo", b"Second") | ||
| await nc.publish("foo", b'Second') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"Second") | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| self.assertEqual(msg.data, b'Second') | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -311,11 +319,11 @@ await nc.close() | ||
| async def bar_cb(msg): | ||
| await msg.respond(b"OK!") | ||
| await msg.respond(b'OK!') | ||
| rsub = await nc.subscribe("bar", cb=bar_cb) | ||
| await nc.publish("foo", b"First") | ||
| await nc.publish("foo", b'First') | ||
| await nc.flush() | ||
| msg = await sub.next_msg() | ||
| self.assertEqual(msg.data, b"First") | ||
| rmsg = await nc.request("bar", b"hi") | ||
| self.assertEqual(rmsg.data, b"OK!") | ||
| self.assertEqual(msg.data, b'First') | ||
| rmsg = await nc.request("bar", b'hi') | ||
| self.assertEqual(rmsg.data, b'OK!') | ||
@@ -332,6 +340,5 @@ # Restart the server and wait for reconnect. | ||
| if __name__ == "__main__": | ||
| if __name__ == '__main__': | ||
| import sys | ||
| runner = unittest.TextTestRunner(stream=sys.stdout) | ||
| unittest.main(verbosity=2, exit=False, testRunner=runner) |
| from __future__ import annotations | ||
| import asyncio | ||
| import os | ||
| import json | ||
| import os | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Dict, List, Optional | ||
| import nats | ||
| from nats.aio.subscription import Subscription | ||
| from unittest import TestCase, skipIf | ||
| import nats | ||
| from nats.aio.subscription import Subscription | ||
| from nats.micro.request import ServiceError | ||
| from dataclasses import dataclass, field | ||
| from typing import Dict, List, Any, Optional, Generic | ||
| from nats.micro.service import ( | ||
| SUBJECT_REGEX, | ||
| EndpointStats, | ||
| GroupConfig, | ||
| ServiceConfig, | ||
| Service, | ||
| EndpointConfig, | ||
| EndpointStats, | ||
| Request, | ||
| Service, | ||
| ServiceConfig, | ||
| ) | ||
| from nats.micro.request import ServiceError | ||
@@ -33,3 +36,2 @@ from .utils import * | ||
| class CompatibilityTest(TestCase): | ||
| def setUp(self): | ||
@@ -44,5 +46,3 @@ self.loop = asyncio.new_event_loop() | ||
| msg = await sub.next_msg(timeout=5) | ||
| self.assertNotIn( | ||
| "fail", msg.subject, f"Test step failed: {msg.subject}" | ||
| ) | ||
| self.assertNotIn("fail", msg.subject, f"Test step failed: {msg.subject}") | ||
| except asyncio.TimeoutError: | ||
@@ -53,3 +53,2 @@ self.fail("Timeout waiting for test result") | ||
| async def test_service_compatibility(self): | ||
| @dataclass | ||
@@ -62,5 +61,3 @@ class TestGroupConfig: | ||
| def from_dict(cls, data: Dict[str, Any]) -> TestGroupConfig: | ||
| return cls( | ||
| name=data["name"], queue_group=data.get("queue_group") | ||
| ) | ||
| return cls(name=data["name"], queue_group=data.get("queue_group")) | ||
@@ -67,0 +64,0 @@ @dataclass |
+119
-184
@@ -0,14 +1,15 @@ | ||
| from tests.utils import SingleJetStreamServerTestCase, SingleServerTestCase, async_test | ||
| import nats | ||
| import random | ||
| import asyncio | ||
| import random | ||
| import nats | ||
| import nats.micro | ||
| from nats.micro import * | ||
| from nats.micro.service import * | ||
| from nats.micro.request import * | ||
| from nats.micro.service import * | ||
| from tests.utils import SingleServerTestCase, async_test | ||
| class MicroServiceTest(SingleServerTestCase): | ||
| def test_invalid_service_name(self): | ||
@@ -19,19 +20,14 @@ with self.assertRaises(ValueError) as context: | ||
| with self.assertRaises(ValueError) as context: | ||
| ServiceConfig(name="test.service@!", version="0.1.0") | ||
| self.assertEqual( | ||
| str(context.exception), | ||
| "Invalid name. It must contain only alphanumeric characters, dashes, and underscores.", | ||
| ) | ||
| self.assertEqual(str(context.exception), "Invalid name. It must contain only alphanumeric characters, dashes, and underscores.") | ||
| def test_invalid_service_version(self): | ||
| with self.assertRaises(ValueError) as context: | ||
| ServiceConfig(name="test_service", version="abc") | ||
| self.assertEqual( | ||
| str(context.exception), | ||
| "Invalid version. It must follow semantic versioning (e.g., 1.0.0, 2.1.3-alpha.1).", | ||
| ) | ||
| self.assertEqual(str(context.exception), "Invalid version. It must follow semantic versioning (e.g., 1.0.0, 2.1.3-alpha.1).") | ||
| def test_invalid_endpoint_subject(self): | ||
| async def noop_handler(request: Request) -> None: | ||
@@ -46,6 +42,3 @@ pass | ||
| ) | ||
| self.assertEqual( | ||
| str(context.exception), | ||
| "Invalid subject. Subject must not contain spaces, and can only have '>' at the end.", | ||
| ) | ||
| self.assertEqual(str(context.exception), "Invalid subject. Subject must not contain spaces, and can only have '>' at the end.") | ||
@@ -82,9 +75,3 @@ @async_test | ||
| for _ in range(50): | ||
| await nc.request( | ||
| "svc.add", | ||
| json.dumps({ | ||
| "x": 22, | ||
| "y": 11 | ||
| }).encode("utf-8") | ||
| ) | ||
| await nc.request("svc.add", json.dumps({"x": 22, "y": 11}).encode("utf-8")) | ||
@@ -111,5 +98,3 @@ for svc in svcs: | ||
| try: | ||
| ping_responses.append( | ||
| await ping_subscription.next_msg(timeout=0.25) | ||
| ) | ||
| ping_responses.append(await ping_subscription.next_msg(timeout=0.25)) | ||
| except: | ||
@@ -129,5 +114,3 @@ break | ||
| try: | ||
| stats_responses.append( | ||
| await stats_subscription.next_msg(timeout=0.25) | ||
| ) | ||
| stats_responses.append(await stats_subscription.next_msg(timeout=0.25)) | ||
| except: | ||
@@ -141,5 +124,3 @@ break | ||
| ] | ||
| total_requests = sum([ | ||
| stat.endpoints[0].num_requests for stat in stats | ||
| ]) | ||
| total_requests = sum([stat.endpoints[0].num_requests for stat in stats]) | ||
| assert total_requests == 50 | ||
@@ -149,3 +130,2 @@ | ||
| async def test_add_service(self): | ||
| async def noop_handler(request: Request): | ||
@@ -156,23 +136,20 @@ pass | ||
| "no_endpoint": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={"basic": "metadata"}, | ||
| ), | ||
| "expected_ping": | ||
| ServicePing( | ||
| id="*", | ||
| type="io.nats.micro.v1.ping_response", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={"basic": "metadata"}, | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={"basic": "metadata"}, | ||
| ), | ||
| "expected_ping": ServicePing( | ||
| id="*", | ||
| type="io.nats.micro.v1.ping_response", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={"basic": "metadata"}, | ||
| ), | ||
| }, | ||
| "with_single_endpoint": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -186,16 +163,14 @@ EndpointConfig( | ||
| ], | ||
| "expected_ping": | ||
| ServicePing( | ||
| id="*", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={}, | ||
| ), | ||
| "expected_ping": ServicePing( | ||
| id="*", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={}, | ||
| ), | ||
| }, | ||
| "with_multiple_endpoints": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -215,9 +190,8 @@ EndpointConfig( | ||
| ], | ||
| "expected_ping": | ||
| ServicePing( | ||
| id="*", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={}, | ||
| ), | ||
| "expected_ping": ServicePing( | ||
| id="*", | ||
| name="test_service", | ||
| version="0.1.0", | ||
| metadata={}, | ||
| ), | ||
| }, | ||
@@ -256,2 +230,3 @@ } | ||
| @async_test | ||
@@ -263,6 +238,3 @@ async def test_groups(self): | ||
| "endpoint_name": "foo", | ||
| "expected_endpoint": { | ||
| "name": "foo", | ||
| "subject": "foo" | ||
| }, | ||
| "expected_endpoint": {"name": "foo", "subject": "foo"}, | ||
| }, | ||
@@ -273,6 +245,3 @@ "single_group": { | ||
| "group_names": ["g1"], | ||
| "expected_endpoint": { | ||
| "name": "foo", | ||
| "subject": "g1.foo" | ||
| }, | ||
| "expected_endpoint": {"name": "foo", "subject": "g1.foo"}, | ||
| }, | ||
@@ -283,6 +252,3 @@ "single_empty_group": { | ||
| "group_names": [""], | ||
| "expected_endpoint": { | ||
| "name": "foo", | ||
| "subject": "foo" | ||
| }, | ||
| "expected_endpoint": {"name": "foo", "subject": "foo"}, | ||
| }, | ||
@@ -293,6 +259,3 @@ "empty_groups": { | ||
| "group_names": ["", "g1", ""], | ||
| "expected_endpoint": { | ||
| "name": "foo", | ||
| "subject": "g1.foo" | ||
| }, | ||
| "expected_endpoint": {"name": "foo", "subject": "g1.foo"}, | ||
| }, | ||
@@ -302,6 +265,3 @@ "multiple_groups": { | ||
| "group_names": ["g1", "g2", "g3"], | ||
| "expected_endpoint": { | ||
| "name": "foo", | ||
| "subject": "g1.g2.g3.foo" | ||
| }, | ||
| "expected_endpoint": {"name": "foo", "subject": "g1.g2.g3.foo"}, | ||
| }, | ||
@@ -326,5 +286,3 @@ } | ||
| await group.add_endpoint( | ||
| name=data["endpoint_name"], handler=noop_handler | ||
| ) | ||
| await group.add_endpoint(name=data["endpoint_name"], handler=noop_handler) | ||
@@ -344,3 +302,2 @@ info = svc.info() | ||
| async def test_monitoring_handlers(self): | ||
| async def noop_handler(request: Request): | ||
@@ -406,10 +363,10 @@ pass | ||
| "id": svc.id, | ||
| "endpoints": [{ | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": { | ||
| "basic": "schema" | ||
| }, | ||
| }], | ||
| "endpoints": [ | ||
| { | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": {"basic": "schema"}, | ||
| } | ||
| ], | ||
| "metadata": {}, | ||
@@ -426,10 +383,10 @@ }, | ||
| "id": svc.id, | ||
| "endpoints": [{ | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": { | ||
| "basic": "schema" | ||
| }, | ||
| }], | ||
| "endpoints": [ | ||
| { | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": {"basic": "schema"}, | ||
| } | ||
| ], | ||
| "metadata": {}, | ||
@@ -446,10 +403,10 @@ }, | ||
| "id": svc.id, | ||
| "endpoints": [{ | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": { | ||
| "basic": "schema" | ||
| }, | ||
| }], | ||
| "endpoints": [ | ||
| { | ||
| "name": "default", | ||
| "subject": "test.func", | ||
| "queue_group": "q", | ||
| "metadata": {"basic": "schema"}, | ||
| } | ||
| ], | ||
| "metadata": {}, | ||
@@ -471,3 +428,2 @@ }, | ||
| async def test_service_stats(self): | ||
| async def handler(request: Request): | ||
@@ -478,7 +434,6 @@ await request.respond(b"ok") | ||
| "stats_handler": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -494,8 +449,7 @@ EndpointConfig( | ||
| "with_stats_handler": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| stats_handler=lambda endpoint: {"key": "val"}, | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| stats_handler=lambda endpoint: {"key": "val"}, | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -509,12 +463,9 @@ EndpointConfig( | ||
| ], | ||
| "expected_stats": { | ||
| "key": "val" | ||
| }, | ||
| "expected_stats": {"key": "val"}, | ||
| }, | ||
| "with_endpoint": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.1.0", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -549,8 +500,4 @@ EndpointConfig( | ||
| stats_subject = control_subject( | ||
| ServiceVerb.STATS, "test_service" | ||
| ) | ||
| stats_response = await nc.request( | ||
| stats_subject, b"", timeout=1 | ||
| ) | ||
| stats_subject = control_subject(ServiceVerb.STATS, "test_service") | ||
| stats_response = await nc.request(stats_subject, b"", timeout=1) | ||
| stats = ServiceStats.from_dict(json.loads(stats_response.data)) | ||
@@ -583,10 +530,6 @@ | ||
| "byte_response_with_headers": { | ||
| "respond_headers": { | ||
| "key": "value" | ||
| }, | ||
| "respond_headers": {"key": "value"}, | ||
| "respond_data": b"OK", | ||
| "expected_response": b"OK", | ||
| "expected_headers": { | ||
| "key": "value" | ||
| }, | ||
| "expected_headers": {"key": "value"}, | ||
| }, | ||
@@ -614,5 +557,3 @@ } | ||
| await svc.add_endpoint( | ||
| EndpointConfig( | ||
| name="default", subject="test.func", handler=handler | ||
| ) | ||
| EndpointConfig(name="default", subject="test.func", handler=handler) | ||
| ) | ||
@@ -661,3 +602,2 @@ | ||
| async def test_custom_queue_group(self): | ||
| async def noop_handler(request: Request): | ||
@@ -668,7 +608,6 @@ pass | ||
| "default_queue_group": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -685,8 +624,7 @@ EndpointConfig( | ||
| "custom_queue_group_on_service_config": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="custom", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="custom", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -704,8 +642,7 @@ EndpointConfig( | ||
| "endpoint_config_overriding_queue_groups": { | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="q-config", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="q-config", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -724,8 +661,7 @@ EndpointConfig( | ||
| "name": "empty queue group in option, inherit from parent", | ||
| "service_config": | ||
| ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="q-service", | ||
| ), | ||
| "service_config": ServiceConfig( | ||
| name="test_service", | ||
| version="0.0.1", | ||
| queue_group="q-service", | ||
| ), | ||
| "endpoint_configs": [ | ||
@@ -761,8 +697,7 @@ EndpointConfig( | ||
| assert len(info.endpoints | ||
| ) == len(data["expected_queue_groups"]) | ||
| assert len(info.endpoints) == len(data["expected_queue_groups"]) | ||
| for endpoint in info.endpoints: | ||
| assert ( | ||
| endpoint.queue_group == data["expected_queue_groups"][ | ||
| endpoint.name] | ||
| endpoint.queue_group | ||
| == data["expected_queue_groups"][endpoint.name] | ||
| ) | ||
@@ -769,0 +704,0 @@ |
@@ -15,2 +15,3 @@ # Copyright 2018-2021 The NATS Authors | ||
| import sys | ||
| import unittest | ||
@@ -17,0 +18,0 @@ from collections import Counter |
+31
-30
| import asyncio | ||
| import sys | ||
| import unittest | ||
@@ -36,3 +37,3 @@ | ||
| async def _process_info(self, info): | ||
| def _process_info(self, info): | ||
| self._server_info = info | ||
@@ -49,3 +50,3 @@ | ||
| ps = Parser(MockNatsClient()) | ||
| data = b"PING\r\n" | ||
| data = b'PING\r\n' | ||
| await ps.parse(data) | ||
@@ -58,3 +59,3 @@ self.assertEqual(len(ps.buf), 0) | ||
| ps = Parser(MockNatsClient()) | ||
| data = b"PONG\r\n" | ||
| data = b'PONG\r\n' | ||
| await ps.parse(data) | ||
@@ -67,3 +68,3 @@ self.assertEqual(len(ps.buf), 0) | ||
| ps = Parser() | ||
| data = b"+OK\r\n" | ||
| data = b'+OK\r\n' | ||
| await ps.parse(data) | ||
@@ -76,3 +77,3 @@ self.assertEqual(len(ps.buf), 0) | ||
| nc = MockNatsClient() | ||
| expected = b"hello world!" | ||
| expected = b'hello world!' | ||
@@ -91,3 +92,3 @@ async def payload_test(sid, subject, reply, payload): | ||
| ps = Parser(nc) | ||
| data = b"MSG hello 1 world 12\r\n" | ||
| data = b'MSG hello 1 world 12\r\n' | ||
@@ -97,4 +98,4 @@ await ps.parse(data) | ||
| self.assertEqual(len(ps.msg_arg.keys()), 3) | ||
| self.assertEqual(ps.msg_arg["subject"], b"hello") | ||
| self.assertEqual(ps.msg_arg["reply"], b"world") | ||
| self.assertEqual(ps.msg_arg["subject"], b'hello') | ||
| self.assertEqual(ps.msg_arg["reply"], b'world') | ||
| self.assertEqual(ps.msg_arg["sid"], 1) | ||
@@ -104,7 +105,7 @@ self.assertEqual(ps.needed, 12) | ||
| await ps.parse(b"hello world!") | ||
| await ps.parse(b'hello world!') | ||
| self.assertEqual(len(ps.buf), 12) | ||
| self.assertEqual(ps.state, AWAITING_MSG_PAYLOAD) | ||
| data = b"\r\n" | ||
| data = b'\r\n' | ||
| await ps.parse(data) | ||
@@ -117,3 +118,3 @@ self.assertEqual(len(ps.buf), 0) | ||
| ps = Parser() | ||
| data = b"MSG hello" | ||
| data = b'MSG hello' | ||
| await ps.parse(data) | ||
@@ -126,3 +127,3 @@ self.assertEqual(len(ps.buf), 9) | ||
| ps = Parser() | ||
| data = b"MSG" | ||
| data = b'MSG' | ||
| await ps.parse(data) | ||
@@ -135,3 +136,3 @@ self.assertEqual(len(ps.buf), 3) | ||
| ps = Parser() | ||
| data = b"MSG " | ||
| data = b'MSG ' | ||
| await ps.parse(data) | ||
@@ -144,3 +145,3 @@ self.assertEqual(len(ps.buf), 4) | ||
| ps = Parser(MockNatsClient()) | ||
| data = b"MSG PONG\r\n" | ||
| data = b'MSG PONG\r\n' | ||
| with self.assertRaises(ProtocolError): | ||
@@ -174,3 +175,3 @@ await ps.parse(data) | ||
| ps = Parser(nc) | ||
| server_id = "A" * 2048 | ||
| server_id = 'A' * 2048 | ||
| data = f'INFO {{"server_id": "{server_id}", "max_payload": 100, "auth_required": false, "connect_urls":["127.0.0.0.1:4223"]}}\r\n' | ||
@@ -180,3 +181,3 @@ await ps.parse(data.encode()) | ||
| self.assertEqual(ps.state, AWAITING_CONTROL_LINE) | ||
| self.assertEqual(len(nc._server_info["server_id"]), 2048) | ||
| self.assertEqual(len(nc._server_info['server_id']), 2048) | ||
@@ -203,10 +204,10 @@ @async_test | ||
| ps = Parser(nc) | ||
| reply = "A" * 2043 | ||
| data = f"PING\r\nMSG hello 1 {reply}" | ||
| reply = 'A' * 2043 | ||
| data = f'PING\r\nMSG hello 1 {reply}' | ||
| await ps.parse(data.encode()) | ||
| await ps.parse(b"AAAAA 0\r\n\r\nMSG hello 1 world 0") | ||
| await ps.parse(b'AAAAA 0\r\n\r\nMSG hello 1 world 0') | ||
| self.assertEqual(msgs, 1) | ||
| self.assertEqual(len(ps.buf), 19) | ||
| self.assertEqual(ps.state, AWAITING_CONTROL_LINE) | ||
| await ps.parse(b"\r\n\r\n") | ||
| await ps.parse(b'\r\n\r\n') | ||
| self.assertEqual(msgs, 2) | ||
@@ -234,3 +235,3 @@ | ||
| ps = Parser(nc) | ||
| reply = "A" * 2043 * 4 | ||
| reply = 'A' * 2043 * 4 | ||
@@ -240,17 +241,17 @@ # FIXME: Malformed long protocol lines will not be detected | ||
| # from the client to give up instead. | ||
| data = f"PING\r\nWRONG hello 1 {reply}" | ||
| data = f'PING\r\nWRONG hello 1 {reply}' | ||
| await ps.parse(data.encode()) | ||
| await ps.parse(b"AAAAA 0") | ||
| await ps.parse(b'AAAAA 0') | ||
| self.assertEqual(ps.state, AWAITING_CONTROL_LINE) | ||
| await ps.parse(b"\r\n\r\n") | ||
| await ps.parse(b"\r\n\r\n") | ||
| await ps.parse(b'\r\n\r\n') | ||
| await ps.parse(b'\r\n\r\n') | ||
| ps = Parser(nc) | ||
| reply = "A" * 2043 | ||
| data = f"PING\r\nWRONG hello 1 {reply}" | ||
| reply = 'A' * 2043 | ||
| data = f'PING\r\nWRONG hello 1 {reply}' | ||
| with self.assertRaises(ProtocolError): | ||
| await ps.parse(data.encode()) | ||
| await ps.parse(b"AAAAA 0") | ||
| await ps.parse(b'AAAAA 0') | ||
| self.assertEqual(ps.state, AWAITING_CONTROL_LINE) | ||
| await ps.parse(b"\r\n\r\n") | ||
| await ps.parse(b"\r\n\r\n") | ||
| await ps.parse(b'\r\n\r\n') | ||
| await ps.parse(b'\r\n\r\n') |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
618769
-1.92%14931
-2.42%