
Security News
GitHub Actions Checkout Now Blocks Risky pull_request_target Checkouts
GitHub Actions checkout now blocks risky pull_request_target checkouts by default to help prevent pwn request supply chain attacks.
websocket-pro-client
Advanced tools
High-performance WebSocket client with auto-reconnect, heartbeat and priority messaging
高性能 WebSocket 客户端,专为现代 Web 应用设计,内置自动重连、心跳、消息优先级调度、连接池管理,以及可配置的消息 ACK 与序列号机制。
order.*)+ 自动重订阅)npm install websocket-pro-client
# 或
yarn add websocket-pro-client
import {
createWebSocketManager,
HeartbeatTimerMode,
} from "websocket-pro-client"
// 1. 创建全局 WebSocket 管理器(可配置重连、心跳、ACK 等)
const manager = createWebSocketManager({
maxReconnectAttempts: 5,
})
// 2. 建立连接(同一 url + protocols 只会创建一个底层连接)
const client = manager.connect("wss://api.example.com")
// 3. 监听消息
client.on("message", (data) => {
console.log("Received:", data)
})
// 4. 发送消息(不关心 ACK)
client.send({ type: "ping" })
createWebSocketManager(config?: Partial<WebSocketConfig>): IWebSocketManager
DEFAULT_CONFIG 深度合并。JsonSerializer / MsgPackSerializer
JsonSerializer: 默认使用 JSON.stringify/JSON.parse 的序列化器。MsgPackSerializer: MessagePack 示例(需要自行安装 @msgpack/msgpack 后替换实现)。IWebSocketManager 接口:
export interface IWebSocketManager {
connect(url: string, protocols?: string[]): IWebSocketClient
closeAll(code?: number, reason?: string): void
on(event: WebSocketEvent, listener: (data: any) => void): void
}
connect(url, protocols?)
IWebSocketClient 实例。url + protocols 会复用同一个底层连接。closeAll(code?, reason?)
on(event, listener)
open/message/close/error/reconnect/heartbeat/latency/overMaxReconnectAttempts),回调中会携带 { url, protocols, data }。示例:
import { WebSocketEvent } from "websocket-pro-client"
manager.on(WebSocketEvent.Error, ({ url, data }) => {
console.error("ws error:", url, data)
})
IWebSocketClient 接口:
export interface IWebSocketClient {
send(data: any, priority?: number): Promise<void>
sendWithAck(data: any, priority?: number): Promise<void>
getLastInboundSeq(): string | number | undefined
updateLastInboundSeq(seq: string | number): void
subscribe(topic: string, listener: (data: any) => void): () => void
unsubscribe(topic: string, listener?: (data: any) => void): void
getState(): WebSocketClientState
getStats(): WebSocketClientStats
resetStats(options?: ResetStatsOptions): void
close(code?: number, reason?: string): void
reconnect(): void
on(event: WebSocketEvent, listener: (data: any) => void): void
off(event: WebSocketEvent, listener: (data: any) => void): void
}
send(data, priority?)
sendWithAck(data, priority?)
ack 配置等待服务端 ACK。{ id, payload: { seq, payload: 原始data } }ackId 字段的消息(例如 { ackId: id }),表示已确认。ack.timeout 和 ack.maxRetries 控制)。getLastInboundSeq()
seq(需要开启 sequence 并提供 extractInboundSeq)。sinceSeq/afterSeq 参数传给你的 HTTP 拉取接口。updateLastInboundSeq(seq)
seq。seq 后,同步回 client,避免后续补拉仍使用旧值。getState()
WebSocketClientState 枚举):
WebSocketClientState.ConnectingWebSocketClientState.OpenWebSocketClientState.ReconnectingWebSocketClientState.ClosedWebSocketClientState.OverMaxReconnectAttemptsgetStats()
sentCount:发送消息总数receivedCount:接收消息总数errorCount:错误总数reconnectScheduledCount:触发重连调度总数ackTimeoutCount:ACK 最终超时次数reconnectAttempts:重连尝试次数pendingAcksCount:等待 ACK 的数量messageQueueLength:离线待发送队列长度subscribedTopicCount:订阅主题数量subscriptionListenerCount:订阅监听器总数lastInboundSeq:最近一次入站 seqsocketReadyState:底层 WebSocket readyStatelastHeartbeatLatency:最近一次心跳延迟(ms)lastErrorAt:最近一次错误时间戳(ms)lastCloseCode/lastCloseReason/lastCloseAt:最近一次关闭信息resetStats()
sentCount/errorCount/lastErrorAt 等)。resetCounters:是否重置计数指标(默认 true)resetLastEvents:是否重置最近事件字段(默认 true)type ResetStatsOptions = {
resetCounters?: boolean
resetLastEvents?: boolean
}
close(code?, reason?)
reconnect()
close() 属于主动关闭,不会自动重连;非主动断开(如服务端关闭/网络中断)会按重连策略自动重连。subscribe(topic, listener)
*:
order.* 匹配 order.created / order.updated 等任意后缀subscription.extractTopic 从入站消息提取 topic(默认读取 message.topic)并分发到对应 listener。subscription.buildSubscribeMessage,会在首次订阅 topic 时发送订阅报文。unsubscribe(topic, listener?)
listener 时仅移除该监听器;不传则移除该 topic 下所有监听器。subscription.buildUnsubscribeMessage,在该 topic 没有监听器后会发送取消订阅报文。subscribeOnce(topic, listener)
事件监听
import { WebSocketEvent } from "websocket-pro-client"
client.on(WebSocketEvent.Open, () => {
console.log("ws open")
})
client.on(WebSocketEvent.Message, (data) => {
console.log("message:", data)
})
client.on(WebSocketEvent.Close, (event) => {
console.log("closed:", event)
})
client.on(WebSocketEvent.Error, (err) => {
console.error("ws error:", err)
})
export interface WebSocketConfig {
// 重连相关
maxReconnectAttempts?: number
reconnectDelay?: number
reconnectExponent?: number
maxReconnectDelay?: number
// 任务调度 & 连接池
connectionPoolSize?: number
maxConcurrent?: number
defaultPriority?: number
// 序列化
enableCompression?: boolean
serializer?: Serializer
// 心跳
isNeedHeartbeat?: boolean
heartbeat?: HeartbeatConfig
// 消息 ACK
ack?: AckStrategy
// 消息序列号
sequence?: SequenceStrategy
// 主题订阅
subscription?: SubscriptionStrategy
}
createWebSocketManager会将配置与默认值做深度合并。例如仅传ack.timeout,ack.enabled/generateId/wrapOutbound/extractAckId等默认项仍会保留。
export type HeartbeatConfig = {
interval?: number // 心跳间隔(ms),默认 25000
timeout?: number // 心跳超时(ms),默认 45000
pingMessage?: any // 用于发送 PING 的消息(默认 "PING")
getPing?: () => any // 自定义生成 PING(优先级高于 pingMessage)
pongMessage?: any // 用于识别服务端 PONG 的消息(默认 "PONG")
isPong?: (raw: any, parsed: any) => boolean // 自定义识别 PONG(优先级高于 pongMessage)
timerMode?: "auto" | "main" | "worker" // 心跳计时器模式,默认 auto
onTimeout?: () => void // 心跳超时时的自定义回调
}
默认情况下,客户端会周期性发送心跳消息,并在超时时自动触发重连。
当页面被最小化/切到后台时,浏览器可能会对 setTimeout/setInterval 降频或合并触发,导致定时任务不再“准点”。本库的心跳实现会用:
setTimeout + 漂移修正:避免 setInterval 在后台堆积触发带来的状态错乱Date.now()):即使回调延迟,也不会把“应该超时”的连接误当成健康连接为了提升后台计时稳定性,你可以让心跳的计时器运行在 Web Worker 中(部分浏览器/环境可能不支持,库会自动回退到主线程计时器)。
auto(默认):优先使用 Worker,不可用则回退主线程main:强制主线程worker:强制 Worker(不可用仍会回退主线程,并输出 warn)默认情况下库会把 "PONG" 识别为心跳响应并调用 recordPong()。如果你的服务端返回的不是字符串 "PONG"(例如返回 JSON),可以通过下面两种方式配置:
pongMessage:简单场景,配置一个值即可(会同时与 raw/parsed 做严格相等判断)isPong(raw, parsed):复杂场景,自行判断是否为 PONG(优先级更高)示例(服务端返回 { type: 'pong' }):
const manager = createWebSocketManager({
heartbeat: {
getPing: () => ({ type: "ping" }),
isPong: (_raw, parsed) => parsed && parsed.type === "pong",
},
})
export type AckStrategy = {
enabled?: boolean
timeout?: number
maxRetries?: number
generateId?: () => string | number
wrapOutbound?: (id: string | number, data: any) => any
extractAckId?: (message: any) => string | number | null
}
默认实现(不配置时):
enabled: truetimeout: 5000maxRetries: 2generateId: 使用浏览器 window 上的自增计数。wrapOutbound: ({ id, payload })extractAckId: 从 message.ackId 中提取 ACK 对应的 ID。如需和现有服务端协议对齐,只需要重写
wrapOutbound和extractAckId即可。
后端规定:
{ msgId, body }{ type: 'ACK', msgId }对应配置:
const manager = createWebSocketManager({
ack: {
enabled: true,
wrapOutbound: (id, data) => ({ msgId: id, body: data }),
extractAckId: (msg) =>
msg && msg.type === "ACK" && msg.msgId != null ? msg.msgId : null,
},
})
export type SequenceStrategy = {
enabled?: boolean
generateSeq?: () => string | number
wrapOutbound?: (seq: string | number, data: any) => any
extractInboundSeq?: (message: any) => string | number | null
}
默认实现:
enabled: truegenerateSeq: 使用浏览器 window 上的自增计数。wrapOutbound: ({ seq, payload })extractInboundSeq: 从 message.seq 中提取序列号。库内部只负责“生成/包装/解析”序列号,不做强制的乱序丢弃;你可以在
message监听回调中结合seq做业务上的顺序控制。
export type SubscriptionStrategy = {
extractTopic?: (message: any) => string | null
buildSubscribeMessage?: (topic: string) => any
buildUnsubscribeMessage?: (topic: string) => any
autoResubscribe?: boolean
}
默认实现:
extractTopic: 读取 message.topic(字符串)autoResubscribe: truebuildSubscribeMessage/buildUnsubscribeMessage)示例:
const manager = createWebSocketManager({
subscription: {
buildSubscribeMessage: (topic) => ({ type: "SUBSCRIBE", topic }),
buildUnsubscribeMessage: (topic) => ({ type: "UNSUBSCRIBE", topic }),
autoResubscribe: true,
},
})
const client = manager.connect("wss://api.example.com")
const dispose = client.subscribe("order.updated", (msg) => {
console.log("order event:", msg)
})
// 也可以主动取消
dispose()
import { createWebSocketManager, WebSocketEvent } from "websocket-pro-client"
const manager = createWebSocketManager({
ack: {
enabled: true,
timeout: 3000,
maxRetries: 1,
},
})
const client = manager.connect("wss://api.example.com")
client.on(WebSocketEvent.Open, async () => {
// 发送一条不需要 ACK 的消息
await client.send({ type: "ping" })
// 发送一条需要 ACK 的消息
try {
await client.sendWithAck({ type: "update", payload: { id: 1 } })
console.log("update confirmed by server")
} catch (e) {
console.error("update failed (no ACK):", e)
}
})
client.on(WebSocketEvent.Message, (msg) => {
// msg 是反序列化后的对象,例如:
// { seq, payload: { ... } } 或根据你自定义的包装形态
console.log("inbound message:", msg)
})
浏览器在切到后台、网络波动等场景可能出现 WebSocket 断开/重连。推荐结合服务端的消息存储能力,提供一个补拉接口(例如 GET /messages?sinceSeq=xxx)。
import { WebSocketEvent, createWebSocketManager } from "websocket-pro-client"
const manager = createWebSocketManager({
heartbeat: {
timerMode: "auto",
},
})
const client = manager.connect("wss://api.example.com")
async function fetchMissedMessages(sinceSeq?: string | number) {
// 伪代码:替换为你的请求库
const res = await fetch(`/messages?sinceSeq=${sinceSeq ?? ""}`)
return res.json()
}
client.on(WebSocketEvent.Open, async () => {
const sinceSeq = client.getLastInboundSeq()
const missed = await fetchMissedMessages(sinceSeq)
console.log("missed messages:", missed)
})
# 启动测试
npm test
# 构建库
npm run build
# 运行 demo
npm run demo
git checkout -b dev/feature/fix-xxx)git commit -am 'feat/fix xxx')git push origin dev/feature/fix-xxx)MIT © 2023 BetaCatPro
FAQs
High-performance WebSocket client with auto-reconnect, heartbeat and priority messaging
The npm package websocket-pro-client receives a total of 17 weekly downloads. As such, websocket-pro-client popularity was classified as not popular.
We found that websocket-pro-client demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
GitHub Actions checkout now blocks risky pull_request_target checkouts by default to help prevent pwn request supply chain attacks.

Product
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.

Product
Socket MCP now lets AI assistants review org alerts, investigate threats using the Socket threat feed, and inspect package files in addition to dependency scoring.