Socket
Socket
Sign inDemoInstall

metacom

Package Overview
Dependencies
Maintainers
1
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

metacom - npm Package Compare versions

Comparing version 3.0.0-alpha.8 to 3.0.0-alpha.9

lib/transport.js

10

CHANGELOG.md

@@ -5,2 +5,9 @@ # Changelog

## [3.0.0-alpha.9][] - 2023-04-22
- Implement metacom3 specs: https://github.com/metarhia/Contracts/blob/master/doc/Metacom.md
- Refactor streams
- Optimize chunk encoding
- Fix setTimeout and setInterval leaks
## [3.0.0-alpha.8][] - 2023-02-13

@@ -252,3 +259,4 @@

[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.8...HEAD
[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.9...HEAD
[3.0.0-alpha.9]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.8...v3.0.0-alpha.9
[3.0.0-alpha.8]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.7...v3.0.0-alpha.8

@@ -255,0 +263,0 @@ [3.0.0-alpha.7]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.6...v3.0.0-alpha.7

@@ -63,4 +63,8 @@ class EventEmitter {

}
static once(emitter, name) {
return new Promise((resolve) => emitter.once(name, resolve));
}
}
export default EventEmitter;

112

dist/metacom.js

@@ -23,3 +23,3 @@ import EventEmitter from './events.js';

class MetacomInterface extends EventEmitter {
class MetacomUnit extends EventEmitter {
emit(...args) {

@@ -41,3 +41,2 @@ super.emit('*', ...args);

this.streamId = 0;
this.eventId = 0;
this.active = false;

@@ -50,2 +49,3 @@ this.connected = false;

this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT;
this.ping = null;
this.open();

@@ -60,11 +60,11 @@ }

getStream(streamId) {
const stream = this.streams.get(streamId);
getStream(id) {
const stream = this.streams.get(id);
if (stream) return stream;
throw new Error(`Stream ${streamId} is not initialized`);
throw new Error(`Stream ${id} is not initialized`);
}
createStream(name, size) {
const streamId = ++this.streamId;
const initData = { streamId, name, size };
const id = ++this.streamId;
const initData = { type: 'stream', id, name, size };
const transport = this;

@@ -79,3 +79,3 @@ return new MetaWritable(transport, initData);

return {
streamId: consumer.streamId,
id: consumer.id,
upload: async () => {

@@ -101,11 +101,10 @@ const reader = blob.stream().getReader();

}
const [callType, target] = Object.keys(packet);
const callId = packet[callType];
const args = packet[target];
if (callId) {
if (callType === 'callback') {
const promised = this.calls.get(callId);
const { type, id, method } = packet;
if (id) {
if (type === 'callback') {
const promised = this.calls.get(id);
if (!promised) return;
const [resolve, reject] = promised;
this.calls.delete(callId);
const [resolve, reject, timeout] = promised;
this.calls.delete(id);
clearTimeout(timeout);
if (packet.error) {

@@ -115,10 +114,10 @@ reject(new MetacomError(packet.error));

}
resolve(args);
} else if (callType === 'event') {
const [interfaceName, eventName] = target.split('/');
const metacomInterface = this.api[interfaceName];
metacomInterface.emit(eventName, args);
} else if (callType === 'stream') {
const { stream: streamId, name, size, status } = packet;
const stream = this.streams.get(streamId);
resolve(packet.result);
} else if (type === 'event') {
const [unit, name] = method.split('/');
const metacomUnit = this.api[unit];
if (metacomUnit) metacomUnit.emit(name, packet.data);
} else if (type === 'stream') {
const { name, size, status } = packet;
const stream = this.streams.get(id);
if (name && typeof name === 'string' && Number.isSafeInteger(size)) {

@@ -128,14 +127,14 @@ if (stream) {

} else {
const streamData = { streamId, name, size };
const streamData = { id, name, size };
const stream = new MetaReadable(streamData);
this.streams.set(streamId, stream);
this.streams.set(id, stream);
}
} else if (!stream) {
console.error(new Error(`Stream ${streamId} is not initialized`));
console.error(new Error(`Stream ${id} is not initialized`));
} else if (status === 'end') {
await stream.close();
this.streams.delete(streamId);
this.streams.delete(id);
} else if (status === 'terminate') {
await stream.terminate();
this.streams.delete(streamId);
this.streams.delete(id);
} else {

@@ -151,47 +150,47 @@ console.error(new Error('Stream packet structure error'));

const byteView = new Uint8Array(buffer);
const { streamId, payload } = Chunk.decode(byteView);
const stream = this.streams.get(streamId);
const { id, payload } = Chunk.decode(byteView);
const stream = this.streams.get(id);
if (stream) await stream.push(payload);
else console.warn(`Stream ${streamId} is not initialized`);
else console.warn(`Stream ${id} is not initialized`);
}
async load(...interfaces) {
async load(...units) {
const introspect = this.scaffold('system')('introspect');
const introspection = await introspect(interfaces);
const introspection = await introspect(units);
const available = Object.keys(introspection);
for (const interfaceName of interfaces) {
if (!available.includes(interfaceName)) continue;
const methods = new MetacomInterface();
const iface = introspection[interfaceName];
const request = this.scaffold(interfaceName);
const methodNames = Object.keys(iface);
for (const unit of units) {
if (!available.includes(unit)) continue;
const methods = new MetacomUnit();
const instance = introspection[unit];
const request = this.scaffold(unit);
const methodNames = Object.keys(instance);
for (const methodName of methodNames) {
methods[methodName] = request(methodName);
}
methods.on('*', (eventName, data) => {
const target = `${interfaceName}/${eventName}`;
const packet = { event: ++this.eventId, [target]: data };
methods.on('*', (event, data) => {
const name = unit + '/' + event;
const packet = { type: 'event', name, data };
this.send(JSON.stringify(packet));
});
this.api[interfaceName] = methods;
this.api[unit] = methods;
}
}
scaffold(iname, ver) {
return (methodName) =>
scaffold(unit, ver) {
return (method) =>
async (args = {}) => {
const callId = ++this.callId;
const interfaceName = ver ? `${iname}.${ver}` : iname;
const target = interfaceName + '/' + methodName;
const id = ++this.callId;
const unitName = unit + (ver ? '.' + ver : '');
const target = unitName + '/' + method;
if (this.opening) await this.opening;
if (!this.connected) await this.open();
return new Promise((resolve, reject) => {
setTimeout(() => {
if (this.calls.has(callId)) {
this.calls.delete(callId);
const timeout = setTimeout(() => {
if (this.calls.has(id)) {
this.calls.delete(id);
reject(new Error('Request timeout'));
}
}, this.callTimeout);
this.calls.set(callId, [resolve, reject]);
const packet = { call: callId, [target]: args };
this.calls.set(id, [resolve, reject, timeout]);
const packet = { type: 'call', id, method: target, args };
this.send(JSON.stringify(packet));

@@ -231,3 +230,3 @@ });

setInterval(() => {
this.ping = setInterval(() => {
if (this.active) {

@@ -253,2 +252,3 @@ const interval = new Date().getTime() - this.lastActivity;

connections.delete(this);
clearInterval(this.ping);
if (!this.socket) return;

@@ -297,2 +297,2 @@ this.socket.close();

export { Metacom, MetacomInterface };
export { Metacom, MetacomUnit };
import EventEmitter from './events.js';
const STREAM_ID_LENGTH = 4;
const ID_LENGTH = 4;
const createStreamIdBuffer = (num) => {
const buffer = new ArrayBuffer(STREAM_ID_LENGTH);
const view = new DataView(buffer);
view.setInt32(0, num);
return buffer;
const chunkEncode = (id, payload) => {
const chunk = new Uint8Array(ID_LENGTH + payload.length);
const view = new DataView(chunk.buffer);
view.setInt32(0, id);
chunk.set(payload, ID_LENGTH);
return chunk;
};
const getStreamId = (buffer) => {
const view = new DataView(buffer);
return view.getInt32(0);
const chunkDecode = (chunk) => {
const view = new DataView(chunk.buffer);
const id = view.getInt32(0);
const payload = chunk.subarray(ID_LENGTH);
return { id, payload };
};
class Chunk {
static encode(streamId, payload) {
const streamIdView = new Uint8Array(createStreamIdBuffer(streamId));
const chunkView = new Uint8Array(STREAM_ID_LENGTH + payload.length);
chunkView.set(streamIdView);
chunkView.set(payload, STREAM_ID_LENGTH);
return chunkView;
}
static decode(chunkView) {
const streamId = getStreamId(chunkView.buffer);
const payload = chunkView.subarray(STREAM_ID_LENGTH);
return { streamId, payload };
}
}
const PUSH_EVENT = Symbol();

@@ -39,11 +26,11 @@ const PULL_EVENT = Symbol();

class MetaReadable extends EventEmitter {
constructor(initData, options = {}) {
constructor(id, name, size, options = {}) {
super();
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.id = id;
this.name = name;
this.size = size;
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK;
this.queue = [];
this.streaming = true;
this.status = null;
this.status = 'active';
this.bytesRead = 0;

@@ -65,4 +52,5 @@ this.maxListenersCount = this.getMaxListeners() - 1;

async finalize(writable) {
const waitWritableEvent = this.waitEvent.bind(writable);
writable.once('error', () => this.terminate());
const waitWritableEvent = EventEmitter.once.bind(writable);
const onError = () => this.terminate();
writable.once('error', onError);
for await (const chunk of this) {

@@ -76,6 +64,7 @@ const needDrain = !writable.write(chunk);

await this.close();
writable.removeListener('error', onError);
}
pipe(writable) {
void this.finalize(writable);
this.finalize(writable);
return writable;

@@ -142,4 +131,4 @@ }

const chunk = await this.read();
if (chunk) yield chunk;
else return;
if (!chunk) return;
yield chunk;
}

@@ -150,8 +139,8 @@ }

class MetaWritable extends EventEmitter {
constructor(transport, initData) {
constructor(id, name, size, transport) {
super();
this.id = id;
this.name = name;
this.size = size;
this.transport = transport;
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.init();

@@ -161,7 +150,4 @@ }

init() {
const packet = {
stream: this.streamId,
name: this.name,
size: this.size,
};
const { id, name, size } = this;
const packet = { type: 'stream', id, name, size };
this.transport.send(JSON.stringify(packet));

@@ -171,3 +157,3 @@ }

write(data) {
const chunk = Chunk.encode(this.streamId, data);
const chunk = chunkEncode(this.id, data);
this.transport.send(chunk);

@@ -178,3 +164,3 @@ return true;

end() {
const packet = { stream: this.streamId, status: 'end' };
const packet = { type: 'stream', id: this.id, status: 'end' };
this.transport.send(JSON.stringify(packet));

@@ -184,3 +170,3 @@ }

terminate() {
const packet = { stream: this.streamId, status: 'terminate' };
const packet = { type: 'stream', id: this.id, status: 'terminate' };
this.transport.send(JSON.stringify(packet));

@@ -190,2 +176,2 @@ }

export { Chunk, MetaReadable, MetaWritable };
export { chunkEncode, chunkDecode, MetaReadable, MetaWritable };

@@ -21,3 +21,3 @@ 'use strict';

class MetacomInterface extends EventEmitter {
class MetacomUnit extends EventEmitter {
emit(...args) {

@@ -39,3 +39,2 @@ super.emit('*', ...args);

this.streamId = 0;
this.eventId = 0;
this.active = false;

@@ -48,2 +47,3 @@ this.connected = false;

this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT;
this.ping = null;
this.open();

@@ -58,13 +58,13 @@ }

getStream(streamId) {
const stream = this.streams.get(streamId);
getStream(id) {
const stream = this.streams.get(id);
if (stream) return stream;
throw new Error(`Stream ${streamId} is not initialized`);
throw new Error(`Stream ${id} is not initialized`);
}
createStream(name, size) {
const streamId = ++this.streamId;
const initData = { streamId, name, size };
const id = ++this.streamId;
const packet = { type: 'stream', id, name, size };
const transport = this;
return new MetaWritable(transport, initData);
return new MetaWritable(transport, packet);
}

@@ -77,3 +77,3 @@

return {
streamId: consumer.streamId,
id: consumer.id,
upload: async () => {

@@ -99,11 +99,10 @@ const reader = blob.stream().getReader();

}
const [callType, target] = Object.keys(packet);
const callId = packet[callType];
const args = packet[target];
if (callId) {
if (callType === 'callback') {
const promised = this.calls.get(callId);
const { type, id, method } = packet;
if (id) {
if (type === 'callback') {
const promised = this.calls.get(id);
if (!promised) return;
const [resolve, reject] = promised;
this.calls.delete(callId);
const [resolve, reject, timeout] = promised;
this.calls.delete(id);
clearTimeout(timeout);
if (packet.error) {

@@ -113,10 +112,10 @@ reject(new MetacomError(packet.error));

}
resolve(args);
} else if (callType === 'event') {
const [interfaceName, eventName] = target.split('/');
const metacomInterface = this.api[interfaceName];
metacomInterface.emit(eventName, args);
} else if (callType === 'stream') {
const { stream: streamId, name, size, status } = packet;
const stream = this.streams.get(streamId);
resolve(packet.result);
} else if (type === 'event') {
const [unit, name] = method.split('/');
const metacomUnit = this.api[unit];
if (metacomUnit) metacomUnit.emit(name, packet.data);
} else if (type === 'stream') {
const { name, size, status } = packet;
const stream = this.streams.get(id);
if (name && typeof name === 'string' && Number.isSafeInteger(size)) {

@@ -126,14 +125,13 @@ if (stream) {

} else {
const streamData = { streamId, name, size };
const stream = new MetaReadable(streamData);
this.streams.set(streamId, stream);
const stream = new MetaReadable({ id, name, size });
this.streams.set(id, stream);
}
} else if (!stream) {
console.error(new Error(`Stream ${streamId} is not initialized`));
console.error(new Error(`Stream ${id} is not initialized`));
} else if (status === 'end') {
await stream.close();
this.streams.delete(streamId);
this.streams.delete(id);
} else if (status === 'terminate') {
await stream.terminate();
this.streams.delete(streamId);
this.streams.delete(id);
} else {

@@ -149,47 +147,47 @@ console.error(new Error('Stream packet structure error'));

const byteView = new Uint8Array(buffer);
const { streamId, payload } = Chunk.decode(byteView);
const stream = this.streams.get(streamId);
const { id, payload } = Chunk.decode(byteView);
const stream = this.streams.get(id);
if (stream) await stream.push(payload);
else console.warn(`Stream ${streamId} is not initialized`);
else console.warn(`Stream ${id} is not initialized`);
}
async load(...interfaces) {
async load(...units) {
const introspect = this.scaffold('system')('introspect');
const introspection = await introspect(interfaces);
const introspection = await introspect(units);
const available = Object.keys(introspection);
for (const interfaceName of interfaces) {
if (!available.includes(interfaceName)) continue;
const methods = new MetacomInterface();
const iface = introspection[interfaceName];
const request = this.scaffold(interfaceName);
const methodNames = Object.keys(iface);
for (const unit of units) {
if (!available.includes(unit)) continue;
const methods = new MetacomUnit();
const instance = introspection[unit];
const request = this.scaffold(unit);
const methodNames = Object.keys(instance);
for (const methodName of methodNames) {
methods[methodName] = request(methodName);
}
methods.on('*', (eventName, data) => {
const target = `${interfaceName}/${eventName}`;
const packet = { event: ++this.eventId, [target]: data };
methods.on('*', (event, data) => {
const name = unit + '/' + event;
const packet = { type: 'event', name, data };
this.send(JSON.stringify(packet));
});
this.api[interfaceName] = methods;
this.api[unit] = methods;
}
}
scaffold(iname, ver) {
return (methodName) =>
scaffold(unit, ver) {
return (method) =>
async (args = {}) => {
const callId = ++this.callId;
const interfaceName = ver ? `${iname}.${ver}` : iname;
const target = interfaceName + '/' + methodName;
const id = ++this.callId;
const unitName = unit + (ver ? '.' + ver : '');
const target = unitName + '/' + method;
if (this.opening) await this.opening;
if (!this.connected) await this.open();
return new Promise((resolve, reject) => {
setTimeout(() => {
if (this.calls.has(callId)) {
this.calls.delete(callId);
const timeout = setTimeout(() => {
if (this.calls.has(id)) {
this.calls.delete(id);
reject(new Error('Request timeout'));
}
}, this.callTimeout);
this.calls.set(callId, [resolve, reject]);
const packet = { call: callId, [target]: args };
this.calls.set(id, [resolve, reject, timeout]);
const packet = { type: 'call', id, method: target, args };
this.send(JSON.stringify(packet));

@@ -229,3 +227,3 @@ });

setInterval(() => {
this.ping = setInterval(() => {
if (this.active) {

@@ -251,2 +249,3 @@ const interval = new Date().getTime() - this.lastActivity;

connections.delete(this);
clearInterval(this.ping);
if (!this.socket) return;

@@ -295,2 +294,2 @@ this.socket.close();

module.exports = { Metacom, MetacomInterface };
module.exports = { Metacom, MetacomUnit };

@@ -65,4 +65,8 @@ 'use strict';

}
static once(emitter, name) {
return new Promise((resolve) => emitter.once(name, resolve));
}
}
module.exports = EventEmitter;

@@ -5,24 +5,167 @@ 'use strict';

const https = require('node:https');
const { EventEmitter } = require('node:events');
const metautil = require('metautil');
const { Semaphore } = metautil;
const ws = require('ws');
const transport = {
http: require('./http.js'),
ws: require('./ws.js'),
};
const { HttpTransport, WsTransport, HEADERS } = require('./transport.js');
const { MetaReadable, MetaWritable, Chunk } = require('./streams.js');
const SHORT_TIMEOUT = 500;
const EMPTY_PACKET = Buffer.from('{}');
const createProxy = (data, save) =>
new Proxy(data, {
get: (data, key) => {
return Reflect.get(data, key);
},
set: (data, key, value) => {
const res = Reflect.set(data, key, value);
if (save) save(data);
return res;
},
});
class Session {
constructor(token, data, server) {
this.token = token;
const { application, console } = server;
this.state = createProxy(data, (data) => {
application.auth.saveSession(token, data).catch((err) => {
console.error(err);
});
});
}
}
const sessions = new Map(); // token: Session
class Context {
constructor(client) {
this.client = client;
this.uuid = metautil.generateUUID();
this.state = {};
this.session = client?.session || null;
}
}
class Client extends EventEmitter {
#transport;
#streamId;
constructor(transport) {
super();
this.#transport = transport;
this.#streamId = 0;
this.ip = transport.ip;
this.session = null;
transport.server.clients.add(this);
transport.on('close', () => {
this.destroy();
transport.server.clients.delete(this);
});
}
error(code, options) {
this.#transport.error(code, options);
}
send(obj, code) {
this.#transport.send(obj, code);
}
createContext() {
return new Context(this);
}
emit(name, data) {
if (name === 'close') {
super.emit(name, data);
return;
}
this.#transport.sendEvent(name, data);
}
sendEvent(name, data) {
const packet = { type: 'event', name, data };
if (!this.connection) {
throw new Error(`Can't send metacom event to http transport`);
}
this.send(packet);
}
getStream(id) {
if (!this.#transport.connection) {
throw new Error(`Can't receive stream from http transport`);
}
const stream = this.#transport.streams.get(id);
if (stream) return stream;
throw new Error(`Stream ${id} is not initialized`);
}
createStream(name, size) {
if (!this.#transport.connection) {
throw new Error(`Can't send metacom streams to http transport`);
}
if (!name) throw new Error('Stream name is not provided');
if (!size) throw new Error('Stream size is not provided');
const id = --this.#streamId;
const packet = { id, name, size };
return new MetaWritable(this.#transport.connection, packet);
}
initializeSession(token, data = {}) {
this.finalizeSession();
const session = new Session(token, data, this.#transport.server);
sessions.set(token, session);
return true;
}
finalizeSession() {
if (!this.session) return false;
sessions.delete(this.session.token);
this.session = null;
return true;
}
startSession(token, data = {}) {
this.initializeSession(token, data);
if (!this.#transport.connection) this.#transport.sendSessionCookie(token);
return true;
}
restoreSession(token) {
const session = sessions.get(token);
if (!session) return false;
this.session = session;
return true;
}
close() {
this.#transport.close();
}
destroy() {
this.emit('close');
if (!this.session) return;
sessions.delete(this.session.token);
}
}
const addHeaders = ({ origin }) => {
if (origin) HEADERS['Access-Control-Allow-Origin'] = origin;
};
class Server {
constructor(options, application) {
const { cors, queue } = options;
constructor(application, options) {
this.application = application;
this.options = options;
this.application = application;
this.balancer = options.kind === 'balancer';
this.console = application.console;
if (cors) transport.http.addHeaders(cors);
if (options.cors) addHeaders(options.cors);
const { queue } = options;
const concurrency = queue.concurrency || options.concurrency;
this.semaphore = new Semaphore(concurrency, queue.size, queue.timeout);
this.server = null;
this.ws = null;
this.channels = new Set();
this.httpServer = null;
this.wsServer = null;
this.clients = new Set();
this.bind();

@@ -32,21 +175,42 @@ }

bind() {
const { options, application, console } = this;
const { host, port, kind, protocol, timeouts, nagle = true } = options;
const proto = protocol === 'http' || kind === 'balancer' ? http : https;
const listener = this.listener.bind(this);
this.server = proto.createServer({ ...application.cert }, listener);
if (!nagle) {
this.server.on('connection', (socket) => {
socket.setNoDelay(true);
});
}
this.server.on('listening', () => {
const { options, application, console, balancer } = this;
const { host, port, protocol, timeouts, nagle = true } = options;
const proto = protocol === 'http' || balancer ? http : https;
const { cert } = application;
this.httpServer = proto.createServer({ ...cert, noDelay: !nagle });
this.httpServer.on('listening', () => {
console.info(`Listen port ${port}`);
});
this.ws = new ws.Server({ server: this.server });
this.ws.on('connection', (connection, req) => {
const channel = transport.ws.createChannel(this, req, connection);
this.channels.add(channel);
this.httpServer.on('request', async (req, res) => {
const transport = new HttpTransport(this, req, res);
if (!req.url.startsWith('/api')) {
application.serveStatic(req.url, transport);
return;
}
if (balancer) this.balancing(transport);
if (req.method !== 'POST') this.error(403);
if (res.writableEnded) return;
const client = new Client(transport);
const data = await metautil.receiveBody(req).catch(() => null);
if (req.url === '/api') this.message(client, data);
else this.request(client, transport, data);
});
this.ws.on('error', (err) => {
this.wsServer = new ws.Server({ server: this.httpServer });
this.wsServer.on('connection', (connection, req) => {
const transport = new WsTransport(this, req, connection);
const client = new Client(transport);
connection.on('message', (data, isBinary) => {
if (isBinary) this.binary(client, data);
else this.message(client, data);
});
});
this.wsServer.on('error', (err) => {
if (err.code !== 'EADDRINUSE') return;

@@ -58,82 +222,160 @@ console.warn(`Address in use: ${host}:${port}, retry...`);

});
this.server.listen(port, host);
this.httpServer.listen(port, host);
}
listener(req, res) {
const { url } = req;
const channel = transport.http.createChannel(this, req, res);
this.channels.add(channel);
if (this.options.kind === 'balancer') {
const host = metautil.parseHost(req.headers.host);
const port = metautil.sample(this.options.ports);
const { protocol } = this.options;
channel.redirect(`${protocol}://${host}:${port}/`);
message(client, data) {
if (Buffer.compare(EMPTY_PACKET, data) === 0) {
client.send('{}');
return;
}
if (url.startsWith('/api')) this.request(channel);
else this.application.serveStatic(channel);
const packet = metautil.jsonParse(data) || {};
const { id, method, type } = packet;
if (id && type === 'call' && method) {
this.rpc(client, packet);
} else if (id && type === 'stream') {
this.stream(client, packet);
} else {
const error = new Error('Packet structure error');
client.error(500, { error, pass: true });
}
}
request(channel) {
const { req } = channel;
if (req.method === 'OPTIONS') {
channel.options();
async rpc(client, packet) {
const { id, method, args } = packet;
const [unitName, methodName] = method.split('/');
const [unit, ver = '*'] = unitName.split('.');
const proc = this.application.getMethod(unit, ver, methodName);
if (!proc) {
client.error(404, { id });
return;
}
if (req.url === '/api' && req.method !== 'POST') {
channel.error(403);
const context = client.createContext();
if (!client.session && proc.access !== 'public') {
client.error(403, { id });
return;
}
const body = metautil.receiveBody(req);
if (req.url === '/api') {
body.then((data) => {
channel.message(data);
});
} else {
body.then((data) => {
let args = null;
if (data.length > 0) {
args = metautil.jsonParse(data);
if (!args) {
const error = new Error('JSON parsing error');
channel.error(500, { error, pass: true });
return;
}
}
const pathname = req.url.slice('/api/'.length);
const [path, params] = metautil.split(pathname, '?');
if (!args) args = metautil.parseParams(params);
const [interfaceName, methodName] = metautil.split(path, '/');
const { headers } = req;
const hook = this.application.getHook(interfaceName);
if (hook) channel.hook(hook, interfaceName, methodName, args, headers);
else channel.rpc(-1, interfaceName, methodName, args, headers);
});
try {
await proc.enter();
} catch {
client.error(503, { id });
return;
}
body.catch((error) => {
channel.error(500, { error });
});
let result = null;
try {
result = await proc.invoke(context, args);
} catch (error) {
if (error.message === 'Timeout reached') {
error.code = error.httpCode = 408;
}
client.error(error.code, { id, error });
return;
} finally {
proc.leave();
}
if (result?.constructor?.name === 'Error') {
const { code, httpCode = 200 } = result;
client.error(code, { id, error: result, httpCode });
return;
}
client.send({ type: 'callback', id, result });
this.console.log(`${client.ip}\t${method}`);
}
closeChannels() {
for (const channel of this.channels.values()) {
if (channel.connection) {
channel.connection.terminate();
async stream(client, packet) {
const { id, name, size, status } = packet;
const tag = id + '/' + name;
try {
const stream = client.streams.get(id);
if (status) {
if (!stream) throw new Error(`Stream ${tag} is not initialized`);
if (status === 'end') await stream.close();
if (status === 'terminate') await stream.terminate();
client.streams.delete(id);
return;
}
const valid = typeof name === 'string' && Number.isSafeInteger(size);
if (!valid) throw new Error('Stream packet structure error');
if (stream) throw new Error(`Stream ${tag} is already initialized`);
{
const stream = new MetaReadable({ id, name, size });
client.streams.set(id, stream);
this.console.log(`${client.ip}\tstream ${tag} init`);
}
} catch (error) {
this.console.error(`${client.ip}\tstream ${tag} error`);
client.error(400, { id, error, pass: true });
}
}
binary(client, data) {
const { id, payload } = Chunk.decode(data);
try {
const upstream = client.streams.get(id);
if (upstream) {
upstream.push(payload);
} else {
channel.error(503);
channel.req.connection.destroy();
const error = new Error(`Stream ${id} is not initialized`);
client.error(400, { id, error, pass: true });
}
} catch (error) {
this.console.error(`${client.ip}\tstream ${id} error`);
client.error(400, { id: 0, error });
}
}
request(client, transport, data) {
const { application } = this;
const { headers, url, method: verb } = transport.req;
const pathname = url.slice('/api/'.length);
const [path, params] = metautil.split(pathname, '?');
const parameters = metautil.parseParams(params);
const [unit, method] = metautil.split(path, '/');
const body = metautil.jsonParse(data) || {};
const args = { ...parameters, ...body };
const packet = { id: 0, method: unit + '/' + method, args };
const hook = application.getHook(unit);
if (hook) this.hook(client, hook, packet, verb, headers);
else this.rpc(client, packet);
}
async hook(client, proc, packet, verb, headers) {
const { id, method, args } = packet;
if (!proc) {
client.error(404, { id });
return;
}
const context = client.createContext();
try {
const par = { verb, method, args, headers };
const result = await proc.invoke(context, par);
client.send(result);
} catch (error) {
client.error(500, { id, error });
}
this.console.log(`${client.ip}\t${method}`);
}
balancing(transport) {
const host = metautil.parseHost(transport.req.headers.host);
const { protocol, port } = this.options;
const targetPort = metautil.sample(port);
transport.redirect(`${protocol}://${host}:${targetPort}/`);
}
closeClients() {
for (const client of this.clients) {
client.close();
}
}
async close() {
this.server.close((err) => {
this.httpServer.close((err) => {
if (err) this.console.error(err);
});
if (this.channels.size === 0) {
if (this.clients.size === 0) return;
this.closeClients();
while (this.clients.size > 0) {
await metautil.delay(SHORT_TIMEOUT);
return;
}
await metautil.delay(this.options.timeouts.stop);
this.closeChannels();
}

@@ -140,0 +382,0 @@ }

@@ -6,32 +6,19 @@ 'use strict';

const STREAM_ID_LENGTH = 4;
const ID_LENGTH = 4;
const createStreamIdBuffer = (num) => {
const buffer = new ArrayBuffer(STREAM_ID_LENGTH);
const view = new DataView(buffer);
view.setInt32(0, num);
return buffer;
const chunkEncode = (id, payload) => {
const chunk = new Uint8Array(ID_LENGTH + payload.length);
const view = new DataView(chunk.buffer);
view.setInt32(0, id);
chunk.set(payload, ID_LENGTH);
return chunk;
};
const getStreamId = (buffer) => {
const view = new DataView(buffer);
return view.getInt32(0);
const chunkDecode = (chunk) => {
const view = new DataView(chunk.buffer);
const id = view.getInt32(0);
const payload = chunk.subarray(ID_LENGTH);
return { id, payload };
};
class Chunk {
static encode(streamId, payload) {
const streamIdView = new Uint8Array(createStreamIdBuffer(streamId));
const chunkView = new Uint8Array(STREAM_ID_LENGTH + payload.length);
chunkView.set(streamIdView);
chunkView.set(payload, STREAM_ID_LENGTH);
return chunkView;
}
static decode(chunkView) {
const streamId = getStreamId(chunkView.buffer);
const payload = chunkView.subarray(STREAM_ID_LENGTH);
return { streamId, payload };
}
}
const PUSH_EVENT = Symbol();

@@ -43,11 +30,11 @@ const PULL_EVENT = Symbol();

class MetaReadable extends EventEmitter {
constructor(initData, options = {}) {
constructor(id, name, size, options = {}) {
super();
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.id = id;
this.name = name;
this.size = size;
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK;
this.queue = [];
this.streaming = true;
this.status = null;
this.status = 'active';
this.bytesRead = 0;

@@ -60,3 +47,3 @@ this.maxListenersCount = this.getMaxListeners() - 1;

this.checkStreamLimits();
await this.waitEvent(PULL_EVENT);
await EventEmitter.once(this, PULL_EVENT);
return this.push(data);

@@ -70,4 +57,5 @@ }

async finalize(writable) {
const waitWritableEvent = this.waitEvent.bind(writable);
writable.once('error', () => this.terminate());
const waitWritableEvent = EventEmitter.once.bind(writable);
const onError = () => this.terminate();
writable.once('error', onError);
for await (const chunk of this) {

@@ -81,6 +69,7 @@ const needDrain = !writable.write(chunk);

await this.close();
writable.removeListener('error', onError);
}
pipe(writable) {
void this.finalize(writable);
this.finalize(writable);
return writable;

@@ -109,3 +98,3 @@ }

while (this.bytesRead !== this.size) {
await this.waitEvent(PULL_EVENT);
await EventEmitter.once(this, PULL_EVENT);
}

@@ -118,3 +107,3 @@ this.streaming = false;

if (this.queue.length > 0) return this.pull();
const finisher = await this.waitEvent(PUSH_EVENT);
const finisher = await EventEmitter.once(this, PUSH_EVENT);
if (finisher === null) return null;

@@ -142,11 +131,7 @@ return this.pull();

waitEvent(event) {
return new Promise((resolve) => this.once(event, resolve));
}
async *[Symbol.asyncIterator]() {
while (this.streaming) {
const chunk = await this.read();
if (chunk) yield chunk;
else return;
if (!chunk) return;
yield chunk;
}

@@ -157,8 +142,8 @@ }

class MetaWritable extends EventEmitter {
constructor(transport, initData) {
constructor(id, name, size, transport) {
super();
this.id = id;
this.name = name;
this.size = size;
this.transport = transport;
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.init();

@@ -168,7 +153,4 @@ }

init() {
const packet = {
stream: this.streamId,
name: this.name,
size: this.size,
};
const { id, name, size } = this;
const packet = { type: 'stream', id, name, size };
this.transport.send(JSON.stringify(packet));

@@ -178,3 +160,3 @@ }

write(data) {
const chunk = Chunk.encode(this.streamId, data);
const chunk = chunkEncode(this.id, data);
this.transport.send(chunk);

@@ -185,3 +167,3 @@ return true;

end() {
const packet = { stream: this.streamId, status: 'end' };
const packet = { type: 'stream', id: this.id, status: 'end' };
this.transport.send(JSON.stringify(packet));

@@ -191,3 +173,3 @@ }

terminate() {
const packet = { stream: this.streamId, status: 'terminate' };
const packet = { type: 'stream', id: this.id, status: 'terminate' };
this.transport.send(JSON.stringify(packet));

@@ -197,2 +179,2 @@ }

module.exports = { Chunk, MetaReadable, MetaWritable };
module.exports = { chunkEncode, chunkDecode, MetaReadable, MetaWritable };

@@ -12,5 +12,13 @@ import { EventEmitter } from 'node:events';

export class MetaReadable extends EventEmitter {
streamId: number;
id: number;
name: string;
size: number;
constructor(
id: number,
name: string,
size: number,
options?: {
highWaterMark?: number;
},
);
push(data: ArrayBufferView): Promise<ArrayBufferView>;

@@ -23,5 +31,6 @@ finalize(writable: Writable): Promise<void>;

export class MetaWritable extends EventEmitter {
streamId: number;
id: number;
name: string;
size: number;
constructor(id: number, name: string, size: number, transport: Transport);
write(data: ArrayBufferView): void;

@@ -33,3 +42,3 @@ end(): void;

export interface BlobUploader {
streamId: number;
id: number;
upload(): Promise<void>;

@@ -49,10 +58,10 @@ }

httpCall(
iname: string,
unit: string,
ver: string,
): (methodName: string) => (args: object) => Promise<void>;
socketCall(
iname: string,
unit: string,
ver: string,
): (methodName: string) => (args: object) => Promise<void>;
getStream(streamId: number): MetaReadable;
getStream(id: number): MetaReadable;
createStream(name: string, size: number): MetaWritable;

@@ -73,3 +82,3 @@ createBlobUploader(blob: Blob): BlobUploader;

export interface ErrorOptions {
callId?: number;
id?: number;
error?: Error;

@@ -91,82 +100,73 @@ pass?: boolean;

ip: string | undefined;
callId: number;
eventId: number;
streamId: number;
auth: Auth;
events: { close: Array<Function> };
redirect(location: string): void;
startSession(token: string, data: object): boolean;
restoreSession(token: string): boolean;
getStream(streamId: number): MetaReadable;
createStream(name: string, size: number): MetaWritable;
session: Session;
}
export class Channel {
server: Server;
auth: Auth;
export class Transport {
console: Console;
req: ClientRequest;
res: ServerResponse;
res?: ServerResponse;
connection?: WebSocket;
ip: string;
client: Client;
session?: Session;
eventId: number;
streamId: number;
streams: Map<number, MetaReadable>;
token: string;
constructor(application: object, req: ClientRequest, res: ServerResponse);
message(data: string): void;
binary(data: Buffer): void;
handleRpcPacket(packet: object): void;
handleStreamPacket(packet: object): Promise<void>;
createContext(): Context;
rpc(
callId: number,
interfaceName: string,
methodName: string,
args: [],
): Promise<void>;
hook(
proc: object,
interfaceName: string,
methodName: string,
args: Array<any>,
): Promise<void>;
constructor(
console: Console,
req: ClientRequest,
target: ServerResponse | WebSocket,
);
error(code: number, errorOptions?: ErrorOptions): void;
sendEvent(name: string, data: object): void;
getStream(streamId: number): MetaWritable;
createStream(name: string, size: number): MetaWritable;
resumeCookieSession(): Promise<void>;
destroy(): void;
}
export class HttpChannel extends Channel {
write(data: any, httpCode?: number, ext?: string): void;
write(data: string | Buffer, httpCode?: number, ext?: string): void;
send(obj: object, httpCode?: number): void;
redirect(location: string): void;
options(): void;
redirect?(location: string): void;
options?(): void;
getCookies?(): object;
sendSessionCookie(token: string): void;
removeSessionCookie(): void;
close(): void;
}
export class WsChannel extends Channel {
connection: WebSocket;
constructor(application: object, req: ClientRequest, connection: WebSocket);
write(data: any): void;
send(obj: object): void;
export interface CallPacket {
type: 'call';
id: number;
method: string;
args: object;
meta: object;
}
export interface StreamPacket {
type: 'stream';
id: number;
name: string;
size: number;
}
export class Server {
application: object;
options: Options;
application: object;
balancer: boolean;
console: Console;
semaphore: Semaphore;
server?: any;
ws?: any;
channels?: Map<Client, Channel>;
httpServer: any;
wsServer: any;
clients: Set<Client>;
constructor(options: Options, application: object);
bind(): void;
listener(req: ClientRequest, res: ServerResponse): void;
request(channel: Channel): void;
closeChannels(): void;
message(client: Client, data: string): void;
rpc(client: Client, packet: CallPacket): Promise<void>;
binary(client: Client, data: Buffer): void;
handleRpcPacket(client: Client, packet: CallPacket): void;
handleStreamPacket(client: Client, packet: StreamPacket): Promise<void>;
handleRequest(
client: Client,
transport: Transport,
data: Buffer,
application: object,
): void;
hook(
client: Client,
proc: object,
packet: CallPacket,
verb: string,
headers: object,
): Promise<void>;
balancing(transport: Transport): void;
closeClients(): void;
close(): Promise<void>;

@@ -173,0 +173,0 @@ }

@@ -5,6 +5,3 @@ 'use strict';

const { Server } = require('./lib/server.js');
const { Channel } = require('./lib/channel.js');
module.exports.Metacom = Metacom;
module.exports.Server = Server;
module.exports.Channel = Channel;
module.exports = { Metacom, Server };
{
"name": "metacom",
"version": "3.0.0-alpha.8",
"version": "3.0.0-alpha.9",
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>",

@@ -49,3 +49,3 @@ "description": "Communication protocol for Metarhia stack with rpc, events, binary streams, memory and db access",

"engines": {
"node": "^14.18 || 16 || 18 || 19"
"node": "^14.18 || 16 || 18 || 19 || 20"
},

@@ -52,0 +52,0 @@ "dependencies": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc