Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

cent

Package Overview
Dependencies
Maintainers
1
Versions
34
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cent - npm Package Compare versions

Comparing version
4.1.0
to
5.0.0b1
+7
cent/client/__init__.py
from .sync_client import Client
from .async_client import AsyncClient
__all__ = (
"AsyncClient",
"Client",
)
from typing import Optional, Any, cast
from aiohttp import ClientSession
from cent.client.session import AiohttpSession
from cent.dto import (
CentRequest,
CentResultType,
PublishResult,
PublishRequest,
BroadcastRequest,
BroadcastResult,
BatchResult,
BatchRequest,
CancelPushResult,
CancelPushRequest,
UpdatePushStatusResult,
UpdatePushStatusRequest,
SendPushNotificationResult,
SendPushNotificationRequest,
UserTopicUpdateResult,
UserTopicUpdateRequest,
UserTopicListResult,
UserTopicListRequest,
DeviceTopicUpdateResult,
DeviceTopicUpdateRequest,
DeviceTopicListResult,
DeviceTopicListRequest,
DeviceListResult,
DeviceListRequest,
DeviceRemoveResult,
DeviceRemoveRequest,
DeviceUpdateResult,
DeviceUpdateRequest,
DeviceRegisterResult,
DeviceRegisterRequest,
InvalidateUserTokensResult,
InvalidateUserTokensRequest,
RevokeTokenResult,
RevokeTokenRequest,
UnblockUserResult,
UnblockUserRequest,
BlockUserResult,
BlockUserRequest,
DeleteUserStatusResult,
DeleteUserStatusRequest,
GetUserStatusResult,
GetUserStatusRequest,
UpdateUserStatusResult,
UpdateUserStatusRequest,
ConnectionsResult,
ConnectionsRequest,
ChannelsResult,
ChannelsRequest,
RefreshResult,
RefreshRequest,
InfoResult,
InfoRequest,
HistoryRemoveResult,
HistoryRemoveRequest,
HistoryResult,
HistoryRequest,
PresenceStatsResult,
PresenceStatsRequest,
PresenceResult,
PresenceRequest,
DisconnectResult,
DisconnectRequest,
UnsubscribeResult,
UnsubscribeRequest,
SubscribeResult,
SubscribeRequest,
)
class AsyncClient:
def __init__(
self,
api_url: str,
api_key: str,
timeout: Optional[float] = 10.0,
session: Optional[ClientSession] = None,
) -> None:
"""
:param api_url: Centrifugo API URL
:param api_key: Centrifugo API key
:param timeout: Base timeout for all requests
:param session: Custom `aiohttp` session
"""
self._api_key = api_key
self._session = AiohttpSession(
api_url,
timeout=timeout,
session=session,
)
async def _send(
self,
request: CentRequest[CentResultType],
timeout: Optional[float] = None,
) -> CentResultType:
method = request.api_method
payload = request.api_payload
content = await self._session.make_request(
self._api_key,
method,
payload,
timeout=timeout,
)
response = request.parse_response(content)
return cast(CentResultType, response.result)
async def publish(
self,
request: PublishRequest,
timeout: Optional[float] = None,
) -> PublishResult:
return await self._send(request, timeout=timeout)
async def broadcast(
self,
request: BroadcastRequest,
timeout: Optional[float] = None,
) -> BroadcastResult:
return await self._send(request, timeout=timeout)
async def subscribe(
self,
request: SubscribeRequest,
timeout: Optional[float] = None,
) -> SubscribeResult:
return await self._send(request, timeout=timeout)
async def unsubscribe(
self,
request: UnsubscribeRequest,
timeout: Optional[float] = None,
) -> UnsubscribeResult:
return await self._send(request, timeout=timeout)
async def disconnect(
self,
request: DisconnectRequest,
timeout: Optional[float] = None,
) -> DisconnectResult:
return await self._send(request, timeout=timeout)
async def presence(
self,
request: PresenceRequest,
timeout: Optional[float] = None,
) -> PresenceResult:
return await self._send(request, timeout=timeout)
async def presence_stats(
self,
request: PresenceStatsRequest,
timeout: Optional[float] = None,
) -> PresenceStatsResult:
return await self._send(request, timeout=timeout)
async def history(
self,
request: HistoryRequest,
timeout: Optional[float] = None,
) -> HistoryResult:
return await self._send(request, timeout=timeout)
async def history_remove(
self,
request: HistoryRemoveRequest,
timeout: Optional[float] = None,
) -> HistoryRemoveResult:
return await self._send(request, timeout=timeout)
async def info(
self,
request: InfoRequest,
timeout: Optional[float] = None,
) -> InfoResult:
return await self._send(request, timeout=timeout)
async def refresh(
self,
request: RefreshRequest,
timeout: Optional[float] = None,
) -> RefreshResult:
return await self._send(request, timeout=timeout)
async def channels(
self,
request: ChannelsRequest,
timeout: Optional[float] = None,
) -> ChannelsResult:
return await self._send(request, timeout=timeout)
async def connections(
self,
request: ConnectionsRequest,
timeout: Optional[float] = None,
) -> ConnectionsResult:
return await self._send(request, timeout=timeout)
async def update_user_status(
self,
request: UpdateUserStatusRequest,
timeout: Optional[float] = None,
) -> UpdateUserStatusResult:
return await self._send(request, timeout=timeout)
async def get_user_status(
self,
request: GetUserStatusRequest,
timeout: Optional[float] = None,
) -> GetUserStatusResult:
return await self._send(request, timeout=timeout)
async def delete_user_status(
self,
request: DeleteUserStatusRequest,
timeout: Optional[float] = None,
) -> DeleteUserStatusResult:
return await self._send(request, timeout=timeout)
async def block_user(
self,
request: BlockUserRequest,
timeout: Optional[float] = None,
) -> BlockUserResult:
return await self._send(request, timeout=timeout)
async def unblock_user(
self,
request: UnblockUserRequest,
timeout: Optional[float] = None,
) -> UnblockUserResult:
return await self._send(request, timeout=timeout)
async def revoke_token(
self,
request: RevokeTokenRequest,
timeout: Optional[float] = None,
) -> RevokeTokenResult:
return await self._send(request, timeout=timeout)
async def invalidate_user_tokens(
self,
request: InvalidateUserTokensRequest,
timeout: Optional[float] = None,
) -> InvalidateUserTokensResult:
return await self._send(request, timeout=timeout)
async def device_register(
self,
request: DeviceRegisterRequest,
timeout: Optional[float] = None,
) -> DeviceRegisterResult:
return await self._send(request, timeout=timeout)
async def device_update(
self,
request: DeviceUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceUpdateResult:
return await self._send(request, timeout=timeout)
async def device_remove(
self,
request: DeviceRemoveRequest,
timeout: Optional[float] = None,
) -> DeviceRemoveResult:
return await self._send(request, timeout=timeout)
async def device_list(
self,
request: DeviceListRequest,
timeout: Optional[float] = None,
) -> DeviceListResult:
return await self._send(request, timeout=timeout)
async def device_topic_list(
self,
request: DeviceTopicListRequest,
timeout: Optional[float] = None,
) -> DeviceTopicListResult:
return await self._send(request, timeout=timeout)
async def device_topic_update(
self,
request: DeviceTopicUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceTopicUpdateResult:
return await self._send(request, timeout=timeout)
async def user_topic_list(
self,
request: UserTopicListRequest,
timeout: Optional[float] = None,
) -> UserTopicListResult:
return await self._send(request, timeout=timeout)
async def user_topic_update(
self,
request: UserTopicUpdateRequest,
timeout: Optional[float] = None,
) -> UserTopicUpdateResult:
return await self._send(request, timeout=timeout)
async def send_push_notification(
self,
request: SendPushNotificationRequest,
timeout: Optional[float] = None,
) -> SendPushNotificationResult:
return await self._send(request, timeout=timeout)
async def update_push_status(
self,
request: UpdatePushStatusRequest,
timeout: Optional[float] = None,
) -> UpdatePushStatusResult:
return await self._send(request, timeout=timeout)
async def cancel_push(
self,
request: CancelPushRequest,
timeout: Optional[float] = None,
) -> CancelPushResult:
return await self._send(request, timeout=timeout)
async def batch(
self,
request: BatchRequest,
timeout: Optional[float] = None,
) -> BatchResult:
return await self._send(request, timeout=timeout)
async def close(self) -> None:
await self._session.close()
async def __aenter__(self) -> "AsyncClient":
return self
async def __aexit__(self, *kwargs: Any) -> None:
await self.close()
from .aiohttp import AiohttpSession
from .requests import RequestsSession
__all__ = (
"AiohttpSession",
"RequestsSession",
)
import asyncio
from typing import Optional, Dict, Any
from aiohttp import ClientSession, ClientError
from cent.client.session.base_http_async import BaseHttpAsyncSession
from cent.exceptions import CentNetworkError, CentTimeoutError
class AiohttpSession(BaseHttpAsyncSession):
def __init__(
self,
base_url: str,
timeout: Optional[float] = 10.0,
session: Optional[ClientSession] = None,
) -> None:
super().__init__()
self._base_url = base_url
self._timeout = timeout
self._session: ClientSession
if session:
self._session = session
else:
self._session = ClientSession()
async def close(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0)
async def make_request(
self,
api_key: str,
method: str,
json_data: Dict[str, Any],
timeout: Optional[float] = None,
) -> str:
session = self._session
if api_key:
session.headers["X-API-Key"] = api_key
url = f"{self._base_url}/{method}"
try:
async with session.post(
url=url,
json=json_data,
timeout=timeout or self._timeout,
) as resp:
raw_result = await resp.text()
except asyncio.TimeoutError as error:
raise CentTimeoutError(
message="Request timeout",
) from error
except ClientError as error:
raise CentNetworkError(
message=f"{type(error).__name__}: {error}",
) from error
self.check_status_code(status_code=resp.status)
return raw_result
def __del__(self) -> None:
if self._session and not self._session.closed:
if self._session.connector is not None and self._session.connector_owner:
self._session.connector.close()
self._session._connector = None
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from cent.client.session.base_http import BaseHttpSession
class BaseHttpAsyncSession(BaseHttpSession, ABC):
@abstractmethod
async def close(self) -> None:
"""
Close client session
"""
@abstractmethod
async def make_request(
self,
api_key: str,
method: str,
json_data: Dict[str, Any],
timeout: Optional[float] = None,
) -> str:
"""
Make request to Centrifugo HTTP API.
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from cent.client.session.base_http import BaseHttpSession
class BaseHttpSyncSession(BaseHttpSession, ABC):
@abstractmethod
def close(self) -> None:
"""
Close client session
"""
@abstractmethod
def make_request(
self,
api_key: str,
method: str,
json_data: Dict[str, Any],
timeout: Optional[float] = None,
) -> str:
"""
Make request to Centrifugo HTTP API.
"""
from http import HTTPStatus
from cent.exceptions import (
CentUnauthorizedError,
CentTransportError,
)
class BaseHttpSession:
"""Base class for HTTP sessions."""
@staticmethod
def check_status_code(
status_code: int,
) -> None:
if status_code == HTTPStatus.UNAUTHORIZED:
raise CentUnauthorizedError
if status_code != HTTPStatus.OK:
raise CentTransportError(
status_code=status_code,
)
from typing import Optional, Dict, Any
import requests
from requests import Session
from cent.client.session.base_http_sync import BaseHttpSyncSession
from cent.exceptions import CentNetworkError, CentTimeoutError
class RequestsSession(BaseHttpSyncSession):
def __init__(
self,
base_url: str,
timeout: Optional[float] = 10.0,
session: Optional[Session] = None,
) -> None:
super().__init__()
self._base_url = base_url
self._timeout = timeout
self._session: Session
if session:
self._session = session
else:
self._session = Session()
def close(self) -> None:
if self._session is not None:
self._session.close()
def make_request(
self,
api_key: str,
method: str,
json_data: Dict[str, Any],
timeout: Optional[float] = None,
) -> str:
if api_key:
self._session.headers["X-API-Key"] = api_key
url = f"{self._base_url}/{method}"
try:
raw_result = self._session.post(
url=url,
json=json_data,
timeout=timeout or self._timeout,
)
except requests.exceptions.Timeout as error:
raise CentTimeoutError(
message="Request timeout",
) from error
except requests.exceptions.ConnectionError as error:
raise CentNetworkError(
message=f"{type(error).__name__}: {error}",
) from error
self.check_status_code(
status_code=raw_result.status_code,
)
return raw_result.text
def __del__(self) -> None:
self.close()
from typing import Optional, Any, cast
from requests import Session
from cent.client.session import RequestsSession
from cent.dto import (
CentRequest,
CentResultType,
BatchResult,
BatchRequest,
CancelPushResult,
CancelPushRequest,
UpdatePushStatusResult,
UpdatePushStatusRequest,
SendPushNotificationResult,
SendPushNotificationRequest,
UserTopicUpdateResult,
UserTopicUpdateRequest,
UserTopicListResult,
UserTopicListRequest,
DeviceTopicUpdateResult,
DeviceTopicUpdateRequest,
DeviceTopicListResult,
DeviceTopicListRequest,
DeviceListResult,
DeviceListRequest,
DeviceRemoveResult,
DeviceRemoveRequest,
DeviceUpdateResult,
DeviceUpdateRequest,
DeviceRegisterResult,
DeviceRegisterRequest,
InvalidateUserTokensResult,
InvalidateUserTokensRequest,
RevokeTokenResult,
RevokeTokenRequest,
UnblockUserResult,
UnblockUserRequest,
BlockUserResult,
BlockUserRequest,
DeleteUserStatusResult,
DeleteUserStatusRequest,
GetUserStatusResult,
GetUserStatusRequest,
UpdateUserStatusResult,
UpdateUserStatusRequest,
ConnectionsResult,
ConnectionsRequest,
ChannelsResult,
ChannelsRequest,
RefreshResult,
RefreshRequest,
InfoResult,
InfoRequest,
HistoryRemoveResult,
HistoryRemoveRequest,
HistoryResult,
HistoryRequest,
PresenceStatsResult,
PresenceStatsRequest,
PresenceResult,
PresenceRequest,
DisconnectResult,
DisconnectRequest,
UnsubscribeResult,
UnsubscribeRequest,
SubscribeResult,
SubscribeRequest,
BroadcastResult,
BroadcastRequest,
PublishResult,
PublishRequest,
)
class Client:
def __init__(
self,
api_url: str,
api_key: str,
timeout: Optional[float] = 10.0,
session: Optional[Session] = None,
) -> None:
"""
:param api_url: Centrifugo API URL
:param api_key: Centrifugo API key
:param timeout: Base timeout for all requests.
:param session: Custom `requests` session.
"""
self._api_url = api_url
self._api_key = api_key
self._session = RequestsSession(
api_url,
timeout=timeout,
session=session,
)
def _send(
self,
request: CentRequest[CentResultType],
timeout: Optional[float] = None,
) -> CentResultType:
content = self._session.make_request(
self._api_key,
request.api_method,
request.api_payload,
timeout=timeout,
)
response = request.parse_response(content)
return cast(CentResultType, response.result)
def publish(
self,
request: PublishRequest,
timeout: Optional[float] = None,
) -> PublishResult:
return self._send(request, timeout=timeout)
def broadcast(
self,
request: BroadcastRequest,
timeout: Optional[float] = None,
) -> BroadcastResult:
return self._send(request, timeout=timeout)
def subscribe(
self,
request: SubscribeRequest,
timeout: Optional[float] = None,
) -> SubscribeResult:
return self._send(request, timeout=timeout)
def unsubscribe(
self,
request: UnsubscribeRequest,
timeout: Optional[float] = None,
) -> UnsubscribeResult:
return self._send(request, timeout=timeout)
def disconnect(
self,
request: DisconnectRequest,
timeout: Optional[float] = None,
) -> DisconnectResult:
return self._send(request, timeout=timeout)
def presence(
self,
request: PresenceRequest,
timeout: Optional[float] = None,
) -> PresenceResult:
return self._send(request, timeout=timeout)
def presence_stats(
self,
request: PresenceStatsRequest,
timeout: Optional[float] = None,
) -> PresenceStatsResult:
return self._send(request, timeout=timeout)
def history(
self,
request: HistoryRequest,
timeout: Optional[float] = None,
) -> HistoryResult:
return self._send(request, timeout=timeout)
def history_remove(
self,
request: HistoryRemoveRequest,
timeout: Optional[float] = None,
) -> HistoryRemoveResult:
return self._send(request, timeout=timeout)
def info(
self,
request: InfoRequest,
timeout: Optional[float] = None,
) -> InfoResult:
return self._send(request, timeout=timeout)
def refresh(
self,
request: RefreshRequest,
timeout: Optional[float] = None,
) -> RefreshResult:
return self._send(request, timeout=timeout)
def channels(
self,
request: ChannelsRequest,
timeout: Optional[float] = None,
) -> ChannelsResult:
return self._send(request, timeout=timeout)
def connections(
self,
request: ConnectionsRequest,
timeout: Optional[float] = None,
) -> ConnectionsResult:
return self._send(request, timeout=timeout)
def update_user_status(
self,
request: UpdateUserStatusRequest,
timeout: Optional[float] = None,
) -> UpdateUserStatusResult:
return self._send(request, timeout=timeout)
def get_user_status(
self,
request: GetUserStatusRequest,
timeout: Optional[float] = None,
) -> GetUserStatusResult:
return self._send(request, timeout=timeout)
def delete_user_status(
self,
request: DeleteUserStatusRequest,
timeout: Optional[float] = None,
) -> DeleteUserStatusResult:
return self._send(request, timeout=timeout)
def block_user(
self,
request: BlockUserRequest,
timeout: Optional[float] = None,
) -> BlockUserResult:
return self._send(request, timeout=timeout)
def unblock_user(
self,
request: UnblockUserRequest,
timeout: Optional[float] = None,
) -> UnblockUserResult:
return self._send(request, timeout=timeout)
def revoke_token(
self,
request: RevokeTokenRequest,
timeout: Optional[float] = None,
) -> RevokeTokenResult:
return self._send(request, timeout=timeout)
def invalidate_user_tokens(
self,
request: InvalidateUserTokensRequest,
timeout: Optional[float] = None,
) -> InvalidateUserTokensResult:
return self._send(request, timeout=timeout)
def device_register(
self,
request: DeviceRegisterRequest,
timeout: Optional[float] = None,
) -> DeviceRegisterResult:
return self._send(request, timeout=timeout)
def device_update(
self,
request: DeviceUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceUpdateResult:
return self._send(request, timeout=timeout)
def device_remove(
self,
request: DeviceRemoveRequest,
timeout: Optional[float] = None,
) -> DeviceRemoveResult:
return self._send(request, timeout=timeout)
def device_list(
self,
request: DeviceListRequest,
timeout: Optional[float] = None,
) -> DeviceListResult:
return self._send(request, timeout=timeout)
def device_topic_list(
self,
request: DeviceTopicListRequest,
timeout: Optional[float] = None,
) -> DeviceTopicListResult:
return self._send(request, timeout=timeout)
def device_topic_update(
self,
request: DeviceTopicUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceTopicUpdateResult:
return self._send(request, timeout=timeout)
def user_topic_list(
self,
request: UserTopicListRequest,
timeout: Optional[float] = None,
) -> UserTopicListResult:
return self._send(request, timeout=timeout)
def user_topic_update(
self,
request: UserTopicUpdateRequest,
timeout: Optional[float] = None,
) -> UserTopicUpdateResult:
return self._send(request, timeout=timeout)
def send_push_notification(
self,
request: SendPushNotificationRequest,
timeout: Optional[float] = None,
) -> SendPushNotificationResult:
return self._send(request, timeout=timeout)
def update_push_status(
self,
request: UpdatePushStatusRequest,
timeout: Optional[float] = None,
) -> UpdatePushStatusResult:
return self._send(request, timeout=timeout)
def cancel_push(
self,
request: CancelPushRequest,
timeout: Optional[float] = None,
) -> CancelPushResult:
return self._send(request, timeout=timeout)
def batch(
self,
request: BatchRequest,
timeout: Optional[float] = None,
) -> BatchResult:
return self._send(request, timeout=timeout)
def close(self) -> None:
self._session.close()
def __enter__(self) -> "Client":
return self
def __exit__(self, *kwargs: Any) -> None:
self.close()
import json
from abc import ABC, abstractmethod
from typing import TypeVar, Any, Generic, TYPE_CHECKING, ClassVar, Optional, List, Dict
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError
from cent.exceptions import CentDecodeError, CentApiResponseError
class CentResult(BaseModel, ABC):
model_config = ConfigDict(
use_enum_values=True,
extra="allow",
validate_assignment=True,
frozen=True,
populate_by_name=True,
arbitrary_types_allowed=True,
defer_build=True,
)
CentResultType = TypeVar("CentResultType", bound=CentResult)
class Error(BaseModel):
code: int
message: str
class Response(BaseModel, Generic[CentResultType]):
error: Optional[Error] = None
result: Optional[CentResultType] = None
class CentRequest(BaseModel, Generic[CentResultType], ABC):
model_config = ConfigDict(
extra="allow",
populate_by_name=True,
arbitrary_types_allowed=True,
)
if TYPE_CHECKING:
__returning__: ClassVar[type]
__api_method__: ClassVar[str]
else:
@property
@abstractmethod
def __returning__(self) -> type:
pass
@property
@abstractmethod
def __api_method__(self) -> str:
pass
@property
def api_payload(self) -> Any:
return self.model_dump(exclude_none=True)
@property
def api_method(self) -> str:
return self.__api_method__
def parse_response(
self,
content: str,
) -> Response[CentResult]:
try:
json_data = json.loads(content)
except Exception as err:
raise CentDecodeError from err
if isinstance(self, BatchRequest):
json_data = _validate_batch(self, json_data["replies"])
try:
response_type = Response[self.__returning__] # type: ignore
response = TypeAdapter(response_type).validate_python(
json_data,
)
except ValidationError as err:
raise CentDecodeError from err
if response.error:
raise CentApiResponseError(
code=response.error.code,
message=response.error.message,
)
return response
class NestedModel(BaseModel, ABC):
model_config = ConfigDict(
extra="allow",
populate_by_name=True,
arbitrary_types_allowed=True,
)
class BatchResult(CentResult):
"""Batch response.
Attributes:
replies: List of results from batch request.
"""
replies: List[CentResult]
class BatchRequest(CentRequest[BatchResult]):
"""Batch request."""
__returning__ = BatchResult
__api_method__ = "batch"
requests: List[Any]
parallel: Optional[bool] = None
@property
def api_payload(self) -> Any:
commands = [
{request.__api_method__: request.model_dump(exclude_none=True)}
for request in self.requests
]
return {"commands": commands, "parallel": bool(self.parallel)}
def _validate_batch(
request: BatchRequest,
json_replies: List[Dict[str, Any]],
) -> Dict[str, Dict[str, List[Any]]]:
replies: List[CentRequest[Any]] = []
for command_method, json_data in zip(request.requests, json_replies):
validated_request: CentRequest[Any] = TypeAdapter(
command_method.__returning__
).validate_python(
json_data[command_method.__api_method__],
)
replies.append(validated_request)
return {"result": {"replies": replies}}
class Disconnect(NestedModel):
"""Disconnect data.
Attributes:
code (int): Disconnect code.
reason (str): Disconnect reason.
"""
code: int = 0
reason: str = ""
class BoolValue(NestedModel):
"""Bool value.
Attributes:
value (bool): Value.
"""
value: bool
class StreamPosition(NestedModel):
"""
Stream position representation.
Attributes:
offset (int): Offset of publication in history stream.
epoch (str): Epoch of current stream.
"""
offset: int
epoch: str
class ChannelOptionsOverride(NestedModel):
"""
Override object for channel options.
Attributes:
presence (Optional[BoolValue]): Override for presence.
join_leave (Optional[BoolValue]): Override for join_leave behavior.
force_push_join_leave (Optional[BoolValue]): Force push for join_leave events.
force_positioning (Optional[BoolValue]): Override for force positioning.
force_recovery (Optional[BoolValue]): Override for force recovery.
"""
presence: Optional[BoolValue] = None
join_leave: Optional[BoolValue] = None
force_push_join_leave: Optional[BoolValue] = None
force_positioning: Optional[BoolValue] = None
force_recovery: Optional[BoolValue] = None
class ProcessStats(CentResult):
"""
Represents statistics of a process.
Attributes:
cpu (float): Process CPU usage as a percentage. Defaults to 0.0.
rss (int): Process Resident Set Size (RSS) in bytes.
"""
cpu: float = 0.0
rss: int = 0
class ClientInfo(CentResult):
"""
Represents the result containing client information.
Attributes:
client (str): Client ID.
user (str): User ID.
conn_info (Optional[Any]): Optional connection info. This can include details
such as IP address, location, etc.
chan_info (Optional[Any]): Optional channel info. This might include specific
settings or preferences related to the channel.
"""
client: str = ""
user: str = ""
conn_info: Optional[Any] = None
chan_info: Optional[Any] = None
class Publication(CentResult):
"""Publication result.
Attributes:
offset (int): Offset of publication in history stream.
data (Any): Custom JSON inside publication.
tags (Optional[Dict[str, str]]): Tags are optional.
"""
data: Any
offset: int = 0
tags: Optional[Dict[str, str]] = None
class Metrics(CentResult):
"""Metrics result.
Attributes:
interval (float): Metrics aggregation interval.
items (Dict[str, float]): metric values.
"""
interval: float = 0.0
items: Dict[str, float]
class Node(CentResult):
"""Node result.
Attributes:
uid (str): Node unique identifier.
name (str): Node name.
version (str): Node version.
num_clients (int): Total number of connections.
num_subs (int): Total number of subscriptions.
num_users (int): Total number of users.
num_channels (int): Total number of channels.
uptime (int): Node uptime.
metrics (Optional[Metrics]): Node metrics.
process (Optional[ProcessStats]): Node process stats.
"""
uid: str
name: str
version: str = ""
num_clients: int = 0
num_subs: int = 0
num_users: int = 0
num_channels: int = 0
uptime: int = 0
metrics: Optional[Metrics] = None
process: Optional[ProcessStats] = None
class PublishResult(CentResult):
"""Publish result.
Attributes:
offset: Offset of publication in history stream.
epoch: Epoch of current stream.
"""
offset: int = 0
epoch: str = ""
class BroadcastResult(CentResult):
"""Broadcast result.
Attributes:
responses: List of responses for each individual publish
(with possible error and publish result)
"""
responses: List[Response[PublishResult]] = Field(default_factory=list)
class ChannelInfoResult(CentResult):
"""Channel info result.
Attributes:
num_clients: Total number of connections currently subscribed to a channel.
"""
num_clients: int = 0
class ChannelsResult(CentResult):
"""Channels result.
Attributes:
channels: Map where key is channel and value is ChannelInfoResult.
"""
channels: Dict[str, ChannelInfoResult]
class DisconnectResult(CentResult):
"""Disconnect result."""
class HistoryRemoveResult(CentResult):
"""History remove result."""
class HistoryResult(CentResult):
"""History result.
Attributes:
publications: List of publications in channel.
offset: Top offset in history stream.
epoch: Epoch of current stream.
"""
publications: List[Publication] = Field(default_factory=list)
offset: int = 0
epoch: str = ""
class InfoResult(CentResult):
"""Info result.
Attributes:
nodes: Information about all nodes in a cluster.
"""
nodes: List[Node]
class PresenceResult(CentResult):
"""Presence result.
Attributes:
presence: Map where key is client ID and value is ClientInfo.
"""
presence: Dict[str, ClientInfo]
class PresenceStatsResult(CentResult):
"""Presence stats result.
Attributes:
num_clients: Total number of clients in channel.
num_users: Total number of unique users in channel.
"""
num_clients: int = 0
num_users: int = 0
class RefreshResult(CentResult):
"""Refresh result."""
class SubscribeResult(CentResult):
"""Subscribe result."""
class UnsubscribeResult(CentResult):
"""Unsubscribe result."""
class BroadcastRequest(CentRequest[BroadcastResult]):
"""Broadcast request.
Attributes:
channels: List of channels to publish data to.
data: Custom data to publish into a channel.
skip_history: Skip adding publications to channels' history for this request.
tags: Publication tags - map with arbitrary string keys and values which is attached to
publication and will be delivered to clients.
b64data: Custom binary data to publish into a channel encoded to base64, so it's possible
to use HTTP API to send binary to clients. Centrifugo will decode it from base64 before
publishing. In case of GRPC you can publish binary using data field.
idempotency_key: Optional idempotency key to drop duplicate publications upon retries. It
acts per channel. Centrifugo currently keeps the cache of idempotent publish results
during 5 minutes window. Available since Centrifugo v5.2.0
"""
__returning__ = BroadcastResult
__api_method__ = "broadcast"
channels: List[str]
data: Any
skip_history: Optional[bool] = None
tags: Optional[Dict[str, str]] = None
b64data: Optional[str] = None
idempotency_key: Optional[str] = None
class ChannelsRequest(CentRequest[ChannelsResult]):
"""Channels request.
Attributes:
pattern: Pattern to filter channels, we are using https://github.com/gobwas/glob
library for matching.
"""
__returning__ = ChannelsResult
__api_method__ = "channels"
pattern: Optional[str] = None
class DisconnectRequest(CentRequest[DisconnectResult]):
"""Disconnect request.
Attributes:
user: User ID to disconnect.
client: Specific client ID to disconnect (user still required to be set).
session: Specific client session to disconnect (user still required to be set).
whitelist: Array of client IDs to keep.
disconnect: Provide custom disconnect object.
"""
__returning__ = DisconnectResult
__api_method__ = "disconnect"
user: str
client: Optional[str] = None
session: Optional[str] = None
whitelist: Optional[List[str]] = None
disconnect: Optional[Disconnect] = None
class HistoryRequest(CentRequest[HistoryResult]):
"""History request.
Attributes:
channel: Name of channel to call history from.
limit: Limit number of returned publications, if not set in request then only
current stream position information will present in result (without any publications).
since: Return publications after this position.
reverse: Iterate in reversed order (from latest to earliest).
"""
__returning__ = HistoryResult
__api_method__ = "history"
channel: str
limit: Optional[int] = None
since: Optional[StreamPosition] = None
reverse: Optional[bool] = None
class HistoryRemoveRequest(CentRequest[HistoryRemoveResult]):
"""History remove request.
Attributes:
channel: Name of channel to remove history.
"""
__returning__ = HistoryRemoveResult
__api_method__ = "history_remove"
channel: str
class InfoRequest(CentRequest[InfoResult]):
"""Info request."""
__returning__ = InfoResult
__api_method__ = "info"
class PresenceRequest(CentRequest[PresenceResult]):
"""Presence request.
Attributes:
channel: Name of channel to call presence from.
"""
__returning__ = PresenceResult
__api_method__ = "presence"
channel: str
class PresenceStatsRequest(CentRequest[PresenceStatsResult]):
"""Presence request.
Attributes:
channel: Name of channel to call presence from.
"""
__returning__ = PresenceStatsResult
__api_method__ = "presence_stats"
channel: str
class PublishRequest(CentRequest[PublishResult]):
"""Publish request.
Attributes:
channel: Name of channel to publish.
data: Custom data to publish into a channel.
skip_history: Skip adding publication to history for this request.
tags: Publication tags - map with arbitrary string keys and values which is attached to
publication and will be delivered to clients.
b64data: Custom binary data to publish into a channel encoded to base64, so it's possible
to use HTTP API to send binary to clients. Centrifugo will decode it from base64
before publishing. In case of GRPC you can publish binary using data field.
idempotency_key: Optional idempotency key to drop duplicate publications upon retries.
It acts per channel. Centrifugo currently keeps the cache of idempotent publish
results during 5 minutes window. Available since Centrifugo v5.2.0
"""
__returning__ = PublishResult
__api_method__ = "publish"
channel: str
data: Any
skip_history: Optional[bool] = None
tags: Optional[Dict[str, str]] = None
b64data: Optional[str] = None
idempotency_key: Optional[str] = None
class RefreshRequest(CentRequest[RefreshResult]):
"""Refresh request.
Attributes:
user: User ID to refresh.
client: Client ID to refresh (user still required to be set).
session: Specific client session to refresh (user still required to be set).
expired: Mark connection as expired and close with Disconnect Expired reason.
expire_at: Unix time (in seconds) in the future when the connection will expire.
"""
__returning__ = RefreshResult
__api_method__ = "refresh"
user: str
client: Optional[str] = None
session: Optional[str] = None
expired: Optional[bool] = None
expire_at: Optional[int] = None
class SubscribeRequest(CentRequest[SubscribeResult]):
"""Subscribe request.
Attributes:
user: User ID to subscribe.
channel: Name of channel to subscribe user to.
info: Attach custom data to subscription (will be used in presence and join/leave
messages).
b64info: info in base64 for binary mode (will be decoded by Centrifugo).
client: Specific client ID to subscribe (user still required to be set, will ignore other
user connections with different client IDs).
session: Specific client session to subscribe (user still required to be set).
data: Custom subscription data (will be sent to client in Subscribe push).
b64data: Same as data but in base64 format (will be decoded by Centrifugo).
recover_since: Stream position to recover from.
override: Allows dynamically override some channel options defined in Centrifugo
configuration (see below available fields).
"""
__returning__ = SubscribeResult
__api_method__ = "subscribe"
user: str
channel: str
info: Optional[Any] = None
b64info: Optional[str] = None
client: Optional[str] = None
session: Optional[str] = None
data: Optional[Any] = None
b64data: Optional[str] = None
recover_since: Optional[StreamPosition] = None
override: Optional[ChannelOptionsOverride] = None
class UnsubscribeRequest(CentRequest[UnsubscribeResult]):
"""Unsubscribe request.
Attributes:
user: User ID to unsubscribe.
channel: Name of channel to unsubscribe user to.
client: Specific client ID to unsubscribe (user still required to be set).
session: Specific client session to disconnect (user still required to be set).
"""
__returning__ = UnsubscribeResult
__api_method__ = "unsubscribe"
user: str
channel: str
client: Optional[str] = None
session: Optional[str] = None
class ConnectionTokenInfo(NestedModel):
"""Connection token info."""
uid: Optional[str] = None
issued_at: Optional[int] = None
class SubscriptionTokenInfo(NestedModel):
"""Subscription token info."""
uid: Optional[str] = None
issued_at: Optional[int] = None
class ChannelContext(NestedModel):
"""Channel context."""
source: Optional[int] = None
class ConnectionState(NestedModel):
"""Connection state."""
channels: Optional[Dict[str, ChannelContext]] = None
connection_token: Optional[ConnectionTokenInfo] = None
subscription_tokens: Optional[Dict[str, SubscriptionTokenInfo]] = None
meta: Optional[Any] = None
class ConnectionInfo(NestedModel):
"""Connection info."""
app_name: str = ""
app_version: str = ""
transport: str
protocol: str
user: str = ""
state: Optional[ConnectionState] = None
class ConnectionsResult(CentResult):
connections: Dict[str, ConnectionInfo]
class ConnectionsRequest(CentRequest[ConnectionsResult]):
"""Connections request."""
__api_method__ = "connections"
__returning__ = ConnectionsResult
user: str
expression: str
class UpdateUserStatusResult(CentResult):
"""
Update user status result.
"""
class UpdateUserStatusRequest(CentRequest[UpdateUserStatusResult]):
"""Update user status request."""
__api_method__ = "update_user_status"
__returning__ = UpdateUserStatusResult
users: List[str]
class UserStatus(NestedModel):
"""
User status.
"""
user: str
active: Optional[int] = None
online: Optional[int] = None
class GetUserStatusResult(CentResult):
"""
Get user status result.
"""
statuses: List[UserStatus]
class GetUserStatusRequest(CentRequest[GetUserStatusResult]):
"""
Get user status request.
"""
__api_method__ = "get_user_status"
__returning__ = GetUserStatusResult
users: List[str]
class DeleteUserStatusResult(CentResult):
"""
Delete user status result.
"""
class DeleteUserStatusRequest(CentRequest[DeleteUserStatusResult]):
"""
Delete user status request.
"""
__api_method__ = "delete_user_status"
__returning__ = DeleteUserStatusResult
users: List[str]
class BlockUserResult(CentResult):
"""
Block user result.
"""
class BlockUserRequest(CentRequest[BlockUserResult]):
"""
Block user request.
"""
__api_method__ = "block_user"
__returning__ = BlockUserResult
expire_at: Optional[int] = None
user: str
class UnblockUserResult(CentResult):
"""
Unblock user result.
"""
class UnblockUserRequest(CentRequest[UnblockUserResult]):
"""
Unblock user request.
"""
__api_method__ = "unblock_user"
__returning__ = UnblockUserResult
user: str
class RevokeTokenResult(CentResult):
"""
Revoke token result.
"""
class RevokeTokenRequest(CentRequest[RevokeTokenResult]):
"""
Revoke token request.
"""
__api_method__ = "revoke_token"
__returning__ = RevokeTokenResult
expire_at: Optional[int] = None
uid: str
class InvalidateUserTokensResult(CentResult):
"""
Invalidate user tokens result.
"""
class InvalidateUserTokensRequest(CentRequest[InvalidateUserTokensResult]):
"""
Invalidate user tokens request.
"""
__api_method__ = "invalidate_user_tokens"
__returning__ = InvalidateUserTokensResult
expire_at: Optional[int] = None
user: str
issued_before: Optional[int] = None
channel: Optional[str] = None
class DeviceRegisterResult(CentResult):
"""
Device register result.
"""
id: str
class DeviceRegisterRequest(CentRequest[DeviceRegisterResult]):
"""
Device register request.
"""
__api_method__ = "device_register"
__returning__ = DeviceRegisterResult
id: Optional[str] = None
provider: str
token: str
platform: str
user: Optional[str] = None
timezone: Optional[str] = None
language: Optional[str] = None
meta: Optional[Dict[str, str]] = None
topics: Optional[List[str]] = None
class DeviceUserUpdate(NestedModel):
"""
Device user update.
"""
user: str
class DeviceTimezoneUpdate(NestedModel):
"""
Device timezone update.
"""
timezone: str
class DeviceLanguageUpdate(NestedModel):
"""
Device language update.
"""
language: str
class DeviceMetaUpdate(NestedModel):
"""
Device meta update.
"""
meta: Dict[str, str]
class DeviceTopicsUpdate(NestedModel):
"""
Device topics update.
"""
op: str
topics: List[str]
class DeviceUpdateResult(CentResult):
"""
Device update result.
"""
class DeviceUpdateRequest(CentRequest[DeviceUpdateResult]):
"""
Device update request.
"""
__api_method__ = "device_update"
__returning__ = DeviceUpdateResult
ids: Optional[List[str]] = None
users: Optional[List[str]] = None
user_update: Optional[DeviceUserUpdate] = None
timezone_update: Optional[DeviceTimezoneUpdate] = None
language_update: Optional[DeviceLanguageUpdate] = None
meta_update: Optional[DeviceMetaUpdate] = None
topics_update: Optional[DeviceTopicsUpdate] = None
class DeviceRemoveResult(CentResult):
"""
Device remove result.
"""
class DeviceRemoveRequest(CentRequest[DeviceRemoveResult]):
"""
Device remove request.
"""
__api_method__ = "device_remove"
__returning__ = DeviceRemoveResult
ids: Optional[List[str]] = None
users: Optional[List[str]] = None
class DeviceFilter(NestedModel):
"""
Device filter.
"""
ids: Optional[List[str]] = None
users: Optional[List[str]] = None
topics: Optional[List[str]] = None
providers: Optional[List[str]] = None
platforms: Optional[List[str]] = None
class Device(NestedModel):
"""
Device.
"""
id: str
platform: str
provider: str
token: str
user: str = ""
created_at: int
updated_at: int
meta: Optional[Dict[str, str]] = None
topics: Optional[List[str]] = None
class DeviceListResult(CentResult):
"""
Device list result.
"""
items: List[Device]
next_cursor: Optional[str] = None
total_count: Optional[int] = None
class DeviceListRequest(CentRequest[DeviceListResult]):
"""
Device list request.
"""
__api_method__ = "device_list"
__returning__ = DeviceListResult
filter: Optional[DeviceFilter] = None
include_total_count: Optional[bool] = None
include_meta: Optional[bool] = None
include_topics: Optional[bool] = None
cursor: Optional[str] = None
limit: Optional[int] = None
class DeviceTopicFilter(NestedModel):
"""
Device topic filter.
"""
device_ids: Optional[List[str]] = None
device_providers: Optional[List[str]] = None
device_platforms: Optional[List[str]] = None
device_users: Optional[List[str]] = None
topics: Optional[List[str]] = None
topic_prefix: Optional[str] = None
class DeviceTopic(NestedModel):
"""
Device topic.
"""
id: str
topic: str
device: Device
class DeviceTopicListResult(CentResult):
"""
Device topic list result.
"""
items: List[DeviceTopic]
next_cursor: Optional[str] = None
total_count: Optional[int] = None
class DeviceTopicListRequest(CentRequest[DeviceTopicListResult]):
"""
Device topic list request.
"""
__api_method__ = "device_topic_list"
__returning__ = DeviceTopicListResult
filter: Optional[DeviceTopicFilter] = None
include_total_count: Optional[bool] = None
include_device: Optional[bool] = None
cursor: Optional[str] = None
limit: Optional[int] = None
class UserTopicFilter(NestedModel):
"""
User topic filter.
"""
users: Optional[List[str]] = None
topics: Optional[List[str]] = None
topic_prefix: Optional[str] = None
class UserTopic(NestedModel):
"""
User topic.
"""
id: str
user: str
topic: str
class UserTopicListResult(CentResult):
"""
User topic list result.
"""
items: List[UserTopic]
next_cursor: Optional[str] = None
total_count: Optional[int] = None
class UserTopicListRequest(CentRequest[UserTopicListResult]):
"""
User topic list request.
"""
__api_method__ = "user_topic_list"
__returning__ = UserTopicListResult
filter: Optional[UserTopicFilter] = None
include_total_count: Optional[bool] = None
cursor: Optional[str] = None
limit: Optional[int] = None
class DeviceTopicUpdateResult(CentResult):
"""
Device topic update result.
"""
class DeviceTopicUpdateRequest(CentRequest[DeviceTopicUpdateResult]):
"""
Device topic update request.
"""
__api_method__ = "device_topic_update"
__returning__ = DeviceTopicUpdateResult
device_id: str
op: str
topics: List[str]
class UserTopicUpdateResult(CentResult):
"""
User topic update result.
"""
class UserTopicUpdateRequest(CentRequest[UserTopicUpdateResult]):
"""
User topic update request.
"""
__api_method__ = "user_topic_update"
__returning__ = UserTopicUpdateResult
user: str
op: str
topics: List[str]
class PushRecipient(NestedModel):
"""
Push recipient.
"""
filter: Optional[DeviceFilter] = None
fcm_tokens: Optional[List[str]] = None
fcm_topic: Optional[str] = None
fcm_condition: Optional[str] = None
hms_tokens: Optional[List[str]] = None
hms_topic: Optional[str] = None
hms_condition: Optional[str] = None
apns_tokens: Optional[List[str]] = None
class FcmPushNotification(NestedModel):
"""
FCM push notification.
"""
message: Any
class HmsPushNotification(NestedModel):
"""
HMS push notification.
"""
message: Any
class ApnsPushNotification(NestedModel):
"""
APNS push notification.
"""
headers: Optional[Dict[str, str]] = None
payload: Any
class PushNotification(NestedModel):
"""
Push notification.
"""
fcm: Optional[FcmPushNotification] = None
hms: Optional[HmsPushNotification] = None
apns: Optional[ApnsPushNotification] = None
expire_at: Optional[int] = None
class SendPushNotificationResult(CentResult):
"""Send push notification result."""
uid: str
class PushLocalization(NestedModel):
translations: Dict[str, str]
class RateLimitPolicy(NestedModel):
rate: int
interval_ms: int
class PushRateLimitStrategy(NestedModel):
key: Optional[str] = None
policies: List[RateLimitPolicy]
drop_if_rate_limited: bool = False
class PushTimeLimitStrategy(NestedModel):
send_after_time: str # use "%H:%M:%S" format, ex. "09:00:00"
send_before_time: str # use "%H:%M:%S" format, ex. "18:00:00"
no_tz_send_now: bool = False
class PushLimitStrategy(NestedModel):
rate_limit: Optional[PushRateLimitStrategy] = None
time_limit: Optional[PushTimeLimitStrategy] = None
class SendPushNotificationRequest(CentRequest[SendPushNotificationResult]):
"""
Send push notification request.
"""
__api_method__ = "send_push_notification"
__returning__ = SendPushNotificationResult
recipient: PushRecipient
notification: PushNotification
uid: Optional[str] = None
send_at: Optional[int] = None
analytics_uid: Optional[str] = None
optimize_for_reliability: Optional[bool] = None
limit_strategy: Optional[PushLimitStrategy] = None
localizations: Optional[Dict[str, PushLocalization]] = None
use_templating: Optional[bool] = None
use_meta: Optional[bool] = None
class UpdatePushStatusResult(CentResult):
"""
Update push status result.
"""
class UpdatePushStatusRequest(CentRequest[UpdatePushStatusResult]):
"""
Update push status request.
"""
__api_method__ = "update_push_status"
__returning__ = UpdatePushStatusResult
analytics_uid: str
status: str
device_id: Optional[str] = None
msg_id: Optional[str] = None
class CancelPushResult(CentResult):
"""
Cancel push result.
"""
class CancelPushRequest(CentRequest[CancelPushResult]):
"""
Cancel push request.
"""
__returning__ = CancelPushResult
__api_method__ = "cancel_push"
uid: str
class CentError(Exception):
"""
Wrapper for all exceptions coming from this library.
"""
class CentNetworkError(CentError):
"""CentNetworkError raised when Centrifugo is unreachable or not available."""
def __init__(self, message: str) -> None:
self.message = message
def __str__(self) -> str:
return f"Network error - {self.message}"
def __repr__(self) -> str:
return f"{type(self).__name__}('{self}')"
class CentTransportError(CentError):
"""CentTransportError raised when HTTP request results into non-200 status code."""
def __init__(self, status_code: int):
self.status_code = status_code
def __str__(self) -> str:
return f"Transport error - {self.status_code}"
def __repr__(self) -> str:
return f"{type(self).__name__}('{self}')"
class CentTimeoutError(CentError):
"""CentTimeoutError raised when request is timed out"""
def __init__(self, message: str) -> None:
self.message = message
def __str__(self) -> str:
return f"Timeout error - {self.message}"
def __repr__(self) -> str:
return f"{type(self).__name__}('{self}')"
class CentUnauthorizedError(CentError):
"""
CentUnauthorizedError raised when Centrifugo returns 401 status code.
"""
class CentDecodeError(CentError):
"""
CentDecodeError raised when response from Centrifugo can't be decoded.
"""
class CentApiResponseError(CentError):
"""
CentApiResponseError raised when the response from Centrifugo server API contains
any error as a result of API command execution.
"""
def __init__(self, code: int, message: str) -> None:
self.code = code
self.message = message
def __str__(self) -> str:
return f"Server API response error #{self.code}: {self.message}"
def __repr__(self) -> str:
return f"{type(self).__name__}('{self}')"
The MIT License (MIT)
Copyright (c) 2024 Centrifugal Labs LTD
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
[tool.poetry]
name = "cent"
version = "5.0.0b1"
description = "Python library to communicate with Centrifugo v5 server HTTP API"
authors = ["Alexandr Emelin", "Katant Savelev"]
license = "MIT"
readme = 'README.md'
classifiers = [
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: Apache Software License",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Software Development",
"Topic :: System :: Networking",
"Topic :: Terminals",
"Topic :: Text Processing",
"Topic :: Utilities",
]
[tool.poetry.dependencies]
python = "^3.9"
aiohttp = "^3"
pydantic = "^2"
requests = "^2"
types-requests = "^2"
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.6.0"
ruff = "^0.1.15"
mypy = "^1.8.0"
pytest = "^8"
pytest-benchmark = "^4.0.0"
pytest-asyncio = "^0.23.5"
[tool.ruff]
preview = true
line-length = 99
select = [
"PL", # pylint
"F", # pyflakes
"E", # pycodestyle errors
"W", # pycodestyle warnings
"C90", # mccabe
"N", # pep8-naming
"YTT", # flake8-2020
"S", # flake8-bandit
"B", # flake8-bugbear
"A", # flake8-builtins
"C40", # flake8-comprehensions
"T10", # flake8-debugger
"EXE", # flake8-executable
"ICN", # flake8-import-conventions
"G", # flake8-logging-format
"PIE", # flake8-pie
"T20", # flake8-print
"PT", # flake8-pytest-style
"SIM", # flake8-simplify
"TID", # flake8-tidy-imports
"TCH", # flake8-type-checking
"ARG", # flake8-unused-arguments
"PGH", # pygrep-hooks
"RSE", # flake8-raise
"RUF", # ruff
]
ignore = [
"PLR0913", # too-many-arguments
"PGH003", # use specific rule code when ignore
"T201",
"PLR0917",
"PLR0904", # Centrifugo has many API methods
]
[tool.ruff.per-file-ignores]
"tests/*" = ["S101", "PT012"]
[tool.mypy]
strict = true
python_version = "3.9"
show_error_codes = true
show_error_context = true
pretty = true
ignore_missing_imports = false
warn_unused_configs = true
disallow_subclassing_any = true
disallow_any_generics = true
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_return_any = true
follow_imports_for_stubs = true
namespace_packages = true
show_absolute_path = true
plugins = ["pydantic.mypy"]
[tool.pydantic-mypy]
warn_required_dynamic_aliases = true
[[tool.mypy.overrides]]
module = [
"pytest_benchmark.*"
]
ignore_missing_imports = true
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
+201
-2

@@ -1,2 +0,201 @@

# coding: utf-8
from .core import Client, CentException, RequestException, ResponseError, ClientNotEmpty
from .client import (
Client,
AsyncClient,
)
from cent.dto import (
CentResult,
CentRequest,
Response,
BatchRequest,
BatchResult,
BroadcastRequest,
PublishRequest,
SubscribeRequest,
UnsubscribeRequest,
PresenceRequest,
PresenceStatsRequest,
HistoryRequest,
HistoryRemoveRequest,
RefreshRequest,
ChannelsRequest,
DisconnectRequest,
InfoRequest,
PublishResult,
BroadcastResult,
SubscribeResult,
UnsubscribeResult,
PresenceResult,
PresenceStatsResult,
HistoryResult,
HistoryRemoveResult,
RefreshResult,
ChannelsResult,
DisconnectResult,
InfoResult,
StreamPosition,
ChannelOptionsOverride,
Disconnect,
BoolValue,
ProcessStats,
Node,
Publication,
ClientInfo,
DeviceRegisterRequest,
DeviceRegisterResult,
DeviceUpdateRequest,
DeviceUpdateResult,
DeviceListRequest,
DeviceListResult,
DeviceRemoveRequest,
DeviceRemoveResult,
DeviceTopicListRequest,
DeviceTopicListResult,
UserTopicListRequest,
UserTopicListResult,
SendPushNotificationRequest,
SendPushNotificationResult,
PushNotification,
FcmPushNotification,
HmsPushNotification,
ApnsPushNotification,
Device,
DeviceFilter,
DeviceTopicFilter,
DeviceUserUpdate,
DeviceMetaUpdate,
DeviceTopicsUpdate,
DeviceTopicUpdateRequest,
DeviceTopicUpdateResult,
UpdatePushStatusRequest,
UpdatePushStatusResult,
CancelPushRequest,
CancelPushResult,
UpdateUserStatusRequest,
UpdateUserStatusResult,
GetUserStatusRequest,
GetUserStatusResult,
UserStatus,
DeleteUserStatusRequest,
DeleteUserStatusResult,
BlockUserRequest,
BlockUserResult,
UnblockUserRequest,
UnblockUserResult,
RevokeTokenRequest,
RevokeTokenResult,
InvalidateUserTokensRequest,
InvalidateUserTokensResult,
ConnectionsRequest,
ConnectionsResult,
ConnectionState,
ConnectionTokenInfo,
SubscriptionTokenInfo,
ChannelContext,
)
from cent.exceptions import (
CentError,
CentNetworkError,
CentTransportError,
CentUnauthorizedError,
CentDecodeError,
CentApiResponseError,
)
__all__ = (
"ApnsPushNotification",
"AsyncClient",
"BatchRequest",
"BatchResult",
"BlockUserRequest",
"BlockUserResult",
"BoolValue",
"BroadcastRequest",
"BroadcastResult",
"CancelPushRequest",
"CancelPushResult",
"CentApiResponseError",
"CentDecodeError",
"CentError",
"CentNetworkError",
"CentRequest",
"CentResult",
"CentTransportError",
"CentUnauthorizedError",
"ChannelContext",
"ChannelOptionsOverride",
"ChannelsRequest",
"ChannelsResult",
"Client",
"ClientInfo",
"ConnectionState",
"ConnectionTokenInfo",
"ConnectionsRequest",
"ConnectionsResult",
"DeleteUserStatusRequest",
"DeleteUserStatusResult",
"Device",
"DeviceFilter",
"DeviceListRequest",
"DeviceListResult",
"DeviceMetaUpdate",
"DeviceRegisterRequest",
"DeviceRegisterResult",
"DeviceRemoveRequest",
"DeviceRemoveResult",
"DeviceTopicFilter",
"DeviceTopicListRequest",
"DeviceTopicListResult",
"DeviceTopicUpdateRequest",
"DeviceTopicUpdateResult",
"DeviceTopicsUpdate",
"DeviceUpdateRequest",
"DeviceUpdateResult",
"DeviceUserUpdate",
"Disconnect",
"DisconnectRequest",
"DisconnectResult",
"FcmPushNotification",
"GetUserStatusRequest",
"GetUserStatusResult",
"HistoryRemoveRequest",
"HistoryRemoveResult",
"HistoryRequest",
"HistoryResult",
"HmsPushNotification",
"InfoRequest",
"InfoResult",
"InvalidateUserTokensRequest",
"InvalidateUserTokensResult",
"Node",
"PresenceRequest",
"PresenceResult",
"PresenceStatsRequest",
"PresenceStatsResult",
"ProcessStats",
"Publication",
"PublishRequest",
"PublishResult",
"PushNotification",
"RefreshRequest",
"RefreshResult",
"Response",
"RevokeTokenRequest",
"RevokeTokenResult",
"SendPushNotificationRequest",
"SendPushNotificationResult",
"StreamPosition",
"SubscribeRequest",
"SubscribeResult",
"SubscriptionTokenInfo",
"UnblockUserRequest",
"UnblockUserResult",
"UnsubscribeRequest",
"UnsubscribeResult",
"UpdatePushStatusRequest",
"UpdatePushStatusResult",
"UpdateUserStatusRequest",
"UpdateUserStatusResult",
"UserStatus",
"UserTopicListRequest",
"UserTopicListResult",
)
+252
-17

@@ -1,21 +0,9 @@

Metadata-Version: 1.1
Metadata-Version: 2.1
Name: cent
Version: 4.1.0
Summary: Python library to communicate with Centrifugo v3 HTTP API
Home-page: https://github.com/centrifugal/cent
Version: 5.0.0b1
Summary: Python library to communicate with Centrifugo v5 server HTTP API
License: MIT
Author: Alexandr Emelin
Author-email: frvzmb@gmail.com
License: MIT
Download-URL: https://github.com/centrifugal/cent
Description: Python library to communicate with Centrifugo v3 HTTP API
Platform: UNKNOWN
Requires-Python: >=3.9,<4.0
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Environment :: Console

@@ -25,2 +13,9 @@ Classifier: Intended Audience :: Developers

Classifier: License :: OSI Approved :: Apache Software License
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Internet :: WWW/HTTP

@@ -32,1 +27,241 @@ Classifier: Topic :: Software Development

Classifier: Topic :: Utilities
Requires-Dist: aiohttp (>=3,<4)
Requires-Dist: pydantic (>=2,<3)
Requires-Dist: requests (>=2,<3)
Requires-Dist: types-requests (>=2,<3)
Description-Content-Type: text/markdown
Python SDK to communicate with Centrifugo v5 HTTP API. Python >= 3.9 supported.
To install run:
```bash
pip install cent
```
## Centrifugo compatibility
* **Cent v5 and higher works only with Centrifugo v5**.
* If you need to work with Centrifugo v3, v4 => use Cent v4
* If you need to work with Centrifugo v2 => use Cent v3
## Usage
First of all, see the description of Centrifugo [server API](https://centrifugal.dev/docs/server/server_api) in the documentation. This library also supports API extensions provided by Centrifugo PRO. In general, refer to [api.proto](https://github.com/centrifugal/centrifugo/blob/master/internal/apiproto/api.proto) Protobuf schema file as a source of truth about all available Centrifugo server APIs. Don't forget that Centrifugo supports both HTTP and GRPC API – so you can switch to GRPC by using `api.proto` file to generate stubs for communication.
This library contains `Client` and `AsyncClient` to work with Centrifugo HTTP server API. Both clients have the same methods to work with Centrifugo API and raise the same top-level exceptions.
## Sync HTTP client
```python
from cent import Client
```
Required init arguments:
* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000/api`
* `api_key` (`str`) - Centrifugo HTTP API key for auth
Optional arguments:
* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds.
* `session` (`requests.Session`) - custom `requests` session to use.
Example:
```python
from cent import Client, PublishRequest
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.publish(request)
print(result)
```
## Async HTTP client
```python
from cent import AsyncClient
```
Required init arguments:
* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000`
* `api_key` (`str`) - Centrifugo HTTP API key for auth
Optional arguments:
* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds.
* `session` (`aiohttp.ClientSession`) - custom `aiohttp` session to use.
Example:
```python
import asyncio
from cent import AsyncClient, PublishRequest
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
async def main():
client = AsyncClient(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = await client.publish(request)
print(result)
if __name__ == "__main__":
asyncio.run(main())
```
## Handling errors
This library raises exceptions if sth goes wrong. All exceptions are subclasses of `cent.CentError`.
* `CentError` - base class for all exceptions
* `CentNetworkError` - raised in case of network related errors (connection refused)
* `CentTransportError` - raised in case of transport related errors (HTTP status code is not 2xx)
* `CentTimeoutError` - raised in case of timeout
* `CentUnauthorizedError` - raised in case of unauthorized access (signal of invalid API key)
* `CentDecodeError` - raised in case of server response decoding error
* `CentApiResponseError` - raised in case of API response error (i.e. error returned by Centrifugo itself, you can inspect code and message returned by Centrifugo in this case)
Note, that `BroadcastRequest` and `BatchRequest` are quite special – since they contain multiple commands in one request, handling `CentApiResponseError` is still required, but not enough – you also need to manually iterate over the results to check for individual errors. For example, one publish command can fail while another one can succeed. For example:
```python
from cent import *
c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
# ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
# Response[PublishResult](error=None, result=PublishResult(offset=8, epoch='nUrf'))
# ]
# )
```
I.e. `cent` library does not raise exceptions for individual errors in `BroadcastRequest` or `BatchRequest`, only for top-level response error, for example, sending empty list of channels in broadcast:
```
req = BroadcastRequest(channels=[], data={})
c.broadcast(req)
Traceback (most recent call last):
...
raise CentApiResponseError(
cent.exceptions.CentApiResponseError: Server API response error #107: bad request
```
So this all adds some complexity, but that's the trade-off for the performance and efficiency of these two methods. You can always write some convenient wrappers around `cent` library to handle errors in a way that suits your application.
## Using for async consumers
You can use this library to constructs events for Centrifugo [async consumers](https://centrifugal.dev/docs/server/consumers). For example, to get proper method and payload for async publish:
```python
from cent import PublishRequest
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
method = request.api_method
payload = request.api_payload
# use method and payload to construct async consumer event.
```
## Using Broadcast and Batch
To demonstrate the benefits of using `BroadcastRequest` and `BatchRequest` let's compare approaches. Let's say at some point in your app you need to publish the same message into 10k different channels. Let's compare sequential publish, batch publish and broadcast publish. Here is the code to do the comparison:
```python
from cent import *
from time import time
def main():
publish_requests = []
channels = []
for i in range(10000):
channel = f"test_{i}"
publish_requests.append(PublishRequest(channel=channel, data={"msg": "hello"}))
channels.append(channel)
batch_request = BatchRequest(requests=publish_requests)
broadcast_request = BroadcastRequest(channels=channels, data={"msg": "hello"})
client = Client("http://localhost:8000/api", "api_key")
start = time()
for request in publish_requests:
client.publish(request)
print("sequential", time() - start)
start = time()
client.batch(batch_request)
print("batch", time() - start)
start = time()
client.broadcast(broadcast_request)
print("broadcast", time() - start)
if __name__ == "__main__":
main()
```
On local machine, the output may look like this:
```
sequential 5.731332778930664
batch 0.12313580513000488
broadcast 0.06050515174865723
```
So `BatchRequest` is much faster than sequential requests in this case, and `BroadcastRequest` is the fastest - publication to 10k Centrifugo channels took only 60ms. Because all the work is done in one network round-trip. In reality the difference will be even more significant because of network latency.
## For contributors
### Tests and benchmarks
Prerequisites – start Centrifugo server locally:
```bash
CENTRIFUGO_API_KEY=api_key CENTRIFUGO_HISTORY_TTL=300s CENTRIFUGO_HISTORY_SIZE=100 \
CENTRIFUGO_PRESENCE=true CENTRIFUGO_GRPC_API=true ./centrifugo
```
And install dependencies:
```bash
make dev
```
Then to run tests, run:
```bash
make test
```
To run benchmarks, run:
```bash
make bench
```
## Migrate to Cent v5
Cent v5 contains the following notable changes compared to Cent v4:
* Client constructor slightly changed, refer to the examples above.
* To call desired API import and construct a request object (inherited from Pydantic `BaseModel`) and pass it to `send` method of the client.
* Base exception class is now `CentError` instead of `CentException`, exceptions SDK raises were refactored.
* To send multiple commands in one HTTP request SDK provides `batch` method.
+181
-75

@@ -1,6 +0,3 @@

CENT
====
Python SDK to communicate with Centrifugo v5 HTTP API. Python >= 3.9 supported.
Python tools to communicate with Centrifugo v3 HTTP API. Python >= 3.3 supported.
To install run:

@@ -12,117 +9,226 @@

### Centrifugo compatibility
## Centrifugo compatibility
**Cent v4.0.0 and higher works only with Centrifugo v3**.
* **Cent v5 and higher works only with Centrifugo v5**.
* If you need to work with Centrifugo v3, v4 => use Cent v4
* If you need to work with Centrifugo v2 => use Cent v3
If you need to work with Centrifugo v2 then use Cent v3
## Usage
### High-level library API
First of all, see the description of Centrifugo [server API](https://centrifugal.dev/docs/server/server_api) in the documentation. This library also supports API extensions provided by Centrifugo PRO. In general, refer to [api.proto](https://github.com/centrifugal/centrifugo/blob/master/internal/apiproto/api.proto) Protobuf schema file as a source of truth about all available Centrifugo server APIs. Don't forget that Centrifugo supports both HTTP and GRPC API – so you can switch to GRPC by using `api.proto` file to generate stubs for communication.
First see [available API methods in documentation](https://centrifugal.github.io/centrifugo/server/http_api/).
This library contains `Client` and `AsyncClient` to work with Centrifugo HTTP server API. Both clients have the same methods to work with Centrifugo API and raise the same top-level exceptions.
This library contains `Client` class to send messages to Centrifugo from your python-powered backend:
## Sync HTTP client
```python
from cent import Client
```
url = "http://localhost:8000/api"
api_key = "XXX"
Required init arguments:
# initialize client instance.
client = Client(url, api_key=api_key, timeout=1)
* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000/api`
* `api_key` (`str`) - Centrifugo HTTP API key for auth
# publish data into channel
channel = "public:chat"
data = {"input": "test"}
client.publish(channel, data)
Optional arguments:
# other available methods
client.unsubscribe("user_id", "channel")
client.disconnect("user_id")
history = client.history("public:chat")
presence = client.presence("public:chat")
channels = client.channels()
info = client.info()
client.history_remove("public:chat")
* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds.
* `session` (`requests.Session`) - custom `requests` session to use.
Example:
```python
from cent import Client, PublishRequest
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.publish(request)
print(result)
```
`publish`, `disconnect`, `unsubscribe`, `history_remove` return `None` in case of success. Each of this commands can raise an instance of `CentException`.
## Async HTTP client
I.e.:
```python
from cent import AsyncClient
```
Required init arguments:
* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000`
* `api_key` (`str`) - Centrifugo HTTP API key for auth
Optional arguments:
* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds.
* `session` (`aiohttp.ClientSession`) - custom `aiohttp` session to use.
Example:
```python
from cent import Client, CentException
import asyncio
from cent import AsyncClient, PublishRequest
client = Client("http://localhost:8000/api", api_key="XXX", timeout=1)
try:
client.publish("public:chat", {"input": "test"})
except CentException:
# handle exception
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
async def main():
client = AsyncClient(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = await client.publish(request)
print(result)
if __name__ == "__main__":
asyncio.run(main())
```
Depending on problem occurred exceptions can be:
## Handling errors
* RequestException – HTTP request to Centrifugo failed
* ResponseError - Centrifugo returned some error on request
This library raises exceptions if sth goes wrong. All exceptions are subclasses of `cent.CentError`.
Both exceptions inherited from `CentException`.
* `CentError` - base class for all exceptions
* `CentNetworkError` - raised in case of network related errors (connection refused)
* `CentTransportError` - raised in case of transport related errors (HTTP status code is not 2xx)
* `CentTimeoutError` - raised in case of timeout
* `CentUnauthorizedError` - raised in case of unauthorized access (signal of invalid API key)
* `CentDecodeError` - raised in case of server response decoding error
* `CentApiResponseError` - raised in case of API response error (i.e. error returned by Centrifugo itself, you can inspect code and message returned by Centrifugo in this case)
### Low-level library API:
Note, that `BroadcastRequest` and `BatchRequest` are quite special – since they contain multiple commands in one request, handling `CentApiResponseError` is still required, but not enough – you also need to manually iterate over the results to check for individual errors. For example, one publish command can fail while another one can succeed. For example:
To send lots of commands in one request:
```python
from cent import *
c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
# ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
# Response[PublishResult](error=None, result=PublishResult(offset=8, epoch='nUrf'))
# ]
# )
```
I.e. `cent` library does not raise exceptions for individual errors in `BroadcastRequest` or `BatchRequest`, only for top-level response error, for example, sending empty list of channels in broadcast:
```
req = BroadcastRequest(channels=[], data={})
c.broadcast(req)
Traceback (most recent call last):
...
raise CentApiResponseError(
cent.exceptions.CentApiResponseError: Server API response error #107: bad request
```
So this all adds some complexity, but that's the trade-off for the performance and efficiency of these two methods. You can always write some convenient wrappers around `cent` library to handle errors in a way that suits your application.
## Using for async consumers
You can use this library to constructs events for Centrifugo [async consumers](https://centrifugal.dev/docs/server/consumers). For example, to get proper method and payload for async publish:
```python
from cent import Client, CentException
from cent import PublishRequest
client = Client("http://localhost:8000/api", api_key="XXX", timeout=1)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
method = request.api_method
payload = request.api_payload
# use method and payload to construct async consumer event.
```
params = {
"channel": "python",
"data": "hello world"
}
## Using Broadcast and Batch
client.add("publish", params)
To demonstrate the benefits of using `BroadcastRequest` and `BatchRequest` let's compare approaches. Let's say at some point in your app you need to publish the same message into 10k different channels. Let's compare sequential publish, batch publish and broadcast publish. Here is the code to do the comparison:
try:
result = client.send()
except CentException:
# handle exception
else:
print(result)
```python
from cent import *
from time import time
def main():
publish_requests = []
channels = []
for i in range(10000):
channel = f"test_{i}"
publish_requests.append(PublishRequest(channel=channel, data={"msg": "hello"}))
channels.append(channel)
batch_request = BatchRequest(requests=publish_requests)
broadcast_request = BroadcastRequest(channels=channels, data={"msg": "hello"})
client = Client("http://localhost:8000/api", "api_key")
start = time()
for request in publish_requests:
client.publish(request)
print("sequential", time() - start)
start = time()
client.batch(batch_request)
print("batch", time() - start)
start = time()
client.broadcast(broadcast_request)
print("broadcast", time() - start)
if __name__ == "__main__":
main()
```
You can use `add` method to add several messages which will be sent.
On local machine, the output may look like this:
You'll get something like this in response:
```
sequential 5.731332778930664
batch 0.12313580513000488
broadcast 0.06050515174865723
```
So `BatchRequest` is much faster than sequential requests in this case, and `BroadcastRequest` is the fastest - publication to 10k Centrifugo channels took only 60ms. Because all the work is done in one network round-trip. In reality the difference will be even more significant because of network latency.
## For contributors
### Tests and benchmarks
Prerequisites – start Centrifugo server locally:
```bash
[{}]
CENTRIFUGO_API_KEY=api_key CENTRIFUGO_HISTORY_TTL=300s CENTRIFUGO_HISTORY_SIZE=100 \
CENTRIFUGO_PRESENCE=true CENTRIFUGO_GRPC_API=true ./centrifugo
```
I.e. list of single response to each command sent. So you need to inspect response on errors (if any) yourself.
And install dependencies:
### Client initialization arguments
```bash
make dev
```
Required:
Then to run tests, run:
* address - Centrifugo HTTP API endpoint address
```bash
make test
```
Optional:
To run benchmarks, run:
* `api_key` - HTTP API key of Centrifugo
* `timeout` (default: `1`) - timeout for HTTP requests to Centrifugo
* `json_encoder` (default: `None`) - set custom JSON encoder
* `send_func` (default: `None`) - set custom send function
* `verify` (default: `True`) - when set to `False` no certificate check will be done during requests.
```bash
make bench
```
## For maintainer
## Migrate to Cent v5
To release:
Cent v5 contains the following notable changes compared to Cent v4:
1. Bump version in `setup.py`
1. Changelog, push and create new tag
1. `pip install twine`
1. `pip install wheel`
1. `python setup.py sdist bdist_wheel`
1. `twine check dist/*`
1. `twine upload dist/*`
* Client constructor slightly changed, refer to the examples above.
* To call desired API import and construct a request object (inherited from Pydantic `BaseModel`) and pass it to `send` method of the client.
* Base exception class is now `CentError` instead of `CentException`, exceptions SDK raises were refactored.
* To send multiple commands in one HTTP request SDK provides `batch` method.
[console_scripts]
cent = cent.console:run
Metadata-Version: 1.1
Name: cent
Version: 4.1.0
Summary: Python library to communicate with Centrifugo v3 HTTP API
Home-page: https://github.com/centrifugal/cent
Author: Alexandr Emelin
Author-email: frvzmb@gmail.com
License: MIT
Download-URL: https://github.com/centrifugal/cent
Description: Python library to communicate with Centrifugo v3 HTTP API
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Environment :: Console
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: Software Development
Classifier: Topic :: System :: Networking
Classifier: Topic :: Terminals
Classifier: Topic :: Text Processing
Classifier: Topic :: Utilities
README.md
setup.py
cent/__init__.py
cent/core.py
cent.egg-info/PKG-INFO
cent.egg-info/SOURCES.txt
cent.egg-info/dependency_links.txt
cent.egg-info/entry_points.txt
cent.egg-info/requires.txt
cent.egg-info/top_level.txt
# coding: utf-8
import urllib.parse as urlparse
import sys
import json
import requests
def to_bytes(s):
return s.encode("latin-1")
class CentException(Exception):
"""
Wrapper for all exceptions coming from this library.
"""
pass
class RequestException(CentException):
"""
RequestException means that request to Centrifugo API failed in some way.
This is just a wrapper over RequestException from requests library.
"""
pass
class ClientNotEmpty(CentException):
"""
ClientNotEmpty raised when attempting to call single method but internal
client command buffer is not empty.
"""
pass
class ResponseError(CentException):
"""
Raised when response from Centrifugo contains any error as result of API
command execution.
"""
pass
class Client(object):
"""
Core class to communicate with Centrifugo.
"""
def __init__(self, address, api_key="", timeout=1,
json_encoder=None, verify=True,
session=None, **kwargs):
"""
:param address: Centrifugo address
:param api_key: Centrifugo API key
:param timeout: timeout for HTTP requests to Centrifugo
:param json_encoder: custom JSON encoder
:param verify: boolean flag, when set to False no certificate check will be done during requests.
:param session: custom requests.Session instance
"""
self.address = address
self.api_key = api_key
self.timeout = timeout
self.json_encoder = json_encoder
self.verify = verify
self.session = session or requests.Session()
self.kwargs = kwargs
self._messages = []
def add(self, method, params):
data = {
"method": method,
"params": params
}
self._messages.append(data)
def send(self, method=None, params=None):
if method and params is not None:
self.add(method, params)
messages = self._messages[:]
self._messages = []
data = to_bytes(
"\n".join([json.dumps(x, cls=self.json_encoder) for x in messages]))
response = self._send(self.address, data)
return [json.loads(x) for x in response.split("\n") if x]
def _send(self, url, data):
"""
Send a request to a remote web server using HTTP POST.
"""
headers = {
'Content-type': 'application/json'
}
if self.api_key:
headers['Authorization'] = 'apikey ' + self.api_key
try:
resp = self.session.post(
url, data=data, headers=headers, timeout=self.timeout, verify=self.verify)
except requests.RequestException as err:
raise RequestException(err)
if resp.status_code != 200:
raise RequestException("wrong status code: %d" % resp.status_code)
return resp.content.decode('utf-8')
def reset(self):
self._messages = []
@staticmethod
def get_publish_params(channel, data, skip_history=False):
params = {
"channel": channel,
"data": data,
"skip_history": skip_history,
}
return params
@staticmethod
def get_broadcast_params(channels, data, skip_history=False):
params = {
"channels": channels,
"data": data,
"skip_history": skip_history,
}
return params
@staticmethod
def get_subscribe_params(user, channel, client=None):
params = {
"user": user,
"channel": channel
}
if client:
params["client"] = client
return params
@staticmethod
def get_unsubscribe_params(user, channel, client=None):
params = {
"user": user,
"channel": channel
}
if client:
params["client"] = client
return params
@staticmethod
def get_disconnect_params(user, client=None):
params = {
"user": user
}
if client:
params["client"] = client
return params
@staticmethod
def get_presence_params(channel):
return {
"channel": channel
}
@staticmethod
def get_presence_stats_params(channel):
return {
"channel": channel
}
@staticmethod
def get_history_params(channel, limit=0, since=None, reverse=False):
params = {
"channel": channel,
"limit": limit,
"reverse": reverse,
}
if since:
params["since"] = {
"offset": since["offset"],
"epoch": since["epoch"]
}
return params
@staticmethod
def get_history_remove_params(channel):
return {
"channel": channel
}
@staticmethod
def get_channels_params(pattern=""):
return {
"pattern": pattern
}
@staticmethod
def get_info_params():
return {}
def _check_empty(self):
if self._messages:
raise ClientNotEmpty(
"client command buffer not empty, send commands or reset client")
def _send_one(self):
res = self.send()
data = res[0]
if "error" in data and data["error"]:
raise ResponseError(data["error"])
return data.get("result")
def publish(self, channel, data, skip_history=False):
self._check_empty()
self.add("publish", self.get_publish_params(
channel, data, skip_history=skip_history))
result = self._send_one()
return result
def broadcast(self, channels, data, skip_history=False):
self._check_empty()
self.add("broadcast", self.get_broadcast_params(
channels, data, skip_history=skip_history))
result = self._send_one()
return result
def subscribe(self, user, channel, client=None):
self._check_empty()
self.add("subscribe", self.get_subscribe_params(
user, channel, client=client))
self._send_one()
return
def unsubscribe(self, user, channel, client=None):
self._check_empty()
self.add("unsubscribe", self.get_unsubscribe_params(
user, channel, client=client))
self._send_one()
return
def disconnect(self, user, client=None):
self._check_empty()
self.add("disconnect", self.get_disconnect_params(user, client=client))
self._send_one()
return
def presence(self, channel):
self._check_empty()
self.add("presence", self.get_presence_params(channel))
result = self._send_one()
return result["presence"]
def presence_stats(self, channel):
self._check_empty()
self.add("presence_stats", self.get_presence_stats_params(channel))
result = self._send_one()
return {
"num_clients": result["num_clients"],
"num_users": result["num_users"],
}
def history(self, channel, limit=0, since=None, reverse=False):
self._check_empty()
self.add("history", self.get_history_params(
channel, limit=limit, since=since, reverse=reverse))
result = self._send_one()
return {
"publications": result.get("publications", []),
"offset": result.get("offset", 0),
"epoch": result.get("epoch", ""),
}
def history_remove(self, channel):
self._check_empty()
self.add("history_remove", self.get_history_remove_params(channel))
self._send_one()
return
def channels(self, pattern=""):
self._check_empty()
self.add("channels", params=self.get_channels_params(pattern=pattern))
result = self._send_one()
return result["channels"]
def info(self):
self._check_empty()
self.add("info", self.get_info_params())
result = self._send_one()
return result
[egg_info]
tag_build =
tag_date = 0
import os
import sys
from setuptools import setup
if sys.argv[-1] == 'test':
status = os.system('python tests/tests.py')
sys.exit(1 if status > 127 else status)
requirements = ['requests']
def long_description():
return "Python library to communicate with Centrifugo v3 HTTP API"
setup(
name='cent',
version='4.1.0',
description="Python library to communicate with Centrifugo v3 HTTP API",
long_description=long_description(),
url='https://github.com/centrifugal/cent',
download_url='https://github.com/centrifugal/cent',
author="Alexandr Emelin",
author_email='frvzmb@gmail.com',
license='MIT',
packages=['cent'],
entry_points={
'console_scripts': [
'cent = cent.console:run',
],
},
install_requires=requirements,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Environment :: Console',
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: Apache Software License',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development',
'Topic :: System :: Networking',
'Topic :: Terminals',
'Topic :: Text Processing',
'Topic :: Utilities'
]
)