Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

impress

Package Overview
Dependencies
Maintainers
4
Versions
719
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

impress - npm Package Compare versions

Comparing version 2.4.3 to 2.5.0

9

CHANGELOG.md

@@ -5,2 +5,8 @@ # Changelog

## [2.5.0][] - 2021-07-22
- Rework scheduler, use `every` syntax
- Run scheduler in separate thread
- Use `Semaphore` to organize task queue in scheduler thread
## [2.4.3][] - 2021-07-10

@@ -180,3 +186,4 @@

[unreleased]: https://github.com/metarhia/impress/compare/v2.4.3...HEAD
[unreleased]: https://github.com/metarhia/impress/compare/v2.5.0...HEAD
[2.5.0]: https://github.com/metarhia/impress/compare/v2.4.3...v2.5.0
[2.4.3]: https://github.com/metarhia/impress/compare/v2.4.2...v2.4.3

@@ -183,0 +190,0 @@ [2.4.2]: https://github.com/metarhia/impress/compare/v2.4.1...v2.4.2

@@ -53,3 +53,6 @@ 'use strict';

const { balancer, ports = [], workers = {} } = config.server;
const count = ports.length + (balancer ? 1 : 0) + (workers.pool || 0);
const serversCount = ports.length + (balancer ? 1 : 0);
const schedulerCount = 1;
const schedulerId = serversCount;
const count = serversCount + schedulerCount + (workers.pool || 0);
let startTimer = null;

@@ -66,2 +69,3 @@ let active = 0;

let scheduler = null;
const start = (id) => {

@@ -71,2 +75,3 @@ const workerPath = path.join(__dirname, 'lib/worker.js');

threads[id] = worker;
if (id === schedulerId) scheduler = worker;

@@ -87,3 +92,6 @@ worker.on('exit', (code) => {

worker.on('message', (data) => {
if (data.type === 'event' && data.name === 'started') active++;
if (data.type === 'event') {
if (data.name === 'started') active++;
if (data.name.startsWith('task:')) scheduler.postMessage(data);
}
if (active === count && startTimer) {

@@ -90,0 +98,0 @@ clearTimeout(startTimer);

11

lib/application.js

@@ -11,2 +11,3 @@ 'use strict';

const { Schemas } = require('./schemas.js');
const { Scheduler } = require('./scheduler.js');
const auth = require('./auth.js');

@@ -26,2 +27,3 @@

super();
this.kind = '';
this.initialization = true;

@@ -38,2 +40,3 @@ this.finalization = false;

this.domain = new Modules('domain', this);
this.scheduler = new Scheduler(this);

@@ -55,3 +58,4 @@ this.starts = [];

async init() {
async init(kind) {
this.kind = kind;
this.startWatch();

@@ -70,2 +74,3 @@ this.createSandbox();

await this.api.load();
if (kind === 'scheduler') await this.scheduler.load();
const { api } = this.sandbox;

@@ -103,7 +108,7 @@ if (api.auth) {

createSandbox() {
const { config, console, resources, schemas } = this;
const { config, console, resources, schemas, scheduler } = this;
const { server: { host, port, protocol } = {} } = this;
const worker = { id: 'W' + node.worker.threadId.toString() };
const server = { host, port, protocol };
const application = { worker, server, resources, schemas };
const application = { worker, server, resources, schemas, scheduler };
application.introspect = async (interfaces) => this.introspect(interfaces);

@@ -110,0 +115,0 @@ const sandbox = { ...SANDBOX, console, application, config };

'use strict';
const { metarhia } = require('./dependencies.js');
const { node, metarhia } = require('./dependencies.js');
const { fsp, path, worker } = node;
const { metautil } = metarhia;
const findHandler = (sandbox, name) => {
const [key, rest] = metautil.split(name, '.');
const element = sandbox[key];
if (element) {
if (rest === '') return element;
return findHandler(element, rest);
}
return null;
};
class Scheduler {
constructor() {
constructor(application) {
this.application = application;
this.path = application.absolute('tasks');
this.tasks = new Map();
this.nextId = 0;
this.executor = false;
this.semaphore = null;
}
setTask(taskName, task) {
this.stopTask(taskName);
this.tasks.set(taskName, task);
task.name = taskName;
task.success = undefined;
task.error = null;
task.lastStart = 0;
task.lastEnd = 0;
task.executing = false;
task.active = false;
task.count = 0;
this.startTask(taskName);
}
async load() {
this.executor = true;
const config = this.application.config.server.scheduler;
const { concurrency, size, timeout } = config;
this.semaphore = new metautil.Semaphore(concurrency, size, timeout);
// Start Task
// taskName <string>
startTask(taskName) {
const task = this.tasks.get(taskName);
if (task && !task.active) {
task.active = true;
task.interval = metarhia.metautil.duration(task.interval);
task.timer = setInterval(() => {
if (!task.executing) {
task.lastStart = Date.now();
task.executing = true;
task.run(task, (taskResult) => {
task.error = taskResult;
task.success = taskResult === null;
task.lastEnd = Date.now();
task.executing = false;
task.count++;
});
worker.parentPort.on('message', async (data) => {
if (data.type !== 'event') return;
if (data.name === 'task:add') this.add(data.task);
else if (data.name === 'task:remove') this.remove(data.task.id);
else if (data.name === 'task:stop') this.stop(data.task.name);
});
const now = metautil.nowDate();
try {
const files = await fsp.readdir(this.path, { withFileTypes: true });
for (const file of files) {
if (file.isDirectory()) continue;
const { name } = file;
if (!name.endsWith('.json') || name.startsWith('.')) continue;
const base = path.basename(name, '.json');
const [date, id] = metautil.split(base, '-id-');
if (date === now) {
const nextId = parseInt(id);
if (nextId > this.nextId) this.nextId = nextId;
}
}, task.interval);
const filePath = path.join(this.path, name);
const data = await fsp.readFile(filePath, 'utf8');
this.restore(JSON.parse(data));
}
} catch (err) {
this.application.console.error(err.stack);
}
}
// Stop Task
// taskName <string>
stopTask(taskName) {
const task = this.tasks.get(taskName);
restore(record) {
const { id, name, every, args, run } = record;
const task = {
id,
name,
every: metautil.parseEvery(every),
success: undefined,
result: null,
error: null,
lastStart: 0,
lastEnd: 0,
executing: false,
runCount: 0,
timer: null,
args,
run,
handler: findHandler(this.application.sandbox, run),
};
this.tasks.set(id, task);
return this.start(id);
}
async add(record) {
if (!this.executor) {
const msg = { type: 'event', name: 'task:add', task: record };
worker.parentPort.postMessage(msg);
return;
}
const id = metautil.nowDate() + '-id-' + this.nextId.toString();
this.nextId++;
const task = { id, ...record };
const started = this.restore(task);
if (!started) return;
const filePath = path.join(this.path, id + '.json');
try {
const data = JSON.stringify(task);
await fsp.writeFile(filePath, data);
} catch (err) {
this.application.console.error(err.stack);
}
}
async remove(id) {
if (!this.executor) {
const msg = { type: 'event', name: 'task:remove', task: { id } };
worker.parentPort.postMessage(msg);
return;
}
const task = this.tasks.get(id);
if (task) {
if (task.timer) clearInterval(task.timer);
this.tasks.delete(taskName);
this.tasks.delete(id);
if (task.timer) {
clearTimeout(task.timer);
task.timer = null;
}
}
const filePath = path.join(this.path, id + '.json');
try {
await fsp.unlink(filePath);
} catch (err) {
if (err.code !== 'ENOENT') {
this.application.console.error(err.stack);
}
}
}
stopTasks() {
start(id) {
const task = this.tasks.get(id);
if (!task || task.timer) return false;
const next = metautil.nextEvent(task.every);
if (next === -1) {
this.remove(id);
return false;
}
if (next === 0) {
this.execute(task, true);
return true;
}
task.timer = setTimeout(() => {
const once = task.every.interval === 0;
this.execute(task, once);
}, next);
return true;
}
async execute(task, once = false) {
if (task.executing) {
this.fail(task, 'Already started task');
return;
}
task.lastStart = Date.now();
task.executing = true;
try {
await this.semaphore.enter();
task.result = await task.handler(task.args);
} catch (err) {
task.error = err;
if (err.message === 'Semaphore timeout') {
this.fail(task, 'Scheduler queue is full');
} else {
this.application.console.error(err.stack);
}
} finally {
this.semaphore.leave();
}
task.success = !task.error;
task.lastEnd = Date.now();
task.executing = false;
task.runCount++;
task.timer = null;
if (once) this.remove(task.id);
else this.start(task.id);
}
fail(task, reason) {
const { id, name, run, args } = task;
const target = `${name} (${id}) ${run}(${JSON.stringify(args)})`;
const msg = `${reason}, can't execute: ${target}`;
this.application.console.error(msg);
}
stop(name = '') {
if (!this.executor) {
const msg = { type: 'event', name: 'task:stop', task: { name } };
worker.parentPort.postMessage(msg);
return;
}
for (const task of this.tasks.values()) {
if (task.timer) clearInterval(task.timer);
this.tasks.delete(task.name);
if (name !== '' && name !== task.name) continue;
this.remove(task.id);
}

@@ -65,2 +196,2 @@ }

module.exports = Scheduler;
module.exports = { Scheduler };

@@ -56,7 +56,12 @@ 'use strict';

const servingThreads = ports.length + (balancer ? 1 : 0);
if (threadId <= servingThreads) {
let kind = 'worker';
if (threadId <= servingThreads) kind = 'server';
if (threadId === servingThreads + 1) kind = 'scheduler';
if (kind === 'server') {
application.server = new Server(config.server, application);
}
await application.init();
await application.init(kind);
console.info(`Application started in worker ${threadId}`);

@@ -63,0 +68,0 @@ worker.parentPort.postMessage({ type: 'event', name: 'started' });

{
"name": "impress",
"version": "2.4.3",
"version": "2.5.0",
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>",

@@ -71,12 +71,12 @@ "description": "Enterprise application server for Node.js",

"metaconfiguration": "^2.1.4",
"metalog": "^3.1.2",
"metaschema": "^1.3.2",
"metautil": "^3.5.5",
"metavm": "^1.0.2",
"metawatch": "^1.0.3"
"metalog": "^3.1.3",
"metaschema": "^1.3.3",
"metautil": "^3.5.8",
"metavm": "^1.0.3",
"metawatch": "^1.0.4"
},
"devDependencies": {
"@types/node": "^16.3.1",
"@types/node": "^16.3.3",
"@types/ws": "^7.4.6",
"eslint": "^7.30.0",
"eslint": "^7.31.0",
"eslint-config-metarhia": "^7.0.1",

@@ -83,0 +83,0 @@ "eslint-config-prettier": "^8.3.0",

@@ -19,2 +19,7 @@ ({

},
scheduler: {
concurrency: 'number',
size: 'number',
timeout: 'number',
},
workers: {

@@ -21,0 +26,0 @@ pool: 'number',

@@ -35,2 +35,7 @@ export interface LogConfig {

};
scheduler: {
concurrency: number;
size: number;
timeout: number;
};
workers: {

@@ -37,0 +42,0 @@ pool: number;

@@ -0,1 +1,14 @@

export interface Task {
name: string;
every: string;
args: object;
run: string;
}
export interface Scheduler {
add(task: Task): void;
remove(id: string): void;
stop(name: string): void;
}
export interface Application {

@@ -7,2 +20,3 @@ worker: object;

introspect: () => Promise<any>;
scheduler: Scheduler;
}

@@ -9,0 +23,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc