Socket
Socket
Sign inDemoInstall

linux-device

Package Overview
Dependencies
132
Maintainers
1
Versions
39
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.16 to 2.1.1

.github/workflows/release.yml

3

index.js
"use strict";
module.exports = require('./lib/Compat.js');
module.exports.startRemote = require('./remote/server.js');
module.exports.LDUtils = require('./lib/Utils');
module.exports.startRemote = require('./remote/server.js');

@@ -1,5 +0,5 @@

"use strict";
'use strict';
const {Duplex} = require('stream');
const {EventEmitter} = require('events');
const { Duplex } = require('stream');
const { EventEmitter } = require('events');
const util = require('util');

@@ -12,2 +12,3 @@ const _fs = require('fs');

const LDUtils = require('./Utils');

@@ -17,12 +18,11 @@ const { Pipe } = process.binding('pipe_wrap');

const fs = {
open: util.promisify(_fs.open),
close: util.promisify(_fs.close),
writeFile: util.promisify(_fs.writeFile),
fstat: util.promisify(_fs.fstat),
createReadStream: _fs.createReadStream,
createWriteStream: _fs.createWriteStream,
constants: _fs.constants,
open: util.promisify(_fs.open),
close: util.promisify(_fs.close),
writeFile: util.promisify(_fs.writeFile),
fstat: util.promisify(_fs.fstat),
createReadStream: _fs.createReadStream,
createWriteStream: _fs.createWriteStream,
constants: _fs.constants,
};
const kSource = Symbol('source');

@@ -33,16 +33,16 @@

const MODEM_BITS = {
"BIT_LE" : 0x001,
"BIT_DTR" : 0x002,
"BIT_RTS" : 0x004,
"BIT_ST" : 0x008,
"BIT_SR" : 0x010,
"BIT_CTS" : 0x020,
"BIT_CAR" : 0x040,
"BIT_RNG" : 0x080,
"BIT_DSR" : 0x100,
"BIT_CD" : 0x040,
"BIT_RI" : 0x080,
"BIT_OUT1" : 0x2000,
"BIT_OUT2" : 0x4000,
"BIT_LOOP" : 0x8000
'BIT_LE': 0x001,
'BIT_DTR': 0x002,
'BIT_RTS': 0x004,
'BIT_ST': 0x008,
'BIT_SR': 0x010,
'BIT_CTS': 0x020,
'BIT_CAR': 0x040,
'BIT_RNG': 0x080,
'BIT_DSR': 0x100,
'BIT_CD': 0x040,
'BIT_RI': 0x080,
'BIT_OUT1': 0x2000,
'BIT_OUT2': 0x4000,
'BIT_LOOP': 0x8000,
};

@@ -55,3 +55,2 @@

