
Security News
Deno 2.4 Brings Back deno bundle, Improves Dependency Management and Observability
Deno 2.4 brings back bundling, improves dependency updates and telemetry, and makes the runtime more practical for real-world JavaScript projects.
This python library is mainly used for task scheduling, for example, there are a bunch of tasks here, the same type of tasks must be queued for execution, and the tasks need strong management and monitoring
Asynchronous code and normal code are now supported, specifically with event loops for asynchronous code
This task scheduling is suitable for:
1.Network Requests: Handling multiple HTTP requests concurrently, where each request can be scheduled and executed asynchronously.
2.File I/O Operations: Reading from or writing to multiple files concurrently, especially when dealing with large files or when the I/O operations are slow.
3.Database Queries: Executing multiple database queries concurrently, especially when the queries involve waiting for database responses.
4.Web Scraping: Running multiple web scraping tasks concurrently, where each task involves fetching and processing web pages.
5.Real-time Data Processing: Processing real-time data streams, where tasks need to be executed as soon as data is available.
6.Background Tasks: Running background tasks that perform periodic operations, such as data aggregation, cleanup, or monitoring.
7.CPU-intensive task execution.
8.Threads in new processes can be well managed.
9.With analysis function, it can analyze which scheduler the execution function conforms.
1.You can send a termination command to the execution code
2.You can enable timeout processing for a task, and terminate the task if it runs for too long
3.When a task fails to run, it can be added to the disabled list and will not be executed thereafter
4.You can directly obtain the current task status through the interface, such as executing: "completed, error, timeout"
5.Automatically hibernate when there are no tasks
!!! WARNING: If I/O-intensive task is running in a series of blocking tasks such as time.sleep, the task cannot be
forced terminated, it is recommended to use interruptible_sleep
instead of time.sleep
for long waits
So, use await asyncio.sleep
for asynchronous tasks
pip install --upgrade task_scheduling
Use the function in code
from task_scheduling.control_interface import start_task_status_ui
start_task_status_ui()
Task status UI available at http://localhost:8000
1.Detailed information requires changing the log level to debug
.
2.This scheduler does not support terminating blocking tasks due to various factors, so it only supports terminating tasks by injecting exceptions.
3.Most of the API calls for this task scheduling can be added to the scheduler for execution as tasks.
from task_scheduling.common import set_log_level
set_log_level("DEBUG") # INFO, DEBUG, ERROR, WARNING
if __name__ == "__main__":
......
The core function is used to create a task and submit it to the scheduler. Currently the scheduler has
cpu_asyncio_task, cpu_liner_task, io_asyncio_task, io_liner_task, timer_task
(The task is of the "io" type) I/O intensive tasks run in the threads of the main process.
import asyncio
import time
from task_scheduling.utils import interruptible_sleep
def line_task(input_info):
while True:
interruptible_sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_id2 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task2",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
asyncio_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
print(task_id1, task_id2)
# cf478b6e-5e02-49b8-9031-4adc6ff915c2, cf478b6e-5e02-49b8-9031-4adc6ff915c2
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
(The task is of the "timer" type) Timer tasks are all run in the threads of a process, and are committed to the thread pool when the specified time is reached
import time
def line_task(input_info):
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.variable import *
task_id1 = task_creation(10,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None, # 14:00
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_timer,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_id2 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
"13:03", # 13.03
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_timer,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task2",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
print(task_id1, task_id2)
# cf478b6e-5e02-49b8-9031-4adc6ff915c2, cf478b6e-5e02-49b8-9031-4adc6ff915c2
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
(The task is of the "cpu" type)Each task is separate in a process.
This task_manager
will end the process when there are no threads in the process, so that the process will be recycled
Threaded tasks can be created in this process, and the library provides task_manager
that will pass the configuration file thread_management=True
to the main function
where a new threaded task is created and managed through the task manager, as shown in an example below
!!! WARNING: The method is still experimental and there is no guarantee that there will be no unknown problems
thread_management=False
import time
def line_task(input_info):
while True:
interruptible_sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler import cpu_liner_task
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_cpu,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
thread_management=True
import time
from task_scheduling.stopit import skip_on_demand
def line_task(task_manager, input_info):
with skip_on_demand() as skip_ctx:
task_id = 1001001
# Create your own thread and give a unique ID to the incoming task_manager
task_manager.add(skip_ctx, task_id)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler import cpu_liner_task
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_cpu,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_id = 1001001
time.sleep(2.0)
cpu_liner_task.force_stop_task(task_id)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
A task that detects what type of function is executed. The detected parameters are stored in a file and read directly at the next time
!!! WARNING: If you use this function, the CPU usage may increase slightly
import time
import numpy as np
def example_cpu_intensive_function(size, iterations):
start_time = time.time()
for _ in range(iterations):
# Create two random matrices
matrix_a = np.random.rand(size, size)
matrix_b = np.random.rand(size, size)
# Perform matrix multiplication
np.dot(matrix_a, matrix_b)
end_time = time.time()
print(
f"It took {end_time - start_time:.2f} seconds to calculate {iterations} times {size} times {size} matrix multiplication")
async def example_io_intensive_function():
for i in range(5):
with open(f"temp_file_{i}.txt", "w") as f:
f.write("Hello, World!" * 1000000)
time.sleep(1)
if __name__ == "__main__":
from task_scheduling.utils import FunctionRunner
cpu_runner = FunctionRunner(example_cpu_intensive_function, "CPU_Task", 10000, 2)
cpu_runner.run()
io_runner = FunctionRunner(example_io_intensive_function, "IO_Task")
io_runner.run()
Add a function type and a view type. Files are stored in:task_scheduling/function_data/task_type.pkl
if __name__ == "__main__":
from task_scheduling.function_data import task_function_type
task_function_type.append_to_dict("CPU_Task", "test")
print(task_function_type.read_from_dict("CPU_Task"))
print(task_function_type.read_from_dict("CPU_Task"))
Get the data returned by the task.
import asyncio
import time
def line_task(input_info):
time.sleep(4)
return input_info
async def asyncio_task(input_info):
await asyncio.sleep(4)
return input_info
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler import io_liner_task
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
while True:
result = io_liner_task.get_task_result(task_id1)
if result is not None:
print(result)
# test
break
else:
time.sleep(0.1)
shutdown(True)
Get information on all tasks. It is convenient to manage all tasks and move on to the next step
import asyncio
import time
def line_task(input_info):
time.sleep(4)
return input_info
async def asyncio_task(input_info):
await asyncio.sleep(4)
return input_info
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.control_interface import get_tasks_info
from task_scheduling.variable import *
task_id1 = task_creation(5,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_timer,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
try:
while True:
print(get_tasks_info())
# tasks queue size: 1, running tasks count: 0, failed tasks count: 0
# name: task1, id: 79185539-01e5-4576-8f10-70bb4f75374f, status: waiting, elapsed time: nan seconds
time.sleep(2.0)
except KeyboardInterrupt:
shutdown(True)
Get information about a single task. This will return a dictionary for easy user access
import asyncio
import time
def line_task(input_info):
while True:
time.sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler_management import task_status_manager
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
print(task_status_manager.get_task_status(task_id1))
# {'task_name': 'task1', 'status': 'waiting', 'start_time': None, 'end_time': None, 'error_info': None, 'is_timeout_enabled': True}
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
Gets the total presence of a task
import asyncio
import time
def line_task(input_info):
while True:
time.sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler_management import task_status_manager
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
print(task_status_manager.get_task_count("task1"))
# 1
print(task_status_manager.get_all_task_count())
# OrderedDict({'task1': 1})
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
Add and remove disabled task names. After you add a disabled task name, the task cannot be added but is blocked
import asyncio
import time
def line_task(input_info):
while True:
time.sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown, task_scheduler
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_scheduler.add_ban_task_name("task1")
task_id2 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_scheduler.remove_ban_task_name("task1")
# Start running io linear task, task ID: 19a643f3-d8fd-462f-8f36-0eca7a447741
# Task name 'task1' has been added to the ban list.
# Task name 'task1' is banned, cannot add task, task ID: a4bc60b1-95d1-423d-8911-10f520ee88f5
# Task name 'task1' has been removed from the ban list.
try:
while True:
time.sleep(2.0)
except KeyboardInterrupt:
shutdown(True)
Cancel a task that is being queued. If the task has already started, the cleanup function does not take effect
import asyncio
import time
def line_task(input_info):
while True:
time.sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown, task_scheduler
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
task_scheduler.cancel_the_queue_task_by_name("task1")
# This type of name task has been removed
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
All 4 schedulers have this function for terminating a running task. Warning: If the function is executing a
process-blocking function such as time.sleep
, it will have to wait for the execution to finish before terminating the function
Warn!!! cpu_liner_task.force_stop_task()
is different and requires the addition of additional parameters
import asyncio
import time
def line_task(input_info):
while True:
time.sleep(1)
print(input_info)
async def asyncio_task(input_info):
while True:
await asyncio.sleep(1)
print(input_info)
input_info = "test"
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation, shutdown
from task_scheduling.scheduler import io_liner_task
from task_scheduling.variable import *
task_id1 = task_creation(None,
# This is how long the delay is executed (in seconds)
# This parameter is required when the function_type is "timer",if this parameter is used, the daily_time is not required
None,
# This is to be performed at what point (24-hour clock)
# This parameter is required when the function_type is "timer",if this parameter is used, the delay is not required
scheduler_io,
# Running function type, there are "io, cpu, timer"
True,
# Set to True to enable timeout detection, tasks that do not finish within the runtime will be forcibly terminated
"task1",
# Task ID, in linear tasks, tasks with the same ID will be queued, different IDs will be executed directly, the same applies to asynchronous tasks
line_task,
# The function to be executed, parameters should not be passed here
input_info
# Pass the parameters required by the function, no restrictions
)
time.sleep(2.0)
io_liner_task.force_stop_task(task_id1)
"
cpu_liner_task.force_stop_task(task_id1, True)
If the task is terminated as the main task, the flag is True.
Only the terminating function of this scheduler requires additional parameters
"
# | Io linear task | 79a85db4-c75f-4acd-a2b1-d375617e5af4 | was cancelled
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
shutdown(True)
Once the task has been submitted, it is necessary to call this function to clean it up when exiting. Forced cleanup will interrupt all running tasks, but not if set to False
from task_scheduling.task_creation import shutdown
Update the parameters in the configuration file, note that this change will not work after a restart
from task_scheduling import update_config
Files are stored in:task_scheduling/config/config.yaml
The maximum number of CPU-optimized asynchronous tasks of the same type can run
cpu_asyncio_task: 4
The maximum number of tasks of the same type in an IO intensive asynchronous task
io_asyncio_task: 4
The maximum number of CPU-oriented linear tasks of the same type can run
cpu_liner_task: 4
The maximum number of tasks of the same type in an I-O intensive linear task
io_liner_task: 4
The timer performs the most tasks
timer_task: 30
When there are no tasks for many seconds, close the task scheduler(seconds)
max_idle_time: 60
When a task runs for a long time without finishing, it is forced to end(seconds)
watch_dog_time: 80
The maximum number of records that can be stored in a task status
maximum_task_info_storage: 20
How many seconds to check whether the task status is correct,recommended a longer interval(seconds)
status_check_interval: 800
Whether to enable thread management in the process
thread_management: False
In order to facilitate subsequent modifications,
some files are placed directly into the folder instead of being installed via pip,
so the libraries used are specifically stated here:https://github.com/glenfant/stopit
FAQs
It is mainly used for task scheduling
We found that task-scheduling demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Deno 2.4 brings back bundling, improves dependency updates and telemetry, and makes the runtime more practical for real-world JavaScript projects.
Security News
CVEForecast.org uses machine learning to project a record-breaking surge in vulnerability disclosures in 2025.
Security News
Browserslist-rs now uses static data to reduce binary size by over 1MB, improving memory use and performance for Rust-based frontend tools.