@lage-run/scheduler
Advanced tools
Comparing version 0.5.15 to 0.6.0
@@ -5,3 +5,24 @@ { | ||
{ | ||
"date": "Fri, 18 Nov 2022 19:52:27 GMT", | ||
"date": "Mon, 21 Nov 2022 06:31:52 GMT", | ||
"tag": "@lage-run/scheduler_v0.6.0", | ||
"version": "0.6.0", | ||
"comments": { | ||
"minor": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/scheduler", | ||
"commit": "aafe75c34b61ed10f11c829a7bb1f5ad86f0b810", | ||
"comment": "adding provision for the run() to pay attention to previous and currently running targetRun's" | ||
}, | ||
{ | ||
"author": "beachball", | ||
"package": "@lage-run/scheduler", | ||
"comment": "Bump @lage-run/scheduler-types to v0.2.9", | ||
"commit": "aafe75c34b61ed10f11c829a7bb1f5ad86f0b810" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Fri, 18 Nov 2022 19:52:38 GMT", | ||
"tag": "@lage-run/scheduler_v0.5.15", | ||
@@ -8,0 +29,0 @@ "version": "0.5.15", |
# Change Log - @lage-run/scheduler | ||
This log was last generated on Fri, 18 Nov 2022 19:52:27 GMT and should not be manually modified. | ||
This log was last generated on Mon, 21 Nov 2022 06:31:52 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.6.0 | ||
Mon, 21 Nov 2022 06:31:52 GMT | ||
### Minor changes | ||
- adding provision for the run() to pay attention to previous and currently running targetRun's (kchau@microsoft.com) | ||
- Bump @lage-run/scheduler-types to v0.2.9 | ||
## 0.5.15 | ||
Fri, 18 Nov 2022 19:52:27 GMT | ||
Fri, 18 Nov 2022 19:52:38 GMT | ||
@@ -11,0 +20,0 @@ ### Patches |
@@ -6,3 +6,3 @@ /// <reference types="node" /> | ||
import type { Logger } from "@lage-run/logger"; | ||
import type { TargetGraph, Target } from "@lage-run/target-graph"; | ||
import type { TargetGraph } from "@lage-run/target-graph"; | ||
import type { TargetScheduler, SchedulerRunSummary } from "@lage-run/scheduler-types"; | ||
@@ -37,5 +37,6 @@ import type { Pool } from "@lage-run/worker-threads-pool"; | ||
export declare class SimpleScheduler implements TargetScheduler { | ||
#private; | ||
private options; | ||
targetRuns: Map<string, WrappedTarget>; | ||
targetsByPriority: Target[]; | ||
rerunTargets: Set<string>; | ||
abortController: AbortController; | ||
@@ -46,2 +47,3 @@ abortSignal: AbortSignal; | ||
constructor(options: SimpleSchedulerOptions); | ||
getTargetsByPriority(): import("@lage-run/target-graph").Target[]; | ||
/** | ||
@@ -57,3 +59,3 @@ * The job of the run method is to: | ||
*/ | ||
run(root: string, targetGraph: TargetGraph): Promise<SchedulerRunSummary>; | ||
run(root: string, targetGraph: TargetGraph, shouldRerun?: boolean): Promise<SchedulerRunSummary>; | ||
/** | ||
@@ -63,6 +65,6 @@ * Used by consumers of the scheduler to notify that the inputs to the target has changed | ||
*/ | ||
onTargetChange(targetId: string): Promise<void>; | ||
markTargetAndDependentsPending(targetId: any): void; | ||
getReadyTargets(): WrappedTarget[]; | ||
isAllDone(): boolean; | ||
scheduleReadyTargets(): Promise<any>; | ||
scheduleReadyTargets(): Promise<void>; | ||
cleanup(): Promise<void>; | ||
@@ -69,0 +71,0 @@ /** |
@@ -9,8 +9,29 @@ "use strict"; | ||
}); | ||
const _workerThreadsPool = require("@lage-run/worker-threads-pool"); | ||
const _formatBytesJs = require("./formatBytes.js"); | ||
const _categorizeTargetRunsJs = require("./categorizeTargetRuns.js"); | ||
const _targetGraph = require("@lage-run/target-graph"); | ||
const _wrappedTargetJs = require("./WrappedTarget.js"); | ||
const _workerThreadsPool = require("@lage-run/worker-threads-pool"); | ||
const _formatBytesJs = require("./formatBytes.js"); | ||
function _checkPrivateRedeclaration(obj, privateCollection) { | ||
if (privateCollection.has(obj)) { | ||
throw new TypeError("Cannot initialize the same private elements twice on an object"); | ||
} | ||
} | ||
function _classPrivateMethodGet(receiver, privateSet, fn) { | ||
if (!privateSet.has(receiver)) { | ||
throw new TypeError("attempted to get private field on non-instance"); | ||
} | ||
return fn; | ||
} | ||
function _classPrivateMethodInit(obj, privateSet) { | ||
_checkPrivateRedeclaration(obj, privateSet); | ||
privateSet.add(obj); | ||
} | ||
var _generateTargetRunPromise = /*#__PURE__*/ new WeakSet(); | ||
class SimpleScheduler { | ||
getTargetsByPriority() { | ||
return (0, _targetGraph.sortTargetsByPriority)([ | ||
...this.targetRuns.values() | ||
].map((run)=>run.target)); | ||
} | ||
/** | ||
@@ -25,3 +46,3 @@ * The job of the run method is to: | ||
* @returns | ||
*/ async run(root, targetGraph) { | ||
*/ async run(root, targetGraph, shouldRerun = false) { | ||
const startTime = process.hrtime(); | ||
@@ -31,18 +52,30 @@ const { continueOnError , logger , cacheProvider , shouldCache , shouldResetCache , hasher } = this.options; | ||
const { targets } = targetGraph; | ||
this.targetsByPriority = (0, _targetGraph.sortTargetsByPriority)([ | ||
...targets.values() | ||
]); | ||
for (const target of targets.values()){ | ||
const targetRun = new _wrappedTargetJs.WrappedTarget({ | ||
target, | ||
root, | ||
logger, | ||
cacheProvider, | ||
hasher, | ||
shouldCache, | ||
shouldResetCache, | ||
continueOnError, | ||
abortController, | ||
pool | ||
}); | ||
let targetRun; | ||
const prevTargetRun = this.targetRuns.get(target.id); | ||
if (prevTargetRun) { | ||
targetRun = prevTargetRun; | ||
// If previous run has been successful, then we may want to rerun | ||
if (prevTargetRun.successful && shouldRerun) { | ||
this.markTargetAndDependentsPending(target.id); | ||
} else if (prevTargetRun.waiting && shouldRerun) { | ||
this.rerunTargets.add(targetRun.target.id); | ||
} else if (!prevTargetRun.successful) { | ||
// If previous run has failed, we should rerun | ||
this.markTargetAndDependentsPending(target.id); | ||
} | ||
} else { | ||
targetRun = new _wrappedTargetJs.WrappedTarget({ | ||
target, | ||
root, | ||
logger, | ||
cacheProvider, | ||
hasher, | ||
shouldCache, | ||
shouldResetCache, | ||
continueOnError, | ||
abortController, | ||
pool | ||
}); | ||
} | ||
if (target.id === (0, _targetGraph.getStartTargetId)()) { | ||
@@ -88,5 +121,3 @@ targetRun.status = "success"; | ||
* @param targetId | ||
*/ async onTargetChange(targetId) { | ||
this.abortController.abort(); | ||
await this.runPromise; | ||
*/ markTargetAndDependentsPending(targetId) { | ||
const queue = [ | ||
@@ -100,2 +131,3 @@ targetId | ||
targetRun.status = "pending"; | ||
this.rerunTargets.add(targetRun.target.id); | ||
const dependents = targetRun.target.dependents; | ||
@@ -107,21 +139,10 @@ for (const dependent of dependents){ | ||
} | ||
this.abortController = new AbortController(); | ||
this.abortSignal = this.abortController.signal; | ||
for (const targetRun1 of this.targetRuns.values()){ | ||
targetRun1.abortController = this.abortController; | ||
} | ||
this.runPromise = this.scheduleReadyTargets(); | ||
} | ||
getReadyTargets() { | ||
const readyTargets = []; | ||
const runningTargets = this.targetsByPriority.filter((target)=>this.targetRuns.get(target.id).status === "running"); | ||
const runningTargetsCountByTask = {}; | ||
for (const target of runningTargets){ | ||
runningTargetsCountByTask[target.task] = typeof runningTargetsCountByTask[target.task] !== "number" ? 1 : runningTargetsCountByTask[target.task]++; | ||
} | ||
for (const target1 of this.targetsByPriority){ | ||
if (target1.id === (0, _targetGraph.getStartTargetId)()) { | ||
for (const target of this.getTargetsByPriority()){ | ||
if (target.id === (0, _targetGraph.getStartTargetId)()) { | ||
continue; | ||
} | ||
const targetRun = this.targetRuns.get(target1.id); | ||
const targetRun = this.targetRuns.get(target.id); | ||
const targetDeps = targetRun.target.dependencies; | ||
@@ -131,7 +152,6 @@ // filter all dependencies for those that are "ready" | ||
const fromTarget = this.targetRuns.get(dep); | ||
return fromTarget.status === "success" || fromTarget.status === "skipped" || dep === (0, _targetGraph.getStartTargetId)(); | ||
return fromTarget.successful || dep === (0, _targetGraph.getStartTargetId)(); | ||
}); | ||
if (ready && targetRun.status === "pending") { | ||
readyTargets.push(targetRun); | ||
runningTargetsCountByTask[target1.task] = (runningTargetsCountByTask[target1.task] ?? 0) + 1; | ||
} | ||
@@ -156,17 +176,6 @@ } | ||
for (const nextTarget of this.getReadyTargets()){ | ||
promises.push(nextTarget.run().then(()=>{ | ||
return this.scheduleReadyTargets(); | ||
}).catch((e)=>{ | ||
// if a continue option is set, this merely records what errors have been encountered | ||
// it'll continue down the execution until all the tasks that still works | ||
if (this.options?.continueOnError) { | ||
return this.scheduleReadyTargets(); | ||
} else { | ||
// immediately reject, if not using "continue" option | ||
throw e; | ||
} | ||
})); | ||
const runPromise = _classPrivateMethodGet(this, _generateTargetRunPromise, generateTargetRunPromise).call(this, nextTarget); | ||
promises.push(runPromise); | ||
} | ||
this.runPromise = Promise.all(promises); | ||
return this.runPromise; | ||
await Promise.all(promises); | ||
} | ||
@@ -183,5 +192,6 @@ async cleanup() { | ||
constructor(options){ | ||
_classPrivateMethodInit(this, _generateTargetRunPromise); | ||
this.options = options; | ||
this.targetRuns = new Map(); | ||
this.targetsByPriority = []; | ||
this.rerunTargets = new Set(); | ||
this.abortController = new AbortController(); | ||
@@ -207,2 +217,28 @@ this.abortSignal = this.abortController.signal; | ||
} | ||
async function generateTargetRunPromise(target) { | ||
let runError; | ||
if (target.result && target.successful && !this.rerunTargets.has(target.target.id)) { | ||
await target.result; | ||
} else { | ||
// This do-while loop only runs again if something causes this target to rerun (asynchronously triggering a re-run) | ||
do { | ||
this.rerunTargets.delete(target.target.id); | ||
try { | ||
await target.run(); | ||
} catch (e) { | ||
runError = e; | ||
} | ||
}while (this.rerunTargets.has(target.target.id)) | ||
// if a continue option is set, this merely records what errors have been encountered | ||
// it'll continue down the execution until all the tasks that still works | ||
if (runError && !this.options?.continueOnError) { | ||
if (!this.options?.continueOnError) { | ||
// immediately reject, if not using "continue" option | ||
throw runError; | ||
} | ||
} | ||
} | ||
// finally do another round of scheduling to run next round of targets | ||
await this.scheduleReadyTargets(); | ||
} | ||
encodeURI; |
@@ -34,4 +34,10 @@ /// <reference types="node" /> | ||
status: TargetStatus; | ||
result: Promise<{ | ||
stdoutBuffer: string; | ||
stderrBuffer: string; | ||
}> | undefined; | ||
get abortController(): AbortController; | ||
set abortController(abortController: AbortController); | ||
get successful(): boolean; | ||
get waiting(): boolean; | ||
constructor(options: WrappedTargetOptions); | ||
@@ -38,0 +44,0 @@ onQueued(): void; |
@@ -27,2 +27,8 @@ "use strict"; | ||
} | ||
get successful() { | ||
return this.status === "skipped" || this.status === "success"; | ||
} | ||
get waiting() { | ||
return this.status === "pending" || this.status === "queued"; | ||
} | ||
onQueued() { | ||
@@ -183,3 +189,3 @@ this.status = "queued"; | ||
const bufferStderr = (0, _bufferTransformJs.bufferTransform)(); | ||
await pool.exec({ | ||
this.result = pool.exec({ | ||
target | ||
@@ -208,2 +214,3 @@ }, target.weight ?? 1, (_worker, stdout, stderr)=>{ | ||
}, abortSignal); | ||
await this.result; | ||
return { | ||
@@ -210,0 +217,0 @@ stdoutBuffer: bufferStdout.buffer, |
{ | ||
"name": "@lage-run/scheduler", | ||
"version": "0.5.15", | ||
"version": "0.6.0", | ||
"description": "Scheduler for Lage", | ||
@@ -24,3 +24,3 @@ "repository": { | ||
"devDependencies": { | ||
"@lage-run/scheduler-types": "^0.2.8", | ||
"@lage-run/scheduler-types": "^0.2.9", | ||
"monorepo-scripts": "*" | ||
@@ -27,0 +27,0 @@ }, |
95659
2503