You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

nats-py

Package Overview
Dependencies
Maintainers
4
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats-py - pypi Package Compare versions

Comparing version
2.11.0
to
2.12.0
+130
tests/test_custom_headers_websocket.py
import asyncio
import queue
import socket
import threading
import unittest
import nats
import pytest
from tests.utils import async_test
try:
import aiohttp # required by nats ws transport
aiohttp_installed = True
except ModuleNotFoundError:
aiohttp_installed = False
def start_header_catcher():
"""
Minimal TCP listener that captures the incoming HTTP request lines
(WebSocket handshake) and returns them via a Queue.
"""
q = queue.Queue(maxsize=1)
ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ln.bind(("127.0.0.1", 0))
ln.listen(1)
host, port = ln.getsockname()
def _accept_once():
try:
conn, _ = ln.accept()
with conn:
conn.settimeout(2.0)
buf = b""
while b"\r\n\r\n" not in buf:
chunk = conn.recv(4096)
if not chunk:
break
buf += chunk
header_block = buf.split(b"\r\n\r\n", 1)[0]
lines = header_block.decode(
"latin1", errors="replace"
).split("\r\n")
q.put(lines)
except Exception:
q.put([])
finally:
try:
ln.close()
except OSError:
pass
threading.Thread(target=_accept_once, daemon=True).start()
return f"{host}:{port}", q, (lambda: ln.close())
def has_header_value(headers, name, want):
prefix = name.lower() + ":"
for h in headers:
if ":" not in h:
continue
if not h.lower().startswith(prefix):
continue
val = h.split(":", 1)[1].strip()
for part in val.split(","):
if part.strip().lower() == want.lower():
return True
return False
class TestHeaderCatcher(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
@async_test
async def test_ws_headers_static_applied_on_handshake(self):
if not aiohttp_installed:
pytest.skip("aiohttp not installed")
addr, got, close_ln = start_header_catcher()
custom_headers = {
"Authorization": ["Bearer Random Token"],
"X-Multi": ["v1", "v2"], # repeated header -> comma-joined
"Accept": ["application/json", "text/plain; q=0.8"],
"X-Feature-Flags": ["feature-a", "feature-b", "feature-c"],
"Single-Header-Key": "Single-Header-Value"
}
try:
# Connect to our catcher; it won't complete the upgrade.
with self.assertRaises(Exception):
await asyncio.wait_for(
nats.connect(
f"ws://{addr}",
ws_connection_headers=custom_headers,
allow_reconnect=False,
),
timeout=1.0,
)
finally:
headers = got.get(timeout=2.0)
close_ln()
self.assertTrue(
has_header_value(headers, "Authorization", "Bearer Random Token")
)
self.assertTrue(has_header_value(headers, "X-Multi", "v1"))
self.assertTrue(has_header_value(headers, "X-Multi", "v2"))
self.assertTrue(
has_header_value(headers, "Accept", "application/json")
)
self.assertTrue(
has_header_value(headers, "Accept", "text/plain; q=0.8")
)
self.assertTrue(
has_header_value(headers, "X-Feature-Flags", "feature-a")
)
self.assertTrue(
has_header_value(headers, "X-Feature-Flags", "feature-b")
)
self.assertTrue(
has_header_value(headers, "X-Feature-Flags", "feature-c")
)
self.assertTrue(
has_header_value(
headers, "Single-Header-Key", "Single-Header-Value"
)
)
+1
-1
Metadata-Version: 2.4
Name: nats-py
Version: 2.11.0
Version: 2.12.0
Summary: NATS client for Python

@@ -5,0 +5,0 @@ Author-email: Waldemar Quevedo <wally@synadia.com>

@@ -41,2 +41,3 @@ LICENSE

tests/test_compatibility.py
tests/test_custom_headers_websocket.py
tests/test_js.py

@@ -43,0 +44,0 @@ tests/test_micro_service.py

@@ -6,3 +6,3 @@ from __future__ import annotations

import ssl
from typing import List, Optional, Union
from typing import Dict, List, Optional, Union
from urllib.parse import ParseResult

@@ -12,4 +12,6 @@

import aiohttp
import multidict
except ImportError:
aiohttp = None # type: ignore[assignment]
multidict = None # type: ignore[assignment]

@@ -197,3 +199,3 @@ from nats.errors import ProtocolError

def __init__(self):
def __init__(self, ws_headers: Optional[Dict[str, List[str]]] = None):
if not aiohttp:

@@ -208,2 +210,3 @@ raise ImportError(

self._using_tls: Optional[bool] = None
self._ws_headers = ws_headers

@@ -213,5 +216,6 @@ async def connect(

):
headers = self._get_custom_headers()
# for websocket library, the uri must contain the scheme already
self._ws = await self._client.ws_connect(
uri.geturl(), timeout=connect_timeout
uri.geturl(), timeout=connect_timeout, headers=headers
)

@@ -232,2 +236,3 @@ self._using_tls = False

headers = self._get_custom_headers()
self._ws = await self._client.ws_connect(

@@ -237,2 +242,3 @@ uri if isinstance(uri, str) else uri.geturl(),

timeout=connect_timeout,
headers=headers,
)

@@ -266,3 +272,4 @@ self._using_tls = True

await self._close_task
await self._client.close()
if self._client:
await self._client.close()
self._ws = self._client = None

@@ -280,1 +287,13 @@

return bool(self._client)
def _get_custom_headers(self):
if self._ws_headers is None:
return None
md: multidict.CIMultiDict[str] = multidict.CIMultiDict()
for name, values in self._ws_headers.items():
if isinstance(values, list):
for v in values:
md.add(name, v)
elif isinstance(values, str):
md.add(name, values)
return md

@@ -17,2 +17,3 @@ # Copyright 2021 The NATS Authors

import datetime
from dataclasses import dataclass, fields, replace

@@ -35,2 +36,3 @@ from enum import Enum

MSG_ID = "Nats-Msg-Id"
MSG_TTL = "Nats-TTL"
ROLLUP = "Nats-Rollup"

@@ -310,2 +312,17 @@ STATUS = "Status"

# Allow per-message TTL via Nats-TTL header. Introduced in nats-server 2.11.0.
allow_msg_ttl: Optional[bool] = None
# Allow scheduled/delayed messages. Introduced in nats-server 2.12.0.
allow_msg_schedules: Optional[bool] = None
# Allow atomic batch publishing. Introduced in nats-server 2.12.0.
allow_atomic: Optional[bool] = None
# Allow batched publishing. Introduced in nats-server 2.12.0.
allow_batched: Optional[bool] = None
# Allow scheduled/delayed messages. Introduced in nats-server 2.12.0.
allow_msg_schedules: Optional[bool] = None
# Metadata are user defined string key/value pairs.

@@ -506,2 +523,7 @@ metadata: Optional[Dict[str, str]] = None

# Consumer pause until timestamp.
# Temporarily suspend message delivery until the specified time (RFC 3339 format).
# Introduced in nats-server 2.11.0.
pause_until: Optional[str] = None
@classmethod

@@ -555,2 +577,8 @@ def from_response(cls, resp: Dict[str, Any]):

push_bound: Optional[bool] = None
# Indicates if the consumer is currently paused.
# Introduced in nats-server 2.11.0.
paused: Optional[bool] = None
# RFC 3339 timestamp until which the consumer is paused.
# Introduced in nats-server 2.11.0.
pause_remaining: Optional[str] = None

@@ -567,2 +595,14 @@ @classmethod

@dataclass
class ConsumerPause(Base):
"""
ConsumerPause represents the pause state after a pause or resume operation.
Introduced in nats-server 2.11.0.
"""
paused: bool
pause_until: Optional[str] = None
pause_remaining: Optional[str] = None
@dataclass
class AccountLimits(Base):

@@ -648,3 +688,3 @@ """Account limits

stream: Optional[str] = None
# TODO: Add 'time'
time: Optional[datetime.datetime] = None

@@ -662,3 +702,21 @@ @property

@classmethod
def _python38_iso_parsing(cls, time_string: str):
# Replace Z with UTC offset
s = time_string.replace("Z", "+00:00")
# Trim fractional seconds to 6 digits
date_part, frac_tz = s.split(".", 1)
frac, tz = frac_tz.split("+")
frac = frac[:6] # keep only microseconds
s = f"{date_part}.{frac}+{tz}"
return s
@classmethod
def from_response(cls, resp: Dict[str, Any]):
resp["time"] = datetime.datetime.fromisoformat(
cls._python38_iso_parsing(resp["time"])
).astimezone(datetime.timezone.utc)
return super().from_response(resp)
@dataclass

@@ -665,0 +723,0 @@ class KeyValueConfig(Base):

@@ -189,5 +189,13 @@ # Copyright 2021-2022 The NATS Authors

headers: Optional[Dict[str, Any]] = None,
msg_ttl: Optional[float] = None,
) -> api.PubAck:
"""
publish emits a new message to JetStream and waits for acknowledgement.
:param subject: Subject to publish to.
:param payload: Message payload.
:param timeout: Request timeout in seconds.
:param stream: Expected stream name.
:param headers: Message headers.
:param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+).
"""

@@ -200,2 +208,6 @@ hdr = headers

hdr[api.Header.EXPECTED_STREAM] = stream
if msg_ttl is not None:
hdr = hdr or {}
# TTL header accepts seconds as integer or duration string
hdr[api.Header.MSG_TTL] = str(int(msg_ttl))

@@ -224,5 +236,13 @@ try:

headers: Optional[Dict] = None,
msg_ttl: Optional[float] = None,
) -> asyncio.Future[api.PubAck]:
"""
emits a new message to JetStream and returns a future that can be awaited for acknowledgement.
:param subject: Subject to publish to.
:param payload: Message payload.
:param wait_stall: Maximum time to wait for semaphore in seconds.
:param stream: Expected stream name.
:param headers: Message headers.
:param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+).
"""

@@ -238,2 +258,6 @@

hdr[api.Header.EXPECTED_STREAM] = stream
if msg_ttl is not None:
hdr = hdr or {}
# TTL header accepts seconds as integer or duration string
hdr[api.Header.MSG_TTL] = str(int(msg_ttl))

@@ -434,4 +458,5 @@ try:

# Auto created consumers use the filter subject.
config.filter_subject = subject
# Auto created consumers use the filter subject, unless filter_subjects is set.
if not config.filter_subjects:
config.filter_subject = subject

@@ -595,5 +620,6 @@ # Heartbeats / FlowControl

# Auto created consumers use the filter subject.
# config.name = durable
config.filter_subject = subject
# Auto created consumers use the filter subject, unless filter_subjects is set.
if not config.filter_subjects:
config.filter_subject = subject
if durable:

@@ -600,0 +626,0 @@ config.name = durable

@@ -273,2 +273,65 @@ # Copyright 2021 The NATS Authors

async def pause_consumer(
self,
stream: str,
consumer: str,
pause_until: str,
timeout: Optional[float] = None,
) -> api.ConsumerPause:
"""
Pause a consumer until the specified time.
Args:
stream: The stream name
consumer: The consumer name
pause_until: RFC 3339 timestamp string (e.g., "2025-10-22T12:00:00Z")
until which the consumer should be paused
timeout: Request timeout in seconds
Returns:
ConsumerPause with paused status
Note:
Requires nats-server 2.11.0 or later
"""
if timeout is None:
timeout = self._timeout
req = {"pause_until": pause_until}
req_data = json.dumps(req).encode()
resp = await self._api_request(
f"{self._prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
req_data,
timeout=timeout,
)
return api.ConsumerPause.from_response(resp)
async def resume_consumer(
self,
stream: str,
consumer: str,
timeout: Optional[float] = None,
) -> api.ConsumerPause:
"""
Resume a paused consumer immediately.
This is equivalent to calling pause_consumer with a timestamp in the past.
Args:
stream: The stream name
consumer: The consumer name
timeout: Request timeout in seconds
Returns:
ConsumerPause with paused=False
Note:
Requires nats-server 2.11.0 or later
"""
# Resume by pausing until a time in the past (epoch)
return await self.pause_consumer(
stream, consumer, "1970-01-01T00:00:00Z", timeout
)
async def consumers_info(

@@ -275,0 +338,0 @@ self,

Metadata-Version: 2.4
Name: nats-py
Version: 2.11.0
Version: 2.12.0
Summary: NATS client for Python

@@ -5,0 +5,0 @@ Author-email: Waldemar Quevedo <wally@synadia.com>

@@ -7,3 +7,3 @@ from setuptools import setup

name="nats-py",
version="2.11.0",
version="2.12.0",
license="Apache 2 License",

@@ -10,0 +10,0 @@ extras_require={

@@ -201,3 +201,113 @@ import asyncio

@async_test
async def test_with_static_headers(self):
if not aiohttp_installed:
pytest.skip("aiohttp not installed")
custom_headers = {
"Authorization": ["Bearer RandomToken"],
"X-Client-ID": ["test-client-123"],
"X-Custom-Header": ["custom-value"],
"Accept": [
"application/json", "text/plain", "application/msgpack"
],
"X-Feature-Flags": ["feature-a", "feature-b", "feature-c"],
"X-Capabilities": ["streaming", "compression", "batching"]
}
nc = await nats.connect(
"ws://localhost:8080", ws_connection_headers=custom_headers
)
# Test basic pub/sub functionality to ensure connection works
sub = await nc.subscribe("foo")
await nc.flush()
# Create test messages
msgs = []
for i in range(10):
msg = b'A' * 100 # 100 bytes of 'A'
msgs.append(msg)
# Publish messages
for i, msg in enumerate(msgs):
await nc.publish("foo", msg)
# Ensure message content is not modified
assert msg == msgs[i], f"User content was changed during publish"
# Receive and verify messages
for i in range(len(msgs)):
msg = await sub.next_msg(timeout=1.0)
assert msg.data == msgs[
i], f"Expected message {i}: {msgs[i]}, got {msg.data}"
await nc.close()
@async_test
async def test_ws_headers_with_reconnect(self):
"""Test that headers persist across reconnections"""
if not aiohttp_installed:
pytest.skip("aiohttp not installed")
reconnect_count = 0
reconnected = asyncio.Future()
async def reconnected_cb():
nonlocal reconnect_count
reconnect_count += 1
if not reconnected.done():
reconnected.set_result(True)
# Connect with custom headers
custom_headers = {
"X-Persistent-Session": ["session-12345"],
"Authorization": ["Bearer ReconnectToken"]
}
nc = await nats.connect(
"ws://localhost:8080",
ws_connection_headers=custom_headers,
reconnected_cb=reconnected_cb,
max_reconnect_attempts=5
)
# Create subscription
messages_received = []
async def message_handler(msg):
messages_received.append(msg.data)
await nc.subscribe("reconnect.test", cb=message_handler)
# Publish before reconnect
await nc.publish("reconnect.test", b"Before reconnect")
await nc.flush()
# Simulate server restart
await asyncio.get_running_loop().run_in_executor(
None, self.server_pool[0].stop
)
await asyncio.sleep(1)
await asyncio.get_running_loop().run_in_executor(
None, self.server_pool[0].start
)
# Wait for reconnection
await asyncio.wait_for(reconnected, timeout=5.0)
# Publish after reconnect
await nc.publish("reconnect.test", b"After reconnect")
await nc.flush()
# Wait a bit for message delivery
await asyncio.sleep(0.5)
# Verify we got messages
assert b"Before reconnect" in messages_received
assert b"After reconnect" in messages_received
assert reconnect_count > 0
await nc.close()
class WebSocketTLSTest(SingleWebSocketTLSServerTestCase):

@@ -315,3 +425,31 @@

@async_test
async def test_ws_headers_with_tls(self):
"""Test custom headers with TLS WebSocket connection"""
if not aiohttp_installed:
pytest.skip("aiohttp not installed")
# Note: This would require a TLS-enabled test server
# Keeping structure similar to the non-TLS test
custom_headers = {
"Authorization": ["Bearer SecureToken"],
"X-TLS-Client": ["secure-client-v1"]
}
nc = await nats.connect(
"wss://localhost:8081",
ws_connection_headers=custom_headers,
tls=self.ssl_ctx
)
# Basic functionality test
sub = await nc.subscribe("tls.test")
await nc.publish("tls.test", b"TLS test message")
msg = await sub.next_msg(timeout=1.0)
assert msg.data == b"TLS test message"
await nc.close()
if __name__ == "__main__":

@@ -318,0 +456,0 @@ import sys

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display