@cap-js-community/websocket
Advanced tools
Comparing version 0.1.2 to 0.2.0
@@ -8,6 +8,13 @@ # Changelog | ||
## Version 0.1.2 - 08.01.24 | ||
## Version 0.2.0 - 2024-01-11 | ||
### Fixed | ||
- Broadcast service events without connected sockets via Redis | ||
- Document base websocket server class | ||
## Version 0.1.2 - 2024-01-08 | ||
### Fixed | ||
- Set websocket default kind to `ws` | ||
@@ -19,3 +26,3 @@ - Mock request response more complete | ||
## Version 0.1.1 - 22.12.23 | ||
## Version 0.1.1 - 2023-12-22 | ||
@@ -27,3 +34,3 @@ ### Added | ||
## Version 0.1.0 - 21.12.23 | ||
## Version 0.1.0 - 2023-12-21 | ||
@@ -30,0 +37,0 @@ ### Added |
{ | ||
"name": "@cap-js-community/websocket", | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"description": "WebSocket adapter for CDS", | ||
@@ -55,3 +55,3 @@ "homepage": "https://cap.cloud.sap/", | ||
"@sap/cds": "^7.5.2", | ||
"@sap/cds-dk": "^7.5.0", | ||
"@sap/cds-dk": "^7.5.1", | ||
"@sap/xssec": "3.6.1", | ||
@@ -63,3 +63,3 @@ "@socket.io/redis-adapter": "^8.2.1", | ||
"eslint-config-prettier": "^9.0.0", | ||
"eslint-plugin-jest": "^27.6.1", | ||
"eslint-plugin-jest": "^27.6.2", | ||
"jest": "^29.7.0", | ||
@@ -66,0 +66,0 @@ "passport": "0.7.0", |
@@ -9,5 +9,5 @@ "use strict"; | ||
class RedisAdapter { | ||
constructor(server, channel, options) { | ||
constructor(server, prefix, options) { | ||
this.server = server; | ||
this.channel = channel; | ||
this.prefix = prefix; | ||
this.options = options; | ||
@@ -17,6 +17,6 @@ } | ||
async setup() { | ||
this.client = await redis.createMainClientAndConnect(); | ||
this.client = await redis.createPrimaryClientAndConnect(); | ||
} | ||
async on() { | ||
async on(service) { | ||
if (!this.client) { | ||
@@ -26,6 +26,10 @@ return; | ||
try { | ||
await this.client.subscribe(this.channel); | ||
this.client.on("message", (channel, message) => { | ||
if (channel === this.channel) { | ||
this.server.wss.broadcastAll(message); | ||
const channel = this.prefix + service; | ||
await this.client.subscribe(channel, async (message, messageChannel) => { | ||
try { | ||
if (messageChannel === channel) { | ||
await this.server.broadcast(service, message); | ||
} | ||
} catch (err) { | ||
LOG?.error(err); | ||
} | ||
@@ -38,3 +42,3 @@ }); | ||
async emit(message) { | ||
async emit(service, message) { | ||
if (!this.client) { | ||
@@ -44,3 +48,4 @@ return; | ||
try { | ||
await this.client.publish(this.channel, message); | ||
const channel = this.prefix + service; | ||
await this.client.publish(channel, message); | ||
} catch (err) { | ||
@@ -47,0 +52,0 @@ LOG?.error(err); |
@@ -111,2 +111,3 @@ "use strict"; | ||
try { | ||
bindServiceEvents(socketServer, servicePath, service); | ||
socketServer.service(servicePath, (socket) => { | ||
@@ -118,3 +119,2 @@ try { | ||
bindServiceEntities(socket, service); | ||
bindServiceEvents(socket, service); | ||
emitConnect(socket, service); | ||
@@ -133,5 +133,12 @@ } catch (err) { | ||
async function emitConnect(socket, service) { | ||
if (service.operations(WebSocketAction.Connect).length) { | ||
await processEvent(socket, service, undefined, WebSocketAction.Connect); | ||
function bindServiceEvents(socketServer, servicePath, service) { | ||
for (const event of service.events()) { | ||
service.on(event, async (req) => { | ||
const localEventName = serviceLocalName(service, event.name); | ||
try { | ||
await socketServer.broadcast(servicePath, localEventName, req.data, null, true); | ||
} catch (err) { | ||
LOG?.error(err); | ||
} | ||
}); | ||
} | ||
@@ -196,19 +203,8 @@ } | ||
function bindServiceEvents(socket, service) { | ||
for (const event of service.events()) { | ||
service.on(event, async (req) => { | ||
const localEventName = serviceLocalName(service, event.name); | ||
await processEmit(socket, service, localEventName, req.data); | ||
}); | ||
async function emitConnect(socket, service) { | ||
if (service.operations(WebSocketAction.Connect).length) { | ||
await processEvent(socket, service, undefined, WebSocketAction.Connect); | ||
} | ||
} | ||
async function processEmit(socket, service, event, data) { | ||
try { | ||
socket.emit(event, data); | ||
} catch (err) { | ||
LOG?.error(err); | ||
} | ||
} | ||
async function processEvent(socket, service, entity, event, data, callback) { | ||
@@ -215,0 +211,0 @@ try { |
@@ -14,8 +14,8 @@ "use strict"; | ||
let mainClientPromise; | ||
let secondClientPromise; | ||
let primaryClientPromise; | ||
let secondaryClientPromise; | ||
const createMainClientAndConnect = () => { | ||
if (mainClientPromise) { | ||
return mainClientPromise; | ||
const createPrimaryClientAndConnect = () => { | ||
if (primaryClientPromise) { | ||
return primaryClientPromise; | ||
} | ||
@@ -25,12 +25,12 @@ | ||
LOG?.error("Error from redis client for pub/sub failed", err); | ||
mainClientPromise = null; | ||
setTimeout(createMainClientAndConnect, TIMEOUT); | ||
primaryClientPromise = null; | ||
setTimeout(createPrimaryClientAndConnect, TIMEOUT); | ||
}; | ||
mainClientPromise = _createClientAndConnect(errorHandlerCreateClient); | ||
return mainClientPromise; | ||
primaryClientPromise = _createClientAndConnect(errorHandlerCreateClient); | ||
return primaryClientPromise; | ||
}; | ||
const createSecondClientAndConnect = () => { | ||
if (secondClientPromise) { | ||
return secondClientPromise; | ||
const createSecondaryClientAndConnect = () => { | ||
if (secondaryClientPromise) { | ||
return secondaryClientPromise; | ||
} | ||
@@ -40,7 +40,7 @@ | ||
LOG?.error("Error from redis client for pub/sub failed", err); | ||
secondClientPromise = null; | ||
setTimeout(createSecondClientAndConnect, TIMEOUT); | ||
secondaryClientPromise = null; | ||
setTimeout(createSecondaryClientAndConnect, TIMEOUT); | ||
}; | ||
secondClientPromise = _createClientAndConnect(errorHandlerCreateClient); | ||
return secondClientPromise; | ||
secondaryClientPromise = _createClientAndConnect(errorHandlerCreateClient); | ||
return secondaryClientPromise; | ||
}; | ||
@@ -100,5 +100,11 @@ const _createClientBase = () => { | ||
const clearClients = () => { | ||
primaryClientPromise = null; | ||
secondaryClientPromise = null; | ||
}; | ||
module.exports = { | ||
createMainClientAndConnect, | ||
createSecondClientAndConnect, | ||
createPrimaryClientAndConnect, | ||
createSecondaryClientAndConnect, | ||
clearClients, | ||
}; |
@@ -7,3 +7,11 @@ "use strict"; | ||
/** | ||
* Base class for a websocket server | ||
*/ | ||
class SocketServer { | ||
/** | ||
* Constructor for websocket server | ||
* @param server HTTP server from express app | ||
* @param path Protocol path, e.g. '/ws' | ||
*/ | ||
constructor(server, path) { | ||
@@ -16,7 +24,17 @@ this.id = crypto.randomUUID(); | ||
/** | ||
* Setup websocket server with async operations | ||
* @returns {Promise<void>} Promise when setup is completed | ||
*/ | ||
async setup() {} | ||
/** | ||
* Connect a service to websocket | ||
* @param service service path, e.g. "/chat" | ||
* @param connected Callback function to be called on every websocket connection passing socket functions (i.e. ws.on("connection", connected)) | ||
*/ | ||
service(service, connected) { | ||
connected && | ||
connected({ | ||
service, | ||
socket: null, | ||
@@ -32,2 +50,17 @@ setup: () => {}, | ||
/** | ||
* Broadcast to all websocket clients | ||
* @param service service path, e.g. "/chat" | ||
* @param event Event name | ||
* @param data Data object | ||
* @param socket Broadcast client to be excluded | ||
* @param multiple Broadcast across multiple websocket servers | ||
* @returns {Promise<void>} Promise when broadcasting completed | ||
*/ | ||
async broadcast(service, event, data, socket, multiple) {} | ||
/** | ||
* Mock the HTTP response object and make available at request.res | ||
* @param request HTTP request | ||
*/ | ||
static mockResponse(request) { | ||
@@ -73,2 +106,6 @@ // Mock response (not available in websocket, CDS middlewares need it) | ||
/** | ||
* Apply the authorization cookie to authorization header for local authorization testing in mocked auth scenario | ||
* @param request HTTP request | ||
*/ | ||
static applyAuthCookie(request) { | ||
@@ -75,0 +112,0 @@ // Apply cookie to authorization header |
@@ -39,2 +39,3 @@ "use strict"; | ||
const serviceSocket = { | ||
service, | ||
socket, | ||
@@ -73,2 +74,11 @@ setup: () => { | ||
async broadcast(service, event, data, socket, multiple) { | ||
if (socket) { | ||
socket.broadcast.emit(event, data); | ||
} else { | ||
const io = this.io.of(service); | ||
io.emit(event, data); | ||
} | ||
} | ||
async _applyAdapter() { | ||
@@ -88,4 +98,4 @@ try { | ||
case "@socket.io/redis-adapter": | ||
client = await redis.createMainClientAndConnect(); | ||
subClient = await redis.createSecondClientAndConnect(); | ||
client = await redis.createPrimaryClientAndConnect(); | ||
subClient = await redis.createSecondaryClientAndConnect(); | ||
if (client && subClient) { | ||
@@ -96,3 +106,3 @@ adapter = adapterFactory.createAdapter(client, subClient, options); | ||
case "@socket.io/redis-streams-adapter": | ||
client = await redis.createMainClientAndConnect(); | ||
client = await redis.createPrimaryClientAndConnect(); | ||
if (client) { | ||
@@ -99,0 +109,0 @@ adapter = adapterFactory.createAdapter(client, options); |
@@ -8,2 +8,3 @@ "use strict"; | ||
const LOG = cds.log("websocket/ws"); | ||
const DEBUG = cds.debug("websocket"); | ||
@@ -14,19 +15,5 @@ class SocketWSServer extends SocketServer { | ||
this.wss = new WebSocket.Server({ server }); | ||
this.wss.broadcast = (message, socket) => { | ||
this.wss.clients.forEach((client) => { | ||
if (client !== socket && client.readyState === WebSocket.OPEN) { | ||
client.send(message); | ||
} | ||
}); | ||
}; | ||
this.wss.broadcastAll = (message) => { | ||
this.wss.clients.forEach((client) => { | ||
if (client.readyState === WebSocket.OPEN) { | ||
client.send(message); | ||
} | ||
}); | ||
}; | ||
this.adapter = null; | ||
cds.ws = this.wss; | ||
cds.wss = this.wss; | ||
this.adapter = null; | ||
} | ||
@@ -39,2 +26,3 @@ | ||
service(service, connected) { | ||
this.adapter?.on(service); | ||
this.wss.on("connection", async (ws, request) => { | ||
@@ -45,2 +33,6 @@ ws.request = request; | ||
} | ||
DEBUG?.("Connected"); | ||
ws.on("close", () => { | ||
DEBUG?.("Disconnected"); | ||
}); | ||
ws.on("error", (error) => { | ||
@@ -53,2 +45,3 @@ LOG?.error(error); | ||
connected({ | ||
service, | ||
socket: ws, | ||
@@ -69,3 +62,3 @@ setup: () => { | ||
ws.on("message", async (message) => { | ||
let payload; | ||
let payload = {}; | ||
try { | ||
@@ -78,6 +71,4 @@ payload = JSON.parse(message); | ||
if (payload?.event === event) { | ||
await this.adapter?.emit(service, message); | ||
await callback(payload.data); | ||
if (this.adapter) { | ||
await this.adapter.emit(message); | ||
} | ||
} | ||
@@ -98,9 +89,3 @@ } catch (err) { | ||
broadcast: (event, data) => { | ||
this.wss.broadcast( | ||
JSON.stringify({ | ||
event, | ||
data, | ||
}), | ||
ws, | ||
); | ||
this.broadcast(service, event, data, ws, true); | ||
}, | ||
@@ -118,2 +103,24 @@ disconnect() { | ||
async broadcast(service, event, data, socket, multiple) { | ||
const clients = []; | ||
this.wss.clients.forEach((client) => { | ||
if ( | ||
client.readyState === WebSocket.OPEN && | ||
client !== socket && | ||
client.request?.url === `${this.path}${service}` | ||
) { | ||
clients.push(client); | ||
} | ||
}); | ||
if (clients.length > 0 || multiple) { | ||
const message = !data ? event : JSON.stringify({ event, data }); | ||
clients.forEach((client) => { | ||
client.send(message); | ||
}); | ||
if (multiple) { | ||
await this.adapter?.emit(service, message); | ||
} | ||
} | ||
} | ||
async _applyAdapter() { | ||
@@ -127,7 +134,6 @@ try { | ||
} | ||
const channel = options?.key ?? "websocket"; | ||
const prefix = options?.key ?? "websocket"; | ||
this.adapterFactory = require(`../adapter/${adapterImpl}`); | ||
this.adapter = new this.adapterFactory(this, channel, options); | ||
await this.adapter.setup(); | ||
await this.adapter.on(); | ||
this.adapter = new this.adapterFactory(this, prefix, options); | ||
await this.adapter?.setup(); | ||
} | ||
@@ -134,0 +140,0 @@ } catch (err) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56640
849