Comparing version 2.4.3 to 2.5.0
@@ -5,2 +5,8 @@ # Changelog | ||
## [2.5.0][] - 2021-07-22 | ||
- Rework scheduler, use `every` syntax | ||
- Run scheduler in separate thread | ||
- Use `Semaphore` to organize task queue in scheduler thread | ||
## [2.4.3][] - 2021-07-10 | ||
@@ -180,3 +186,4 @@ | ||
[unreleased]: https://github.com/metarhia/impress/compare/v2.4.3...HEAD | ||
[unreleased]: https://github.com/metarhia/impress/compare/v2.5.0...HEAD | ||
[2.5.0]: https://github.com/metarhia/impress/compare/v2.4.3...v2.5.0 | ||
[2.4.3]: https://github.com/metarhia/impress/compare/v2.4.2...v2.4.3 | ||
@@ -183,0 +190,0 @@ [2.4.2]: https://github.com/metarhia/impress/compare/v2.4.1...v2.4.2 |
@@ -53,3 +53,6 @@ 'use strict'; | ||
const { balancer, ports = [], workers = {} } = config.server; | ||
const count = ports.length + (balancer ? 1 : 0) + (workers.pool || 0); | ||
const serversCount = ports.length + (balancer ? 1 : 0); | ||
const schedulerCount = 1; | ||
const schedulerId = serversCount; | ||
const count = serversCount + schedulerCount + (workers.pool || 0); | ||
let startTimer = null; | ||
@@ -66,2 +69,3 @@ let active = 0; | ||
let scheduler = null; | ||
const start = (id) => { | ||
@@ -71,2 +75,3 @@ const workerPath = path.join(__dirname, 'lib/worker.js'); | ||
threads[id] = worker; | ||
if (id === schedulerId) scheduler = worker; | ||
@@ -87,3 +92,6 @@ worker.on('exit', (code) => { | ||
worker.on('message', (data) => { | ||
if (data.type === 'event' && data.name === 'started') active++; | ||
if (data.type === 'event') { | ||
if (data.name === 'started') active++; | ||
if (data.name.startsWith('task:')) scheduler.postMessage(data); | ||
} | ||
if (active === count && startTimer) { | ||
@@ -90,0 +98,0 @@ clearTimeout(startTimer); |
@@ -11,2 +11,3 @@ 'use strict'; | ||
const { Schemas } = require('./schemas.js'); | ||
const { Scheduler } = require('./scheduler.js'); | ||
const auth = require('./auth.js'); | ||
@@ -26,2 +27,3 @@ | ||
super(); | ||
this.kind = ''; | ||
this.initialization = true; | ||
@@ -38,2 +40,3 @@ this.finalization = false; | ||
this.domain = new Modules('domain', this); | ||
this.scheduler = new Scheduler(this); | ||
@@ -55,3 +58,4 @@ this.starts = []; | ||
async init() { | ||
async init(kind) { | ||
this.kind = kind; | ||
this.startWatch(); | ||
@@ -70,2 +74,3 @@ this.createSandbox(); | ||
await this.api.load(); | ||
if (kind === 'scheduler') await this.scheduler.load(); | ||
const { api } = this.sandbox; | ||
@@ -103,7 +108,7 @@ if (api.auth) { | ||
createSandbox() { | ||
const { config, console, resources, schemas } = this; | ||
const { config, console, resources, schemas, scheduler } = this; | ||
const { server: { host, port, protocol } = {} } = this; | ||
const worker = { id: 'W' + node.worker.threadId.toString() }; | ||
const server = { host, port, protocol }; | ||
const application = { worker, server, resources, schemas }; | ||
const application = { worker, server, resources, schemas, scheduler }; | ||
application.introspect = async (interfaces) => this.introspect(interfaces); | ||
@@ -110,0 +115,0 @@ const sandbox = { ...SANDBOX, console, application, config }; |
'use strict'; | ||
const { metarhia } = require('./dependencies.js'); | ||
const { node, metarhia } = require('./dependencies.js'); | ||
const { fsp, path, worker } = node; | ||
const { metautil } = metarhia; | ||
const findHandler = (sandbox, name) => { | ||
const [key, rest] = metautil.split(name, '.'); | ||
const element = sandbox[key]; | ||
if (element) { | ||
if (rest === '') return element; | ||
return findHandler(element, rest); | ||
} | ||
return null; | ||
}; | ||
class Scheduler { | ||
constructor() { | ||
constructor(application) { | ||
this.application = application; | ||
this.path = application.absolute('tasks'); | ||
this.tasks = new Map(); | ||
this.nextId = 0; | ||
this.executor = false; | ||
this.semaphore = null; | ||
} | ||
setTask(taskName, task) { | ||
this.stopTask(taskName); | ||
this.tasks.set(taskName, task); | ||
task.name = taskName; | ||
task.success = undefined; | ||
task.error = null; | ||
task.lastStart = 0; | ||
task.lastEnd = 0; | ||
task.executing = false; | ||
task.active = false; | ||
task.count = 0; | ||
this.startTask(taskName); | ||
} | ||
async load() { | ||
this.executor = true; | ||
const config = this.application.config.server.scheduler; | ||
const { concurrency, size, timeout } = config; | ||
this.semaphore = new metautil.Semaphore(concurrency, size, timeout); | ||
// Start Task | ||
// taskName <string> | ||
startTask(taskName) { | ||
const task = this.tasks.get(taskName); | ||
if (task && !task.active) { | ||
task.active = true; | ||
task.interval = metarhia.metautil.duration(task.interval); | ||
task.timer = setInterval(() => { | ||
if (!task.executing) { | ||
task.lastStart = Date.now(); | ||
task.executing = true; | ||
task.run(task, (taskResult) => { | ||
task.error = taskResult; | ||
task.success = taskResult === null; | ||
task.lastEnd = Date.now(); | ||
task.executing = false; | ||
task.count++; | ||
}); | ||
worker.parentPort.on('message', async (data) => { | ||
if (data.type !== 'event') return; | ||
if (data.name === 'task:add') this.add(data.task); | ||
else if (data.name === 'task:remove') this.remove(data.task.id); | ||
else if (data.name === 'task:stop') this.stop(data.task.name); | ||
}); | ||
const now = metautil.nowDate(); | ||
try { | ||
const files = await fsp.readdir(this.path, { withFileTypes: true }); | ||
for (const file of files) { | ||
if (file.isDirectory()) continue; | ||
const { name } = file; | ||
if (!name.endsWith('.json') || name.startsWith('.')) continue; | ||
const base = path.basename(name, '.json'); | ||
const [date, id] = metautil.split(base, '-id-'); | ||
if (date === now) { | ||
const nextId = parseInt(id); | ||
if (nextId > this.nextId) this.nextId = nextId; | ||
} | ||
}, task.interval); | ||
const filePath = path.join(this.path, name); | ||
const data = await fsp.readFile(filePath, 'utf8'); | ||
this.restore(JSON.parse(data)); | ||
} | ||
} catch (err) { | ||
this.application.console.error(err.stack); | ||
} | ||
} | ||
// Stop Task | ||
// taskName <string> | ||
stopTask(taskName) { | ||
const task = this.tasks.get(taskName); | ||
restore(record) { | ||
const { id, name, every, args, run } = record; | ||
const task = { | ||
id, | ||
name, | ||
every: metautil.parseEvery(every), | ||
success: undefined, | ||
result: null, | ||
error: null, | ||
lastStart: 0, | ||
lastEnd: 0, | ||
executing: false, | ||
runCount: 0, | ||
timer: null, | ||
args, | ||
run, | ||
handler: findHandler(this.application.sandbox, run), | ||
}; | ||
this.tasks.set(id, task); | ||
return this.start(id); | ||
} | ||
async add(record) { | ||
if (!this.executor) { | ||
const msg = { type: 'event', name: 'task:add', task: record }; | ||
worker.parentPort.postMessage(msg); | ||
return; | ||
} | ||
const id = metautil.nowDate() + '-id-' + this.nextId.toString(); | ||
this.nextId++; | ||
const task = { id, ...record }; | ||
const started = this.restore(task); | ||
if (!started) return; | ||
const filePath = path.join(this.path, id + '.json'); | ||
try { | ||
const data = JSON.stringify(task); | ||
await fsp.writeFile(filePath, data); | ||
} catch (err) { | ||
this.application.console.error(err.stack); | ||
} | ||
} | ||
async remove(id) { | ||
if (!this.executor) { | ||
const msg = { type: 'event', name: 'task:remove', task: { id } }; | ||
worker.parentPort.postMessage(msg); | ||
return; | ||
} | ||
const task = this.tasks.get(id); | ||
if (task) { | ||
if (task.timer) clearInterval(task.timer); | ||
this.tasks.delete(taskName); | ||
this.tasks.delete(id); | ||
if (task.timer) { | ||
clearTimeout(task.timer); | ||
task.timer = null; | ||
} | ||
} | ||
const filePath = path.join(this.path, id + '.json'); | ||
try { | ||
await fsp.unlink(filePath); | ||
} catch (err) { | ||
if (err.code !== 'ENOENT') { | ||
this.application.console.error(err.stack); | ||
} | ||
} | ||
} | ||
stopTasks() { | ||
start(id) { | ||
const task = this.tasks.get(id); | ||
if (!task || task.timer) return false; | ||
const next = metautil.nextEvent(task.every); | ||
if (next === -1) { | ||
this.remove(id); | ||
return false; | ||
} | ||
if (next === 0) { | ||
this.execute(task, true); | ||
return true; | ||
} | ||
task.timer = setTimeout(() => { | ||
const once = task.every.interval === 0; | ||
this.execute(task, once); | ||
}, next); | ||
return true; | ||
} | ||
async execute(task, once = false) { | ||
if (task.executing) { | ||
this.fail(task, 'Already started task'); | ||
return; | ||
} | ||
task.lastStart = Date.now(); | ||
task.executing = true; | ||
try { | ||
await this.semaphore.enter(); | ||
task.result = await task.handler(task.args); | ||
} catch (err) { | ||
task.error = err; | ||
if (err.message === 'Semaphore timeout') { | ||
this.fail(task, 'Scheduler queue is full'); | ||
} else { | ||
this.application.console.error(err.stack); | ||
} | ||
} finally { | ||
this.semaphore.leave(); | ||
} | ||
task.success = !task.error; | ||
task.lastEnd = Date.now(); | ||
task.executing = false; | ||
task.runCount++; | ||
task.timer = null; | ||
if (once) this.remove(task.id); | ||
else this.start(task.id); | ||
} | ||
fail(task, reason) { | ||
const { id, name, run, args } = task; | ||
const target = `${name} (${id}) ${run}(${JSON.stringify(args)})`; | ||
const msg = `${reason}, can't execute: ${target}`; | ||
this.application.console.error(msg); | ||
} | ||
stop(name = '') { | ||
if (!this.executor) { | ||
const msg = { type: 'event', name: 'task:stop', task: { name } }; | ||
worker.parentPort.postMessage(msg); | ||
return; | ||
} | ||
for (const task of this.tasks.values()) { | ||
if (task.timer) clearInterval(task.timer); | ||
this.tasks.delete(task.name); | ||
if (name !== '' && name !== task.name) continue; | ||
this.remove(task.id); | ||
} | ||
@@ -65,2 +196,2 @@ } | ||
module.exports = Scheduler; | ||
module.exports = { Scheduler }; |
@@ -56,7 +56,12 @@ 'use strict'; | ||
const servingThreads = ports.length + (balancer ? 1 : 0); | ||
if (threadId <= servingThreads) { | ||
let kind = 'worker'; | ||
if (threadId <= servingThreads) kind = 'server'; | ||
if (threadId === servingThreads + 1) kind = 'scheduler'; | ||
if (kind === 'server') { | ||
application.server = new Server(config.server, application); | ||
} | ||
await application.init(); | ||
await application.init(kind); | ||
console.info(`Application started in worker ${threadId}`); | ||
@@ -63,0 +68,0 @@ worker.parentPort.postMessage({ type: 'event', name: 'started' }); |
{ | ||
"name": "impress", | ||
"version": "2.4.3", | ||
"version": "2.5.0", | ||
"author": "Timur Shemsedinov <timur.shemsedinov@gmail.com>", | ||
@@ -71,12 +71,12 @@ "description": "Enterprise application server for Node.js", | ||
"metaconfiguration": "^2.1.4", | ||
"metalog": "^3.1.2", | ||
"metaschema": "^1.3.2", | ||
"metautil": "^3.5.5", | ||
"metavm": "^1.0.2", | ||
"metawatch": "^1.0.3" | ||
"metalog": "^3.1.3", | ||
"metaschema": "^1.3.3", | ||
"metautil": "^3.5.8", | ||
"metavm": "^1.0.3", | ||
"metawatch": "^1.0.4" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^16.3.1", | ||
"@types/node": "^16.3.3", | ||
"@types/ws": "^7.4.6", | ||
"eslint": "^7.30.0", | ||
"eslint": "^7.31.0", | ||
"eslint-config-metarhia": "^7.0.1", | ||
@@ -83,0 +83,0 @@ "eslint-config-prettier": "^8.3.0", |
@@ -19,2 +19,7 @@ ({ | ||
}, | ||
scheduler: { | ||
concurrency: 'number', | ||
size: 'number', | ||
timeout: 'number', | ||
}, | ||
workers: { | ||
@@ -21,0 +26,0 @@ pool: 'number', |
@@ -35,2 +35,7 @@ export interface LogConfig { | ||
}; | ||
scheduler: { | ||
concurrency: number; | ||
size: number; | ||
timeout: number; | ||
}; | ||
workers: { | ||
@@ -37,0 +42,0 @@ pool: number; |
@@ -0,1 +1,14 @@ | ||
export interface Task { | ||
name: string; | ||
every: string; | ||
args: object; | ||
run: string; | ||
} | ||
export interface Scheduler { | ||
add(task: Task): void; | ||
remove(id: string): void; | ||
stop(name: string): void; | ||
} | ||
export interface Application { | ||
@@ -7,2 +20,3 @@ worker: object; | ||
introspect: () => Promise<any>; | ||
scheduler: Scheduler; | ||
} | ||
@@ -9,0 +23,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56857
1294
Updatedmetalog@^3.1.3
Updatedmetaschema@^1.3.3
Updatedmetautil@^3.5.8
Updatedmetavm@^1.0.3
Updatedmetawatch@^1.0.4