Socket
Socket
Sign inDemoInstall

metacom

Package Overview
Dependencies
Maintainers
1
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

metacom - npm Package Compare versions

Comparing version 3.0.0-alpha.7 to 3.0.0-alpha.8

lib/events.js

10

CHANGELOG.md

@@ -5,2 +5,9 @@ # Changelog

## [3.0.0-alpha.8][] - 2023-02-13
- Fix server-side client
- Unify server-side and browser `streams` implementation
- Update metautil to 3.7.1 for fixed `fetch`
- Unify `EventEmitter` implementation
## [3.0.0-alpha.7][] - 2023-02-09

@@ -245,3 +252,4 @@

[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.7...HEAD
[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.8...HEAD
[3.0.0-alpha.8]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.7...v3.0.0-alpha.8
[3.0.0-alpha.7]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.6...v3.0.0-alpha.7

@@ -248,0 +256,0 @@ [3.0.0-alpha.6]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.5...v3.0.0-alpha.6

45

dist/events.js

@@ -1,10 +0,2 @@

const warnAboutMemoryLeak = (eventName, count) =>
console.warn(
`Possible EventEmitter memory leak detected.
${count} listeners added.
You have to decrease the number of listeners for '${eventName}' event.
Hint: avoid adding listeners in loops.`,
);
export default class EventEmitter {
class EventEmitter {
constructor() {

@@ -14,2 +6,3 @@ this.events = new Map();

}
getMaxListeners() {

@@ -30,3 +23,9 @@ return this.maxListenersCount;

const tooManyListeners = event.size > this.maxListenersCount;
if (tooManyListeners) warnAboutMemoryLeak(name, event.size);
if (tooManyListeners) {
const name = 'MaxListenersExceededWarning';
const warn = 'Possible EventEmitter memory leak detected';
const max = `Current maxListenersCount is ${this.maxListenersCount}`;
const hint = 'Hint: avoid adding listeners in loops';
console.warn(`${name}: ${warn}. ${max}. ${hint}`);
}
} else {

@@ -46,16 +45,7 @@ this.events.set(name, new Set([fn]));

emit(name, ...args) {
if (name === '*') {
throw new Error('Cannot emit reserved "*" global listeners.');
}
const event = this.events.get(name);
if (event) {
for (const fn of event.values()) {
fn(...args);
}
if (!event) return;
for (const fn of event.values()) {
fn(...args);
}
const globalListeners = this.events.get('*');
if (!globalListeners) return;
for (const fn of globalListeners.values()) {
fn(name, ...args);
}
}

@@ -66,17 +56,14 @@

if (!event) return;
if (event.has(fn)) {
event.delete(fn);
}
if (event.has(fn)) event.delete(fn);
}
clear(name) {
const globalListeners = this.events.get('*');
if (!name) {
this.events.clear();
globalListeners.clear();
return;
}
if (name === '*') globalListeners.clear();
else this.events.delete(name);
this.events.delete(name);
}
}
export default EventEmitter;

@@ -24,8 +24,9 @@ import EventEmitter from './events.js';

class MetacomInterface extends EventEmitter {
constructor() {
super();
emit(...args) {
super.emit('*', ...args);
super.emit(...args);
}
}
export class Metacom extends EventEmitter {
class Metacom extends EventEmitter {
constructor(url, options = {}) {

@@ -285,1 +286,3 @@ super();

};
export { Metacom, MetacomInterface };

@@ -100,9 +100,7 @@ import EventEmitter from './events.js';

async stop() {
if (this.bytesRead === this.size) {
this.streaming = false;
this.emit(PUSH_EVENT, null);
} else {
while (this.bytesRead !== this.size) {
await this.waitEvent(PULL_EVENT);
await this.stop();
}
this.streaming = false;
this.emit(PUSH_EVENT, null);
}

@@ -170,2 +168,3 @@

this.transport.send(chunk);
return true;
}

@@ -172,0 +171,0 @@

@@ -5,5 +5,11 @@ 'use strict';

const WebSocket = require('ws');
const metautil = require('metautil');
const { fetch } = require('metautil');
const { MetaWritable, MetaReadable, Chunk } = require('./streams.js');
const CALL_TIMEOUT = 7 * 1000;
const PING_INTERVAL = 60 * 1000;
const RECONNECT_TIMEOUT = 2 * 1000;
const connections = new Set();
class MetacomError extends Error {

@@ -16,30 +22,66 @@ constructor({ message, code }) {

class MetacomInterface extends EventEmitter {}
class MetacomInterface extends EventEmitter {
emit(...args) {
super.emit('*', ...args);
super.emit(...args);
}
}
class Metacom extends EventEmitter {
constructor(url) {
constructor(url, options = {}) {
super();
this.url = url;
this.socket = new WebSocket(url);
this.socket = null;
this.api = {};
this.callId = 0;
this.calls = new Map();
this.streams = new Map();
this.streamId = 0;
this.streams = new Map();
this.socket.addEventListener('message', ({ data }) => {
if (typeof data === 'string') void this.message(data);
else void this.binary(data);
});
this.socket.addEventListener('close', () => {
this.connected = false;
setTimeout(() => {
if (this.active) this.open();
}, this.reconnectTimeout);
});
this.socket.addEventListener('error', (err) => {
this.emit('error', err);
this.socket.close();
});
this.eventId = 0;
this.active = false;
this.connected = false;
this.opening = null;
this.lastActivity = new Date().getTime();
this.callTimeout = options.callTimeout || CALL_TIMEOUT;
this.pingInterval = options.pingInterval || PING_INTERVAL;
this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT;
this.open();
}
static create(url, options) {
const { transport } = Metacom;
const Transport = url.startsWith('ws') ? transport.ws : transport.http;
return new Transport(url, options);
}
getStream(streamId) {
const stream = this.streams.get(streamId);
if (stream) return stream;
throw new Error(`Stream ${streamId} is not initialized`);
}
createStream(name, size) {
const streamId = ++this.streamId;
const initData = { streamId, name, size };
const transport = this;
return new MetaWritable(transport, initData);
}
createBlobUploader(blob) {
const name = blob.name || 'blob';
const size = blob.size;
const consumer = this.createStream(name, size);
return {
streamId: consumer.streamId,
upload: async () => {
const reader = blob.stream().getReader();
let chunk;
while (!(chunk = await reader.read()).done) {
consumer.write(chunk.value);
}
consumer.end();
},
};
}
async message(data) {

@@ -57,3 +99,3 @@ if (data === '{}') return;

const args = packet[target];
if (callId && args) {
if (callId) {
if (callType === 'callback') {

@@ -69,10 +111,7 @@ const promised = this.calls.get(callId);

resolve(args);
return;
}
if (callType === 'event') {
} else if (callType === 'event') {
const [interfaceName, eventName] = target.split('/');
const metacomInterface = this.api[interfaceName];
metacomInterface.emit(eventName, args);
}
if (callType === 'stream') {
} else if (callType === 'stream') {
const { stream: streamId, name, size, status } = packet;

@@ -103,3 +142,4 @@ const stream = this.streams.get(streamId);

async binary(buffer) {
async binary(blob) {
const buffer = await blob.arrayBuffer();
const byteView = new Uint8Array(buffer);

@@ -112,15 +152,4 @@ const { streamId, payload } = Chunk.decode(byteView);

static create(url) {
return new Metacom(url);
}
ready() {
return new Promise((resolve) => {
if (this.socket.readyState === WebSocket.OPEN) resolve();
else this.socket.addEventListener('open', resolve);
});
}
async load(...interfaces) {
const introspect = this.httpCall('system')('introspect');
const introspect = this.scaffold('system')('introspect');
const introspection = await introspect(interfaces);

@@ -132,3 +161,3 @@ const available = Object.keys(introspection);

const iface = introspection[interfaceName];
const request = this.socketCall(interfaceName);
const request = this.scaffold(interfaceName);
const methodNames = Object.keys(iface);

@@ -138,2 +167,7 @@ for (const methodName of methodNames) {

}
methods.on('*', (eventName, data) => {
const target = `${interfaceName}/${eventName}`;
const packet = { event: ++this.eventId, [target]: data };
this.send(JSON.stringify(packet));
});
this.api[interfaceName] = methods;

@@ -143,35 +177,3 @@ }

getStream(streamId) {
const stream = this.streams.get(streamId);
if (stream) return stream;
throw new Error(`Stream ${streamId} is not initialized`);
}
createStream(name, size) {
if (!name) throw new Error('Stream name is not provided');
if (!size) throw new Error('Stream size is not provided');
const streamId = ++this.streamId;
const initData = { streamId, name, size };
const transport = this;
return new MetaWritable(transport, initData);
}
createBlobUploader(blob) {
const name = blob.name || 'blob';
const size = blob.size;
const consumer = this.createStream(name, size);
return {
streamId: consumer.streamId,
upload: async () => {
const reader = blob.stream().getReader();
let chunk;
while (!(chunk = await reader.read()).done) {
consumer.write(chunk.value);
}
consumer.end();
},
};
}
httpCall(iname, ver) {
scaffold(iname, ver) {
return (methodName) =>

@@ -182,28 +184,14 @@ async (args = {}) => {

const target = interfaceName + '/' + methodName;
const packet = { call: callId, [target]: args };
const dest = new URL(this.url);
const protocol = dest.protocol === 'ws:' ? 'http' : 'https';
const url = `${protocol}://${dest.host}/api`;
const res = await metautil.fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(packet),
});
const data = await res.json();
if (data.error) throw new MetacomError(data.error);
return data.result;
};
}
socketCall(iname, ver) {
return (methodName) =>
async (args = {}) => {
const callId = ++this.callId;
const interfaceName = ver ? `${iname}.${ver}` : iname;
const target = interfaceName + '/' + methodName;
await this.ready();
if (this.opening) await this.opening;
if (!this.connected) await this.open();
return new Promise((resolve, reject) => {
setTimeout(() => {
if (this.calls.has(callId)) {
this.calls.delete(callId);
reject(new Error('Request timeout'));
}
}, this.callTimeout);
this.calls.set(callId, [resolve, reject]);
const packet = { call: callId, [target]: args };
this.socket.send(JSON.stringify(packet));
this.send(JSON.stringify(packet));
});

@@ -214,2 +202,94 @@ };

module.exports = { Metacom };
class WebsocketTransport extends Metacom {
async open() {
if (this.opening) return this.opening;
if (this.connected) return Promise.resolve();
const socket = new WebSocket(this.url);
this.active = true;
this.socket = socket;
connections.add(this);
socket.addEventListener('message', ({ data }) => {
if (typeof data === 'string') this.message(data);
else this.binary(data);
});
socket.addEventListener('close', () => {
this.opening = null;
this.connected = false;
this.emit('close');
setTimeout(() => {
if (this.active) this.open();
}, this.reconnectTimeout);
});
socket.addEventListener('error', (err) => {
this.emit('error', err);
socket.close();
});
setInterval(() => {
if (this.active) {
const interval = new Date().getTime() - this.lastActivity;
if (interval > this.pingInterval) this.send('{}');
}
}, this.pingInterval);
this.opening = new Promise((resolve) => {
socket.addEventListener('open', () => {
this.opening = null;
this.connected = true;
this.emit('open');
resolve();
});
});
return this.opening;
}
close() {
this.active = false;
connections.delete(this);
if (!this.socket) return;
this.socket.close();
this.socket = null;
}
send(data) {
if (!this.connected) return;
this.lastActivity = new Date().getTime();
this.socket.send(data);
}
}
class HttpTransport extends Metacom {
async open() {
this.active = true;
this.connected = true;
this.emit('open');
}
close() {
this.active = false;
this.connected = false;
}
send(data) {
this.lastActivity = new Date().getTime();
fetch(this.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: data,
}).then((res) =>
res.text().then((packet) => {
this.message(packet);
}),
);
}
}
Metacom.transport = {
ws: WebsocketTransport,
http: HttpTransport,
};
module.exports = { Metacom, MetacomInterface };

@@ -79,3 +79,2 @@ 'use strict';

// implements nodejs readable pipe method
pipe(writable) {

@@ -169,3 +168,2 @@ void this.finalize(writable);

// implements nodejs writable write method
write(data) {

@@ -177,3 +175,2 @@ const chunk = Chunk.encode(this.streamId, data);

// implements nodejs writable end method
end() {

@@ -180,0 +177,0 @@ const packet = { stream: this.streamId, status: 'end' };

{
"name": "metacom",
"version": "3.0.0-alpha.7",
"version": "3.0.0-alpha.8",
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>",

@@ -52,11 +52,11 @@ "description": "Communication protocol for Metarhia stack with rpc, events, binary streams, memory and db access",

"dependencies": {
"metautil": "^3.7.0",
"ws": "^8.12.1"
"metautil": "^3.7.1",
"ws": "^8.13.0"
},
"devDependencies": {
"@types/node": "^18.14.6",
"@types/node": "^18.15.2",
"@types/ws": "^8.5.4",
"eslint": "^8.35.0",
"eslint": "^8.36.0",
"eslint-config-metarhia": "^8.1.0",
"eslint-config-prettier": "^8.6.0",
"eslint-config-prettier": "^8.7.0",
"eslint-plugin-import": "^2.27.5",

@@ -63,0 +63,0 @@ "eslint-plugin-prettier": "^4.0.0",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc