nats-py
Advanced tools
| import asyncio | ||
| import queue | ||
| import socket | ||
| import threading | ||
| import unittest | ||
| import nats | ||
| import pytest | ||
| from tests.utils import async_test | ||
| try: | ||
| import aiohttp # required by nats ws transport | ||
| aiohttp_installed = True | ||
| except ModuleNotFoundError: | ||
| aiohttp_installed = False | ||
| def start_header_catcher(): | ||
| """ | ||
| Minimal TCP listener that captures the incoming HTTP request lines | ||
| (WebSocket handshake) and returns them via a Queue. | ||
| """ | ||
| q = queue.Queue(maxsize=1) | ||
| ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| ln.bind(("127.0.0.1", 0)) | ||
| ln.listen(1) | ||
| host, port = ln.getsockname() | ||
| def _accept_once(): | ||
| try: | ||
| conn, _ = ln.accept() | ||
| with conn: | ||
| conn.settimeout(2.0) | ||
| buf = b"" | ||
| while b"\r\n\r\n" not in buf: | ||
| chunk = conn.recv(4096) | ||
| if not chunk: | ||
| break | ||
| buf += chunk | ||
| header_block = buf.split(b"\r\n\r\n", 1)[0] | ||
| lines = header_block.decode( | ||
| "latin1", errors="replace" | ||
| ).split("\r\n") | ||
| q.put(lines) | ||
| except Exception: | ||
| q.put([]) | ||
| finally: | ||
| try: | ||
| ln.close() | ||
| except OSError: | ||
| pass | ||
| threading.Thread(target=_accept_once, daemon=True).start() | ||
| return f"{host}:{port}", q, (lambda: ln.close()) | ||
| def has_header_value(headers, name, want): | ||
| prefix = name.lower() + ":" | ||
| for h in headers: | ||
| if ":" not in h: | ||
| continue | ||
| if not h.lower().startswith(prefix): | ||
| continue | ||
| val = h.split(":", 1)[1].strip() | ||
| for part in val.split(","): | ||
| if part.strip().lower() == want.lower(): | ||
| return True | ||
| return False | ||
| class TestHeaderCatcher(unittest.TestCase): | ||
| def setUp(self): | ||
| self.loop = asyncio.new_event_loop() | ||
| @async_test | ||
| async def test_ws_headers_static_applied_on_handshake(self): | ||
| if not aiohttp_installed: | ||
| pytest.skip("aiohttp not installed") | ||
| addr, got, close_ln = start_header_catcher() | ||
| custom_headers = { | ||
| "Authorization": ["Bearer Random Token"], | ||
| "X-Multi": ["v1", "v2"], # repeated header -> comma-joined | ||
| "Accept": ["application/json", "text/plain; q=0.8"], | ||
| "X-Feature-Flags": ["feature-a", "feature-b", "feature-c"], | ||
| "Single-Header-Key": "Single-Header-Value" | ||
| } | ||
| try: | ||
| # Connect to our catcher; it won't complete the upgrade. | ||
| with self.assertRaises(Exception): | ||
| await asyncio.wait_for( | ||
| nats.connect( | ||
| f"ws://{addr}", | ||
| ws_connection_headers=custom_headers, | ||
| allow_reconnect=False, | ||
| ), | ||
| timeout=1.0, | ||
| ) | ||
| finally: | ||
| headers = got.get(timeout=2.0) | ||
| close_ln() | ||
| self.assertTrue( | ||
| has_header_value(headers, "Authorization", "Bearer Random Token") | ||
| ) | ||
| self.assertTrue(has_header_value(headers, "X-Multi", "v1")) | ||
| self.assertTrue(has_header_value(headers, "X-Multi", "v2")) | ||
| self.assertTrue( | ||
| has_header_value(headers, "Accept", "application/json") | ||
| ) | ||
| self.assertTrue( | ||
| has_header_value(headers, "Accept", "text/plain; q=0.8") | ||
| ) | ||
| self.assertTrue( | ||
| has_header_value(headers, "X-Feature-Flags", "feature-a") | ||
| ) | ||
| self.assertTrue( | ||
| has_header_value(headers, "X-Feature-Flags", "feature-b") | ||
| ) | ||
| self.assertTrue( | ||
| has_header_value(headers, "X-Feature-Flags", "feature-c") | ||
| ) | ||
| self.assertTrue( | ||
| has_header_value( | ||
| headers, "Single-Header-Key", "Single-Header-Value" | ||
| ) | ||
| ) |
| Metadata-Version: 2.4 | ||
| Name: nats-py | ||
| Version: 2.11.0 | ||
| Version: 2.12.0 | ||
| Summary: NATS client for Python | ||
@@ -5,0 +5,0 @@ Author-email: Waldemar Quevedo <wally@synadia.com> |
@@ -41,2 +41,3 @@ LICENSE | ||
| tests/test_compatibility.py | ||
| tests/test_custom_headers_websocket.py | ||
| tests/test_js.py | ||
@@ -43,0 +44,0 @@ tests/test_micro_service.py |
@@ -6,3 +6,3 @@ from __future__ import annotations | ||
| import ssl | ||
| from typing import List, Optional, Union | ||
| from typing import Dict, List, Optional, Union | ||
| from urllib.parse import ParseResult | ||
@@ -12,4 +12,6 @@ | ||
| import aiohttp | ||
| import multidict | ||
| except ImportError: | ||
| aiohttp = None # type: ignore[assignment] | ||
| multidict = None # type: ignore[assignment] | ||
@@ -197,3 +199,3 @@ from nats.errors import ProtocolError | ||
| def __init__(self): | ||
| def __init__(self, ws_headers: Optional[Dict[str, List[str]]] = None): | ||
| if not aiohttp: | ||
@@ -208,2 +210,3 @@ raise ImportError( | ||
| self._using_tls: Optional[bool] = None | ||
| self._ws_headers = ws_headers | ||
@@ -213,5 +216,6 @@ async def connect( | ||
| ): | ||
| headers = self._get_custom_headers() | ||
| # for websocket library, the uri must contain the scheme already | ||
| self._ws = await self._client.ws_connect( | ||
| uri.geturl(), timeout=connect_timeout | ||
| uri.geturl(), timeout=connect_timeout, headers=headers | ||
| ) | ||
@@ -232,2 +236,3 @@ self._using_tls = False | ||
| headers = self._get_custom_headers() | ||
| self._ws = await self._client.ws_connect( | ||
@@ -237,2 +242,3 @@ uri if isinstance(uri, str) else uri.geturl(), | ||
| timeout=connect_timeout, | ||
| headers=headers, | ||
| ) | ||
@@ -266,3 +272,4 @@ self._using_tls = True | ||
| await self._close_task | ||
| await self._client.close() | ||
| if self._client: | ||
| await self._client.close() | ||
| self._ws = self._client = None | ||
@@ -280,1 +287,13 @@ | ||
| return bool(self._client) | ||
| def _get_custom_headers(self): | ||
| if self._ws_headers is None: | ||
| return None | ||
| md: multidict.CIMultiDict[str] = multidict.CIMultiDict() | ||
| for name, values in self._ws_headers.items(): | ||
| if isinstance(values, list): | ||
| for v in values: | ||
| md.add(name, v) | ||
| elif isinstance(values, str): | ||
| md.add(name, values) | ||
| return md |
+59
-1
@@ -17,2 +17,3 @@ # Copyright 2021 The NATS Authors | ||
| import datetime | ||
| from dataclasses import dataclass, fields, replace | ||
@@ -35,2 +36,3 @@ from enum import Enum | ||
| MSG_ID = "Nats-Msg-Id" | ||
| MSG_TTL = "Nats-TTL" | ||
| ROLLUP = "Nats-Rollup" | ||
@@ -310,2 +312,17 @@ STATUS = "Status" | ||
| # Allow per-message TTL via Nats-TTL header. Introduced in nats-server 2.11.0. | ||
| allow_msg_ttl: Optional[bool] = None | ||
| # Allow scheduled/delayed messages. Introduced in nats-server 2.12.0. | ||
| allow_msg_schedules: Optional[bool] = None | ||
| # Allow atomic batch publishing. Introduced in nats-server 2.12.0. | ||
| allow_atomic: Optional[bool] = None | ||
| # Allow batched publishing. Introduced in nats-server 2.12.0. | ||
| allow_batched: Optional[bool] = None | ||
| # Allow scheduled/delayed messages. Introduced in nats-server 2.12.0. | ||
| allow_msg_schedules: Optional[bool] = None | ||
| # Metadata are user defined string key/value pairs. | ||
@@ -506,2 +523,7 @@ metadata: Optional[Dict[str, str]] = None | ||
| # Consumer pause until timestamp. | ||
| # Temporarily suspend message delivery until the specified time (RFC 3339 format). | ||
| # Introduced in nats-server 2.11.0. | ||
| pause_until: Optional[str] = None | ||
| @classmethod | ||
@@ -555,2 +577,8 @@ def from_response(cls, resp: Dict[str, Any]): | ||
| push_bound: Optional[bool] = None | ||
| # Indicates if the consumer is currently paused. | ||
| # Introduced in nats-server 2.11.0. | ||
| paused: Optional[bool] = None | ||
| # RFC 3339 timestamp until which the consumer is paused. | ||
| # Introduced in nats-server 2.11.0. | ||
| pause_remaining: Optional[str] = None | ||
@@ -567,2 +595,14 @@ @classmethod | ||
| @dataclass | ||
| class ConsumerPause(Base): | ||
| """ | ||
| ConsumerPause represents the pause state after a pause or resume operation. | ||
| Introduced in nats-server 2.11.0. | ||
| """ | ||
| paused: bool | ||
| pause_until: Optional[str] = None | ||
| pause_remaining: Optional[str] = None | ||
| @dataclass | ||
| class AccountLimits(Base): | ||
@@ -648,3 +688,3 @@ """Account limits | ||
| stream: Optional[str] = None | ||
| # TODO: Add 'time' | ||
| time: Optional[datetime.datetime] = None | ||
@@ -662,3 +702,21 @@ @property | ||
| @classmethod | ||
| def _python38_iso_parsing(cls, time_string: str): | ||
| # Replace Z with UTC offset | ||
| s = time_string.replace("Z", "+00:00") | ||
| # Trim fractional seconds to 6 digits | ||
| date_part, frac_tz = s.split(".", 1) | ||
| frac, tz = frac_tz.split("+") | ||
| frac = frac[:6] # keep only microseconds | ||
| s = f"{date_part}.{frac}+{tz}" | ||
| return s | ||
| @classmethod | ||
| def from_response(cls, resp: Dict[str, Any]): | ||
| resp["time"] = datetime.datetime.fromisoformat( | ||
| cls._python38_iso_parsing(resp["time"]) | ||
| ).astimezone(datetime.timezone.utc) | ||
| return super().from_response(resp) | ||
| @dataclass | ||
@@ -665,0 +723,0 @@ class KeyValueConfig(Base): |
+31
-5
@@ -189,5 +189,13 @@ # Copyright 2021-2022 The NATS Authors | ||
| headers: Optional[Dict[str, Any]] = None, | ||
| msg_ttl: Optional[float] = None, | ||
| ) -> api.PubAck: | ||
| """ | ||
| publish emits a new message to JetStream and waits for acknowledgement. | ||
| :param subject: Subject to publish to. | ||
| :param payload: Message payload. | ||
| :param timeout: Request timeout in seconds. | ||
| :param stream: Expected stream name. | ||
| :param headers: Message headers. | ||
| :param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+). | ||
| """ | ||
@@ -200,2 +208,6 @@ hdr = headers | ||
| hdr[api.Header.EXPECTED_STREAM] = stream | ||
| if msg_ttl is not None: | ||
| hdr = hdr or {} | ||
| # TTL header accepts seconds as integer or duration string | ||
| hdr[api.Header.MSG_TTL] = str(int(msg_ttl)) | ||
@@ -224,5 +236,13 @@ try: | ||
| headers: Optional[Dict] = None, | ||
| msg_ttl: Optional[float] = None, | ||
| ) -> asyncio.Future[api.PubAck]: | ||
| """ | ||
| emits a new message to JetStream and returns a future that can be awaited for acknowledgement. | ||
| :param subject: Subject to publish to. | ||
| :param payload: Message payload. | ||
| :param wait_stall: Maximum time to wait for semaphore in seconds. | ||
| :param stream: Expected stream name. | ||
| :param headers: Message headers. | ||
| :param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+). | ||
| """ | ||
@@ -238,2 +258,6 @@ | ||
| hdr[api.Header.EXPECTED_STREAM] = stream | ||
| if msg_ttl is not None: | ||
| hdr = hdr or {} | ||
| # TTL header accepts seconds as integer or duration string | ||
| hdr[api.Header.MSG_TTL] = str(int(msg_ttl)) | ||
@@ -434,4 +458,5 @@ try: | ||
| # Auto created consumers use the filter subject. | ||
| config.filter_subject = subject | ||
| # Auto created consumers use the filter subject, unless filter_subjects is set. | ||
| if not config.filter_subjects: | ||
| config.filter_subject = subject | ||
@@ -595,5 +620,6 @@ # Heartbeats / FlowControl | ||
| # Auto created consumers use the filter subject. | ||
| # config.name = durable | ||
| config.filter_subject = subject | ||
| # Auto created consumers use the filter subject, unless filter_subjects is set. | ||
| if not config.filter_subjects: | ||
| config.filter_subject = subject | ||
| if durable: | ||
@@ -600,0 +626,0 @@ config.name = durable |
+63
-0
@@ -273,2 +273,65 @@ # Copyright 2021 The NATS Authors | ||
| async def pause_consumer( | ||
| self, | ||
| stream: str, | ||
| consumer: str, | ||
| pause_until: str, | ||
| timeout: Optional[float] = None, | ||
| ) -> api.ConsumerPause: | ||
| """ | ||
| Pause a consumer until the specified time. | ||
| Args: | ||
| stream: The stream name | ||
| consumer: The consumer name | ||
| pause_until: RFC 3339 timestamp string (e.g., "2025-10-22T12:00:00Z") | ||
| until which the consumer should be paused | ||
| timeout: Request timeout in seconds | ||
| Returns: | ||
| ConsumerPause with paused status | ||
| Note: | ||
| Requires nats-server 2.11.0 or later | ||
| """ | ||
| if timeout is None: | ||
| timeout = self._timeout | ||
| req = {"pause_until": pause_until} | ||
| req_data = json.dumps(req).encode() | ||
| resp = await self._api_request( | ||
| f"{self._prefix}.CONSUMER.PAUSE.{stream}.{consumer}", | ||
| req_data, | ||
| timeout=timeout, | ||
| ) | ||
| return api.ConsumerPause.from_response(resp) | ||
| async def resume_consumer( | ||
| self, | ||
| stream: str, | ||
| consumer: str, | ||
| timeout: Optional[float] = None, | ||
| ) -> api.ConsumerPause: | ||
| """ | ||
| Resume a paused consumer immediately. | ||
| This is equivalent to calling pause_consumer with a timestamp in the past. | ||
| Args: | ||
| stream: The stream name | ||
| consumer: The consumer name | ||
| timeout: Request timeout in seconds | ||
| Returns: | ||
| ConsumerPause with paused=False | ||
| Note: | ||
| Requires nats-server 2.11.0 or later | ||
| """ | ||
| # Resume by pausing until a time in the past (epoch) | ||
| return await self.pause_consumer( | ||
| stream, consumer, "1970-01-01T00:00:00Z", timeout | ||
| ) | ||
| async def consumers_info( | ||
@@ -275,0 +338,0 @@ self, |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: nats-py | ||
| Version: 2.11.0 | ||
| Version: 2.12.0 | ||
| Summary: NATS client for Python | ||
@@ -5,0 +5,0 @@ Author-email: Waldemar Quevedo <wally@synadia.com> |
+1
-1
@@ -7,3 +7,3 @@ from setuptools import setup | ||
| name="nats-py", | ||
| version="2.11.0", | ||
| version="2.12.0", | ||
| license="Apache 2 License", | ||
@@ -10,0 +10,0 @@ extras_require={ |
@@ -201,3 +201,113 @@ import asyncio | ||
| @async_test | ||
| async def test_with_static_headers(self): | ||
| if not aiohttp_installed: | ||
| pytest.skip("aiohttp not installed") | ||
| custom_headers = { | ||
| "Authorization": ["Bearer RandomToken"], | ||
| "X-Client-ID": ["test-client-123"], | ||
| "X-Custom-Header": ["custom-value"], | ||
| "Accept": [ | ||
| "application/json", "text/plain", "application/msgpack" | ||
| ], | ||
| "X-Feature-Flags": ["feature-a", "feature-b", "feature-c"], | ||
| "X-Capabilities": ["streaming", "compression", "batching"] | ||
| } | ||
| nc = await nats.connect( | ||
| "ws://localhost:8080", ws_connection_headers=custom_headers | ||
| ) | ||
| # Test basic pub/sub functionality to ensure connection works | ||
| sub = await nc.subscribe("foo") | ||
| await nc.flush() | ||
| # Create test messages | ||
| msgs = [] | ||
| for i in range(10): | ||
| msg = b'A' * 100 # 100 bytes of 'A' | ||
| msgs.append(msg) | ||
| # Publish messages | ||
| for i, msg in enumerate(msgs): | ||
| await nc.publish("foo", msg) | ||
| # Ensure message content is not modified | ||
| assert msg == msgs[i], f"User content was changed during publish" | ||
| # Receive and verify messages | ||
| for i in range(len(msgs)): | ||
| msg = await sub.next_msg(timeout=1.0) | ||
| assert msg.data == msgs[ | ||
| i], f"Expected message {i}: {msgs[i]}, got {msg.data}" | ||
| await nc.close() | ||
| @async_test | ||
| async def test_ws_headers_with_reconnect(self): | ||
| """Test that headers persist across reconnections""" | ||
| if not aiohttp_installed: | ||
| pytest.skip("aiohttp not installed") | ||
| reconnect_count = 0 | ||
| reconnected = asyncio.Future() | ||
| async def reconnected_cb(): | ||
| nonlocal reconnect_count | ||
| reconnect_count += 1 | ||
| if not reconnected.done(): | ||
| reconnected.set_result(True) | ||
| # Connect with custom headers | ||
| custom_headers = { | ||
| "X-Persistent-Session": ["session-12345"], | ||
| "Authorization": ["Bearer ReconnectToken"] | ||
| } | ||
| nc = await nats.connect( | ||
| "ws://localhost:8080", | ||
| ws_connection_headers=custom_headers, | ||
| reconnected_cb=reconnected_cb, | ||
| max_reconnect_attempts=5 | ||
| ) | ||
| # Create subscription | ||
| messages_received = [] | ||
| async def message_handler(msg): | ||
| messages_received.append(msg.data) | ||
| await nc.subscribe("reconnect.test", cb=message_handler) | ||
| # Publish before reconnect | ||
| await nc.publish("reconnect.test", b"Before reconnect") | ||
| await nc.flush() | ||
| # Simulate server restart | ||
| await asyncio.get_running_loop().run_in_executor( | ||
| None, self.server_pool[0].stop | ||
| ) | ||
| await asyncio.sleep(1) | ||
| await asyncio.get_running_loop().run_in_executor( | ||
| None, self.server_pool[0].start | ||
| ) | ||
| # Wait for reconnection | ||
| await asyncio.wait_for(reconnected, timeout=5.0) | ||
| # Publish after reconnect | ||
| await nc.publish("reconnect.test", b"After reconnect") | ||
| await nc.flush() | ||
| # Wait a bit for message delivery | ||
| await asyncio.sleep(0.5) | ||
| # Verify we got messages | ||
| assert b"Before reconnect" in messages_received | ||
| assert b"After reconnect" in messages_received | ||
| assert reconnect_count > 0 | ||
| await nc.close() | ||
| class WebSocketTLSTest(SingleWebSocketTLSServerTestCase): | ||
@@ -315,3 +425,31 @@ | ||
| @async_test | ||
| async def test_ws_headers_with_tls(self): | ||
| """Test custom headers with TLS WebSocket connection""" | ||
| if not aiohttp_installed: | ||
| pytest.skip("aiohttp not installed") | ||
| # Note: This would require a TLS-enabled test server | ||
| # Keeping structure similar to the non-TLS test | ||
| custom_headers = { | ||
| "Authorization": ["Bearer SecureToken"], | ||
| "X-TLS-Client": ["secure-client-v1"] | ||
| } | ||
| nc = await nats.connect( | ||
| "wss://localhost:8081", | ||
| ws_connection_headers=custom_headers, | ||
| tls=self.ssl_ctx | ||
| ) | ||
| # Basic functionality test | ||
| sub = await nc.subscribe("tls.test") | ||
| await nc.publish("tls.test", b"TLS test message") | ||
| msg = await sub.next_msg(timeout=1.0) | ||
| assert msg.data == b"TLS test message" | ||
| await nc.close() | ||
| if __name__ == "__main__": | ||
@@ -318,0 +456,0 @@ import sys |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
666016
4.72%46
2.22%16183
4.77%