Comparing version 3.0.0-alpha.8 to 3.0.0-alpha.9
@@ -5,2 +5,9 @@ # Changelog | ||
## [3.0.0-alpha.9][] - 2023-04-22 | ||
- Implement metacom3 specs: https://github.com/metarhia/Contracts/blob/master/doc/Metacom.md | ||
- Refactor streams | ||
- Optimize chunk encoding | ||
- Fix setTimeout and setInterval leaks | ||
## [3.0.0-alpha.8][] - 2023-02-13 | ||
@@ -252,3 +259,4 @@ | ||
[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.8...HEAD | ||
[unreleased]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.9...HEAD | ||
[3.0.0-alpha.9]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.8...v3.0.0-alpha.9 | ||
[3.0.0-alpha.8]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.7...v3.0.0-alpha.8 | ||
@@ -255,0 +263,0 @@ [3.0.0-alpha.7]: https://github.com/metarhia/metacom/compare/v3.0.0-alpha.6...v3.0.0-alpha.7 |
@@ -63,4 +63,8 @@ class EventEmitter { | ||
} | ||
static once(emitter, name) { | ||
return new Promise((resolve) => emitter.once(name, resolve)); | ||
} | ||
} | ||
export default EventEmitter; |
@@ -23,3 +23,3 @@ import EventEmitter from './events.js'; | ||
class MetacomInterface extends EventEmitter { | ||
class MetacomUnit extends EventEmitter { | ||
emit(...args) { | ||
@@ -41,3 +41,2 @@ super.emit('*', ...args); | ||
this.streamId = 0; | ||
this.eventId = 0; | ||
this.active = false; | ||
@@ -50,2 +49,3 @@ this.connected = false; | ||
this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT; | ||
this.ping = null; | ||
this.open(); | ||
@@ -60,11 +60,11 @@ } | ||
getStream(streamId) { | ||
const stream = this.streams.get(streamId); | ||
getStream(id) { | ||
const stream = this.streams.get(id); | ||
if (stream) return stream; | ||
throw new Error(`Stream ${streamId} is not initialized`); | ||
throw new Error(`Stream ${id} is not initialized`); | ||
} | ||
createStream(name, size) { | ||
const streamId = ++this.streamId; | ||
const initData = { streamId, name, size }; | ||
const id = ++this.streamId; | ||
const initData = { type: 'stream', id, name, size }; | ||
const transport = this; | ||
@@ -79,3 +79,3 @@ return new MetaWritable(transport, initData); | ||
return { | ||
streamId: consumer.streamId, | ||
id: consumer.id, | ||
upload: async () => { | ||
@@ -101,11 +101,10 @@ const reader = blob.stream().getReader(); | ||
} | ||
const [callType, target] = Object.keys(packet); | ||
const callId = packet[callType]; | ||
const args = packet[target]; | ||
if (callId) { | ||
if (callType === 'callback') { | ||
const promised = this.calls.get(callId); | ||
const { type, id, method } = packet; | ||
if (id) { | ||
if (type === 'callback') { | ||
const promised = this.calls.get(id); | ||
if (!promised) return; | ||
const [resolve, reject] = promised; | ||
this.calls.delete(callId); | ||
const [resolve, reject, timeout] = promised; | ||
this.calls.delete(id); | ||
clearTimeout(timeout); | ||
if (packet.error) { | ||
@@ -115,10 +114,10 @@ reject(new MetacomError(packet.error)); | ||
} | ||
resolve(args); | ||
} else if (callType === 'event') { | ||
const [interfaceName, eventName] = target.split('/'); | ||
const metacomInterface = this.api[interfaceName]; | ||
metacomInterface.emit(eventName, args); | ||
} else if (callType === 'stream') { | ||
const { stream: streamId, name, size, status } = packet; | ||
const stream = this.streams.get(streamId); | ||
resolve(packet.result); | ||
} else if (type === 'event') { | ||
const [unit, name] = method.split('/'); | ||
const metacomUnit = this.api[unit]; | ||
if (metacomUnit) metacomUnit.emit(name, packet.data); | ||
} else if (type === 'stream') { | ||
const { name, size, status } = packet; | ||
const stream = this.streams.get(id); | ||
if (name && typeof name === 'string' && Number.isSafeInteger(size)) { | ||
@@ -128,14 +127,14 @@ if (stream) { | ||
} else { | ||
const streamData = { streamId, name, size }; | ||
const streamData = { id, name, size }; | ||
const stream = new MetaReadable(streamData); | ||
this.streams.set(streamId, stream); | ||
this.streams.set(id, stream); | ||
} | ||
} else if (!stream) { | ||
console.error(new Error(`Stream ${streamId} is not initialized`)); | ||
console.error(new Error(`Stream ${id} is not initialized`)); | ||
} else if (status === 'end') { | ||
await stream.close(); | ||
this.streams.delete(streamId); | ||
this.streams.delete(id); | ||
} else if (status === 'terminate') { | ||
await stream.terminate(); | ||
this.streams.delete(streamId); | ||
this.streams.delete(id); | ||
} else { | ||
@@ -151,47 +150,47 @@ console.error(new Error('Stream packet structure error')); | ||
const byteView = new Uint8Array(buffer); | ||
const { streamId, payload } = Chunk.decode(byteView); | ||
const stream = this.streams.get(streamId); | ||
const { id, payload } = Chunk.decode(byteView); | ||
const stream = this.streams.get(id); | ||
if (stream) await stream.push(payload); | ||
else console.warn(`Stream ${streamId} is not initialized`); | ||
else console.warn(`Stream ${id} is not initialized`); | ||
} | ||
async load(...interfaces) { | ||
async load(...units) { | ||
const introspect = this.scaffold('system')('introspect'); | ||
const introspection = await introspect(interfaces); | ||
const introspection = await introspect(units); | ||
const available = Object.keys(introspection); | ||
for (const interfaceName of interfaces) { | ||
if (!available.includes(interfaceName)) continue; | ||
const methods = new MetacomInterface(); | ||
const iface = introspection[interfaceName]; | ||
const request = this.scaffold(interfaceName); | ||
const methodNames = Object.keys(iface); | ||
for (const unit of units) { | ||
if (!available.includes(unit)) continue; | ||
const methods = new MetacomUnit(); | ||
const instance = introspection[unit]; | ||
const request = this.scaffold(unit); | ||
const methodNames = Object.keys(instance); | ||
for (const methodName of methodNames) { | ||
methods[methodName] = request(methodName); | ||
} | ||
methods.on('*', (eventName, data) => { | ||
const target = `${interfaceName}/${eventName}`; | ||
const packet = { event: ++this.eventId, [target]: data }; | ||
methods.on('*', (event, data) => { | ||
const name = unit + '/' + event; | ||
const packet = { type: 'event', name, data }; | ||
this.send(JSON.stringify(packet)); | ||
}); | ||
this.api[interfaceName] = methods; | ||
this.api[unit] = methods; | ||
} | ||
} | ||
scaffold(iname, ver) { | ||
return (methodName) => | ||
scaffold(unit, ver) { | ||
return (method) => | ||
async (args = {}) => { | ||
const callId = ++this.callId; | ||
const interfaceName = ver ? `${iname}.${ver}` : iname; | ||
const target = interfaceName + '/' + methodName; | ||
const id = ++this.callId; | ||
const unitName = unit + (ver ? '.' + ver : ''); | ||
const target = unitName + '/' + method; | ||
if (this.opening) await this.opening; | ||
if (!this.connected) await this.open(); | ||
return new Promise((resolve, reject) => { | ||
setTimeout(() => { | ||
if (this.calls.has(callId)) { | ||
this.calls.delete(callId); | ||
const timeout = setTimeout(() => { | ||
if (this.calls.has(id)) { | ||
this.calls.delete(id); | ||
reject(new Error('Request timeout')); | ||
} | ||
}, this.callTimeout); | ||
this.calls.set(callId, [resolve, reject]); | ||
const packet = { call: callId, [target]: args }; | ||
this.calls.set(id, [resolve, reject, timeout]); | ||
const packet = { type: 'call', id, method: target, args }; | ||
this.send(JSON.stringify(packet)); | ||
@@ -231,3 +230,3 @@ }); | ||
setInterval(() => { | ||
this.ping = setInterval(() => { | ||
if (this.active) { | ||
@@ -253,2 +252,3 @@ const interval = new Date().getTime() - this.lastActivity; | ||
connections.delete(this); | ||
clearInterval(this.ping); | ||
if (!this.socket) return; | ||
@@ -297,2 +297,2 @@ this.socket.close(); | ||
export { Metacom, MetacomInterface }; | ||
export { Metacom, MetacomUnit }; |
import EventEmitter from './events.js'; | ||
const STREAM_ID_LENGTH = 4; | ||
const ID_LENGTH = 4; | ||
const createStreamIdBuffer = (num) => { | ||
const buffer = new ArrayBuffer(STREAM_ID_LENGTH); | ||
const view = new DataView(buffer); | ||
view.setInt32(0, num); | ||
return buffer; | ||
const chunkEncode = (id, payload) => { | ||
const chunk = new Uint8Array(ID_LENGTH + payload.length); | ||
const view = new DataView(chunk.buffer); | ||
view.setInt32(0, id); | ||
chunk.set(payload, ID_LENGTH); | ||
return chunk; | ||
}; | ||
const getStreamId = (buffer) => { | ||
const view = new DataView(buffer); | ||
return view.getInt32(0); | ||
const chunkDecode = (chunk) => { | ||
const view = new DataView(chunk.buffer); | ||
const id = view.getInt32(0); | ||
const payload = chunk.subarray(ID_LENGTH); | ||
return { id, payload }; | ||
}; | ||
class Chunk { | ||
static encode(streamId, payload) { | ||
const streamIdView = new Uint8Array(createStreamIdBuffer(streamId)); | ||
const chunkView = new Uint8Array(STREAM_ID_LENGTH + payload.length); | ||
chunkView.set(streamIdView); | ||
chunkView.set(payload, STREAM_ID_LENGTH); | ||
return chunkView; | ||
} | ||
static decode(chunkView) { | ||
const streamId = getStreamId(chunkView.buffer); | ||
const payload = chunkView.subarray(STREAM_ID_LENGTH); | ||
return { streamId, payload }; | ||
} | ||
} | ||
const PUSH_EVENT = Symbol(); | ||
@@ -39,11 +26,11 @@ const PULL_EVENT = Symbol(); | ||
class MetaReadable extends EventEmitter { | ||
constructor(initData, options = {}) { | ||
constructor(id, name, size, options = {}) { | ||
super(); | ||
this.streamId = initData.streamId; | ||
this.name = initData.name; | ||
this.size = initData.size; | ||
this.id = id; | ||
this.name = name; | ||
this.size = size; | ||
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK; | ||
this.queue = []; | ||
this.streaming = true; | ||
this.status = null; | ||
this.status = 'active'; | ||
this.bytesRead = 0; | ||
@@ -65,4 +52,5 @@ this.maxListenersCount = this.getMaxListeners() - 1; | ||
async finalize(writable) { | ||
const waitWritableEvent = this.waitEvent.bind(writable); | ||
writable.once('error', () => this.terminate()); | ||
const waitWritableEvent = EventEmitter.once.bind(writable); | ||
const onError = () => this.terminate(); | ||
writable.once('error', onError); | ||
for await (const chunk of this) { | ||
@@ -76,6 +64,7 @@ const needDrain = !writable.write(chunk); | ||
await this.close(); | ||
writable.removeListener('error', onError); | ||
} | ||
pipe(writable) { | ||
void this.finalize(writable); | ||
this.finalize(writable); | ||
return writable; | ||
@@ -142,4 +131,4 @@ } | ||
const chunk = await this.read(); | ||
if (chunk) yield chunk; | ||
else return; | ||
if (!chunk) return; | ||
yield chunk; | ||
} | ||
@@ -150,8 +139,8 @@ } | ||
class MetaWritable extends EventEmitter { | ||
constructor(transport, initData) { | ||
constructor(id, name, size, transport) { | ||
super(); | ||
this.id = id; | ||
this.name = name; | ||
this.size = size; | ||
this.transport = transport; | ||
this.streamId = initData.streamId; | ||
this.name = initData.name; | ||
this.size = initData.size; | ||
this.init(); | ||
@@ -161,7 +150,4 @@ } | ||
init() { | ||
const packet = { | ||
stream: this.streamId, | ||
name: this.name, | ||
size: this.size, | ||
}; | ||
const { id, name, size } = this; | ||
const packet = { type: 'stream', id, name, size }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -171,3 +157,3 @@ } | ||
write(data) { | ||
const chunk = Chunk.encode(this.streamId, data); | ||
const chunk = chunkEncode(this.id, data); | ||
this.transport.send(chunk); | ||
@@ -178,3 +164,3 @@ return true; | ||
end() { | ||
const packet = { stream: this.streamId, status: 'end' }; | ||
const packet = { type: 'stream', id: this.id, status: 'end' }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -184,3 +170,3 @@ } | ||
terminate() { | ||
const packet = { stream: this.streamId, status: 'terminate' }; | ||
const packet = { type: 'stream', id: this.id, status: 'terminate' }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -190,2 +176,2 @@ } | ||
export { Chunk, MetaReadable, MetaWritable }; | ||
export { chunkEncode, chunkDecode, MetaReadable, MetaWritable }; |
@@ -21,3 +21,3 @@ 'use strict'; | ||
class MetacomInterface extends EventEmitter { | ||
class MetacomUnit extends EventEmitter { | ||
emit(...args) { | ||
@@ -39,3 +39,2 @@ super.emit('*', ...args); | ||
this.streamId = 0; | ||
this.eventId = 0; | ||
this.active = false; | ||
@@ -48,2 +47,3 @@ this.connected = false; | ||
this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT; | ||
this.ping = null; | ||
this.open(); | ||
@@ -58,13 +58,13 @@ } | ||
getStream(streamId) { | ||
const stream = this.streams.get(streamId); | ||
getStream(id) { | ||
const stream = this.streams.get(id); | ||
if (stream) return stream; | ||
throw new Error(`Stream ${streamId} is not initialized`); | ||
throw new Error(`Stream ${id} is not initialized`); | ||
} | ||
createStream(name, size) { | ||
const streamId = ++this.streamId; | ||
const initData = { streamId, name, size }; | ||
const id = ++this.streamId; | ||
const packet = { type: 'stream', id, name, size }; | ||
const transport = this; | ||
return new MetaWritable(transport, initData); | ||
return new MetaWritable(transport, packet); | ||
} | ||
@@ -77,3 +77,3 @@ | ||
return { | ||
streamId: consumer.streamId, | ||
id: consumer.id, | ||
upload: async () => { | ||
@@ -99,11 +99,10 @@ const reader = blob.stream().getReader(); | ||
} | ||
const [callType, target] = Object.keys(packet); | ||
const callId = packet[callType]; | ||
const args = packet[target]; | ||
if (callId) { | ||
if (callType === 'callback') { | ||
const promised = this.calls.get(callId); | ||
const { type, id, method } = packet; | ||
if (id) { | ||
if (type === 'callback') { | ||
const promised = this.calls.get(id); | ||
if (!promised) return; | ||
const [resolve, reject] = promised; | ||
this.calls.delete(callId); | ||
const [resolve, reject, timeout] = promised; | ||
this.calls.delete(id); | ||
clearTimeout(timeout); | ||
if (packet.error) { | ||
@@ -113,10 +112,10 @@ reject(new MetacomError(packet.error)); | ||
} | ||
resolve(args); | ||
} else if (callType === 'event') { | ||
const [interfaceName, eventName] = target.split('/'); | ||
const metacomInterface = this.api[interfaceName]; | ||
metacomInterface.emit(eventName, args); | ||
} else if (callType === 'stream') { | ||
const { stream: streamId, name, size, status } = packet; | ||
const stream = this.streams.get(streamId); | ||
resolve(packet.result); | ||
} else if (type === 'event') { | ||
const [unit, name] = method.split('/'); | ||
const metacomUnit = this.api[unit]; | ||
if (metacomUnit) metacomUnit.emit(name, packet.data); | ||
} else if (type === 'stream') { | ||
const { name, size, status } = packet; | ||
const stream = this.streams.get(id); | ||
if (name && typeof name === 'string' && Number.isSafeInteger(size)) { | ||
@@ -126,14 +125,13 @@ if (stream) { | ||
} else { | ||
const streamData = { streamId, name, size }; | ||
const stream = new MetaReadable(streamData); | ||
this.streams.set(streamId, stream); | ||
const stream = new MetaReadable({ id, name, size }); | ||
this.streams.set(id, stream); | ||
} | ||
} else if (!stream) { | ||
console.error(new Error(`Stream ${streamId} is not initialized`)); | ||
console.error(new Error(`Stream ${id} is not initialized`)); | ||
} else if (status === 'end') { | ||
await stream.close(); | ||
this.streams.delete(streamId); | ||
this.streams.delete(id); | ||
} else if (status === 'terminate') { | ||
await stream.terminate(); | ||
this.streams.delete(streamId); | ||
this.streams.delete(id); | ||
} else { | ||
@@ -149,47 +147,47 @@ console.error(new Error('Stream packet structure error')); | ||
const byteView = new Uint8Array(buffer); | ||
const { streamId, payload } = Chunk.decode(byteView); | ||
const stream = this.streams.get(streamId); | ||
const { id, payload } = Chunk.decode(byteView); | ||
const stream = this.streams.get(id); | ||
if (stream) await stream.push(payload); | ||
else console.warn(`Stream ${streamId} is not initialized`); | ||
else console.warn(`Stream ${id} is not initialized`); | ||
} | ||
async load(...interfaces) { | ||
async load(...units) { | ||
const introspect = this.scaffold('system')('introspect'); | ||
const introspection = await introspect(interfaces); | ||
const introspection = await introspect(units); | ||
const available = Object.keys(introspection); | ||
for (const interfaceName of interfaces) { | ||
if (!available.includes(interfaceName)) continue; | ||
const methods = new MetacomInterface(); | ||
const iface = introspection[interfaceName]; | ||
const request = this.scaffold(interfaceName); | ||
const methodNames = Object.keys(iface); | ||
for (const unit of units) { | ||
if (!available.includes(unit)) continue; | ||
const methods = new MetacomUnit(); | ||
const instance = introspection[unit]; | ||
const request = this.scaffold(unit); | ||
const methodNames = Object.keys(instance); | ||
for (const methodName of methodNames) { | ||
methods[methodName] = request(methodName); | ||
} | ||
methods.on('*', (eventName, data) => { | ||
const target = `${interfaceName}/${eventName}`; | ||
const packet = { event: ++this.eventId, [target]: data }; | ||
methods.on('*', (event, data) => { | ||
const name = unit + '/' + event; | ||
const packet = { type: 'event', name, data }; | ||
this.send(JSON.stringify(packet)); | ||
}); | ||
this.api[interfaceName] = methods; | ||
this.api[unit] = methods; | ||
} | ||
} | ||
scaffold(iname, ver) { | ||
return (methodName) => | ||
scaffold(unit, ver) { | ||
return (method) => | ||
async (args = {}) => { | ||
const callId = ++this.callId; | ||
const interfaceName = ver ? `${iname}.${ver}` : iname; | ||
const target = interfaceName + '/' + methodName; | ||
const id = ++this.callId; | ||
const unitName = unit + (ver ? '.' + ver : ''); | ||
const target = unitName + '/' + method; | ||
if (this.opening) await this.opening; | ||
if (!this.connected) await this.open(); | ||
return new Promise((resolve, reject) => { | ||
setTimeout(() => { | ||
if (this.calls.has(callId)) { | ||
this.calls.delete(callId); | ||
const timeout = setTimeout(() => { | ||
if (this.calls.has(id)) { | ||
this.calls.delete(id); | ||
reject(new Error('Request timeout')); | ||
} | ||
}, this.callTimeout); | ||
this.calls.set(callId, [resolve, reject]); | ||
const packet = { call: callId, [target]: args }; | ||
this.calls.set(id, [resolve, reject, timeout]); | ||
const packet = { type: 'call', id, method: target, args }; | ||
this.send(JSON.stringify(packet)); | ||
@@ -229,3 +227,3 @@ }); | ||
setInterval(() => { | ||
this.ping = setInterval(() => { | ||
if (this.active) { | ||
@@ -251,2 +249,3 @@ const interval = new Date().getTime() - this.lastActivity; | ||
connections.delete(this); | ||
clearInterval(this.ping); | ||
if (!this.socket) return; | ||
@@ -295,2 +294,2 @@ this.socket.close(); | ||
module.exports = { Metacom, MetacomInterface }; | ||
module.exports = { Metacom, MetacomUnit }; |
@@ -65,4 +65,8 @@ 'use strict'; | ||
} | ||
static once(emitter, name) { | ||
return new Promise((resolve) => emitter.once(name, resolve)); | ||
} | ||
} | ||
module.exports = EventEmitter; |
@@ -5,24 +5,167 @@ 'use strict'; | ||
const https = require('node:https'); | ||
const { EventEmitter } = require('node:events'); | ||
const metautil = require('metautil'); | ||
const { Semaphore } = metautil; | ||
const ws = require('ws'); | ||
const transport = { | ||
http: require('./http.js'), | ||
ws: require('./ws.js'), | ||
}; | ||
const { HttpTransport, WsTransport, HEADERS } = require('./transport.js'); | ||
const { MetaReadable, MetaWritable, Chunk } = require('./streams.js'); | ||
const SHORT_TIMEOUT = 500; | ||
const EMPTY_PACKET = Buffer.from('{}'); | ||
const createProxy = (data, save) => | ||
new Proxy(data, { | ||
get: (data, key) => { | ||
return Reflect.get(data, key); | ||
}, | ||
set: (data, key, value) => { | ||
const res = Reflect.set(data, key, value); | ||
if (save) save(data); | ||
return res; | ||
}, | ||
}); | ||
class Session { | ||
constructor(token, data, server) { | ||
this.token = token; | ||
const { application, console } = server; | ||
this.state = createProxy(data, (data) => { | ||
application.auth.saveSession(token, data).catch((err) => { | ||
console.error(err); | ||
}); | ||
}); | ||
} | ||
} | ||
const sessions = new Map(); // token: Session | ||
class Context { | ||
constructor(client) { | ||
this.client = client; | ||
this.uuid = metautil.generateUUID(); | ||
this.state = {}; | ||
this.session = client?.session || null; | ||
} | ||
} | ||
class Client extends EventEmitter { | ||
#transport; | ||
#streamId; | ||
constructor(transport) { | ||
super(); | ||
this.#transport = transport; | ||
this.#streamId = 0; | ||
this.ip = transport.ip; | ||
this.session = null; | ||
transport.server.clients.add(this); | ||
transport.on('close', () => { | ||
this.destroy(); | ||
transport.server.clients.delete(this); | ||
}); | ||
} | ||
error(code, options) { | ||
this.#transport.error(code, options); | ||
} | ||
send(obj, code) { | ||
this.#transport.send(obj, code); | ||
} | ||
createContext() { | ||
return new Context(this); | ||
} | ||
emit(name, data) { | ||
if (name === 'close') { | ||
super.emit(name, data); | ||
return; | ||
} | ||
this.#transport.sendEvent(name, data); | ||
} | ||
sendEvent(name, data) { | ||
const packet = { type: 'event', name, data }; | ||
if (!this.connection) { | ||
throw new Error(`Can't send metacom event to http transport`); | ||
} | ||
this.send(packet); | ||
} | ||
getStream(id) { | ||
if (!this.#transport.connection) { | ||
throw new Error(`Can't receive stream from http transport`); | ||
} | ||
const stream = this.#transport.streams.get(id); | ||
if (stream) return stream; | ||
throw new Error(`Stream ${id} is not initialized`); | ||
} | ||
createStream(name, size) { | ||
if (!this.#transport.connection) { | ||
throw new Error(`Can't send metacom streams to http transport`); | ||
} | ||
if (!name) throw new Error('Stream name is not provided'); | ||
if (!size) throw new Error('Stream size is not provided'); | ||
const id = --this.#streamId; | ||
const packet = { id, name, size }; | ||
return new MetaWritable(this.#transport.connection, packet); | ||
} | ||
initializeSession(token, data = {}) { | ||
this.finalizeSession(); | ||
const session = new Session(token, data, this.#transport.server); | ||
sessions.set(token, session); | ||
return true; | ||
} | ||
finalizeSession() { | ||
if (!this.session) return false; | ||
sessions.delete(this.session.token); | ||
this.session = null; | ||
return true; | ||
} | ||
startSession(token, data = {}) { | ||
this.initializeSession(token, data); | ||
if (!this.#transport.connection) this.#transport.sendSessionCookie(token); | ||
return true; | ||
} | ||
restoreSession(token) { | ||
const session = sessions.get(token); | ||
if (!session) return false; | ||
this.session = session; | ||
return true; | ||
} | ||
close() { | ||
this.#transport.close(); | ||
} | ||
destroy() { | ||
this.emit('close'); | ||
if (!this.session) return; | ||
sessions.delete(this.session.token); | ||
} | ||
} | ||
const addHeaders = ({ origin }) => { | ||
if (origin) HEADERS['Access-Control-Allow-Origin'] = origin; | ||
}; | ||
class Server { | ||
constructor(options, application) { | ||
const { cors, queue } = options; | ||
constructor(application, options) { | ||
this.application = application; | ||
this.options = options; | ||
this.application = application; | ||
this.balancer = options.kind === 'balancer'; | ||
this.console = application.console; | ||
if (cors) transport.http.addHeaders(cors); | ||
if (options.cors) addHeaders(options.cors); | ||
const { queue } = options; | ||
const concurrency = queue.concurrency || options.concurrency; | ||
this.semaphore = new Semaphore(concurrency, queue.size, queue.timeout); | ||
this.server = null; | ||
this.ws = null; | ||
this.channels = new Set(); | ||
this.httpServer = null; | ||
this.wsServer = null; | ||
this.clients = new Set(); | ||
this.bind(); | ||
@@ -32,21 +175,42 @@ } | ||
bind() { | ||
const { options, application, console } = this; | ||
const { host, port, kind, protocol, timeouts, nagle = true } = options; | ||
const proto = protocol === 'http' || kind === 'balancer' ? http : https; | ||
const listener = this.listener.bind(this); | ||
this.server = proto.createServer({ ...application.cert }, listener); | ||
if (!nagle) { | ||
this.server.on('connection', (socket) => { | ||
socket.setNoDelay(true); | ||
}); | ||
} | ||
this.server.on('listening', () => { | ||
const { options, application, console, balancer } = this; | ||
const { host, port, protocol, timeouts, nagle = true } = options; | ||
const proto = protocol === 'http' || balancer ? http : https; | ||
const { cert } = application; | ||
this.httpServer = proto.createServer({ ...cert, noDelay: !nagle }); | ||
this.httpServer.on('listening', () => { | ||
console.info(`Listen port ${port}`); | ||
}); | ||
this.ws = new ws.Server({ server: this.server }); | ||
this.ws.on('connection', (connection, req) => { | ||
const channel = transport.ws.createChannel(this, req, connection); | ||
this.channels.add(channel); | ||
this.httpServer.on('request', async (req, res) => { | ||
const transport = new HttpTransport(this, req, res); | ||
if (!req.url.startsWith('/api')) { | ||
application.serveStatic(req.url, transport); | ||
return; | ||
} | ||
if (balancer) this.balancing(transport); | ||
if (req.method !== 'POST') this.error(403); | ||
if (res.writableEnded) return; | ||
const client = new Client(transport); | ||
const data = await metautil.receiveBody(req).catch(() => null); | ||
if (req.url === '/api') this.message(client, data); | ||
else this.request(client, transport, data); | ||
}); | ||
this.ws.on('error', (err) => { | ||
this.wsServer = new ws.Server({ server: this.httpServer }); | ||
this.wsServer.on('connection', (connection, req) => { | ||
const transport = new WsTransport(this, req, connection); | ||
const client = new Client(transport); | ||
connection.on('message', (data, isBinary) => { | ||
if (isBinary) this.binary(client, data); | ||
else this.message(client, data); | ||
}); | ||
}); | ||
this.wsServer.on('error', (err) => { | ||
if (err.code !== 'EADDRINUSE') return; | ||
@@ -58,82 +222,160 @@ console.warn(`Address in use: ${host}:${port}, retry...`); | ||
}); | ||
this.server.listen(port, host); | ||
this.httpServer.listen(port, host); | ||
} | ||
listener(req, res) { | ||
const { url } = req; | ||
const channel = transport.http.createChannel(this, req, res); | ||
this.channels.add(channel); | ||
if (this.options.kind === 'balancer') { | ||
const host = metautil.parseHost(req.headers.host); | ||
const port = metautil.sample(this.options.ports); | ||
const { protocol } = this.options; | ||
channel.redirect(`${protocol}://${host}:${port}/`); | ||
message(client, data) { | ||
if (Buffer.compare(EMPTY_PACKET, data) === 0) { | ||
client.send('{}'); | ||
return; | ||
} | ||
if (url.startsWith('/api')) this.request(channel); | ||
else this.application.serveStatic(channel); | ||
const packet = metautil.jsonParse(data) || {}; | ||
const { id, method, type } = packet; | ||
if (id && type === 'call' && method) { | ||
this.rpc(client, packet); | ||
} else if (id && type === 'stream') { | ||
this.stream(client, packet); | ||
} else { | ||
const error = new Error('Packet structure error'); | ||
client.error(500, { error, pass: true }); | ||
} | ||
} | ||
request(channel) { | ||
const { req } = channel; | ||
if (req.method === 'OPTIONS') { | ||
channel.options(); | ||
async rpc(client, packet) { | ||
const { id, method, args } = packet; | ||
const [unitName, methodName] = method.split('/'); | ||
const [unit, ver = '*'] = unitName.split('.'); | ||
const proc = this.application.getMethod(unit, ver, methodName); | ||
if (!proc) { | ||
client.error(404, { id }); | ||
return; | ||
} | ||
if (req.url === '/api' && req.method !== 'POST') { | ||
channel.error(403); | ||
const context = client.createContext(); | ||
if (!client.session && proc.access !== 'public') { | ||
client.error(403, { id }); | ||
return; | ||
} | ||
const body = metautil.receiveBody(req); | ||
if (req.url === '/api') { | ||
body.then((data) => { | ||
channel.message(data); | ||
}); | ||
} else { | ||
body.then((data) => { | ||
let args = null; | ||
if (data.length > 0) { | ||
args = metautil.jsonParse(data); | ||
if (!args) { | ||
const error = new Error('JSON parsing error'); | ||
channel.error(500, { error, pass: true }); | ||
return; | ||
} | ||
} | ||
const pathname = req.url.slice('/api/'.length); | ||
const [path, params] = metautil.split(pathname, '?'); | ||
if (!args) args = metautil.parseParams(params); | ||
const [interfaceName, methodName] = metautil.split(path, '/'); | ||
const { headers } = req; | ||
const hook = this.application.getHook(interfaceName); | ||
if (hook) channel.hook(hook, interfaceName, methodName, args, headers); | ||
else channel.rpc(-1, interfaceName, methodName, args, headers); | ||
}); | ||
try { | ||
await proc.enter(); | ||
} catch { | ||
client.error(503, { id }); | ||
return; | ||
} | ||
body.catch((error) => { | ||
channel.error(500, { error }); | ||
}); | ||
let result = null; | ||
try { | ||
result = await proc.invoke(context, args); | ||
} catch (error) { | ||
if (error.message === 'Timeout reached') { | ||
error.code = error.httpCode = 408; | ||
} | ||
client.error(error.code, { id, error }); | ||
return; | ||
} finally { | ||
proc.leave(); | ||
} | ||
if (result?.constructor?.name === 'Error') { | ||
const { code, httpCode = 200 } = result; | ||
client.error(code, { id, error: result, httpCode }); | ||
return; | ||
} | ||
client.send({ type: 'callback', id, result }); | ||
this.console.log(`${client.ip}\t${method}`); | ||
} | ||
closeChannels() { | ||
for (const channel of this.channels.values()) { | ||
if (channel.connection) { | ||
channel.connection.terminate(); | ||
async stream(client, packet) { | ||
const { id, name, size, status } = packet; | ||
const tag = id + '/' + name; | ||
try { | ||
const stream = client.streams.get(id); | ||
if (status) { | ||
if (!stream) throw new Error(`Stream ${tag} is not initialized`); | ||
if (status === 'end') await stream.close(); | ||
if (status === 'terminate') await stream.terminate(); | ||
client.streams.delete(id); | ||
return; | ||
} | ||
const valid = typeof name === 'string' && Number.isSafeInteger(size); | ||
if (!valid) throw new Error('Stream packet structure error'); | ||
if (stream) throw new Error(`Stream ${tag} is already initialized`); | ||
{ | ||
const stream = new MetaReadable({ id, name, size }); | ||
client.streams.set(id, stream); | ||
this.console.log(`${client.ip}\tstream ${tag} init`); | ||
} | ||
} catch (error) { | ||
this.console.error(`${client.ip}\tstream ${tag} error`); | ||
client.error(400, { id, error, pass: true }); | ||
} | ||
} | ||
binary(client, data) { | ||
const { id, payload } = Chunk.decode(data); | ||
try { | ||
const upstream = client.streams.get(id); | ||
if (upstream) { | ||
upstream.push(payload); | ||
} else { | ||
channel.error(503); | ||
channel.req.connection.destroy(); | ||
const error = new Error(`Stream ${id} is not initialized`); | ||
client.error(400, { id, error, pass: true }); | ||
} | ||
} catch (error) { | ||
this.console.error(`${client.ip}\tstream ${id} error`); | ||
client.error(400, { id: 0, error }); | ||
} | ||
} | ||
request(client, transport, data) { | ||
const { application } = this; | ||
const { headers, url, method: verb } = transport.req; | ||
const pathname = url.slice('/api/'.length); | ||
const [path, params] = metautil.split(pathname, '?'); | ||
const parameters = metautil.parseParams(params); | ||
const [unit, method] = metautil.split(path, '/'); | ||
const body = metautil.jsonParse(data) || {}; | ||
const args = { ...parameters, ...body }; | ||
const packet = { id: 0, method: unit + '/' + method, args }; | ||
const hook = application.getHook(unit); | ||
if (hook) this.hook(client, hook, packet, verb, headers); | ||
else this.rpc(client, packet); | ||
} | ||
async hook(client, proc, packet, verb, headers) { | ||
const { id, method, args } = packet; | ||
if (!proc) { | ||
client.error(404, { id }); | ||
return; | ||
} | ||
const context = client.createContext(); | ||
try { | ||
const par = { verb, method, args, headers }; | ||
const result = await proc.invoke(context, par); | ||
client.send(result); | ||
} catch (error) { | ||
client.error(500, { id, error }); | ||
} | ||
this.console.log(`${client.ip}\t${method}`); | ||
} | ||
balancing(transport) { | ||
const host = metautil.parseHost(transport.req.headers.host); | ||
const { protocol, port } = this.options; | ||
const targetPort = metautil.sample(port); | ||
transport.redirect(`${protocol}://${host}:${targetPort}/`); | ||
} | ||
closeClients() { | ||
for (const client of this.clients) { | ||
client.close(); | ||
} | ||
} | ||
async close() { | ||
this.server.close((err) => { | ||
this.httpServer.close((err) => { | ||
if (err) this.console.error(err); | ||
}); | ||
if (this.channels.size === 0) { | ||
if (this.clients.size === 0) return; | ||
this.closeClients(); | ||
while (this.clients.size > 0) { | ||
await metautil.delay(SHORT_TIMEOUT); | ||
return; | ||
} | ||
await metautil.delay(this.options.timeouts.stop); | ||
this.closeChannels(); | ||
} | ||
@@ -140,0 +382,0 @@ } |
@@ -6,32 +6,19 @@ 'use strict'; | ||
const STREAM_ID_LENGTH = 4; | ||
const ID_LENGTH = 4; | ||
const createStreamIdBuffer = (num) => { | ||
const buffer = new ArrayBuffer(STREAM_ID_LENGTH); | ||
const view = new DataView(buffer); | ||
view.setInt32(0, num); | ||
return buffer; | ||
const chunkEncode = (id, payload) => { | ||
const chunk = new Uint8Array(ID_LENGTH + payload.length); | ||
const view = new DataView(chunk.buffer); | ||
view.setInt32(0, id); | ||
chunk.set(payload, ID_LENGTH); | ||
return chunk; | ||
}; | ||
const getStreamId = (buffer) => { | ||
const view = new DataView(buffer); | ||
return view.getInt32(0); | ||
const chunkDecode = (chunk) => { | ||
const view = new DataView(chunk.buffer); | ||
const id = view.getInt32(0); | ||
const payload = chunk.subarray(ID_LENGTH); | ||
return { id, payload }; | ||
}; | ||
class Chunk { | ||
static encode(streamId, payload) { | ||
const streamIdView = new Uint8Array(createStreamIdBuffer(streamId)); | ||
const chunkView = new Uint8Array(STREAM_ID_LENGTH + payload.length); | ||
chunkView.set(streamIdView); | ||
chunkView.set(payload, STREAM_ID_LENGTH); | ||
return chunkView; | ||
} | ||
static decode(chunkView) { | ||
const streamId = getStreamId(chunkView.buffer); | ||
const payload = chunkView.subarray(STREAM_ID_LENGTH); | ||
return { streamId, payload }; | ||
} | ||
} | ||
const PUSH_EVENT = Symbol(); | ||
@@ -43,11 +30,11 @@ const PULL_EVENT = Symbol(); | ||
class MetaReadable extends EventEmitter { | ||
constructor(initData, options = {}) { | ||
constructor(id, name, size, options = {}) { | ||
super(); | ||
this.streamId = initData.streamId; | ||
this.name = initData.name; | ||
this.size = initData.size; | ||
this.id = id; | ||
this.name = name; | ||
this.size = size; | ||
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK; | ||
this.queue = []; | ||
this.streaming = true; | ||
this.status = null; | ||
this.status = 'active'; | ||
this.bytesRead = 0; | ||
@@ -60,3 +47,3 @@ this.maxListenersCount = this.getMaxListeners() - 1; | ||
this.checkStreamLimits(); | ||
await this.waitEvent(PULL_EVENT); | ||
await EventEmitter.once(this, PULL_EVENT); | ||
return this.push(data); | ||
@@ -70,4 +57,5 @@ } | ||
async finalize(writable) { | ||
const waitWritableEvent = this.waitEvent.bind(writable); | ||
writable.once('error', () => this.terminate()); | ||
const waitWritableEvent = EventEmitter.once.bind(writable); | ||
const onError = () => this.terminate(); | ||
writable.once('error', onError); | ||
for await (const chunk of this) { | ||
@@ -81,6 +69,7 @@ const needDrain = !writable.write(chunk); | ||
await this.close(); | ||
writable.removeListener('error', onError); | ||
} | ||
pipe(writable) { | ||
void this.finalize(writable); | ||
this.finalize(writable); | ||
return writable; | ||
@@ -109,3 +98,3 @@ } | ||
while (this.bytesRead !== this.size) { | ||
await this.waitEvent(PULL_EVENT); | ||
await EventEmitter.once(this, PULL_EVENT); | ||
} | ||
@@ -118,3 +107,3 @@ this.streaming = false; | ||
if (this.queue.length > 0) return this.pull(); | ||
const finisher = await this.waitEvent(PUSH_EVENT); | ||
const finisher = await EventEmitter.once(this, PUSH_EVENT); | ||
if (finisher === null) return null; | ||
@@ -142,11 +131,7 @@ return this.pull(); | ||
waitEvent(event) { | ||
return new Promise((resolve) => this.once(event, resolve)); | ||
} | ||
async *[Symbol.asyncIterator]() { | ||
while (this.streaming) { | ||
const chunk = await this.read(); | ||
if (chunk) yield chunk; | ||
else return; | ||
if (!chunk) return; | ||
yield chunk; | ||
} | ||
@@ -157,8 +142,8 @@ } | ||
class MetaWritable extends EventEmitter { | ||
constructor(transport, initData) { | ||
constructor(id, name, size, transport) { | ||
super(); | ||
this.id = id; | ||
this.name = name; | ||
this.size = size; | ||
this.transport = transport; | ||
this.streamId = initData.streamId; | ||
this.name = initData.name; | ||
this.size = initData.size; | ||
this.init(); | ||
@@ -168,7 +153,4 @@ } | ||
init() { | ||
const packet = { | ||
stream: this.streamId, | ||
name: this.name, | ||
size: this.size, | ||
}; | ||
const { id, name, size } = this; | ||
const packet = { type: 'stream', id, name, size }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -178,3 +160,3 @@ } | ||
write(data) { | ||
const chunk = Chunk.encode(this.streamId, data); | ||
const chunk = chunkEncode(this.id, data); | ||
this.transport.send(chunk); | ||
@@ -185,3 +167,3 @@ return true; | ||
end() { | ||
const packet = { stream: this.streamId, status: 'end' }; | ||
const packet = { type: 'stream', id: this.id, status: 'end' }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -191,3 +173,3 @@ } | ||
terminate() { | ||
const packet = { stream: this.streamId, status: 'terminate' }; | ||
const packet = { type: 'stream', id: this.id, status: 'terminate' }; | ||
this.transport.send(JSON.stringify(packet)); | ||
@@ -197,2 +179,2 @@ } | ||
module.exports = { Chunk, MetaReadable, MetaWritable }; | ||
module.exports = { chunkEncode, chunkDecode, MetaReadable, MetaWritable }; |
136
metacom.d.ts
@@ -12,5 +12,13 @@ import { EventEmitter } from 'node:events'; | ||
export class MetaReadable extends EventEmitter { | ||
streamId: number; | ||
id: number; | ||
name: string; | ||
size: number; | ||
constructor( | ||
id: number, | ||
name: string, | ||
size: number, | ||
options?: { | ||
highWaterMark?: number; | ||
}, | ||
); | ||
push(data: ArrayBufferView): Promise<ArrayBufferView>; | ||
@@ -23,5 +31,6 @@ finalize(writable: Writable): Promise<void>; | ||
export class MetaWritable extends EventEmitter { | ||
streamId: number; | ||
id: number; | ||
name: string; | ||
size: number; | ||
constructor(id: number, name: string, size: number, transport: Transport); | ||
write(data: ArrayBufferView): void; | ||
@@ -33,3 +42,3 @@ end(): void; | ||
export interface BlobUploader { | ||
streamId: number; | ||
id: number; | ||
upload(): Promise<void>; | ||
@@ -49,10 +58,10 @@ } | ||
httpCall( | ||
iname: string, | ||
unit: string, | ||
ver: string, | ||
): (methodName: string) => (args: object) => Promise<void>; | ||
socketCall( | ||
iname: string, | ||
unit: string, | ||
ver: string, | ||
): (methodName: string) => (args: object) => Promise<void>; | ||
getStream(streamId: number): MetaReadable; | ||
getStream(id: number): MetaReadable; | ||
createStream(name: string, size: number): MetaWritable; | ||
@@ -73,3 +82,3 @@ createBlobUploader(blob: Blob): BlobUploader; | ||
export interface ErrorOptions { | ||
callId?: number; | ||
id?: number; | ||
error?: Error; | ||
@@ -91,82 +100,73 @@ pass?: boolean; | ||
ip: string | undefined; | ||
callId: number; | ||
eventId: number; | ||
streamId: number; | ||
auth: Auth; | ||
events: { close: Array<Function> }; | ||
redirect(location: string): void; | ||
startSession(token: string, data: object): boolean; | ||
restoreSession(token: string): boolean; | ||
getStream(streamId: number): MetaReadable; | ||
createStream(name: string, size: number): MetaWritable; | ||
session: Session; | ||
} | ||
export class Channel { | ||
server: Server; | ||
auth: Auth; | ||
export class Transport { | ||
console: Console; | ||
req: ClientRequest; | ||
res: ServerResponse; | ||
res?: ServerResponse; | ||
connection?: WebSocket; | ||
ip: string; | ||
client: Client; | ||
session?: Session; | ||
eventId: number; | ||
streamId: number; | ||
streams: Map<number, MetaReadable>; | ||
token: string; | ||
constructor(application: object, req: ClientRequest, res: ServerResponse); | ||
message(data: string): void; | ||
binary(data: Buffer): void; | ||
handleRpcPacket(packet: object): void; | ||
handleStreamPacket(packet: object): Promise<void>; | ||
createContext(): Context; | ||
rpc( | ||
callId: number, | ||
interfaceName: string, | ||
methodName: string, | ||
args: [], | ||
): Promise<void>; | ||
hook( | ||
proc: object, | ||
interfaceName: string, | ||
methodName: string, | ||
args: Array<any>, | ||
): Promise<void>; | ||
constructor( | ||
console: Console, | ||
req: ClientRequest, | ||
target: ServerResponse | WebSocket, | ||
); | ||
error(code: number, errorOptions?: ErrorOptions): void; | ||
sendEvent(name: string, data: object): void; | ||
getStream(streamId: number): MetaWritable; | ||
createStream(name: string, size: number): MetaWritable; | ||
resumeCookieSession(): Promise<void>; | ||
destroy(): void; | ||
} | ||
export class HttpChannel extends Channel { | ||
write(data: any, httpCode?: number, ext?: string): void; | ||
write(data: string | Buffer, httpCode?: number, ext?: string): void; | ||
send(obj: object, httpCode?: number): void; | ||
redirect(location: string): void; | ||
options(): void; | ||
redirect?(location: string): void; | ||
options?(): void; | ||
getCookies?(): object; | ||
sendSessionCookie(token: string): void; | ||
removeSessionCookie(): void; | ||
close(): void; | ||
} | ||
export class WsChannel extends Channel { | ||
connection: WebSocket; | ||
constructor(application: object, req: ClientRequest, connection: WebSocket); | ||
write(data: any): void; | ||
send(obj: object): void; | ||
export interface CallPacket { | ||
type: 'call'; | ||
id: number; | ||
method: string; | ||
args: object; | ||
meta: object; | ||
} | ||
export interface StreamPacket { | ||
type: 'stream'; | ||
id: number; | ||
name: string; | ||
size: number; | ||
} | ||
export class Server { | ||
application: object; | ||
options: Options; | ||
application: object; | ||
balancer: boolean; | ||
console: Console; | ||
semaphore: Semaphore; | ||
server?: any; | ||
ws?: any; | ||
channels?: Map<Client, Channel>; | ||
httpServer: any; | ||
wsServer: any; | ||
clients: Set<Client>; | ||
constructor(options: Options, application: object); | ||
bind(): void; | ||
listener(req: ClientRequest, res: ServerResponse): void; | ||
request(channel: Channel): void; | ||
closeChannels(): void; | ||
message(client: Client, data: string): void; | ||
rpc(client: Client, packet: CallPacket): Promise<void>; | ||
binary(client: Client, data: Buffer): void; | ||
handleRpcPacket(client: Client, packet: CallPacket): void; | ||
handleStreamPacket(client: Client, packet: StreamPacket): Promise<void>; | ||
handleRequest( | ||
client: Client, | ||
transport: Transport, | ||
data: Buffer, | ||
application: object, | ||
): void; | ||
hook( | ||
client: Client, | ||
proc: object, | ||
packet: CallPacket, | ||
verb: string, | ||
headers: object, | ||
): Promise<void>; | ||
balancing(transport: Transport): void; | ||
closeClients(): void; | ||
close(): Promise<void>; | ||
@@ -173,0 +173,0 @@ } |
@@ -5,6 +5,3 @@ 'use strict'; | ||
const { Server } = require('./lib/server.js'); | ||
const { Channel } = require('./lib/channel.js'); | ||
module.exports.Metacom = Metacom; | ||
module.exports.Server = Server; | ||
module.exports.Channel = Channel; | ||
module.exports = { Metacom, Server }; |
{ | ||
"name": "metacom", | ||
"version": "3.0.0-alpha.8", | ||
"version": "3.0.0-alpha.9", | ||
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>", | ||
@@ -49,3 +49,3 @@ "description": "Communication protocol for Metarhia stack with rpc, events, binary streams, memory and db access", | ||
"engines": { | ||
"node": "^14.18 || 16 || 18 || 19" | ||
"node": "^14.18 || 16 || 18 || 19 || 20" | ||
}, | ||
@@ -52,0 +52,0 @@ "dependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
64346
15
1612