Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/websocket

Package Overview
Dependencies
Maintainers
0
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/websocket - npm Package Compare versions

Comparing version 1.3.0 to 1.4.0

15

CHANGELOG.md

@@ -8,2 +8,17 @@ # Changelog

## Version 1.4.0 - 2024-11-04
### Changed
- Socket.IO implementation does not use server `path` option anymore, in alignment with kind `ws`
- Use `io("/ws/chat")` instead of `io("/chat", { path: "/ws" })`
### Fixed
- Support for http conform headers (`x-ws` and `x-websocket`)
- Revise error handling for websocket events
- Fix for operations without parameters
- Fix support for absolute service paths
- Update documentation
## Version 1.3.0 - 2024-10-07

@@ -10,0 +25,0 @@

26

package.json
{
"name": "@cap-js-community/websocket",
"version": "1.3.0",
"version": "1.4.0",
"description": "WebSocket adapter for CDS",

@@ -44,7 +44,7 @@ "homepage": "https://cap.cloud.sap/",

"dependencies": {
"@sap/xsenv": "^5.3.0",
"cookie": "^0.7.2",
"express": "^4.21.0",
"@sap/xsenv": "^5.4.0",
"cookie": "^1.0.1",
"express": "^4.21.1",
"redis": "^4.7.0",
"socket.io": "^4.8.0",
"socket.io": "^4.8.1",
"ws": "^8.18.0"

@@ -54,17 +54,17 @@ },

"@cap-js-community/websocket": "./",
"@cap-js/sqlite": "^1.7.3",
"@eslint/js": "^9.12.0",
"@sap/cds": "^8.3.0",
"@sap/cds-dk": "^8.3.0",
"@cap-js/sqlite": "^1.7.5",
"@eslint/js": "^9.14.0",
"@sap/cds": "^8.4.0",
"@sap/cds-dk": "^8.4.0",
"@socket.io/redis-adapter": "^8.3.0",
"@socket.io/redis-streams-adapter": "^0.2.2",
"eslint": "^9.12.0",
"eslint": "^9.14.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-jest": "^28.8.3",
"eslint-plugin-n": "^17.10.3",
"globals": "^15.10.0",
"eslint-plugin-n": "^17.12.0",
"globals": "^15.11.0",
"jest": "^29.7.0",
"passport": "^0.7.0",
"prettier": "^3.3.3",
"socket.io-client": "^4.8.0"
"socket.io-client": "^4.8.1"
},

@@ -71,0 +71,0 @@ "license": "Apache-2.0",

@@ -8,3 +8,3 @@ # @cap-js-community/websocket

### [WebSocket adapter for CDS](https://www.npmjs.com/package/@cap-js-community/websocket)
### [WebSocket Adapter for CDS](https://www.npmjs.com/package/@cap-js-community/websocket)

@@ -15,16 +15,74 @@ > Exposes a WebSocket protocol via WebSocket standard or Socket.IO for CDS services.

## Table of Contents
- [Getting Started](#getting-started)
- [Usage](#usage)
- [Server](#server)
- [Client](#client)
- [Documentation](#documentation)
- [Architecture Overview](#architecture-overview)
- [Protocol Annotations](#protocol-annotations)
- [WebSocket Server](#websocket-server)
- [WebSocket Implementation](#websocket-implementation)
- [WebSocket Service](#websocket-service)
- [WebSocket Event](#websocket-event)
- [Server Socket](#server-socket)
- [Service Facade](#service-facade)
- [Middlewares](#middlewares)
- [Tenant Isolation](#tenant-isolation)
- [Authentication & Authorization](#authentication--authorization)
- [Invocation Context](#invocation-context)
- [Transactional Safety](#transactional-safety)
- [CDS Persistent Outbox](#cds-persistent-outbox)
- [Client Determination](#client-determination)
- [Event Users](#event-users)
- [Event Contexts](#event-contexts)
- [Event Client Identifiers](#event-client-identifiers)
- [Event Emit Headers](#event-emit-headers)
- [Value Aggregation](#value-aggregation)
- [Format Headers](#format-headers)
- [Ignore Definitions](#ignore-definitions)
- [WebSocket Format](#websocket-format)
- [SAP Push Channel Protocol (PCP)](#sap-push-channel-protocol-pcp)
- [Cloud Events](#cloud-events)
- [Custom Format](#custom-format)
- [Generic Format](#generic-format)
- [Connect & Disconnect](#connect--disconnect)
- [Approuter](#approuter)
- [Operations](#operations)
- [Operation Results](#operation-results)
- [Unbound Operations](#unbound-operations)
- [Special Operations](#special-operations)
- [Bound Operations](#bound-operations)
- [CRUD Operations](#crud-operations)
- [Examples](#examples)
- [Todo (UI5)](#todo-ui5)
- [Chat (HTML)](#chat-html)
- [Unit-Tests](#unit-tests)
- [Adapters](#adapters)
- [WS Standard Adapters](#ws-standard-adapters)
- [Socket.IO Adapters](#socketio-adapters)
- [Deployment](#deployment)
- [Support, Feedback, Contributing](#support-feedback-contributing)
- [Code of Conduct](#code-of-conduct)
- [Licensing](#licensing)
## Getting Started
- Run `npm add @cap-js-community/websocket` in `@sap/cds` project
- Annotate a service, that shall be exposed via WebSocket using one of the following annotations:
- Add a WebSocket enabled CDS service:
```cds
@ws
@websocket
@protocol: 'ws'
@protocol: 'websocket'
@protocol: [{ kind: 'websocket', path: 'chat' }]
@protocol: [{ kind: 'ws', path: 'chat' }]
service ChatService {
event received {
text: String;
}
}
```
- Execute `cds-serve` to start server
- Access the service endpoint via WebSocket
- Emit event from business logic:
```js
await srv.emit("received", { text: "Hello World!" });
```
- Start server via `cds-serve`
- Access the service endpoint via WebSocket client

@@ -92,3 +150,3 @@ ## Usage

```js
const socket = io("/chat", { path: "/ws" });
const socket = io("/ws/chat");
```

@@ -120,3 +178,25 @@ - Emit event

- Emit/Broadcast CDS events to a subset of websocket clients leveraging users, event contexts or client identifiers
- Websocket events support different formats (JSON, PCP, CloudEvents or custom format)
### Protocol Annotations
The CDS WebSocket module supports the following protocols definitions options in CDS:
- `@ws`
- `@websocket`
- `@protocol: 'ws'`
- `@protocol: 'websocket'`
- `@protocol: [{ kind: 'ws', path: 'chat' }]`
- `@protocol: [{ kind: 'websocket', path: 'chat' }]`
If protocol path is not specified (e.g. via `@path`), it is determined from service name.
If the specified path is relative (i.e. does not start with a slash `/`), it is appended to the default protocol path e.g. `/ws`.
If the path is absolute (i.e. starts with a slash `/`), it is used as is.
Examples:
- `@path: 'chat`: Service is exposed at `/ws/chat`
- `@path: '/chat`: Service is exposed at `/chat`
### WebSocket Server

@@ -153,3 +233,3 @@

- **WS**: `const socket = new WebSocket("ws://localhost:4004/ws/chat");`
- **Socket.IO**: `const socket = io("/chat", { path: "/ws" })`
- **Socket.IO**: `const socket = io("ws/chat")`

@@ -187,4 +267,3 @@ #### WebSocket Event

Although the service is exposed as an OData protocol at `/odata/v4/chat`, the service events annotated with `@websocket`
or
`@ws` are exposed as websocket events under the websocket protocol path as follows: `/ws/chat`. Entities and operations
or `@ws` are exposed as websocket events under the websocket protocol path as follows: `/ws/chat`. Entities and operations
are not exposed, as the service itself is not marked as websocket protocol.

@@ -218,6 +297,6 @@

#### Service Facade
### Service Facade
The service facade provides native access to websocket implementation independent of CDS context
and is accessible on socket via `socket.facade` or in CDS context via `req.context.ws.service`.:
and is accessible on socket via `socket.facade` or in CDS context via `req.context.ws.service`.
It abstracts from the concrete websocket implementation by exposing the following public interface:

@@ -346,5 +425,5 @@

### Event Users
#### Event Users
#### Current User
##### Current User

@@ -411,3 +490,3 @@ Events are broadcast to all websocket clients, including clients established in context of current context user.

Event is published only to websocket clients established in context to the current context user, if the event data of
Event is published only to websocket clients not established in context to the current context user, if the event data of
`flag` is falsy.

@@ -471,3 +550,3 @@

### Event Contexts
#### Event Contexts

@@ -597,6 +676,5 @@ It is possible to broadcast events to a subset of clients. By entering or exiting contexts, the server can be instructed

For Socket.IO (`kind: socket.io`) contexts are implemented
leveraging [Socket.IO rooms](https://socket.io/docs/v4/rooms/).
For Socket.IO (`kind: socket.io`) contexts are implemented leveraging [Socket.IO rooms](https://socket.io/docs/v4/rooms/).
### Event Client Identifiers
#### Event Client Identifiers

@@ -680,3 +758,3 @@ Events are broadcast to all websocket clients, including clients that performed certain action. When events are send

#### Client Setup
##### Client Setup

@@ -691,6 +769,6 @@ The unique identifier can be provided for a websocket client as follows:

```js
const socket = io("/chat?id=1234", { path: "/ws" });
const socket = io("/ws/chat?id=1234");
```
### Event Emit Headers
#### Event Emit Headers

@@ -774,4 +852,17 @@ The websocket implementation allows to provide event emit headers to dynamically control websocket processing.

### Value Aggregation
##### Event HTTP Headers
In addition to the above event emit headers, HTTP conform headers can be specified starting with `x-websocket-` or `x-ws-`
prefix. The lower case header names are converted to camel-cased header names removing prefix, e.g. `x-ws-current-user` becomes `currentUser`.
Header string values are parsed according to their value to types `Boolean`, `Number` or `String`.
Format specific HTTP conform headers can be defined in formatter named subsection, `x-websocket-<format>-` or `x-ws-<format>-`.
**Examples** (for format `cloudevent`):
- `x-ws-cloudevent-subject: 'xyz'` becomes nested JSON header object `{ "cloudevent": { subject: "xyz" } }` in `cloudvent` formatter.
- `x-ws-cloudevent-value: '1'` becomes nested JSON header object `{ "cloudevent": { value: 1 } }` in `cloudvent` formatter.
#### Value Aggregation
The respective event annotations (described in sections above) are respected in addition to event emit header

@@ -805,3 +896,3 @@ specification. All event annotation values (static or dynamic) and header values are aggregated during event

### Ignore Definitions
#### Ignore Definitions

@@ -1134,2 +1225,6 @@ To ignore elements and parameters during event processing, the annotation `@websocket.ignore` or `@ws.ignore` is available

The generic formatter can also directly be used via annotations `@websocket.format: 'generic'` or `@ws.format: 'generic'`.
Values are derived from data via CDS annotations based on wildcard annotations `@websocket.generic.<annotation>` or `@ws.generic.<annotation>`
and headers from subsections `websocket.generic.<header>` or `ws.generic.<header>`.
### Connect & Disconnect

@@ -1143,3 +1238,3 @@

#### Approuter
### Approuter

@@ -1176,6 +1271,4 @@ Authorization in provided in production by Approuter component (e.g. via XSUAA auth).

#### Local
For local testing without approuter a mocked basic authorization is hardcoded in `flp.html/index.html`.
For local testing a mocked basic authorization is hardcoded in `flp.html/index.html`.
### Operations

@@ -1187,3 +1280,3 @@

### Operation Results
#### Operation Results

@@ -1194,3 +1287,3 @@ Operation results will be provided via optional websocket acknowledgement callback.

#### Unbound
#### Unbound Operations

@@ -1201,3 +1294,3 @@ Each unbound function and action is exposed as websocket event.

##### Special operations
#### Special operations

@@ -1210,3 +1303,3 @@ The websocket adapter tries to call the following special operations on the CDS service, if available:

#### Bound
#### Bound Operations

@@ -1221,3 +1314,3 @@ Each service entity is exposed as CRUD interface via as special events as

#### CRUD
#### CRUD Operations

@@ -1236,3 +1329,3 @@ Create, Read, Update and Delete (CRUD) actions are mapped to websocket events as follows:

#### CRUD Broadcast Events
##### CRUD Broadcast Events

@@ -1339,4 +1432,4 @@ CRUD events that modify entities automatically emit another event after successful processing:

afterAll(() => {
socket.close();
cds.ws.close();
socket.close();
});

@@ -1351,2 +1444,3 @@

);
done();
});

@@ -1366,6 +1460,3 @@ });

cds.env.websocket = {
kind: "socket.io",
impl: null,
};
cds.env.websocket.kind = "socket.io";

@@ -1379,4 +1470,3 @@ const authorization = `Basic ${Buffer.from("alice:alice").toString("base64")}`;

const port = cds.app.server.address().port;
socket = ioc(`http://localhost:${port}/chat`, {
path: "/ws",
socket = ioc(`http://localhost:${port}/ws/chat`, {
extraHeaders: {

@@ -1390,4 +1480,4 @@ authorization,

afterAll(() => {
socket.disconnect();
cds.ws.close();
socket.disconnect();
});

@@ -1404,8 +1494,12 @@

### Adapters (Socket.IO)
### Adapters
An Adapter is a server-side component which is responsible for broadcasting events to all or a subset of clients.
#### Redis
#### WS Standard Adapters
The following adapters for WS Standard are supported out-of-the-box.
##### Redis
Every event that is sent to multiple clients is sent to all matching clients connected to the current server

@@ -1415,8 +1509,2 @@ and published in a Redis channel, and received by the other websocket servers of the cluster.

##### WS Standard
The following adapters for WS Standard are supported out-of-the-box.
###### Redis
To use the Redis Adapter (basic publish/subscribe), the following steps have to be performed:

@@ -1434,6 +1522,6 @@

- Redis Adapter options can be specified via `cds.websocket.adapter.options`
- Redis channel key can be specified via `cds.websocket.adapter.options.key`. Default value is `websocket`.
- Redis channel key can be specified via `cds.websocket.adapter.options.key`. Default value is `websocket`
- Redis client connection configuration can be passed via `cds.websocket.adapter.config`
###### Custom Adapter
##### Custom Adapter

@@ -1453,7 +1541,7 @@ A custom websocket adapter implementation can be provided via a path relative to the project root

##### Socket.IO
#### Socket.IO Adapters
The following adapters for Socket.IO are supported out-of-the-box.
###### Redis Adapter
##### Redis Adapter

@@ -1473,3 +1561,3 @@ To use the Redis Adapter, the following steps have to be performed:

###### Redis Streams Adapter
##### Redis Streams Adapter

@@ -1489,3 +1577,3 @@ To use the Redis Stream Adapter, the following steps have to be performed:

###### Custom Adapter
##### Custom Adapter

@@ -1534,4 +1622,4 @@ A custom websocket adapter implementation can be provided via a path relative to the project root

Copyright 2023 SAP SE or an SAP affiliate company and websocket contributors. Please see our [LICENSE](LICENSE) for
Copyright 2024 SAP SE or an SAP affiliate company and websocket contributors. Please see our [LICENSE](LICENSE) for
copyright and license information. Detailed information including third-party components and their licensing/copyright
information is available [via the REUSE tool](https://api.reuse.software/info/github.com/cap-js-community/websocket).

@@ -26,3 +26,3 @@ "use strict";

try {
const channel = this.prefix + path;
const channel = this.getChannel(path);
await this.client.subscribe(channel, async (message, messageChannel) => {

@@ -47,3 +47,3 @@ try {

try {
const channel = this.prefix + path;
const channel = this.getChannel(path);
await this.client.publish(channel, message);

@@ -54,4 +54,8 @@ } catch (err) {

}
getChannel(path) {
return `${this.prefix}/${path}`;
}
}
module.exports = RedisAdapter;

@@ -45,3 +45,3 @@ "use strict";

}
const result = super.compose(event, data, headers);
const result = super.compose(event, data, headers, false);
cloudEvent = {

@@ -48,0 +48,0 @@ ...cloudEvent,

@@ -10,3 +10,3 @@ "use strict";

super(service, origin);
this.name = name;
this.name = name || "generic";
this.identifier = identifier || "name";

@@ -30,3 +30,3 @@ this.LOG = cds.log(`/websocket/${this.name}`);

const result = {};
for (const param of operation.params) {
for (const param of operation.params || []) {
if (mappedData[param.name] !== undefined) {

@@ -95,3 +95,7 @@ result[param.name] = mappedData[param.name];

}
return result;
const serialize = arguments[3];
if (serialize === false) {
return result;
}
return this.serialize(result);
}

@@ -98,0 +102,0 @@

@@ -37,3 +37,3 @@ "use strict";

if (operation) {
for (const param of operation.params) {
for (const param of operation.params || []) {
if (param["@websocket.ignore"] || param["@ws.ignore"]) {

@@ -40,0 +40,0 @@ continue;

@@ -46,45 +46,42 @@ "use strict";

// Websockets events
if (cds.env.protocols.websocket || cds.env.protocols.ws) {
const eventServices = {};
for (const name in cds.model.definitions) {
const definition = cds.model.definitions[name];
if (definition.kind === "event" && (definition["@websocket"] || definition["@ws"])) {
const service = cds.services[definition._service?.name];
if (service && !isServedViaWebsocket(service)) {
eventServices[service.name] ??= eventServices[service.name] || {
name: service.name,
definition: service.definition,
endpoints: service.endpoints.map((endpoint) => {
const protocol =
cds.env.protocols[endpoint.kind] ||
(endpoint.kind === "odata" ? cds.env.protocols["odata-v4"] : null);
return {
kind: "websocket",
path:
(cds.env.protocols.websocket?.path || cds.env.protocols.ws?.path) +
normalizeServicePath(service.path, protocol.path),
};
}),
operations: () => {
return interableObject();
},
entities: () => {
return interableObject();
},
_events: interableObject(),
events: function () {
return this._events;
},
on: service.on.bind(service),
tx: service.tx.bind(service),
};
eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition;
}
const eventServices = {};
for (const name in cds.model.definitions) {
const definition = cds.model.definitions[name];
if (definition.kind === "event" && (definition["@websocket"] || definition["@ws"])) {
const service = cds.services[definition._service?.name];
if (service && !isServedViaWebsocket(service)) {
eventServices[service.name] ??= eventServices[service.name] || {
name: service.name,
definition: service.definition,
endpoints: service.endpoints.map((endpoint) => {
const protocol =
cds.env.protocols[endpoint.kind] ||
(endpoint.kind === "odata" ? cds.env.protocols["odata-v4"] : null);
let path = normalizeServicePath(service.path, protocol.path);
if (!path.startsWith("/")) {
path = (cds.env.protocols?.websocket?.path || cds.env.protocols?.ws?.path || "/ws") + "/" + path;
}
return { kind: "websocket", path };
}),
operations: () => {
return interableObject();
},
entities: () => {
return interableObject();
},
_events: interableObject(),
events: function () {
return this._events;
},
on: service.on.bind(service),
tx: service.tx.bind(service),
};
eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition;
}
}
for (const name in eventServices) {
const eventService = eventServices[name];
if (Object.keys(eventService.events()).length > 0) {
serveWebSocketService(socketServer, eventService, options);
}
}
for (const name in eventServices) {
const eventService = eventServices[name];
if (Object.keys(eventService.events()).length > 0) {
serveWebSocketService(socketServer, eventService, options);
}

@@ -116,4 +113,4 @@ }

function normalizeServicePath(servicePath, protocolPath) {
if (servicePath.startsWith(protocolPath)) {
return servicePath.substring(protocolPath.length);
if (servicePath.startsWith(`${protocolPath}/`)) {
return servicePath.substring(`${protocolPath}/`.length);
}

@@ -152,11 +149,12 @@ return servicePath;

const localEventName = serviceLocalName(service, event.name);
const user = deriveUser(event, req.data, req.headers, req);
const context = deriveContext(event, req.data, req.headers);
const identifier = deriveIdentifier(event, req.data, req.headers);
const headers =
req.headers?.websocket || req.headers?.ws ? { ...req.headers?.websocket, ...req.headers?.ws } : undefined;
path = normalizeEventPath(event["@websocket.path"] || event["@ws.path"] || path);
const format = deriveFormat(service, event);
const headers = deriveHeaders(req.headers, format);
const user = deriveUser(event, req.data, headers, req);
const context = deriveContext(event, req.data, headers);
const identifier = deriveIdentifier(event, req.data, headers);
const eventHeaders = deriveEventHeaders(headers);
const eventPath = derivePath(event, path);
await socketServer.broadcast({
service,
path,
path: eventPath,
event: localEventName,

@@ -168,3 +166,3 @@ data: req.data,

identifier,
headers,
headers: eventHeaders,
socket: null,

@@ -374,2 +372,12 @@ });

function deriveFormat(service, event) {
return (
event["@websocket.format"] ||
event["@ws.format"] ||
service.definition["@websocket.format"] ||
service.definition["@ws.format"] ||
"json"
);
}
function deriveKey(entity, data) {

@@ -655,2 +663,54 @@ return Object.keys(entity.keys).reduce((result, key) => {

function parseStringValue(value) {
if (value === undefined || value === null || typeof value !== "string") {
return value;
}
if (["false", "true"].includes(value)) {
return value === "true";
}
if (!isNaN(value)) {
return parseFloat(value);
}
return value;
}
function deriveHeaders(headers, format) {
for (const header in headers ?? {}) {
let xHeader = header.toLocaleLowerCase();
if (!xHeader.startsWith("x-websocket-") && !xHeader.startsWith("x-ws-")) {
continue;
}
if (header.toLocaleLowerCase().startsWith("x-websocket-")) {
xHeader = xHeader.substring("x-websocket-".length);
} else if (xHeader.startsWith("x-ws-")) {
xHeader = xHeader.substring("x-ws-".length);
}
let formatSpecific = false;
if (xHeader.startsWith(`${format}-`)) {
xHeader = xHeader.substring(`${format}-`.length);
formatSpecific = true;
}
const value = parseStringValue(headers[header]);
delete headers[header];
if (formatSpecific) {
headers.ws ??= {};
headers.ws[format] ??= {};
headers.ws[format][xHeader] = value;
headers.ws[format][toCamelCase(xHeader)] = value;
} else {
headers[xHeader] = value;
headers[toCamelCase(xHeader)] = value;
}
}
return headers;
}
function deriveEventHeaders(headers) {
return headers?.websocket || headers?.ws ? { ...headers?.websocket, ...headers?.ws } : undefined;
}
function derivePath(event, path) {
return event["@websocket.path"] || event["@ws.path"] || path;
}
function getDeepEntityColumns(entity) {

@@ -673,9 +733,2 @@ const columns = [];

function normalizeEventPath(path) {
if (!path) {
return path;
}
return path.startsWith("/") ? path : `/${path}`;
}
function serviceLocalName(service, name) {

@@ -689,2 +742,6 @@ const servicePrefix = `${service.name}.`;

function toCamelCase(string) {
return string.replace(/([-_][a-z])/g, (group) => group.toUpperCase().replace("-", "").replace("_", ""));
}
function interableObject(object) {

@@ -691,0 +748,0 @@ return {

@@ -374,2 +374,11 @@ /* eslint-disable no-unused-vars */

/**
* Return service path including protocol prefix or absolute service path (if already absolute)
* @param {String} path path
* @returns {String} Service path
*/
servicePath(path) {
return path.startsWith("/") ? path : `${this.path}/${path}`;
}
/**
* Return format instance for service

@@ -376,0 +385,0 @@ * @param {Object} service Service definition

@@ -15,6 +15,3 @@ "use strict";

super(server, path, config);
this.io = new Server(server, {
path,
...config?.options,
});
this.io = new Server(server, config?.options);
this.io.engine.on("connection_error", (err) => {

@@ -33,3 +30,3 @@ delete err.req;

service(service, path, connected) {
const io = this.applyMiddlewares(this.io.of(path));
const io = this.applyMiddlewares(this.io.of(this.servicePath(path)));
const format = this.format(service, undefined, "json");

@@ -69,7 +66,17 @@ io.on("connection", async (socket) => {

socket.on(event, async (data, fn) => {
await callback(format.parse(data).data, fn);
try {
await callback(format.parse(data).data, fn);
} catch (err) {
LOG?.error(err);
throw err;
}
});
},
emit: async (event, data) => {
await socket.emit(event, format.compose(event, data));
try {
await socket.emit(event, format.compose(event, data));
} catch (err) {
LOG?.error(err);
throw err;
}
},

@@ -170,63 +177,68 @@ broadcast: async (event, data, user, context, identifier, headers) => {

async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket }) {
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
let to = socket?.broadcast || this.io.of(path);
if (context?.include?.length && identifier?.include?.length) {
for (const contextInclude of context.include) {
try {
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
let to = socket?.broadcast || this.io.of(this.servicePath(path));
if (context?.include?.length && identifier?.include?.length) {
for (const contextInclude of context.include) {
for (const identifierInclude of identifier.include) {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude, context: contextInclude, identifier: identifierInclude }));
}
} else {
to = to.to(room({ tenant, context: contextInclude, identifier: identifierInclude }));
}
}
}
} else if (context?.include?.length) {
for (const contextInclude of context.include) {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude, context: contextInclude }));
}
} else {
to = to.to(room({ tenant, context: contextInclude }));
}
}
} else if (identifier?.include?.length) {
for (const identifierInclude of identifier.include) {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude, context: contextInclude, identifier: identifierInclude }));
to = to.to(room({ tenant, user: userInclude, identifier: identifierInclude }));
}
} else {
to = to.to(room({ tenant, context: contextInclude, identifier: identifierInclude }));
to = to.to(room({ tenant, identifier: identifierInclude }));
}
}
}
} else if (context?.include?.length) {
for (const contextInclude of context.include) {
} else {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude, context: contextInclude }));
to = to.to(room({ tenant, user: userInclude }));
}
} else {
to = to.to(room({ tenant, context: contextInclude }));
to = to.to(room({ tenant }));
}
}
} else if (identifier?.include?.length) {
for (const identifierInclude of identifier.include) {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude, identifier: identifierInclude }));
}
} else {
to = to.to(room({ tenant, identifier: identifierInclude }));
if (user?.exclude?.length) {
for (const userExclude of user.exclude) {
to = to.except(room({ tenant, user: userExclude }));
}
}
} else {
if (user?.include?.length) {
for (const userInclude of user.include) {
to = to.to(room({ tenant, user: userInclude }));
if (context?.exclude?.length) {
for (const contextExclude of context.exclude) {
to = to.except(room({ tenant, context: contextExclude }));
}
} else {
to = to.to(room({ tenant }));
}
}
if (user?.exclude?.length) {
for (const userExclude of user.exclude) {
to = to.except(room({ tenant, user: userExclude }));
if (identifier?.exclude?.length) {
for (const identifierExclude of identifier.exclude) {
to = to.except(room({ tenant, identifier: identifierExclude }));
}
}
const format = this.format(service, event, "json");
to.emit(event, format.compose(event, data, headers));
} catch (err) {
LOG?.error(err);
throw err;
}
if (context?.exclude?.length) {
for (const contextExclude of context.exclude) {
to = to.except(room({ tenant, context: contextExclude }));
}
}
if (identifier?.exclude?.length) {
for (const identifierExclude of identifier.exclude) {
to = to.except(room({ tenant, identifier: identifierExclude }));
}
}
const format = this.format(service, event, "json");
to.emit(event, format.compose(event, data, headers));
}

@@ -233,0 +245,0 @@

@@ -30,3 +30,7 @@ "use strict";

request.url = urlObj.pathname;
this.services[request.url]?.(ws, request);
if (this.services[request.url]) {
this.services[request.url](ws, request);
} else {
DEBUG?.("No websocket service for url", request.url);
}
}

@@ -38,5 +42,4 @@ });

this.adapter?.on(service, path);
const servicePath = `${this.path}${path}`;
const format = this.format(service);
this.services[servicePath] = (ws, request) => {
this.services[this.servicePath(path)] = (ws, request) => {
this.onInit(ws, request);

@@ -52,4 +55,4 @@ DEBUG?.("Initialized");

ws.on("message", async (message) => {
const payload = format.parse(message);
try {
const payload = format.parse(message);
for (const callback of this.getFromMap(ws.events, payload?.event, new Set())) {

@@ -60,2 +63,3 @@ await callback(payload.data);

LOG?.error(err);
throw err;
}

@@ -78,3 +82,8 @@ });

emit: async (event, data) => {
await ws.send(format.compose(event, data));
try {
await ws.send(format.compose(event, data));
} catch (err) {
LOG?.error(err);
throw err;
}
},

@@ -138,66 +147,70 @@ broadcast: async (event, data, user, context, identifier, headers) => {

async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket, local }) {
const eventMessage = event;
const isEventMessage = !data;
if (isEventMessage) {
const message = JSON.parse(eventMessage);
event = message.event;
data = message.data;
tenant = message.tenant;
user = message.user;
context = message.context;
identifier = message.identifier;
}
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
const servicePath = `${this.path}${path}`;
const serviceClients = this.fetchClients(tenant, servicePath);
const clients = new Set(serviceClients.all);
if (user?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.users, user?.include));
}
if (context?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.contexts, context?.include));
}
if (identifier?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.identifiers, identifier?.include));
}
if (user?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !user?.exclude.includes(client.context.user?.id);
}),
);
}
if (context?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !context?.exclude.find((context) => client.contexts.has(context));
}),
);
}
if (identifier?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !identifier?.exclude.includes(client.request?.queryOptions?.id);
}),
);
}
if (clients.size > 0) {
const format = this.format(service, event);
const clientMessage = format.compose(event, data, headers);
for (const client of clients) {
if (client !== socket && client.readyState === WebSocket.OPEN) {
await client.send(clientMessage);
try {
const eventMessage = event;
const isEventMessage = !data;
if (isEventMessage) {
const message = JSON.parse(eventMessage);
event = message.event;
data = message.data;
tenant = message.tenant;
user = message.user;
context = message.context;
identifier = message.identifier;
}
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
const serviceClients = this.fetchClients(tenant, this.servicePath(path));
const clients = new Set(serviceClients.all);
if (user?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.users, user?.include));
}
if (context?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.contexts, context?.include));
}
if (identifier?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.identifiers, identifier?.include));
}
if (user?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !user?.exclude.includes(client.context.user?.id);
}),
);
}
if (context?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !context?.exclude.find((context) => client.contexts.has(context));
}),
);
}
if (identifier?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !identifier?.exclude.includes(client.request?.queryOptions?.id);
}),
);
}
if (clients.size > 0) {
const format = this.format(service, event);
const clientMessage = format.compose(event, data, headers);
for (const client of clients) {
if (client !== socket && client.readyState === WebSocket.OPEN) {
await client.send(clientMessage);
}
}
}
if (!local) {
const adapterMessage = isEventMessage
? eventMessage
: JSON.stringify({ event, data, tenant, user, context, identifier, headers });
await this.adapter?.emit(service, path, adapterMessage);
}
} catch (err) {
LOG?.error(err);
throw err;
}
if (!local) {
const adapterMessage = isEventMessage
? eventMessage
: JSON.stringify({ event, data, tenant, user, context, identifier, headers });
await this.adapter?.emit(service, path, adapterMessage);
}
}

@@ -204,0 +217,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc