You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

nats-py

Package Overview
Dependencies
Maintainers
4
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats-py - pypi Package Compare versions

Comparing version
2.10.0
to
2.9.1
+2
-24
nats_py.egg-info/PKG-INFO
Metadata-Version: 2.4
Name: nats-py
Version: 2.10.0
Version: 2.9.1
Summary: NATS client for Python

@@ -37,2 +37,3 @@ Author-email: Waldemar Quevedo <wally@synadia.com>

[![pypi](https://img.shields.io/pypi/v/nats-py.svg)](https://pypi.org/project/nats-py)
[![Build Status](https://travis-ci.com/nats-io/nats.py.svg?branch=main)](http://travis-ci.com/nats-io/nats.py)
[![Versions](https://img.shields.io/pypi/pyversions/nats-py.svg)](https://pypi.org/project/nats-py)

@@ -258,25 +259,2 @@ [![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)

## Updating Docs
To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo
as follows:
```sh
git clone https://github.com/nats-io/nats.py
cd nats.py
git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments
pipenv shell
make html
# preview the changes:
make serve
```
If you are happy with the changes, make a PR on the docs branch:
```
make publish
git add docs
```
## License

@@ -283,0 +261,0 @@

@@ -17,3 +17,2 @@ # Copyright 2016-2021 The NATS Authors

from typing import List, Union
from .aio.client import Client as NATS

@@ -20,0 +19,0 @@

@@ -27,3 +27,2 @@ # Copyright 2016-2021 The NATS Authors

"""
pass

@@ -39,3 +38,2 @@

"""
pass

@@ -51,3 +49,2 @@

"""
pass

@@ -63,3 +60,2 @@

"""
pass

@@ -75,3 +71,2 @@

"""
pass

@@ -87,3 +82,2 @@

"""
pass

@@ -99,3 +93,2 @@

"""
pass

@@ -111,3 +104,2 @@

"""
pass

@@ -123,3 +115,2 @@

"""
pass

@@ -135,3 +126,2 @@

"""
pass

@@ -147,3 +137,2 @@

"""
pass

@@ -159,3 +148,2 @@

"""
pass

@@ -171,3 +159,2 @@

"""
pass

@@ -183,3 +170,2 @@

"""
pass

@@ -195,3 +181,2 @@

"""
pass

@@ -207,3 +192,2 @@

"""
pass

@@ -219,3 +203,2 @@

"""
pass

@@ -231,3 +214,2 @@

"""
pass

@@ -243,3 +225,2 @@

"""
pass

@@ -255,3 +236,2 @@

"""
pass
+22
-32

@@ -19,3 +19,3 @@ # Copyright 2016-2021 The NATS Authors

from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Dict, Union

@@ -44,7 +44,6 @@ from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError

"""
_client: NATS
subject: str = ""
reply: str = ""
data: bytes = b""
subject: str = ''
reply: str = ''
data: bytes = b''
headers: Optional[Dict[str, str]] = None

@@ -63,4 +62,4 @@

# Reply metadata...
Prefix0 = "$JS"
Prefix1 = "ACK"
Prefix0 = '$JS'
Prefix1 = 'ACK'
Domain = 2

@@ -84,9 +83,2 @@ AccHash = 3

@property
def is_acked(self) -> bool:
"""
Have we sent a terminal ack message (not in-progress) in response to this original message?
"""
return self._ackd
@property
def sid(self) -> int:

@@ -97,3 +89,3 @@ """

if self._sid is None:
raise Error("sid not set")
raise Error('sid not set')
return self._sid

@@ -106,5 +98,5 @@

if not self.reply:
raise Error("no reply subject available")
raise Error('no reply subject available')
if not self._client:
raise Error("client not set")
raise Error('client not set')

@@ -139,5 +131,5 @@ await self._client.publish(self.reply, data, headers=self.headers)

if delay:
json_args["delay"] = int(delay * 10**9) # from seconds to ns
json_args['delay'] = int(delay * 10**9) # from seconds to ns
if json_args:
payload += b" " + json.dumps(json_args).encode()
payload += (b' ' + json.dumps(json_args).encode())
await self._client.publish(self.reply, payload)

@@ -151,3 +143,3 @@ self._ackd = True

"""
if self.reply is None or self.reply == "":
if self.reply is None or self.reply == '':
raise NotJSMessageError

@@ -184,3 +176,3 @@ await self._client.publish(self.reply, Msg.Ack.Progress)

def _check_reply(self) -> None:
if self.reply is None or self.reply == "":
if self.reply is None or self.reply == '':
raise NotJSMessageError

@@ -204,3 +196,2 @@ if self._ackd:

"""
sequence: SequencePair

@@ -219,3 +210,2 @@ num_pending: int

"""
consumer: int

@@ -228,7 +218,7 @@ stream: int

raise NotJSMessageError
tokens = reply.split(".")
if ((len(tokens) == _V1_TOKEN_COUNT
or len(tokens) >= _V2_TOKEN_COUNT - 1)
and tokens[0] == Msg.Ack.Prefix0
and tokens[1] == Msg.Ack.Prefix1):
tokens = reply.split('.')
if (len(tokens) == _V1_TOKEN_COUNT or
len(tokens) >= _V2_TOKEN_COUNT-1) and \
tokens[0] == Msg.Ack.Prefix0 and \
tokens[1] == Msg.Ack.Prefix1:
return tokens

@@ -239,7 +229,8 @@ raise NotJSMessageError

def _from_reply(cls, reply: str) -> Msg.Metadata:
"""Construct the metadata from the reply string"""
"""Construct the metadata from the reply string
"""
tokens = cls._get_metadata_fields(reply)
if len(tokens) == _V1_TOKEN_COUNT:
t = datetime.datetime.fromtimestamp(
int(tokens[7]) / 1_000_000_000.0, datetime.timezone.utc
int(tokens[7]) / 1_000_000_000.0
)

@@ -259,4 +250,3 @@ return cls(

t = datetime.datetime.fromtimestamp(
int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0,
datetime.timezone.utc
int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0
)

@@ -263,0 +253,0 @@

@@ -66,4 +66,4 @@ # Copyright 2016-2021 The NATS Authors

id: int = 0,
subject: str = "",
queue: str = "",
subject: str = '',
queue: str = '',
cb: Optional[Callable[[Msg], Awaitable[None]]] = None,

@@ -127,3 +127,3 @@ future: Optional[asyncio.Future] = None,

::
nc = await nats.connect()

@@ -182,3 +182,3 @@ sub = await nc.subscribe('foo')

raise errors.Error(
"nats: next_msg cannot be used in async subscriptions"
'nats: next_msg cannot be used in async subscriptions'
)

@@ -216,5 +216,4 @@

if self._cb:
if not asyncio.iscoroutinefunction(self._cb) and not (
hasattr(self._cb, "func")
and asyncio.iscoroutinefunction(self._cb.func)):
if not asyncio.iscoroutinefunction(self._cb) and \
not (hasattr(self._cb, "func") and asyncio.iscoroutinefunction(self._cb.func)):
raise errors.Error(

@@ -336,4 +335,3 @@ "nats: must use coroutine for subscriptions"

# Apply auto unsubscribe checks after having processed last msg.
if (self._max_msgs > 0 and self._received >= self._max_msgs
and self._pending_queue.empty):
if self._max_msgs > 0 and self._received >= self._max_msgs and self._pending_queue.empty:
self._stop_processing()

@@ -340,0 +338,0 @@ except asyncio.CancelledError:

@@ -6,4 +6,4 @@ from __future__ import annotations

import ssl
from typing import List, Optional, Union
from urllib.parse import ParseResult
from typing import List, Union, Optional

@@ -127,4 +127,3 @@ try:

limit=buffer_size,
),
connect_timeout,
), connect_timeout
)

@@ -148,3 +147,3 @@ # We keep a reference to the initial transport we used when

) -> None:
assert self._io_writer, f"{type(self).__name__}.connect must be called first"
assert self._io_writer, f'{type(self).__name__}.connect must be called first'

@@ -159,3 +158,3 @@ # manually recreate the stream reader/writer with a tls upgraded transport

# hostname here will be passed directly as string
server_hostname=uri if isinstance(uri, str) else uri.hostname,
server_hostname=uri if isinstance(uri, str) else uri.hostname
)

@@ -175,3 +174,3 @@ transport = await asyncio.wait_for(transport_future, connect_timeout)

async def read(self, buffer_size: int):
assert self._io_reader, f"{type(self).__name__}.connect must be called first"
assert self._io_reader, f'{type(self).__name__}.connect must be called first'
return await self._io_reader.read(buffer_size)

@@ -235,3 +234,3 @@

ssl=ssl_context,
timeout=connect_timeout,
timeout=connect_timeout
)

@@ -254,3 +253,3 @@ self._using_tls = True

# if the connection terminated abruptly, return empty binary data to raise unexpected EOF
return b""
return b''
return data.data

@@ -257,0 +256,0 @@

@@ -110,6 +110,4 @@ # Copyright 2021 The NATS Authors

def __str__(self) -> str:
return (
"nats: slow consumer, messages dropped subject: "
f"{self.subject}, sid: {self.sid}, sub: {self.sub}"
)
return "nats: slow consumer, messages dropped subject: " \
f"{self.subject}, sid: {self.sid}, sub: {self.sub}"

@@ -116,0 +114,0 @@

@@ -19,3 +19,3 @@ # Copyright 2021 The NATS Authors

from enum import Enum
from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar
from typing import Any, Dict, Optional, TypeVar, List, Iterable, Iterator

@@ -40,3 +40,3 @@ _NANOSECOND = 10**9

DEFAULT_PREFIX = "$JS.API"
INBOX_PREFIX = b"_INBOX."
INBOX_PREFIX = b'_INBOX.'

@@ -63,3 +63,4 @@

def _convert(resp: Dict[str, Any], field: str, type: type[Base]) -> None:
"""Convert the field into the given type in place."""
"""Convert the field into the given type in place.
"""
data = resp.get(field, None)

@@ -75,3 +76,4 @@ if data is None:

def _convert_nanoseconds(resp: Dict[str, Any], field: str) -> None:
"""Convert the given field from nanoseconds to seconds in place."""
"""Convert the given field from nanoseconds to seconds in place.
"""
val = resp.get(field, None)

@@ -84,3 +86,4 @@ if val is not None:

def _to_nanoseconds(val: Optional[float]) -> Optional[int]:
"""Convert the value from seconds to nanoseconds."""
"""Convert the value from seconds to nanoseconds.
"""
if val is None:

@@ -104,7 +107,9 @@ # We use 0 to avoid sending null to Go servers.

def evolve(self: _B, **params) -> _B:
"""Return a copy of the instance with the passed values replaced."""
"""Return a copy of the instance with the passed values replaced.
"""
return replace(self, **params)
def as_dict(self) -> Dict[str, object]:
"""Return the object converted into an API-friendly dict."""
"""Return the object converted into an API-friendly dict.
"""
result = {}

@@ -129,3 +134,2 @@ for field in fields(self):

"""
stream: str

@@ -167,4 +171,4 @@ seq: int

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "external", ExternalStream)
cls._convert(resp, "subject_transforms", SubjectTransform)
cls._convert(resp, 'external', ExternalStream)
cls._convert(resp, 'subject_transforms', SubjectTransform)
return super().from_response(resp)

@@ -175,3 +179,3 @@

if self.subject_transforms:
result["subject_transform"] = [
result['subject_transform'] = [
tr.as_dict() for tr in self.subject_transforms

@@ -210,3 +214,3 @@ ]

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "lost", LostStreamData)
cls._convert(resp, 'lost', LostStreamData)
return super().from_response(resp)

@@ -256,3 +260,2 @@

"""
src: Optional[str] = None

@@ -266,3 +269,2 @@ dest: Optional[str] = None

"""Subject transform to apply to matching messages."""
src: str

@@ -281,3 +283,2 @@ dest: str

"""
name: Optional[str] = None

@@ -291,3 +292,2 @@ description: Optional[str] = None

discard: Optional[DiscardPolicy] = DiscardPolicy.OLD
discard_new_per_subject: bool = False
max_age: Optional[float] = None # in seconds

@@ -327,9 +327,9 @@ max_msgs_per_subject: int = -1

def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "max_age")
cls._convert_nanoseconds(resp, "duplicate_window")
cls._convert(resp, "placement", Placement)
cls._convert(resp, "mirror", StreamSource)
cls._convert(resp, "sources", StreamSource)
cls._convert(resp, "republish", RePublish)
cls._convert(resp, "subject_transform", SubjectTransform)
cls._convert_nanoseconds(resp, 'max_age')
cls._convert_nanoseconds(resp, 'duplicate_window')
cls._convert(resp, 'placement', Placement)
cls._convert(resp, 'mirror', StreamSource)
cls._convert(resp, 'sources', StreamSource)
cls._convert(resp, 'republish', RePublish)
cls._convert(resp, 'subject_transform', SubjectTransform)
return super().from_response(resp)

@@ -339,10 +339,9 @@

result = super().as_dict()
result["duplicate_window"] = self._to_nanoseconds(
result['duplicate_window'] = self._to_nanoseconds(
self.duplicate_window
)
result["max_age"] = self._to_nanoseconds(self.max_age)
result['max_age'] = self._to_nanoseconds(self.max_age)
if self.sources:
result["sources"] = [src.as_dict() for src in self.sources]
if self.compression and (self.compression != StoreCompression.NONE
and self.compression != StoreCompression.S2):
result['sources'] = [src.as_dict() for src in self.sources]
if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
raise ValueError(

@@ -373,3 +372,3 @@ "nats: invalid store compression type: %s" % self.compression

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "replicas", PeerInfo)
cls._convert(resp, 'replicas', PeerInfo)
return super().from_response(resp)

@@ -383,3 +382,2 @@

"""
config: StreamConfig

@@ -394,7 +392,7 @@ state: StreamState

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "config", StreamConfig)
cls._convert(resp, "state", StreamState)
cls._convert(resp, "mirror", StreamSourceInfo)
cls._convert(resp, "sources", StreamSourceInfo)
cls._convert(resp, "cluster", ClusterInfo)
cls._convert(resp, 'config', StreamConfig)
cls._convert(resp, 'state', StreamState)
cls._convert(resp, 'mirror', StreamSourceInfo)
cls._convert(resp, 'sources', StreamSourceInfo)
cls._convert(resp, 'cluster', ClusterInfo)
return super().from_response(resp)

@@ -408,6 +406,3 @@

"""
def __init__(
self, offset: int, total: int, streams: List[Dict[str, any]]
) -> None:
def __init__(self, offset: int, total: int, streams: List[Dict[str, any]]) -> None:
self.offset = offset

@@ -483,3 +478,2 @@ self.total = total

"""
name: Optional[str] = None

@@ -526,7 +520,7 @@ durable_name: Optional[str] = None

def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "ack_wait")
cls._convert_nanoseconds(resp, "idle_heartbeat")
cls._convert_nanoseconds(resp, "inactive_threshold")
if "backoff" in resp:
resp["backoff"] = [val / _NANOSECOND for val in resp["backoff"]]
cls._convert_nanoseconds(resp, 'ack_wait')
cls._convert_nanoseconds(resp, 'idle_heartbeat')
cls._convert_nanoseconds(resp, 'inactive_threshold')
if 'backoff' in resp:
resp['backoff'] = [val / _NANOSECOND for val in resp['backoff']]
return super().from_response(resp)

@@ -536,9 +530,9 @@

result = super().as_dict()
result["ack_wait"] = self._to_nanoseconds(self.ack_wait)
result["idle_heartbeat"] = self._to_nanoseconds(self.idle_heartbeat)
result["inactive_threshold"] = self._to_nanoseconds(
result['ack_wait'] = self._to_nanoseconds(self.ack_wait)
result['idle_heartbeat'] = self._to_nanoseconds(self.idle_heartbeat)
result['inactive_threshold'] = self._to_nanoseconds(
self.inactive_threshold
)
if self.backoff:
result["backoff"] = [self._to_nanoseconds(i) for i in self.backoff]
result['backoff'] = [self._to_nanoseconds(i) for i in self.backoff]
return result

@@ -560,3 +554,2 @@

"""
name: str

@@ -578,6 +571,6 @@ stream_name: str

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "delivered", SequenceInfo)
cls._convert(resp, "ack_floor", SequenceInfo)
cls._convert(resp, "config", ConsumerConfig)
cls._convert(resp, "cluster", ClusterInfo)
cls._convert(resp, 'delivered', SequenceInfo)
cls._convert(resp, 'ack_floor', SequenceInfo)
cls._convert(resp, 'config', ConsumerConfig)
cls._convert(resp, 'cluster', ClusterInfo)
return super().from_response(resp)

@@ -614,3 +607,3 @@

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "limits", AccountLimits)
cls._convert(resp, 'limits', AccountLimits)
return super().from_response(resp)

@@ -634,3 +627,2 @@

"""
# NOTE: These fields are shared with Tier type as well.

@@ -649,6 +641,6 @@ memory: int

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "limits", AccountLimits)
cls._convert(resp, "api", APIStats)
cls._convert(resp, 'limits', AccountLimits)
cls._convert(resp, 'api', APIStats)
info = super().from_response(resp)
tiers = resp.get("tiers", None)
tiers = resp.get('tiers', None)
if tiers:

@@ -689,3 +681,2 @@ result = {}

"""
bucket: str

@@ -705,3 +696,3 @@ description: Optional[str] = None

result = super().as_dict()
result["ttl"] = self._to_nanoseconds(self.ttl)
result['ttl'] = self._to_nanoseconds(self.ttl)
return result

@@ -715,3 +706,2 @@

"""
# Purge up to but not including sequence.

@@ -730,3 +720,2 @@ seq: Optional[int] = None

"""
bucket: Optional[str] = None

@@ -742,3 +731,3 @@ description: Optional[str] = None

result = super().as_dict()
result["ttl"] = self._to_nanoseconds(self.ttl)
result['ttl'] = self._to_nanoseconds(self.ttl)
return result

@@ -752,3 +741,2 @@

"""
# Bucket is the name of the other object store.

@@ -772,3 +760,3 @@ bucket: str

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "link", ObjectLink)
cls._convert(resp, 'link', ObjectLink)
return super().from_response(resp)

@@ -782,3 +770,2 @@

"""
name: Optional[str] = None

@@ -792,3 +779,3 @@ description: Optional[str] = None

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "options", ObjectMetaOptions)
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)

@@ -802,3 +789,2 @@

"""
name: str

@@ -833,3 +819,3 @@ bucket: str

def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "options", ObjectMetaOptions)
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)

@@ -22,11 +22,3 @@ # Copyright 2021-2022 The NATS Authors

from secrets import token_hex
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
)
from typing import TYPE_CHECKING, Awaitable, Callable, Optional, List, Dict

@@ -38,17 +30,8 @@ import nats.errors

from nats.js import api
from nats.js.errors import (
BadBucketError,
BucketNotFoundError,
FetchTimeoutError,
InvalidBucketNameError,
NotFoundError,
)
from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError, FetchTimeoutError
from nats.js.kv import KeyValue
from nats.js.manager import JetStreamManager
from nats.js.object_store import (
OBJ_ALL_CHUNKS_PRE_TEMPLATE,
OBJ_ALL_META_PRE_TEMPLATE,
OBJ_STREAM_TEMPLATE,
VALID_BUCKET_RE,
ObjectStore,
VALID_BUCKET_RE, OBJ_ALL_CHUNKS_PRE_TEMPLATE, OBJ_ALL_META_PRE_TEMPLATE,
OBJ_STREAM_TEMPLATE, ObjectStore
)

@@ -61,9 +44,9 @@

NATS_HDR_LINE = bytearray(b"NATS/1.0")
NATS_HDR_LINE = bytearray(b'NATS/1.0')
NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE)
_CRLF_ = b"\r\n"
_CRLF_ = b'\r\n'
_CRLF_LEN_ = len(_CRLF_)
KV_STREAM_TEMPLATE = "KV_{bucket}"
KV_PRE_TEMPLATE = "$KV.{bucket}."
Callback = Callable[["Msg"], Awaitable[None]]
Callback = Callable[['Msg'], Awaitable[None]]

@@ -129,5 +112,3 @@ # For JetStream the default pending limits are larger.

self._publish_async_pending_semaphore = asyncio.Semaphore(
publish_async_max_pending
)
self._publish_async_pending_semaphore = asyncio.Semaphore(publish_async_max_pending)

@@ -146,8 +127,8 @@ @property

self._async_reply_prefix = self._nc._inbox_prefix[:]
self._async_reply_prefix.extend(b".")
self._async_reply_prefix.extend(b'.')
self._async_reply_prefix.extend(self._nc._nuid.next())
self._async_reply_prefix.extend(b".")
self._async_reply_prefix.extend(b'.')
async_reply_subject = self._async_reply_prefix[:]
async_reply_subject.extend(b"*")
async_reply_subject.extend(b'*')

@@ -169,4 +150,3 @@ await self._nc.subscribe(

# Handle no responders
if msg.headers and msg.headers.get(api.Header.STATUS
) == NO_RESPONDERS_STATUS:
if msg.headers and msg.headers.get(api.Header.STATUS) == NO_RESPONDERS_STATUS:
future.set_exception(nats.js.errors.NoStreamResponseError)

@@ -178,4 +158,4 @@ return

resp = json.loads(msg.data)
if "error" in resp:
err = nats.js.errors.APIError.from_error(resp["error"])
if 'error' in resp:
err = nats.js.errors.APIError.from_error(resp['error'])
future.set_exception(err)

@@ -192,6 +172,6 @@ return

subject: str,
payload: bytes = b"",
payload: bytes = b'',
timeout: Optional[float] = None,
stream: Optional[str] = None,
headers: Optional[Dict[str, Any]] = None,
headers: Optional[Dict] = None
) -> api.PubAck:

@@ -219,4 +199,4 @@ """

resp = json.loads(msg.data)
if "error" in resp:
raise nats.js.errors.APIError.from_error(resp["error"])
if 'error' in resp:
raise nats.js.errors.APIError.from_error(resp['error'])
return api.PubAck.from_response(resp)

@@ -227,3 +207,3 @@

subject: str,
payload: bytes = b"",
payload: bytes = b'',
wait_stall: Optional[float] = None,

@@ -247,6 +227,3 @@ stream: Optional[str] = None,

try:
await asyncio.wait_for(
self._publish_async_pending_semaphore.acquire(),
timeout=wait_stall
)
await asyncio.wait_for(self._publish_async_pending_semaphore.acquire(), timeout=wait_stall)
except (asyncio.TimeoutError, asyncio.CancelledError):

@@ -436,6 +413,5 @@ raise nats.js.errors.TooManyStalledMsgsError

# Create inbox for push consumer, if deliver_subject is not assigned already.
if config.deliver_subject is None:
deliver = self._nc.new_inbox()
config.deliver_subject = deliver
# Create inbox for push consumer.
deliver = self._nc.new_inbox()
config.deliver_subject = deliver

@@ -492,3 +468,4 @@ # Auto created consumers use the filter subject.

) -> PushSubscription:
"""Push-subscribe to an existing consumer."""
"""Push-subscribe to an existing consumer.
"""
# By default, async subscribers wrap the original callback and

@@ -666,3 +643,3 @@ # auto ack the messages as they are delivered.

pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
pending_bytes_limit=pending_bytes_limit
)

@@ -704,5 +681,4 @@ consumer_name = None

def _is_temporary_error(cls, status: Optional[str]) -> bool:
if (status == api.StatusCode.NO_MESSAGES
or status == api.StatusCode.CONFLICT
or status == api.StatusCode.REQUEST_TIMEOUT):
if status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.CONFLICT \
or status == api.StatusCode.REQUEST_TIMEOUT:
return True

@@ -726,3 +702,3 @@ else:

class _JSI:
class _JSI():

@@ -803,4 +779,3 @@ def __init__(

if (self._fciseq -
self._psub._pending_queue.qsize()) >= self._fcd:
if (self._fciseq - self._psub._pending_queue.qsize()) >= self._fcd:
fc_reply = self._fcr

@@ -1023,3 +998,3 @@ try:

prefix = self._js._prefix
self._nms = f"{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}"
self._nms = f'{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}'
self._deliver = deliver.decode()

@@ -1073,3 +1048,3 @@

timeout: Optional[float] = 5,
heartbeat: Optional[float] = None,
heartbeat: Optional[float] = None
) -> List[Msg]:

@@ -1126,3 +1101,3 @@ """

timeout: Optional[float],
heartbeat: Optional[float] = None,
heartbeat: Optional[float] = None
) -> Msg:

@@ -1148,7 +1123,7 @@ queue = self._sub._pending_queue

next_req = {}
next_req["batch"] = 1
next_req['batch'] = 1
if expires:
next_req["expires"] = int(expires)
next_req['expires'] = int(expires)
if heartbeat:
next_req["idle_heartbeat"] = int(
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000

@@ -1206,3 +1181,3 @@ ) # to nanoseconds

timeout: Optional[float],
heartbeat: Optional[float] = None,
heartbeat: Optional[float] = None
) -> List[Msg]:

@@ -1235,10 +1210,10 @@ msgs = []

next_req = {}
next_req["batch"] = needed
next_req['batch'] = needed
if expires:
next_req["expires"] = expires
next_req['expires'] = expires
if heartbeat:
next_req["idle_heartbeat"] = int(
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000
) # to nanoseconds
next_req["no_wait"] = True
next_req['no_wait'] = True
await self._nc.publish(

@@ -1279,3 +1254,3 @@ self._nms,

status = JetStreamContext.is_status_msg(msg)
if status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.REQUEST_TIMEOUT:
if status == api.StatusCode.NO_MESSAGES:
# No more messages after this so fallthrough

@@ -1303,7 +1278,7 @@ # after receiving the rest.

next_req = {}
next_req["batch"] = needed
next_req['batch'] = needed
if expires:
next_req["expires"] = expires
next_req['expires'] = expires
if heartbeat:
next_req["idle_heartbeat"] = int(
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000

@@ -1413,3 +1388,3 @@ ) # to nanoseconds

js=self,
direct=bool(si.config.allow_direct),
direct=bool(si.config.allow_direct)
)

@@ -1416,0 +1391,0 @@

@@ -18,3 +18,3 @@ # Copyright 2016-2024 The NATS Authors

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, NoReturn, Optional
from typing import TYPE_CHECKING, Any, NoReturn, Optional, Dict

@@ -37,3 +37,3 @@ import nats.errors

def __str__(self) -> str:
desc = ""
desc = ''
if self.description:

@@ -49,3 +49,2 @@ desc = self.description

"""
code: Optional[int]

@@ -84,3 +83,3 @@ err_code: Optional[int]

def from_error(cls, err: Dict[str, Any]):
code = err["code"]
code = err['code']
if code == 503:

@@ -108,3 +107,2 @@ raise ServiceUnavailableError(**err)

"""
pass

@@ -117,3 +115,2 @@

"""
pass

@@ -126,3 +123,2 @@

"""
pass

@@ -135,3 +131,2 @@

"""
pass

@@ -177,3 +172,3 @@

consumer_sequence=None,
last_consumer_sequence=None,
last_consumer_sequence=None
) -> None:

@@ -196,3 +191,2 @@ self.stream_resume_sequence = stream_resume_sequence

"""
pass

@@ -209,3 +203,2 @@

"""
pass

@@ -272,3 +265,2 @@

"""
pass

@@ -281,3 +273,2 @@

"""
pass

@@ -290,3 +281,2 @@

"""
pass

@@ -299,3 +289,2 @@

"""
pass

@@ -308,3 +297,2 @@

"""
pass

@@ -317,3 +305,2 @@

"""
pass

@@ -326,3 +313,2 @@

"""
pass

@@ -335,3 +321,2 @@

"""
pass

@@ -19,3 +19,2 @@ # Copyright 2021-2022 The NATS Authors

import datetime
import logging
from dataclasses import dataclass

@@ -36,5 +35,3 @@ from typing import TYPE_CHECKING, List, Optional

logger = logging.getLogger(__name__)
class KeyValue:

@@ -44,2 +41,5 @@ """

.. note::
This functionality is EXPERIMENTAL and may be changed in later releases.
::

@@ -75,3 +75,2 @@

"""
bucket: str

@@ -90,3 +89,2 @@ key: str

"""
stream_info: api.StreamInfo

@@ -289,5 +287,3 @@ bucket: str

subject = f"{self._pre}{entry.key}"
duration = datetime.datetime.now(
datetime.timezone.utc
) - entry.created
duration = datetime.datetime.now() - entry.created
if olderthan > 0 and olderthan > duration.total_seconds():

@@ -311,4 +307,3 @@ keep = 1

self._js = js
self._updates: asyncio.Queue[KeyValue.Entry
| None] = asyncio.Queue(maxsize=256)
self._updates: asyncio.Queue[KeyValue.Entry | None] = asyncio.Queue(maxsize=256)
self._sub = None

@@ -352,6 +347,5 @@ self._pending: Optional[int] = None

async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:
async def keys(self, **kwargs) -> List[str]:
"""
Returns a list of the keys from a KeyValue store.
Optionally filters the keys based on the provided filter list.
keys will return a list of the keys from a KeyValue store.
"""

@@ -364,14 +358,2 @@ watcher = await self.watchall(

# Check consumer info and make sure filters are applied correctly
try:
consumer_info = await watcher._sub.consumer_info()
if consumer_info and filters:
# If NATS server < 2.10, filters might be ignored.
if consumer_info.config.filter_subject != ">":
logger.warning(
"Server may ignore filters if version is < 2.10."
)
except Exception as e:
raise e
async for key in watcher:

@@ -381,11 +363,3 @@ # None entry is used to signal that there is no more info.

break
# Apply filters if any were provided
if filters:
if any(f in key.key for f in filters):
keys.append(key.key)
else:
# No filters provided, append all keys
keys.append(key.key)
keys.append(key.key)
await watcher.stop()

@@ -449,3 +423,3 @@

if ignore_deletes:
if op == KV_PURGE or op == KV_DEL:
if (op == KV_PURGE or op == KV_DEL):
if meta.num_pending == 0 and not watcher._init_done:

@@ -452,0 +426,0 @@ await watcher._updates.put(None)

@@ -20,3 +20,3 @@ # Copyright 2021 The NATS Authors

from email.parser import BytesParser
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional
from typing import TYPE_CHECKING, Any, List, Optional, Dict, Iterable

@@ -30,5 +30,5 @@ from nats.errors import NoRespondersError

NATS_HDR_LINE = bytearray(b"NATS/1.0")
NATS_HDR_LINE = bytearray(b'NATS/1.0')
NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE)
_CRLF_ = b"\r\n"
_CRLF_ = b'\r\n'
_CRLF_LEN_ = len(_CRLF_)

@@ -55,3 +55,3 @@

resp = await self._api_request(
f"{self._prefix}.INFO", b"", timeout=self._timeout
f"{self._prefix}.INFO", b'', timeout=self._timeout
)

@@ -70,21 +70,15 @@ return api.AccountInfo.from_response(resp)

)
if not info["streams"]:
if not info['streams']:
raise NotFoundError
return info["streams"][0]
return info['streams'][0]
async def stream_info(
self,
name: str,
subjects_filter: Optional[str] = None
) -> api.StreamInfo:
async def stream_info(self, name: str, subjects_filter: Optional[str] = None) -> api.StreamInfo:
"""
Get the latest StreamInfo by stream name.
"""
req_data = ""
req_data = ''
if subjects_filter:
req_data = json.dumps({"subjects_filter": subjects_filter})
resp = await self._api_request(
f"{self._prefix}.STREAM.INFO.{name}",
req_data.encode(),
timeout=self._timeout,
f"{self._prefix}.STREAM.INFO.{name}", req_data.encode(), timeout=self._timeout
)

@@ -104,22 +98,8 @@ return api.StreamInfo.from_response(resp)

config = config.evolve(**params)
stream_name = config.name
if stream_name is None:
if config.name is None:
raise ValueError("nats: stream name is required")
# Validate stream name
invalid_chars = set(".*>/\\")
has_invalid_chars = any(char in stream_name for char in invalid_chars)
has_whitespace = any(char.isspace() for char in stream_name)
is_not_printable = not stream_name.isprintable()
if has_invalid_chars or has_whitespace or is_not_printable:
raise ValueError(
f"nats: stream name ({stream_name}) is invalid. Names cannot contain whitespace, '.', '*', '>', "
"path separators (forward or backward slash), or non-printable characters."
)
data = json.dumps(config.as_dict())
resp = await self._api_request(
f"{self._prefix}.STREAM.CREATE.{stream_name}",
f"{self._prefix}.STREAM.CREATE.{config.name}",
data.encode(),

@@ -159,3 +139,3 @@ timeout=self._timeout,

)
return resp["success"]
return resp['success']

@@ -167,3 +147,3 @@ async def purge_stream(

subject: Optional[str] = None,
keep: Optional[int] = None,
keep: Optional[int] = None
) -> bool:

@@ -175,7 +155,7 @@ """

if seq:
stream_req["seq"] = seq
stream_req['seq'] = seq
if subject:
stream_req["filter"] = subject
stream_req['filter'] = subject
if keep:
stream_req["keep"] = keep
stream_req['keep'] = keep

@@ -188,3 +168,3 @@ req = json.dumps(stream_req)

)
return resp["success"]
return resp['success']

@@ -199,3 +179,3 @@ async def consumer_info(

f"{self._prefix}.CONSUMER.INFO.{stream}.{consumer}",
b"",
b'',
timeout=timeout

@@ -211,9 +191,7 @@ )

f"{self._prefix}.STREAM.LIST",
json.dumps({
"offset": offset
}).encode(),
json.dumps({"offset": offset}).encode(),
timeout=self._timeout,
)
streams = []
for stream in resp["streams"]:
for stream in resp['streams']:
stream_info = api.StreamInfo.from_response(stream)

@@ -223,4 +201,3 @@ streams.append(stream_info)

async def streams_info_iterator(self,
offset=0) -> Iterable[api.StreamInfo]:
async def streams_info_iterator(self, offset=0) -> Iterable[api.StreamInfo]:
"""

@@ -231,11 +208,7 @@ streams_info retrieves a list of streams Iterator.

f"{self._prefix}.STREAM.LIST",
json.dumps({
"offset": offset
}).encode(),
json.dumps({"offset": offset}).encode(),
timeout=self._timeout,
)
return api.StreamsListIterator(
resp["offset"], resp["total"], resp["streams"]
)
return api.StreamsListIterator(resp["offset"], resp["total"], resp["streams"])

@@ -259,3 +232,3 @@ async def add_consumer(

resp = None
subject = ""
subject = ''
version = self._nc.connected_server_version

@@ -282,6 +255,6 @@ consumer_name_supported = version.major >= 2 and version.minor >= 9

f"{self._prefix}.CONSUMER.DELETE.{stream}.{consumer}",
b"",
timeout=self._timeout,
b'',
timeout=self._timeout
)
return resp["success"]
return resp['success']

@@ -301,3 +274,3 @@ async def consumers_info(

f"{self._prefix}.CONSUMER.LIST.{stream}",
b"" if offset is None else json.dumps({
b'' if offset is None else json.dumps({
"offset": offset

@@ -308,3 +281,3 @@ }).encode(),

consumers = []
for consumer in resp["consumers"]:
for consumer in resp['consumers']:
consumer_info = api.ConsumerInfo.from_response(consumer)

@@ -328,12 +301,12 @@ consumers.append(consumer_info)

if seq:
req["seq"] = seq
req['seq'] = seq
if subject:
req["seq"] = None
req.pop("seq", None)
req["last_by_subj"] = subject
req['seq'] = None
req.pop('seq', None)
req['last_by_subj'] = subject
if next:
req["seq"] = seq
req["last_by_subj"] = None
req.pop("last_by_subj", None)
req["next_by_subj"] = subject
req['seq'] = seq
req['last_by_subj'] = None
req.pop('last_by_subj', None)
req['next_by_subj'] = subject
data = json.dumps(req)

@@ -345,3 +318,3 @@

# last_by_subject type request requires no payload.
data = ""
data = ''
req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}"

@@ -363,3 +336,3 @@ else:

raw_msg = api.RawStreamMsg.from_response(resp_data["message"])
raw_msg = api.RawStreamMsg.from_response(resp_data['message'])
if raw_msg.hdrs:

@@ -387,5 +360,5 @@ hdrs = base64.b64decode(raw_msg.hdrs)

msg.data = None
status = msg.headers.get("Status")
status = msg.headers.get('Status')
if status:
if status == "404":
if status == '404':
raise NotFoundError

@@ -396,6 +369,6 @@ else:

raw_msg = api.RawStreamMsg()
subject = msg.headers["Nats-Subject"]
subject = msg.headers['Nats-Subject']
raw_msg.subject = subject
seq = msg.headers.get("Nats-Sequence")
seq = msg.headers.get('Nats-Sequence')
if seq:

@@ -413,6 +386,6 @@ raw_msg.seq = int(seq)

req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}"
req = {"seq": seq}
req = {'seq': seq}
data = json.dumps(req)
resp = await self._api_request(req_subject, data.encode())
return resp["success"]
return resp['success']

@@ -433,3 +406,3 @@ async def get_last_msg(

req_subject: str,
req: bytes = b"",
req: bytes = b'',
timeout: float = 5,

@@ -444,5 +417,5 @@ ) -> Dict[str, Any]:

# Check for API errors.
if "error" in resp:
raise APIError.from_error(resp["error"])
if 'error' in resp:
raise APIError.from_error(resp['error'])
return resp

@@ -15,11 +15,11 @@ # Copyright 2021-2023 The NATS Authors

import asyncio
import base64
import re
import io
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from hashlib import sha256
from typing import TYPE_CHECKING, List, Optional, Union
from dataclasses import dataclass
import json
import asyncio
from typing import TYPE_CHECKING, Optional, Union, List

@@ -29,9 +29,4 @@ import nats.errors

from nats.js.errors import (
BadObjectMetaError,
DigestMismatchError,
LinkIsABucketError,
NotFoundError,
ObjectAlreadyExists,
ObjectDeletedError,
ObjectNotFoundError,
BadObjectMetaError, DigestMismatchError, ObjectAlreadyExists,
ObjectDeletedError, ObjectNotFoundError, NotFoundError, LinkIsABucketError
)

@@ -61,2 +56,5 @@ from nats.js.kv import MSG_ROLLUP_SUBJECT

.. note::
This functionality is EXPERIMENTAL and may be changed in later releases.
::

@@ -79,3 +77,2 @@ """

"""
stream_info: api.StreamInfo

@@ -150,3 +147,3 @@ bucket: str

bucket=self._name,
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode()
)

@@ -220,3 +217,3 @@ stream = OBJ_STREAM_TEMPLATE.format(bucket=self._name)

executor = asyncio.get_running_loop().run_in_executor
if hasattr(writeinto, "buffer"):
if hasattr(writeinto, 'buffer'):
executor_fn = writeinto.buffer.write

@@ -293,6 +290,6 @@ else:

data = io.BytesIO(data)
elif hasattr(data, "readinto") or isinstance(data, io.BufferedIOBase):
elif hasattr(data, 'readinto') or isinstance(data, io.BufferedIOBase):
# Need to delegate to a threaded executor to avoid blocking.
executor = asyncio.get_running_loop().run_in_executor
elif hasattr(data, "buffer") or isinstance(data, io.TextIOWrapper):
elif hasattr(data, 'buffer') or isinstance(data, io.TextIOWrapper):
data = data.buffer

@@ -311,3 +308,3 @@ else:

chunks=0,
mtime=datetime.now(timezone.utc).isoformat(),
mtime=datetime.now(timezone.utc).isoformat()
)

@@ -348,3 +345,3 @@ h = sha256()

bucket=self._name,
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode()
)

@@ -356,3 +353,3 @@ # Publish the meta message.

json.dumps(info.as_dict()).encode(),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}
)

@@ -419,3 +416,3 @@ except Exception as err:

bucket=self._name,
obj=base64.urlsafe_b64encode(bytes(name, "utf-8")).decode(),
obj=base64.urlsafe_b64encode(bytes(name, "utf-8")).decode()
)

@@ -427,3 +424,3 @@ # Publish the meta message.

json.dumps(info.as_dict()).encode(),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}
)

@@ -541,4 +538,4 @@ except Exception as err:

info.chunks = 0
info.digest = ""
info.mtime = ""
info.digest = ''
info.mtime = ''

@@ -548,3 +545,3 @@ # Prepare the meta message.

bucket=self._name,
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode()
)

@@ -556,3 +553,3 @@ # Publish the meta message.

json.dumps(info.as_dict()).encode(),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}
)

@@ -559,0 +556,0 @@ finally:

@@ -16,8 +16,7 @@ # Copyright 2021-2022 The NATS Authors

from dataclasses import replace
from nats.aio.client import Client
from typing import Optional
from nats.aio.client import Client
from .request import Handler, Request
from .service import Service, ServiceConfig
from .request import Request, Handler

@@ -24,0 +23,0 @@

@@ -18,3 +18,4 @@ # Copyright 2021-2024 The NATS Authors

from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Optional
from enum import Enum
from typing import Any, Dict, Awaitable, Callable, Optional

@@ -50,5 +51,3 @@ from nats.aio.msg import Msg

async def respond(
self,
data: bytes = b"",
headers: Optional[Dict[str, str]] = None
self, data: bytes = b"", headers: Optional[Dict[str, str]] = None
) -> None:

@@ -88,6 +87,8 @@ """Send a response to the request.

headers.update({
ERROR_HEADER: description,
ERROR_CODE_HEADER: code,
})
headers.update(
{
ERROR_HEADER: description,
ERROR_CODE_HEADER: code,
}
)

@@ -114,4 +115,2 @@ await self.respond(data, headers=headers)

def from_dict(cls, data: Dict[str, Any]) -> ServiceError:
return cls(
code=data.get("code", ""), description=data.get("description", "")
)
return cls(code=data.get("code", ""), description=data.get("description", ""))
from __future__ import annotations
import json
import re
import time
from asyncio import Event
from dataclasses import dataclass, field, replace
from dataclasses import dataclass, replace, field
from datetime import datetime, timedelta
from enum import Enum
from nats.aio.client import Client
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
from typing import (
Any,
AsyncContextManager,
Callable,
Optional,
Protocol,
Dict,
List,
Optional,
Protocol,
overload,
Callable,
)
from nats.aio.client import Client
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
import re
import json
import time
from .request import Handler, Request, ServiceError
from .request import Request, Handler, ServiceError
DEFAULT_QUEUE_GROUP = "q"

@@ -275,5 +277,3 @@ """Queue Group name used across all services."""

self._processing_time += elapsed_time
self._average_processing_time = int(
self._processing_time / self._num_requests
)
self._average_processing_time = int(self._processing_time / self._num_requests)

@@ -298,4 +298,3 @@

@overload
async def add_endpoint(self, config: EndpointConfig) -> None:
...
async def add_endpoint(self, config: EndpointConfig) -> None: ...

@@ -311,9 +310,7 @@ @overload

metadata: Optional[Dict[str, str]] = None,
) -> None:
...
) -> None: ...
async def add_endpoint(
self, config: Optional[EndpointConfig] = None, **kwargs
) -> None:
...
) -> None: ...

@@ -327,19 +324,11 @@

@overload
def add_group(
self, *, name: str, queue_group: Optional[str] = None
) -> Group:
...
def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ...
@overload
def add_group(self, config: GroupConfig) -> Group:
...
def add_group(self, config: GroupConfig) -> Group: ...
def add_group(
self, config: Optional[GroupConfig] = None, **kwargs
) -> Group:
...
def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group: ...
class Group(GroupManager, EndpointManager):
def __init__(self, service: "Service", config: GroupConfig) -> None:

@@ -351,4 +340,3 @@ self._service = service

@overload
async def add_endpoint(self, config: EndpointConfig) -> None:
...
async def add_endpoint(self, config: EndpointConfig) -> None: ...

@@ -364,4 +352,3 @@ @overload

metadata: Optional[Dict[str, str]] = None,
) -> None:
...
) -> None: ...

@@ -378,4 +365,5 @@ async def add_endpoint(

config,
subject=f"{self._prefix.strip('.')}.{config.subject or config.name}"
.strip("."),
subject=f"{self._prefix.strip('.')}.{config.subject or config.name}".strip(
"."
),
queue_group=config.queue_group or self._queue_group,

@@ -387,14 +375,8 @@ )

@overload
def add_group(
self, *, name: str, queue_group: Optional[str] = None
) -> Group:
...
def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ...
@overload
def add_group(self, config: GroupConfig) -> Group:
...
def add_group(self, config: GroupConfig) -> Group: ...
def add_group(
self, config: Optional[GroupConfig] = None, **kwargs
) -> Group:
def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group:
if config:

@@ -553,8 +535,5 @@ config = replace(config, **kwargs)

version=data["version"],
started=datetime.strptime(
data["started"], "%Y-%m-%dT%H:%M:%S.%fZ"
),
started=datetime.strptime(data["started"], "%Y-%m-%dT%H:%M:%S.%fZ"),
endpoints=[
EndpointStats.from_dict(endpoint)
for endpoint in data["endpoints"]
EndpointStats.from_dict(endpoint) for endpoint in data["endpoints"]
],

@@ -572,4 +551,3 @@ metadata=data["metadata"],

"version": self.version,
"started": self.started.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] +
"Z",
"started": self.started.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"endpoints": [endpoint.to_dict() for endpoint in self.endpoints],

@@ -634,4 +612,3 @@ "metadata": self.metadata,

endpoints=[
EndpointInfo.from_dict(endpoint)
for endpoint in data["endpoints"]
EndpointInfo.from_dict(endpoint) for endpoint in data["endpoints"]
],

@@ -659,3 +636,2 @@ metadata=data["metadata"],

class Service(AsyncContextManager):
def __init__(self, client: Client, config: ServiceConfig) -> None:

@@ -698,5 +674,3 @@ self._id = client._nuid.next().decode()

f"{verb}-all",
control_subject(
verb, name=None, id=None, prefix=self._prefix
),
control_subject(verb, name=None, id=None, prefix=self._prefix),
),

@@ -712,6 +686,3 @@ (

control_subject(
verb,
name=self._name,
id=self._id,
prefix=self._prefix
verb, name=self._name, id=self._id, prefix=self._prefix
),

@@ -726,8 +697,7 @@ ),

self._started = datetime.utcnow()
self._started = datetime.now()
await self._client.flush()
@overload
async def add_endpoint(self, config: EndpointConfig) -> None:
...
async def add_endpoint(self, config: EndpointConfig) -> None: ...

@@ -743,4 +713,3 @@ @overload

metadata: Optional[Dict[str, str]] = None,
) -> None:
...
) -> None: ...

@@ -755,5 +724,3 @@ async def add_endpoint(

config = replace(
config, queue_group=config.queue_group or self._queue_group
)
config = replace(config, queue_group=config.queue_group or self._queue_group)

@@ -765,14 +732,8 @@ endpoint = Endpoint(self, config)

@overload
def add_group(
self, *, name: str, queue_group: Optional[str] = None
) -> Group:
...
def add_group(self, *, name: str, queue_group: Optional[str] = None) -> Group: ...
@overload
def add_group(self, config: GroupConfig) -> Group:
...
def add_group(self, config: GroupConfig) -> Group: ...
def add_group(
self, config: Optional[GroupConfig] = None, **kwargs
) -> Group:
def add_group(self, config: Optional[GroupConfig] = None, **kwargs) -> Group:
if config:

@@ -783,5 +744,3 @@ config = replace(config, **kwargs)

config = replace(
config, queue_group=config.queue_group or self._queue_group
)
config = replace(config, queue_group=config.queue_group or self._queue_group)

@@ -809,3 +768,4 @@ return Group(self, config)

average_processing_time=endpoint._average_processing_time,
) for endpoint in (self._endpoints or [])
)
for endpoint in (self._endpoints or [])
],

@@ -834,3 +794,4 @@ started=self._started,

metadata=endpoint._metadata,
) for endpoint in self._endpoints
)
for endpoint in self._endpoints
],

@@ -837,0 +798,0 @@ )

@@ -21,3 +21,3 @@ # Copyright 2016-2018 The NATS Authors

DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
BASE = 62

@@ -24,0 +24,0 @@ PREFIX_LENGTH = 12

@@ -5,7 +5,7 @@ from __future__ import annotations

PUB_OP = "PUB"
HPUB_OP = "HPUB"
SUB_OP = "SUB"
UNSUB_OP = "UNSUB"
_CRLF_ = "\r\n"
PUB_OP = 'PUB'
HPUB_OP = 'HPUB'
SUB_OP = 'SUB'
UNSUB_OP = 'UNSUB'
_CRLF_ = '\r\n'

@@ -16,6 +16,4 @@ Command = Callable[..., bytes]

def pub_cmd(subject, reply, payload) -> bytes:
return (
f"{PUB_OP} {subject} {reply} {len(payload)}{_CRLF_}".encode() +
payload + _CRLF_.encode()
)
return f'{PUB_OP} {subject} {reply} {len(payload)}{_CRLF_}'.encode(
) + payload + _CRLF_.encode()

@@ -26,14 +24,12 @@

total_size = len(payload) + hdr_len
return (
f"{HPUB_OP} {subject} {reply} {hdr_len} {total_size}{_CRLF_}".encode()
+ hdr + payload + _CRLF_.encode()
)
return f'{HPUB_OP} {subject} {reply} {hdr_len} {total_size}{_CRLF_}'.encode(
) + hdr + payload + _CRLF_.encode()
def sub_cmd(subject, queue, sid) -> bytes:
return f"{SUB_OP} {subject} {queue} {sid}{_CRLF_}".encode()
return f'{SUB_OP} {subject} {queue} {sid}{_CRLF_}'.encode()
def unsub_cmd(sid, limit) -> bytes:
limit_s = "" if limit == 0 else f"{limit}"
return f"{UNSUB_OP} {sid} {limit_s}{_CRLF_}".encode()
limit_s = '' if limit == 0 else f'{limit}'
return f'{UNSUB_OP} {sid} {limit_s}{_CRLF_}'.encode()

@@ -27,27 +27,27 @@ # Copyright 2016-2023 The NATS Authors

MSG_RE = re.compile(
b"\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n"
b'\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n'
)
HMSG_RE = re.compile(
b"\\AHMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?([\\d]+)\\s+(\\d+)\r\n"
b'\\AHMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?([\\d]+)\\s+(\\d+)\r\n'
)
OK_RE = re.compile(b"\\A\\+OK\\s*\r\n")
ERR_RE = re.compile(b"\\A-ERR\\s+('.+')?\r\n")
PING_RE = re.compile(b"\\APING\\s*\r\n")
PONG_RE = re.compile(b"\\APONG\\s*\r\n")
INFO_RE = re.compile(b"\\AINFO\\s+([^\r\n]+)\r\n")
OK_RE = re.compile(b'\\A\\+OK\\s*\r\n')
ERR_RE = re.compile(b'\\A-ERR\\s+(\'.+\')?\r\n')
PING_RE = re.compile(b'\\APING\\s*\r\n')
PONG_RE = re.compile(b'\\APONG\\s*\r\n')
INFO_RE = re.compile(b'\\AINFO\\s+([^\r\n]+)\r\n')
INFO_OP = b"INFO"
CONNECT_OP = b"CONNECT"
PUB_OP = b"PUB"
MSG_OP = b"MSG"
HMSG_OP = b"HMSG"
SUB_OP = b"SUB"
UNSUB_OP = b"UNSUB"
PING_OP = b"PING"
PONG_OP = b"PONG"
OK_OP = b"+OK"
ERR_OP = b"-ERR"
MSG_END = b"\n"
_CRLF_ = b"\r\n"
_SPC_ = b" "
INFO_OP = b'INFO'
CONNECT_OP = b'CONNECT'
PUB_OP = b'PUB'
MSG_OP = b'MSG'
HMSG_OP = b'HMSG'
SUB_OP = b'SUB'
UNSUB_OP = b'UNSUB'
PING_OP = b'PING'
PONG_OP = b'PONG'
OK_OP = b'+OK'
ERR_OP = b'-ERR'
MSG_END = b'\n'
_CRLF_ = b'\r\n'
_SPC_ = b' '

@@ -91,3 +91,3 @@ OK = OK_OP + _CRLF_

async def parse(self, data: bytes = b""):
async def parse(self, data: bytes = b''):
"""

@@ -109,3 +109,3 @@ Parses the wire protocol from NATS for the client

else:
self.msg_arg["reply"] = b""
self.msg_arg["reply"] = b''
self.needed = int(needed_bytes)

@@ -128,3 +128,3 @@ del self.buf[:msg.end()]

else:
self.msg_arg["reply"] = b""
self.msg_arg["reply"] = b''
self.needed = int(needed_bytes)

@@ -168,8 +168,7 @@ self.header_needed = int(header_size)

srv_info = json.loads(info_line.decode())
await self.nc._process_info(srv_info)
self.nc._process_info(srv_info)
del self.buf[:info.end()]
continue
if len(self.buf
) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
if len(self.buf) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
# FIXME: By default server uses a max protocol

@@ -176,0 +175,0 @@ # line of 4096 bytes but it can be tuned in latest

Metadata-Version: 2.4
Name: nats-py
Version: 2.10.0
Version: 2.9.1
Summary: NATS client for Python

@@ -37,2 +37,3 @@ Author-email: Waldemar Quevedo <wally@synadia.com>

[![pypi](https://img.shields.io/pypi/v/nats-py.svg)](https://pypi.org/project/nats-py)
[![Build Status](https://travis-ci.com/nats-io/nats.py.svg?branch=main)](http://travis-ci.com/nats-io/nats.py)
[![Versions](https://img.shields.io/pypi/pyversions/nats-py.svg)](https://pypi.org/project/nats-py)

@@ -258,25 +259,2 @@ [![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)

## Updating Docs
To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo
as follows:
```sh
git clone https://github.com/nats-io/nats.py
cd nats.py
git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments
pipenv shell
make html
# preview the changes:
make serve
```
If you are happy with the changes, make a PR on the docs branch:
```
make publish
git add docs
```
## License

@@ -283,0 +261,0 @@

@@ -64,1 +64,7 @@ [build-system]

split_before_expression_after_opening_paren = true
[tool.isort]
combine_as_imports = true
multi_line_output = 3
include_trailing_comma = true
src_paths = ["nats", "tests"]

@@ -7,2 +7,3 @@ # NATS - Python3 Client for Asyncio

[![pypi](https://img.shields.io/pypi/v/nats-py.svg)](https://pypi.org/project/nats-py)
[![Build Status](https://travis-ci.com/nats-io/nats.py.svg?branch=main)](http://travis-ci.com/nats-io/nats.py)
[![Versions](https://img.shields.io/pypi/pyversions/nats-py.svg)](https://pypi.org/project/nats-py)

@@ -228,25 +229,2 @@ [![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)

## Updating Docs
To update the docs, first checkout the `docs` branch under a local copy of the `nats.py` repo
as follows:
```sh
git clone https://github.com/nats-io/nats.py
cd nats.py
git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments
pipenv shell
make html
# preview the changes:
make serve
```
If you are happy with the changes, make a PR on the docs branch:
```
make publish
git add docs
```
## License

@@ -253,0 +231,0 @@

[flake8]
ignore = W503, W504
max-line-length = 120

@@ -4,0 +3,0 @@

@@ -7,12 +7,12 @@ from setuptools import setup

name="nats-py",
version="2.10.0",
license="Apache 2 License",
version='2.9.1',
license='Apache 2 License',
extras_require={
"nkeys": ["nkeys"],
"aiohttp": ["aiohttp"],
"fast_parse": ["fast-mail-parser"],
'nkeys': ['nkeys'],
'aiohttp': ['aiohttp'],
'fast_parse': ['fast-mail-parser']
},
packages=["nats", "nats.aio", "nats.micro", "nats.protocol", "nats.js"],
packages=['nats', 'nats.aio', 'nats.micro', 'nats.protocol', 'nats.js'],
package_data={"nats": ["py.typed"]},
zip_safe=True,
zip_safe=True
)
import asyncio
import sys
import unittest
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout
from nats.errors import SlowConsumerError, TimeoutError

@@ -27,3 +30,3 @@ from tests.utils import SingleServerTestCase, async_test

for i in range(0, 5):
await nc.publish(f"tests.{i}", b"bar")
await nc.publish(f"tests.{i}", b'bar')

@@ -61,30 +64,30 @@ # Wait a bit for messages to be received.

if msg.reply != "":
await nc.publish(msg.reply, b"")
await nc.publish(msg.reply, b'')
await nc.subscribe("bar", cb=handler_bar)
await nc.publish("foo", b"1")
await nc.publish("foo", b"2")
await nc.publish("foo", b"3")
await nc.publish("foo", b'1')
await nc.publish("foo", b'2')
await nc.publish("foo", b'3')
# Will be processed before the others since no head of line
# blocking among the subscriptions.
await nc.publish("bar", b"4")
await nc.publish("bar", b'4')
response = await nc.request("foo", b"hello1", timeout=1)
self.assertEqual(response.data, b"hello1hello1")
response = await nc.request("foo", b'hello1', timeout=1)
self.assertEqual(response.data, b'hello1hello1')
with self.assertRaises(TimeoutError):
await nc.request("foo", b"hello2", timeout=0.1)
await nc.request("foo", b'hello2', timeout=0.1)
await nc.publish("bar", b"5")
response = await nc.request("foo", b"hello2", timeout=1)
self.assertEqual(response.data, b"hello2hello2")
await nc.publish("bar", b'5')
response = await nc.request("foo", b'hello2', timeout=1)
self.assertEqual(response.data, b'hello2hello2')
self.assertEqual(msgs[0].data, b"1")
self.assertEqual(msgs[1].data, b"4")
self.assertEqual(msgs[2].data, b"2")
self.assertEqual(msgs[3].data, b"3")
self.assertEqual(msgs[4].data, b"hello1")
self.assertEqual(msgs[5].data, b"hello2")
self.assertEqual(msgs[0].data, b'1')
self.assertEqual(msgs[1].data, b'4')
self.assertEqual(msgs[2].data, b'2')
self.assertEqual(msgs[3].data, b'3')
self.assertEqual(msgs[4].data, b'hello1')
self.assertEqual(msgs[5].data, b'hello2')
self.assertEqual(len(errors), 0)

@@ -122,13 +125,13 @@ await nc.close()

for i in range(10):
await nc.publish("foo", f"{i}".encode())
await nc.publish("foo", f'{i}'.encode())
# Will be processed before the others since no head of line
# blocking among the subscriptions.
await nc.publish("bar", b"14")
response = await nc.request("bar", b"hi1", 2)
self.assertEqual(response.data, b"hi1hi1hi1")
await nc.publish("bar", b'14')
response = await nc.request("bar", b'hi1', 2)
self.assertEqual(response.data, b'hi1hi1hi1')
self.assertEqual(len(msgs), 2)
self.assertEqual(msgs[0].data, b"14")
self.assertEqual(msgs[1].data, b"hi1")
self.assertEqual(msgs[0].data, b'14')
self.assertEqual(msgs[1].data, b'hi1')

@@ -177,9 +180,9 @@ # Consumed messages but the rest were slow consumers.

# blocking among the subscriptions.
await nc.publish("bar", b"14")
await nc.publish("bar", b'14')
response = await nc.request("bar", b"hi1", 2)
self.assertEqual(response.data, b"hi1hi1hi1")
response = await nc.request("bar", b'hi1', 2)
self.assertEqual(response.data, b'hi1hi1hi1')
self.assertEqual(len(msgs), 2)
self.assertEqual(msgs[0].data, b"14")
self.assertEqual(msgs[1].data, b"hi1")
self.assertEqual(msgs[0].data, b'14')
self.assertEqual(msgs[1].data, b'hi1')

@@ -195,4 +198,4 @@ # Consumed a few messages but the rest were slow consumers.

await asyncio.sleep(3)
response = await nc.request("foo", b"B", 1)
self.assertEqual(response.data, b"BB")
response = await nc.request("foo", b'B', 1)
self.assertEqual(response.data, b'BB')
await nc.close()

@@ -9,3 +9,2 @@ import asyncio

import nkeys
nkeys_installed = True

@@ -15,3 +14,4 @@ except ModuleNotFoundError:

from nats.aio.client import Client as NATS, RawCredentials
from nats.aio.client import Client as NATS
from nats.aio.client import RawCredentials
from nats.aio.errors import *

@@ -35,3 +35,3 @@ from nats.errors import *

seed = None
with open(config_file, "rb") as f:
with open(config_file, 'rb') as f:
seed = bytearray(os.fstat(f.fileno()).st_size)

@@ -41,6 +41,6 @@ f.readinto(seed)

{
"nkeys_seed": config_file
'nkeys_seed': config_file
},
{
"nkeys_seed_str": seed.decode()
'nkeys_seed_str': seed.decode()
},

@@ -60,17 +60,15 @@ ]

await nc.connect(
["tls://127.0.0.1:4222"],
error_cb=error_cb,
connect_timeout=10,
allow_reconnect=False,
**nkeys_args,
)
await nc.connect(["tls://127.0.0.1:4222"],
error_cb=error_cb,
connect_timeout=10,
allow_reconnect=False,
**nkeys_args)
async def help_handler(msg):
await nc.publish(msg.reply, b"OK!")
await nc.publish(msg.reply, b'OK!')
await nc.subscribe("help", cb=help_handler)
await nc.flush()
msg = await nc.request("help", b"I need help")
self.assertEqual(msg.data, b"OK!")
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')

@@ -82,4 +80,4 @@ await nc.subscribe("bar", cb=help_handler)

msg = await nc.request("help", b"I need help")
self.assertEqual(msg.data, b"OK!")
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')

@@ -110,8 +108,8 @@ await nc.close()

async def help_handler(msg):
await nc.publish(msg.reply, b"OK!")
await nc.publish(msg.reply, b'OK!')
await nc.subscribe("help", cb=help_handler)
await nc.flush()
msg = await nc.request("help", b"I need help")
self.assertEqual(msg.data, b"OK!")
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')
await nc.close()

@@ -135,3 +133,3 @@

get_config_file("nkeys/foo-user.jwt"),
get_config_file("nkeys/foo-user.nk"),
get_config_file("nkeys/foo-user.nk")
),

@@ -142,8 +140,8 @@ allow_reconnect=False,

async def help_handler(msg):
await nc.publish(msg.reply, b"OK!")
await nc.publish(msg.reply, b'OK!')
await nc.subscribe("help", cb=help_handler)
await nc.flush()
msg = await nc.request("help", b"I need help")
self.assertEqual(msg.data, b"OK!")
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')
await nc.close()

@@ -211,8 +209,8 @@

async def help_handler(msg):
await nc.publish(msg.reply, b"OK!")
await nc.publish(msg.reply, b'OK!')
await nc.subscribe("help", cb=help_handler)
await nc.flush()
msg = await nc.request("help", b"I need help")
self.assertEqual(msg.data, b"OK!")
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')
await nc.close()

@@ -0,4 +1,13 @@

import asyncio
import http.client
import json
import shutil
import ssl
import tempfile
import time
import unittest
from unittest import mock
import nats
from nats.aio.client import Client as NATS, __version__
from nats.aio.errors import *

@@ -17,5 +26,5 @@ from tests.utils import *

await nc.publish(
"foo", b"hello world", headers={
"foo": "bar",
"hello": "world-1"
"foo", b'hello world', headers={
'foo': 'bar',
'hello': 'world-1'
}

@@ -28,4 +37,4 @@ )

self.assertEqual(msg.headers["foo"], "bar")
self.assertEqual(msg.headers["hello"], "world-1")
self.assertEqual(msg.headers['foo'], 'bar')
self.assertEqual(msg.headers['hello'], 'world-1')

@@ -40,4 +49,4 @@ await nc.close()

# Add another header
msg.headers["quux"] = "quuz"
await msg.respond(b"OK!")
msg.headers['quux'] = 'quuz'
await msg.respond(b'OK!')

@@ -47,5 +56,5 @@ await nc.subscribe("foo", cb=service)

msg = await nc.request(
"foo", b"hello world", headers={
"foo": "bar",
"hello": "world"
"foo", b'hello world', headers={
'foo': 'bar',
'hello': 'world'
}

@@ -56,6 +65,6 @@ )

self.assertEqual(len(msg.headers), 3)
self.assertEqual(msg.headers["foo"], "bar")
self.assertEqual(msg.headers["hello"], "world")
self.assertEqual(msg.headers["quux"], "quuz")
self.assertEqual(msg.data, b"OK!")
self.assertEqual(msg.headers['foo'], 'bar')
self.assertEqual(msg.headers['hello'], 'world')
self.assertEqual(msg.headers['quux'], 'quuz')
self.assertEqual(msg.data, b'OK!')

@@ -70,3 +79,3 @@ await nc.close()

await nc.flush()
await nc.publish("foo", b"hello world", headers={"": ""})
await nc.publish("foo", b'hello world', headers={'': ''})

@@ -77,3 +86,3 @@ msg = await sub.next_msg()

# Empty long key
await nc.publish("foo", b"hello world", headers={" ": ""})
await nc.publish("foo", b'hello world', headers={' ': ''})
msg = await sub.next_msg()

@@ -84,3 +93,3 @@ self.assertTrue(msg.headers == None)

await nc.publish(
"foo", b"hello world", headers={"": " "}
"foo", b'hello world', headers={'': ' '}
)

@@ -91,9 +100,9 @@ msg = await sub.next_msg()

hdrs = {
"timestamp": "2022-06-15T19:08:14.639020",
"type": "rpc",
"command": "publish_state",
"trace_id": "",
"span_id": "",
'timestamp': '2022-06-15T19:08:14.639020',
'type': 'rpc',
'command': 'publish_state',
'trace_id': '',
'span_id': ''
}
await nc.publish("foo", b"Hello from Python!", headers=hdrs)
await nc.publish("foo", b'Hello from Python!', headers=hdrs)
msg = await sub.next_msg()

@@ -105,6 +114,5 @@ self.assertEqual(msg.headers, hdrs)

if __name__ == "__main__":
if __name__ == '__main__':
import sys
runner = unittest.TextTestRunner(stream=sys.stdout)
unittest.main(verbosity=2, exit=False, testRunner=runner)
import asyncio
import http.client
import json
import shutil
import ssl
import tempfile
import time
import unittest
from unittest import mock
import pytest
import nats
import pytest
from nats.aio.client import Client as NATS, __version__
from nats.aio.errors import *

@@ -28,5 +36,5 @@ from tests.utils import *

await nc.publish(
"foo", b"hello world", headers={
"foo": "bar",
"hello": "world-1"
"foo", b'hello world', headers={
'foo': 'bar',
'hello': 'world-1'
}

@@ -39,4 +47,4 @@ )

self.assertEqual(msg.headers["foo"], "bar")
self.assertEqual(msg.headers["hello"], "world-1")
self.assertEqual(msg.headers['foo'], 'bar')
self.assertEqual(msg.headers['hello'], 'world-1')

@@ -54,4 +62,4 @@ await nc.close()

# Add another header
msg.headers["quux"] = "quuz"
await msg.respond(b"OK!")
msg.headers['quux'] = 'quuz'
await msg.respond(b'OK!')

@@ -61,5 +69,5 @@ await nc.subscribe("foo", cb=service)

msg = await nc.request(
"foo", b"hello world", headers={
"foo": "bar",
"hello": "world"
"foo", b'hello world', headers={
'foo': 'bar',
'hello': 'world'
}

@@ -70,6 +78,6 @@ )

self.assertEqual(len(msg.headers), 3)
self.assertEqual(msg.headers["foo"], "bar")
self.assertEqual(msg.headers["hello"], "world")
self.assertEqual(msg.headers["quux"], "quuz")
self.assertEqual(msg.data, b"OK!")
self.assertEqual(msg.headers['foo'], 'bar')
self.assertEqual(msg.headers['hello'], 'world')
self.assertEqual(msg.headers['quux'], 'quuz')
self.assertEqual(msg.data, b'OK!')

@@ -87,3 +95,3 @@ await nc.close()

await nc.flush()
await nc.publish("foo", b"hello world", headers={"": ""})
await nc.publish("foo", b'hello world', headers={'': ''})

@@ -94,3 +102,3 @@ msg = await sub.next_msg()

# Empty long key
await nc.publish("foo", b"hello world", headers={" ": ""})
await nc.publish("foo", b'hello world', headers={' ': ''})
msg = await sub.next_msg()

@@ -101,3 +109,3 @@ self.assertTrue(msg.headers == None)

await nc.publish(
"foo", b"hello world", headers={"": " "}
"foo", b'hello world', headers={'': ' '}
)

@@ -108,9 +116,9 @@ msg = await sub.next_msg()

hdrs = {
"timestamp": "2022-06-15T19:08:14.639020",
"type": "rpc",
"command": "publish_state",
"trace_id": "",
"span_id": "",
'timestamp': '2022-06-15T19:08:14.639020',
'type': 'rpc',
'command': 'publish_state',
'trace_id': '',
'span_id': ''
}
await nc.publish("foo", b"Hello from Python!", headers=hdrs)
await nc.publish("foo", b'Hello from Python!', headers=hdrs)
msg = await sub.next_msg()

@@ -140,12 +148,12 @@ self.assertEqual(msg.headers, hdrs)

async def bar_cb(msg):
await msg.respond(b"OK!")
await msg.respond(b'OK!')
rsub = await nc.subscribe("bar", cb=bar_cb)
await nc.publish("foo", b"First")
await nc.publish("foo", b'First')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"First")
self.assertEqual(msg.data, b'First')
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -163,8 +171,8 @@ # Restart the server and wait for reconnect.

# Get another message.
await nc.publish("foo", b"Second")
await nc.publish("foo", b'Second')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"Second")
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
self.assertEqual(msg.data, b'Second')
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -193,11 +201,11 @@ await nc.close()

async def bar_cb(msg):
await msg.respond(b"OK!")
await msg.respond(b'OK!')
rsub = await nc.subscribe("bar", cb=bar_cb)
await nc.publish("foo", b"First")
await nc.publish("foo", b'First')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"First")
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
self.assertEqual(msg.data, b'First')
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -225,3 +233,3 @@ # Restart the server and wait for reconnect.

await nc.flush()
await nc.publish("foo", b"hello world", headers={"foo": "bar"})
await nc.publish("foo", b'hello world', headers={'foo': 'bar'})

@@ -232,3 +240,3 @@ msg = await sub.next_msg()

self.assertEqual(msg.headers["foo"], "bar")
self.assertEqual(msg.headers['foo'], 'bar')

@@ -257,12 +265,12 @@ await nc.close()

async def bar_cb(msg):
await msg.respond(b"OK!")
await msg.respond(b'OK!')
rsub = await nc.subscribe("bar", cb=bar_cb)
await nc.publish("foo", b"First")
await nc.publish("foo", b'First')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"First")
self.assertEqual(msg.data, b'First')
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -280,8 +288,8 @@ # Restart the server and wait for reconnect.

# Get another message.
await nc.publish("foo", b"Second")
await nc.publish("foo", b'Second')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"Second")
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
self.assertEqual(msg.data, b'Second')
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -311,11 +319,11 @@ await nc.close()

async def bar_cb(msg):
await msg.respond(b"OK!")
await msg.respond(b'OK!')
rsub = await nc.subscribe("bar", cb=bar_cb)
await nc.publish("foo", b"First")
await nc.publish("foo", b'First')
await nc.flush()
msg = await sub.next_msg()
self.assertEqual(msg.data, b"First")
rmsg = await nc.request("bar", b"hi")
self.assertEqual(rmsg.data, b"OK!")
self.assertEqual(msg.data, b'First')
rmsg = await nc.request("bar", b'hi')
self.assertEqual(rmsg.data, b'OK!')

@@ -332,6 +340,5 @@ # Restart the server and wait for reconnect.

if __name__ == "__main__":
if __name__ == '__main__':
import sys
runner = unittest.TextTestRunner(stream=sys.stdout)
unittest.main(verbosity=2, exit=False, testRunner=runner)
from __future__ import annotations
import asyncio
import os
import json
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import nats
from nats.aio.subscription import Subscription
from unittest import TestCase, skipIf
import nats
from nats.aio.subscription import Subscription
from nats.micro.request import ServiceError
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Generic
from nats.micro.service import (
SUBJECT_REGEX,
EndpointStats,
GroupConfig,
ServiceConfig,
Service,
EndpointConfig,
EndpointStats,
Request,
Service,
ServiceConfig,
)
from nats.micro.request import ServiceError

@@ -33,3 +36,2 @@ from .utils import *

class CompatibilityTest(TestCase):
def setUp(self):

@@ -44,5 +46,3 @@ self.loop = asyncio.new_event_loop()

msg = await sub.next_msg(timeout=5)
self.assertNotIn(
"fail", msg.subject, f"Test step failed: {msg.subject}"
)
self.assertNotIn("fail", msg.subject, f"Test step failed: {msg.subject}")
except asyncio.TimeoutError:

@@ -53,3 +53,2 @@ self.fail("Timeout waiting for test result")

async def test_service_compatibility(self):
@dataclass

@@ -62,5 +61,3 @@ class TestGroupConfig:

def from_dict(cls, data: Dict[str, Any]) -> TestGroupConfig:
return cls(
name=data["name"], queue_group=data.get("queue_group")
)
return cls(name=data["name"], queue_group=data.get("queue_group"))

@@ -67,0 +64,0 @@ @dataclass

@@ -0,14 +1,15 @@

from tests.utils import SingleJetStreamServerTestCase, SingleServerTestCase, async_test
import nats
import random
import asyncio
import random
import nats
import nats.micro
from nats.micro import *
from nats.micro.service import *
from nats.micro.request import *
from nats.micro.service import *
from tests.utils import SingleServerTestCase, async_test
class MicroServiceTest(SingleServerTestCase):
def test_invalid_service_name(self):

@@ -19,19 +20,14 @@ with self.assertRaises(ValueError) as context:

with self.assertRaises(ValueError) as context:
ServiceConfig(name="test.service@!", version="0.1.0")
self.assertEqual(
str(context.exception),
"Invalid name. It must contain only alphanumeric characters, dashes, and underscores.",
)
self.assertEqual(str(context.exception), "Invalid name. It must contain only alphanumeric characters, dashes, and underscores.")
def test_invalid_service_version(self):
with self.assertRaises(ValueError) as context:
ServiceConfig(name="test_service", version="abc")
self.assertEqual(
str(context.exception),
"Invalid version. It must follow semantic versioning (e.g., 1.0.0, 2.1.3-alpha.1).",
)
self.assertEqual(str(context.exception), "Invalid version. It must follow semantic versioning (e.g., 1.0.0, 2.1.3-alpha.1).")
def test_invalid_endpoint_subject(self):
async def noop_handler(request: Request) -> None:

@@ -46,6 +42,3 @@ pass

)
self.assertEqual(
str(context.exception),
"Invalid subject. Subject must not contain spaces, and can only have '>' at the end.",
)
self.assertEqual(str(context.exception), "Invalid subject. Subject must not contain spaces, and can only have '>' at the end.")

@@ -82,9 +75,3 @@ @async_test

for _ in range(50):
await nc.request(
"svc.add",
json.dumps({
"x": 22,
"y": 11
}).encode("utf-8")
)
await nc.request("svc.add", json.dumps({"x": 22, "y": 11}).encode("utf-8"))

@@ -111,5 +98,3 @@ for svc in svcs:

try:
ping_responses.append(
await ping_subscription.next_msg(timeout=0.25)
)
ping_responses.append(await ping_subscription.next_msg(timeout=0.25))
except:

@@ -129,5 +114,3 @@ break

try:
stats_responses.append(
await stats_subscription.next_msg(timeout=0.25)
)
stats_responses.append(await stats_subscription.next_msg(timeout=0.25))
except:

@@ -141,5 +124,3 @@ break

]
total_requests = sum([
stat.endpoints[0].num_requests for stat in stats
])
total_requests = sum([stat.endpoints[0].num_requests for stat in stats])
assert total_requests == 50

@@ -149,3 +130,2 @@

async def test_add_service(self):
async def noop_handler(request: Request):

@@ -156,23 +136,20 @@ pass

"no_endpoint": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
metadata={"basic": "metadata"},
),
"expected_ping":
ServicePing(
id="*",
type="io.nats.micro.v1.ping_response",
name="test_service",
version="0.1.0",
metadata={"basic": "metadata"},
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
metadata={"basic": "metadata"},
),
"expected_ping": ServicePing(
id="*",
type="io.nats.micro.v1.ping_response",
name="test_service",
version="0.1.0",
metadata={"basic": "metadata"},
),
},
"with_single_endpoint": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
),
"endpoint_configs": [

@@ -186,16 +163,14 @@ EndpointConfig(

],
"expected_ping":
ServicePing(
id="*",
name="test_service",
version="0.1.0",
metadata={},
),
"expected_ping": ServicePing(
id="*",
name="test_service",
version="0.1.0",
metadata={},
),
},
"with_multiple_endpoints": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
),
"endpoint_configs": [

@@ -215,9 +190,8 @@ EndpointConfig(

],
"expected_ping":
ServicePing(
id="*",
name="test_service",
version="0.1.0",
metadata={},
),
"expected_ping": ServicePing(
id="*",
name="test_service",
version="0.1.0",
metadata={},
),
},

@@ -256,2 +230,3 @@ }

@async_test

@@ -263,6 +238,3 @@ async def test_groups(self):

"endpoint_name": "foo",
"expected_endpoint": {
"name": "foo",
"subject": "foo"
},
"expected_endpoint": {"name": "foo", "subject": "foo"},
},

@@ -273,6 +245,3 @@ "single_group": {

"group_names": ["g1"],
"expected_endpoint": {
"name": "foo",
"subject": "g1.foo"
},
"expected_endpoint": {"name": "foo", "subject": "g1.foo"},
},

@@ -283,6 +252,3 @@ "single_empty_group": {

"group_names": [""],
"expected_endpoint": {
"name": "foo",
"subject": "foo"
},
"expected_endpoint": {"name": "foo", "subject": "foo"},
},

@@ -293,6 +259,3 @@ "empty_groups": {

"group_names": ["", "g1", ""],
"expected_endpoint": {
"name": "foo",
"subject": "g1.foo"
},
"expected_endpoint": {"name": "foo", "subject": "g1.foo"},
},

@@ -302,6 +265,3 @@ "multiple_groups": {

"group_names": ["g1", "g2", "g3"],
"expected_endpoint": {
"name": "foo",
"subject": "g1.g2.g3.foo"
},
"expected_endpoint": {"name": "foo", "subject": "g1.g2.g3.foo"},
},

@@ -326,5 +286,3 @@ }

await group.add_endpoint(
name=data["endpoint_name"], handler=noop_handler
)
await group.add_endpoint(name=data["endpoint_name"], handler=noop_handler)

@@ -344,3 +302,2 @@ info = svc.info()

async def test_monitoring_handlers(self):
async def noop_handler(request: Request):

@@ -406,10 +363,10 @@ pass

"id": svc.id,
"endpoints": [{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {
"basic": "schema"
},
}],
"endpoints": [
{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {"basic": "schema"},
}
],
"metadata": {},

@@ -426,10 +383,10 @@ },

"id": svc.id,
"endpoints": [{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {
"basic": "schema"
},
}],
"endpoints": [
{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {"basic": "schema"},
}
],
"metadata": {},

@@ -446,10 +403,10 @@ },

"id": svc.id,
"endpoints": [{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {
"basic": "schema"
},
}],
"endpoints": [
{
"name": "default",
"subject": "test.func",
"queue_group": "q",
"metadata": {"basic": "schema"},
}
],
"metadata": {},

@@ -471,3 +428,2 @@ },

async def test_service_stats(self):
async def handler(request: Request):

@@ -478,7 +434,6 @@ await request.respond(b"ok")

"stats_handler": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
),
"endpoint_configs": [

@@ -494,8 +449,7 @@ EndpointConfig(

"with_stats_handler": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
stats_handler=lambda endpoint: {"key": "val"},
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
stats_handler=lambda endpoint: {"key": "val"},
),
"endpoint_configs": [

@@ -509,12 +463,9 @@ EndpointConfig(

],
"expected_stats": {
"key": "val"
},
"expected_stats": {"key": "val"},
},
"with_endpoint": {
"service_config":
ServiceConfig(
name="test_service",
version="0.1.0",
),
"service_config": ServiceConfig(
name="test_service",
version="0.1.0",
),
"endpoint_configs": [

@@ -549,8 +500,4 @@ EndpointConfig(

stats_subject = control_subject(
ServiceVerb.STATS, "test_service"
)
stats_response = await nc.request(
stats_subject, b"", timeout=1
)
stats_subject = control_subject(ServiceVerb.STATS, "test_service")
stats_response = await nc.request(stats_subject, b"", timeout=1)
stats = ServiceStats.from_dict(json.loads(stats_response.data))

@@ -583,10 +530,6 @@

"byte_response_with_headers": {
"respond_headers": {
"key": "value"
},
"respond_headers": {"key": "value"},
"respond_data": b"OK",
"expected_response": b"OK",
"expected_headers": {
"key": "value"
},
"expected_headers": {"key": "value"},
},

@@ -614,5 +557,3 @@ }

await svc.add_endpoint(
EndpointConfig(
name="default", subject="test.func", handler=handler
)
EndpointConfig(name="default", subject="test.func", handler=handler)
)

@@ -661,3 +602,2 @@

async def test_custom_queue_group(self):
async def noop_handler(request: Request):

@@ -668,7 +608,6 @@ pass

"default_queue_group": {
"service_config":
ServiceConfig(
name="test_service",
version="0.0.1",
),
"service_config": ServiceConfig(
name="test_service",
version="0.0.1",
),
"endpoint_configs": [

@@ -685,8 +624,7 @@ EndpointConfig(

"custom_queue_group_on_service_config": {
"service_config":
ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="custom",
),
"service_config": ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="custom",
),
"endpoint_configs": [

@@ -704,8 +642,7 @@ EndpointConfig(

"endpoint_config_overriding_queue_groups": {
"service_config":
ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="q-config",
),
"service_config": ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="q-config",
),
"endpoint_configs": [

@@ -724,8 +661,7 @@ EndpointConfig(

"name": "empty queue group in option, inherit from parent",
"service_config":
ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="q-service",
),
"service_config": ServiceConfig(
name="test_service",
version="0.0.1",
queue_group="q-service",
),
"endpoint_configs": [

@@ -761,8 +697,7 @@ EndpointConfig(

assert len(info.endpoints
) == len(data["expected_queue_groups"])
assert len(info.endpoints) == len(data["expected_queue_groups"])
for endpoint in info.endpoints:
assert (
endpoint.queue_group == data["expected_queue_groups"][
endpoint.name]
endpoint.queue_group
== data["expected_queue_groups"][endpoint.name]
)

@@ -769,0 +704,0 @@

@@ -15,2 +15,3 @@ # Copyright 2018-2021 The NATS Authors

import sys
import unittest

@@ -17,0 +18,0 @@ from collections import Counter

import asyncio
import sys
import unittest

@@ -36,3 +37,3 @@

async def _process_info(self, info):
def _process_info(self, info):
self._server_info = info

@@ -49,3 +50,3 @@

ps = Parser(MockNatsClient())
data = b"PING\r\n"
data = b'PING\r\n'
await ps.parse(data)

@@ -58,3 +59,3 @@ self.assertEqual(len(ps.buf), 0)

ps = Parser(MockNatsClient())
data = b"PONG\r\n"
data = b'PONG\r\n'
await ps.parse(data)

@@ -67,3 +68,3 @@ self.assertEqual(len(ps.buf), 0)

ps = Parser()
data = b"+OK\r\n"
data = b'+OK\r\n'
await ps.parse(data)

@@ -76,3 +77,3 @@ self.assertEqual(len(ps.buf), 0)

nc = MockNatsClient()
expected = b"hello world!"
expected = b'hello world!'

@@ -91,3 +92,3 @@ async def payload_test(sid, subject, reply, payload):

ps = Parser(nc)
data = b"MSG hello 1 world 12\r\n"
data = b'MSG hello 1 world 12\r\n'

@@ -97,4 +98,4 @@ await ps.parse(data)

self.assertEqual(len(ps.msg_arg.keys()), 3)
self.assertEqual(ps.msg_arg["subject"], b"hello")
self.assertEqual(ps.msg_arg["reply"], b"world")
self.assertEqual(ps.msg_arg["subject"], b'hello')
self.assertEqual(ps.msg_arg["reply"], b'world')
self.assertEqual(ps.msg_arg["sid"], 1)

@@ -104,7 +105,7 @@ self.assertEqual(ps.needed, 12)

await ps.parse(b"hello world!")
await ps.parse(b'hello world!')
self.assertEqual(len(ps.buf), 12)
self.assertEqual(ps.state, AWAITING_MSG_PAYLOAD)
data = b"\r\n"
data = b'\r\n'
await ps.parse(data)

@@ -117,3 +118,3 @@ self.assertEqual(len(ps.buf), 0)

ps = Parser()
data = b"MSG hello"
data = b'MSG hello'
await ps.parse(data)

@@ -126,3 +127,3 @@ self.assertEqual(len(ps.buf), 9)

ps = Parser()
data = b"MSG"
data = b'MSG'
await ps.parse(data)

@@ -135,3 +136,3 @@ self.assertEqual(len(ps.buf), 3)

ps = Parser()
data = b"MSG "
data = b'MSG '
await ps.parse(data)

@@ -144,3 +145,3 @@ self.assertEqual(len(ps.buf), 4)

ps = Parser(MockNatsClient())
data = b"MSG PONG\r\n"
data = b'MSG PONG\r\n'
with self.assertRaises(ProtocolError):

@@ -174,3 +175,3 @@ await ps.parse(data)

ps = Parser(nc)
server_id = "A" * 2048
server_id = 'A' * 2048
data = f'INFO {{"server_id": "{server_id}", "max_payload": 100, "auth_required": false, "connect_urls":["127.0.0.0.1:4223"]}}\r\n'

@@ -180,3 +181,3 @@ await ps.parse(data.encode())

self.assertEqual(ps.state, AWAITING_CONTROL_LINE)
self.assertEqual(len(nc._server_info["server_id"]), 2048)
self.assertEqual(len(nc._server_info['server_id']), 2048)

@@ -203,10 +204,10 @@ @async_test

ps = Parser(nc)
reply = "A" * 2043
data = f"PING\r\nMSG hello 1 {reply}"
reply = 'A' * 2043
data = f'PING\r\nMSG hello 1 {reply}'
await ps.parse(data.encode())
await ps.parse(b"AAAAA 0\r\n\r\nMSG hello 1 world 0")
await ps.parse(b'AAAAA 0\r\n\r\nMSG hello 1 world 0')
self.assertEqual(msgs, 1)
self.assertEqual(len(ps.buf), 19)
self.assertEqual(ps.state, AWAITING_CONTROL_LINE)
await ps.parse(b"\r\n\r\n")
await ps.parse(b'\r\n\r\n')
self.assertEqual(msgs, 2)

@@ -234,3 +235,3 @@

ps = Parser(nc)
reply = "A" * 2043 * 4
reply = 'A' * 2043 * 4

@@ -240,17 +241,17 @@ # FIXME: Malformed long protocol lines will not be detected

# from the client to give up instead.
data = f"PING\r\nWRONG hello 1 {reply}"
data = f'PING\r\nWRONG hello 1 {reply}'
await ps.parse(data.encode())
await ps.parse(b"AAAAA 0")
await ps.parse(b'AAAAA 0')
self.assertEqual(ps.state, AWAITING_CONTROL_LINE)
await ps.parse(b"\r\n\r\n")
await ps.parse(b"\r\n\r\n")
await ps.parse(b'\r\n\r\n')
await ps.parse(b'\r\n\r\n')
ps = Parser(nc)
reply = "A" * 2043
data = f"PING\r\nWRONG hello 1 {reply}"
reply = 'A' * 2043
data = f'PING\r\nWRONG hello 1 {reply}'
with self.assertRaises(ProtocolError):
await ps.parse(data.encode())
await ps.parse(b"AAAAA 0")
await ps.parse(b'AAAAA 0')
self.assertEqual(ps.state, AWAITING_CONTROL_LINE)
await ps.parse(b"\r\n\r\n")
await ps.parse(b"\r\n\r\n")
await ps.parse(b'\r\n\r\n')
await ps.parse(b'\r\n\r\n')

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display