/**

@@ -73,282 +72,287 @@ * NodeJS Stream API

class DeviceHandle extends Duplex {
constructor(options) {
super(options);
options = options || {};
if(typeof options === 'string') options = {path: options};
const mode = options.mode || CONSTANTS.O_RDWR;
this[kSource] = {
path: options.path,
mode: mode,
readable: mode & CONSTANTS.O_RDONLY || mode & CONSTANTS.O_RDWR,
writable: mode & CONSTANTS.O_WRONLY || mode & CONSTANTS.O_RDWR,
parser: options.parser || undefined,
reading: 0,
forcedDataSize: options.absoluteSize,
}
if(this[kSource].readable) this[kSource].mode |= CONSTANTS.O_NONBLOCK;
if(options && options.autoOpen) this.open().catch(err => this.emit('error', err));
}
constructor(options) {
super(options);
options = options || {};
if (typeof options === 'string') options = { path: options };
const mode = options.mode || CONSTANTS.O_RDWR;
this[kSource] = {
path: options.path,
mode: mode,
readable: mode & CONSTANTS.O_RDONLY || mode & CONSTANTS.O_RDWR,
writable: mode & CONSTANTS.O_WRONLY || mode & CONSTANTS.O_RDWR,
parser: options.parser || undefined,
reading: 0,
forcedDataSize: options.absoluteSize,
};
if (this[kSource].readable) this[kSource].mode |= CONSTANTS.O_NONBLOCK;
if (options && options.autoOpen) this.open().catch(err => this.emit('error', err));
}
__onEnd(err) {
if(err) this.close().catch(e => {});
}
__onEnd(err) {
if (err) this.close().catch(e => {
});
}
__onError(err) {
if(err) this.close().catch(e => {});
this.emit('error', err);
}
__onError(err) {
if (err) this.close().catch(e => {
});
this.emit('error', err);
}
__pushSmart(res) {
let result = true;
if(res && this[kSource].parser) {
if(!this[kSource].emitter) {
this[kSource].emitter = new EventEmitter();
this[kSource].emitter.on('data', (data) => this.push(data));
}
this[kSource].parser(this[kSource].emitter, res);
} if(res && this[kSource].forcedDataSize) {
for(let i = 0; i < res.length; i+= this[kSource].forcedDataSize) {
result = this.push(res.slice(i, i+this[kSource].forcedDataSize));
}
} else {
result = this.push(res);
}
return result;
}
__pushSmart(res) {
let result = true;
if (res && this[kSource].parser) {
if (!this[kSource].emitter) {
this[kSource].emitter = new EventEmitter();
this[kSource].emitter.on('data', (data) => this.push(data));
}
this[kSource].parser(this[kSource].emitter, res);
}
if (res && this[kSource].forcedDataSize) {
for (let i = 0; i < res.length; i += this[kSource].forcedDataSize) {
result = this.push(res.slice(i, i + this[kSource].forcedDataSize));
}
} else {
result = this.push(res);
}
return result;
}
async _createStreams() {
if(tty.isatty(this.fd)) {
this[kSource].isTTY = true;
if(this[kSource].readable) {
this[kSource].inStream = new tty.ReadStream(this.fd);
}
//write uses writeRepeated instead of a stream
} else {
this[kSource].isTTY = false;
const fstat = await fs.fstat(this.fd);
if(fstat.isCharacterDevice() && this[kSource].mode & CONSTANTS.O_NONBLOCK) {
let handle = new Pipe(0);
handle.open(this.fd);
this[kSource].inStream = new net.Socket({
handle: handle,
readable: this[kSource].readable,
writable: false,
})
} else if(fstat.isFile() || fstat.isCharacterDevice() || fstat.isBlockDevice() ) {
//do nothing
this[kSource].useFSRead = true;
} else if(fstat.isSocket()) {
this[kSource].inStream = new net.Socket({
fd: this.fd,
readable: this[kSource].readable,
writable: false,
})
} else {
throw new Error("unknown_type");
}
}
async _createStreams() {
if (tty.isatty(this.fd)) {
this[kSource].isTTY = true;
if (this[kSource].readable) {
this[kSource].inStream = new tty.ReadStream(this.fd);
}
//write uses writeRepeated instead of a stream
} else {
this[kSource].isTTY = false;
const fstat = await fs.fstat(this.fd);
if(this[kSource].inStream) {
this[kSource].inStream.on('error', err => this.__onError(err));
this[kSource].inStream.on('end', err => this.__onEnd(err));
this[kSource].inStream.pause();
this[kSource].inStream.on('data', (data) => {
if(!this.__pushSmart(data)) {
this[kSource].inStream.pause();
}
});
if(this[kSource].resumeOnCreate){
delete this[kSource].resumeOnCreate;
this[kSource].inStream.resume();
}
} else {
if(this[kSource].resumeOnCreate){
delete this[kSource].resumeOnCreate;
this._read();
}
}
if (fstat.isCharacterDevice() && this[kSource].mode & CONSTANTS.O_NONBLOCK) {
let handle = new Pipe(0);
handle.open(this.fd);
this[kSource].inStream = new net.Socket({
handle: handle,
readable: this[kSource].readable,
writable: false,
});
} else if (fstat.isFile() || fstat.isCharacterDevice() || fstat.isBlockDevice()) {
//do nothing
this[kSource].useFSRead = true;
} else if (fstat.isSocket()) {
this[kSource].inStream = new net.Socket({
fd: this.fd,
readable: this[kSource].readable,
writable: false,
});
} else {
throw new Error('unknown_type');
}
}
/**
* Opens the device
* @returns {number} Device fd
*/
async open() {
if(this.fd) await this.close().catch(()=>{});
if(this[kSource].tainted) throw new Error('attempt to reuse closed DeviceHandle');
if(!this[kSource].opening) {
return this[kSource].opening = (async () => {
const res = await fs.open(this[kSource].path, this[kSource].mode);
this.fd = res;
this[kSource].opening = false;
await this._createStreams();
this.emit('open', res);
return res;
})();
} else {
return this[kSource].opening;
}
}
/**
* Closes the device
*/
async close() {
if(this[kSource].outStream) {
this[kSource].outStream.cork();
this[kSource].outStream.removeAllListeners();
delete this[kSource].outStream;
if (this[kSource].inStream) {
this[kSource].inStream.on('error', err => this.__onError(err));
this[kSource].inStream.on('end', err => this.__onEnd(err));
this[kSource].inStream.pause();
this[kSource].inStream.on('data', (data) => {
if (!this.__pushSmart(data)) {
this[kSource].inStream.pause();
}
if(this[kSource].inStream) {
this[kSource].inStream.removeAllListeners();
this[kSource].inStream.on('error', ()=>{});
this[kSource].inStream.destroy();
delete this[kSource].inStream;
}
if(!this.fd) return;
await fs.close(this.fd);
delete this.fd;
this.emit('close');
}
/**
* Checks if the device is open
* @returns {boolean} True when the device is open, false otherwise
*/
isOpen() {
return !!this.fd;
}
/**
* Performs an ioctl on the device
* @param {number} direction - Either constants.IOCTL_NONE, IOCTL_READ, IOCTL_WRITE, IOCTL_RW
* @param {number} type - The ioctl type
* @param {number} cmd - The ioctl command
* @param {Buffer} data - The ioctl data, the data may be changed by the ioctl
*/
async ioctl(direction, type, cmd, data) {
if(!this.fd) throw new Error('not_open');
await FH.ioctl(this.fd, direction, type, cmd, data||Buffer.alloc(0));
}
/**
* Performs a raw ioctl on the device
* @param {number} cmd - The ioctl command
* @param {Buffer} data - The ioctl data, the data may be changed by the ioctl
*/
async ioctlRaw(cmd, data) {
if(!this.fd) throw new Error('not_open');
await FH.ioctlRaw(this.fd, cmd, data||Buffer.alloc(0));
}
});
if (this[kSource].resumeOnCreate) {
delete this[kSource].resumeOnCreate;
this[kSource].inStream.resume();
}
} else {
if (this[kSource].resumeOnCreate) {
delete this[kSource].resumeOnCreate;
this._read();
}
}
}
async setModemBits(bits) {
var data = Buffer.allocUnsafe(4);
data.writeUInt32LE(bits, 0);
return this.ioctlRaw(IOCTL_TIOCMBIS, data);
}
/**
* Opens the device
* @returns {number} Device fd
*/
async open() {
if (this.fd) await this.close().catch(() => {
});
if (this[kSource].tainted) throw new Error('attempt to reuse closed DeviceHandle');
if (!this[kSource].opening) {
return this[kSource].opening = (async () => {
const res = await fs.open(this[kSource].path, this[kSource].mode);
this.fd = res;
this[kSource].opening = false;
await this._createStreams();
this.emit('open', res);
return res;
})();
} else {
return this[kSource].opening;
}
}
async clearModemBits(bits) {
var data = Buffer.allocUnsafe(4);
data.writeUInt32LE(bits, 0);
return this.ioctlRaw(IOCTL_TIOCMBIC, data);
};
async flush() {
console.log('[linux-device] Calling unimplemented flush method, invoking callback without doing anything.');
};
/**
* Alias for DeviceHandle.gpio
*/
async gpio(...args) {
return await this.constructor.gpio(...args);
}
/**
* Sets a GPIO output pin
* @param {number} gpio - The gpio number
* @param {Buffer} value - Either true or false
*/
static async gpio(gpio, value) {
await fs.writeFile( SYSFS_GPIO+'/export', ''+gpio).catch(e => {});
await fs.writeFile(SYSFS_GPIO+'/gpio'+gpio+'/direction', value ? 'high': 'low');
};
/**
* DeviceHandle constants
*/
static get constants() {
return CONSTANTS;
}
_write(chunk, encoding, callback) {
util.callbackify(this._writePromise).apply(this, arguments);
}
/**
* Closes the device
*/
async close() {
if (this[kSource].outStream) {
this[kSource].outStream.cork();
this[kSource].outStream.removeAllListeners();
delete this[kSource].outStream;
}
if (this[kSource].inStream) {
this[kSource].inStream.removeAllListeners();
this[kSource].inStream.on('error', () => {
});
this[kSource].inStream.destroy();
delete this[kSource].inStream;
}
if (!this.fd) return;
await fs.close(this.fd);
delete this.fd;
this.emit('close');
}
/**
* Checks if the device is open
* @returns {boolean} True when the device is open, false otherwise
*/
isOpen() {
return !!this.fd;
}
_allocNewPool(poolSize) {
this[kSource].pool = Buffer.allocUnsafe(poolSize);
this[kSource].pool.used = 0;
}
/**
* Performs an ioctl on the device
* @param {number} direction - Either constants.IOCTL_NONE, IOCTL_READ, IOCTL_WRITE, IOCTL_RW
* @param {number} type - The ioctl type
* @param {number} cmd - The ioctl command
* @param {Buffer} data - The ioctl data, the data may be changed by the ioctl
*/
async ioctl(direction, type, cmd, data) {
if (!this.fd) throw new Error('not_open');
await FH.ioctl(this.fd, direction, type, cmd, data || Buffer.alloc(0));
}
_readFS(size) {
// the actual read.
/**
* Performs a raw ioctl on the device
* @param {number} cmd - The ioctl command
* @param {Buffer} data - The ioctl data, the data may be changed by the ioctl
*/
async ioctlRaw(cmd, data) {
if (!this.fd) throw new Error('not_open');
await FH.ioctlRaw(this.fd, cmd, data || Buffer.alloc(0));
}
if (!this[kSource].pool || this[kSource].pool.length - this[kSource].pool.used < 512) {
// discard the old pool.
this._allocNewPool(2048);
}
async setModemBits(bits) {
var data = Buffer.allocUnsafe(4);
data.writeUInt32LE(bits, 0);
return this.ioctlRaw(IOCTL_TIOCMBIS, data);
}
// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
var thisPool = this[kSource].pool;
var toRead = Math.min(thisPool.length - thisPool.used, size||512);
var start = thisPool.used;
async clearModemBits(bits) {
var data = Buffer.allocUnsafe(4);
data.writeUInt32LE(bits, 0);
return this.ioctlRaw(IOCTL_TIOCMBIC, data);
};
_fs.read(this.fd, thisPool, start, toRead, null, (err, bytesRead) => {
if (err) {
return this.__onError(err);
}
async flush() {
console.log('[linux-device] Calling unimplemented flush method, invoking callback without doing anything.');
};
if (bytesRead > 0) {
let buffer = thisPool.slice(start, start+bytesRead);
if(thisPool.used == start+toRead) thisPool.used = start+bytesRead;
this.__pushSmart(buffer);
} else {
//setTimeout(() => this._readFS(), 1000);
}
});
thisPool.used += toRead;
}
/**
* Alias for DeviceHandle.gpio
*/
async gpio(...args) {
return await this.constructor.gpio(...args);
}
_read(size) {
if(this[kSource].inStream && this[kSource].inStream.isPaused()) {
this[kSource].inStream.resume();
} else if(this[kSource].useFSRead) {
this._readFS(size);
} else {
this[kSource].resumeOnCreate = true;
}
/**
* Sets a GPIO output pin
* @param {number} gpio - The gpio number
* @param {Buffer} value - Either true or false
*/
static async gpio(gpio, value) {
await fs.writeFile(SYSFS_GPIO + '/export', '' + gpio).catch(e => {
});
await fs.writeFile(SYSFS_GPIO + '/gpio' + gpio + '/direction', value ? 'high' : 'low');
};
/**
* DeviceHandle constants
*/
static get constants() {
return CONSTANTS;
}
_write(chunk, encoding, callback) {
util.callbackify(this._writePromise).apply(this, arguments);
}
_allocNewPool(poolSize) {
this[kSource].pool = Buffer.allocUnsafe(poolSize);
this[kSource].pool.used = 0;
}
_readFS(size) {
// the actual read.
if (!this[kSource].pool || this[kSource].pool.length - this[kSource].pool.used < 512) {
// discard the old pool.
this._allocNewPool(2048);
}
async _writePromise(chunk, encoding) {
if(!this.fd) throw new Error('not_open');
if(!this[kSource].writable) throw new Error('not_writable');
return await FH.writeRepeated(this.fd, chunk, chunk.interval || 0, chunk.repetitions || 1); //retain chunks
}
_destroy(error, callback) {
if(!this.fd) return;
util.callbackify(this.close).call(this, (err) => {
callback(err||error);
});
}
// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
var thisPool = this[kSource].pool;
var toRead = Math.min(thisPool.length - thisPool.used, size || 512);
var start = thisPool.used;
_fs.read(this.fd, thisPool, start, toRead, null, (err, bytesRead) => {
if (err) {
return this.__onError(err);
}
if (bytesRead > 0) {
let buffer = thisPool.slice(start, start + bytesRead);
if (thisPool.used == start + toRead) thisPool.used = start + bytesRead;
this.__pushSmart(buffer);
} else {
//setTimeout(() => this._readFS(), 1000);
}
});
thisPool.used += toRead;
}
_read(size) {
if (this[kSource].inStream && this[kSource].inStream.isPaused()) {
this[kSource].inStream.resume();
} else if (this[kSource].useFSRead) {
this._readFS(size);
} else {
this[kSource].resumeOnCreate = true;
}
}
async _writePromise(chunk, encoding) {
if (!this.fd) throw new Error('not_open');
if (!this[kSource].writable) throw new Error('not_writable');
// Check if a custom header is available which specifies the interval and repetitions of this command. Encoding happens in node-homey-infrared/index.js.
const { buffer, interval, repetitions } = LDUtils.decodeWriteBuffer(chunk);
return await FH.writeRepeated(this.fd, buffer, interval, repetitions); //retain chunks
}
_destroy(error, callback) {
if (!this.fd) return;
util.callbackify(this.close).call(this, (err) => {
callback(err || error);
});
}
}
module.exports = DeviceHandle;
{
"name": "linux-device",
"version": "2.0.16",
"version": "2.1.1",
"description": "Native addon to communicate with linux devices (can also be used for sockets or FIFOs)",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -1,4 +0,4 @@

"use strict";
'use strict';
const {Duplex} = require('stream');
const { Duplex } = require('stream');
const fs = require('fs');

@@ -10,2 +10,4 @@ const util = require('util');

const LDUtils = require('../lib/Utils');
let server;

@@ -19,77 +21,88 @@ let handles = {};

function makeRemoteFunc(...meta) {
return async function(...args) {
server = await server;
try {
if(this._currentCall) await this._currentCall;
} catch(e) {}
const work = new Promise((resolve, reject) => {
function done(err, res) {
//console.log('done', ...meta, err, res);
if(err) return reject(err);
resolve(res);
}
//console.log('executing', ...meta, ...args);
args.push(done);
if(this && this._handle)
server.emit(...meta, this._handle, ...args);
else
server.emit(...meta, ...args);
});
this._currentCall = work;
try {
await work;
} catch(e) {}
if(this._currentCall === work) this._currentCall = null;
return work;
};
return async function (...args) {
server = await server;
try {
if (this._currentCall) await this._currentCall;
}
catch (e) {
}
const work = new Promise((resolve, reject) => {
function done(err, res) {
//console.log('done', ...meta, err, res);
if (err) return reject(err);
resolve(res);
}
//console.log('executing', ...meta, ...args);
args.push(done);
if (this && this._handle)
server.emit(...meta, this._handle, ...args);
else
server.emit(...meta, ...args);
});
this._currentCall = work;
try {
await work;
}
catch (e) {
}
if (this._currentCall === work) this._currentCall = null;
return work;
};
}
class DeviceHandleProxy extends Duplex {
constructor(options) {
super(options);
this._handle = ++i;
handles[i] = this;
this.__init(options);
}
isOpen() {
return !!this.fd;
}
_write(chunk, encoding, callback) {
this._writeChunk(chunk, {gap: chunk.gap, repetitions: chunk.repetitions})
.then(res => callback(null, res)).catch(err => callback(err));
}
_writePromise(chunk, encoding) {
return this._writeChunk(chunk, {gap: chunk.gap, repetitions: chunk.repetitions});
}
_read(size) {
this._startRead(size);
}
_destroy(error, callback) {
this.close()
.then(res => callback(null, res)).catch(err => callback(err));
if(error) this.emit('error', error);
}
_handle_open(fd) {
this.fd = fd;
this.emit('open', fd);
}
_handle_close() {
delete this.fd;
this.emit('close');
}
_handle_data(data) {
this.push(Buffer.from(data))
}
static get constants() {
return CONSTANTS;
}
constructor(options) {
super(options);
this._handle = ++i;
handles[i] = this;
this.__init(options);
}
isOpen() {
return !!this.fd;
}
_write(chunk, encoding, callback) {
// Check if a custom header is available which specifies the interval and repetitions of this command. Encoding happens in node-homey-infrared/index.js.
const { buffer, interval, repetitions } = LDUtils.decodeWriteBuffer(chunk);
this._writeChunk(buffer, { interval, repetitions })
.then(res => callback(null, res)).catch(err => callback(err));
}
_writePromise(chunk, encoding) {
// Check if a custom header is available which specifies the interval and repetitions of this command. Encoding happens in node-homey-infrared/index.js.
const { buffer, interval, repetitions } = LDUtils.decodeWriteBuffer(chunk);
return this._writeChunk(buffer, { interval, repetitions });
}
_read(size) {
this._startRead(size);
}
_destroy(error, callback) {
this.close()
.then(res => callback(null, res)).catch(err => callback(err));
if (error) this.emit('error', error);
}
_handle_open(fd) {
this.fd = fd;
this.emit('open', fd);
}
_handle_close() {
delete this.fd;
this.emit('close');
}
_handle_data(data) {
this.push(Buffer.from(data));
}
static get constants() {
return CONSTANTS;
}
}

@@ -101,39 +114,38 @@

functions.forEach((func) => {
DeviceHandleProxy.prototype[func] = makeRemoteFunc('function', func);
DeviceHandleProxy.prototype[func] = makeRemoteFunc('function', func);
});
metadata.static_functions.forEach((func) => {
DeviceHandleProxy[func] = makeRemoteFunc('static_func', func);
DeviceHandleProxy[func] = makeRemoteFunc('static_func', func);
});
module.exports = function (url) {
server = new Promise((resolve, reject) => {
let serv = io.connect(url, { transports: ['websocket'] });
module.exports = function(url) {
server = new Promise((resolve, reject) => {
let serv = io.connect(url, {transports: ['websocket']});
serv.on('connect', () => resolve(serv));
serv.on('constants', (constants) => Object.assign(CONSTANTS, constants));
serv.on('server_error', (error) => {
console.error('[linux-device] An remote error occured:', error);
});
serv.on('connect', () => console.log('[linux-device] REMOTE DEVICE SUPPORT ENABLED, CONNECTED TO:', url));
serv.on('handle_event', (handle, event, ...args) => {
if(!handles[handle]) return;
if(handles[handle]['_handle_'+event])
handles[handle]['_handle_'+event](...args);
else
handles[handle].emit(event, ...args);
});
serv.on('disconnect', () => {
Object.keys(handles).forEach((handle) => {
handles[handle].destroy(new Error('Remote connection unexpectedly closed.'));
});
});
});
serv.on('connect', () => resolve(serv));
serv.on('constants', (constants) => Object.assign(CONSTANTS, constants));
return DeviceHandleProxy;
serv.on('server_error', (error) => {
console.error('[linux-device] An remote error occured:', error);
});
serv.on('connect', () => console.log('[linux-device] REMOTE DEVICE SUPPORT ENABLED, CONNECTED TO:', url));
serv.on('handle_event', (handle, event, ...args) => {
if (!handles[handle]) return;
if (handles[handle]['_handle_' + event])
handles[handle]['_handle_' + event](...args);
else
handles[handle].emit(event, ...args);
});
serv.on('disconnect', () => {
Object.keys(handles).forEach((handle) => {
handles[handle].destroy(new Error('Remote connection unexpectedly closed.'));
});
});
});
return DeviceHandleProxy;
};

@@ -1,2 +0,2 @@

"use strict";
'use strict';

@@ -7,115 +7,120 @@ const util = require('util');

class Handler {
constructor(handle, socket, options) {
this.socket = socket;
this._handle = handle;
this._rc = 0;
this.handle = new DeviceHandle(options);
this.__bindEvents();
}
__destruct() {
this.handle.close();
}
__bindEvents() {
const that = this;
this.handle.emit = function(...args) {
if(args[0] === 'error') args[1] = args[1] && args[1].stack ? args[1].stack : args[1];
that.socket.emit('handle_event', that._handle, ...args);
return this.constructor.prototype.emit.apply(this, args);
}
this.handle.on('error', ()=>{});
}
async _handleFunction(func, ...args) {
if(this._currentFunc) {
try {
await this._currentFunc;
} catch(e) {}
}
var res;
if(this[func]) res = this[func](...args);
else res = this.handle[func](...args);
this._currentFunc = res;
try {
await res;
} catch(e) {}
if(res === this._currentFunc) this._currentFunc = null;
return res;
}
_writeChunk(chunk, opts) {
Object.assign(chunk, opts);
return this.handle._writePromise(chunk);
}
_startRead(size) {
this._rc++;
this._rdListener = this._rdListener || (() => {
this._rc--;
if(this._rc < 0)
process.nextTick(() => {
this._rc = 0;
this.handle.removeListener('data', this._rdListener);
this.handle.pause();
},1000);
});
if(this.handle.listeners('data') <= 0)
this.handle.on('data', this._rdListener);
this.handle.resume();
}
constructor(handle, socket, options) {
this.socket = socket;
this._handle = handle;
this._rc = 0;
this.handle = new DeviceHandle(options);
this.__bindEvents();
}
__destruct() {
this.handle.close();
}
__bindEvents() {
const that = this;
this.handle.emit = function (...args) {
if (args[0] === 'error') args[1] = args[1] && args[1].stack ? args[1].stack : args[1];
that.socket.emit('handle_event', that._handle, ...args);
return this.constructor.prototype.emit.apply(this, args);
};
this.handle.on('error', () => {
});
}
async _handleFunction(func, ...args) {
if (this._currentFunc) {
try {
await this._currentFunc;
}
catch (e) {
}
}
var res;
if (this[func]) res = this[func](...args);
else res = this.handle[func](...args);
this._currentFunc = res;
try {
await res;
}
catch (e) {
}
if (res === this._currentFunc) this._currentFunc = null;
return res;
}
_writeChunk(chunk, opts) {
Object.assign(chunk, opts);
return this.handle._writePromise(chunk);
}
_startRead(size) {
this._rc++;
this._rdListener = this._rdListener || (() => {
this._rc--;
if (this._rc < 0)
process.nextTick(() => {
this._rc = 0;
this.handle.removeListener('data', this._rdListener);
this.handle.pause();
}, 1000);
});
if (this.handle.listeners('data') <= 0)
this.handle.on('data', this._rdListener);
this.handle.resume();
}
}
module.exports = function (port) {
port = port || 8081;
const io = require('socket.io')(port);
const metadata = require('./metadata.json');
const emitter = new (require('events').EventEmitter)();
module.exports = function(port) {
port = port || 8081;
const io = require('socket.io')(port);
const metadata = require('./metadata.json');
const emitter = new (require('events').EventEmitter)();
console.log('Started listening at port', port);
io.on('connection', function (socket) {
let connections = {};
console.log('Incoming connection.');
socket.emit('constants', DeviceHandle.constants);
socket.on('create_handle', function(handle, options, cb) {
console.log('Creating handle:', handle, options);
if(connections[handle]) {
connections[handle].__destruct();
delete connections[handle];
}
connections[handle] = new Handler(handle, socket, options);
cb();
});
async function handleFunction(func, handle, ...args) {
const cb = args.pop();
if(!connections[handle]) return cb();
return connections[handle]._handleFunction(func, ...args)
.then(res => cb(null, res)).catch(err => cb(err.stack));
}
async function handleStaticFunction(func, ...args) {
const cb = args.pop();
return DeviceHandle[func](...args)
.then(res => cb(null, res)).catch(err => cb(err.stack));
}
socket.on('function', handleFunction);
socket.on('static_func', handleStaticFunction)
socket.on('disconnect', function () {
//destruct all
Object.keys(connections).forEach((handle) => { connections[handle].__destruct(); });
emitter.emit('disconnect');
});
emitter.emit('connect');
});
return emitter;
};
console.log('Started listening at port', port);
io.on('connection', function (socket) {
let connections = {};
console.log('Incoming connection.');
socket.emit('constants', DeviceHandle.constants);
socket.on('create_handle', function (handle, options, cb) {
console.log('Creating handle:', handle, options);
if (connections[handle]) {
connections[handle].__destruct();
delete connections[handle];
}
connections[handle] = new Handler(handle, socket, options);
cb();
});
async function handleFunction(func, handle, ...args) {
const cb = args.pop();
if (!connections[handle]) return cb();
return connections[handle]._handleFunction(func, ...args)
.then(res => cb(null, res)).catch(err => cb(err.stack));
}
async function handleStaticFunction(func, ...args) {
const cb = args.pop();
return DeviceHandle[func](...args)
.then(res => cb(null, res)).catch(err => cb(err.stack));
}
socket.on('function', handleFunction);
socket.on('static_func', handleStaticFunction);
socket.on('disconnect', function () {
//destruct all
Object.keys(connections).forEach((handle) => {
connections[handle].__destruct();
});
emitter.emit('disconnect');
});
emitter.emit('connect');
});
return emitter;
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc