Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Orchestra is:
Install from pypi:
pip install pyorchestra
- name: sample-tasks # optional name for this block, this has no bearing on how orchestra works whatsoever, you may name your blocks in any way
module: tasks # this is the python module where celery will look for the task
schedules: # a list of schedules
- name: "short_task_every_1_second" # name of the task that shows up in the logs, *has to be unique*
task: short_task # the function in the module decorated by `orchestra.task`
enabled: false # if it's not enabled it will be ignored
schedule:
timing: "every 00:00:01" # see the examples for a list of understood expressions
timezone: Europe/Paris # name of a time zone from the tz database
resume_from_previous: true # if enabled, Orchestra will attempt to calculate what would be the next run based on the last known run, false by default
additional_options: # everything in this optional section will be passed to the celery task when invoked
queue: background-jobs
tags: # tags are list of string, they may be used to group together tasks
- cpu
- fast
A list of examples of understood expressions:
By default timing expression with only a time or timestamp (without date) like 11:23:00
will be interpreted as a time instant and will trigger the next time the wall clock shows the specified time.
main.py
import asyncio
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db", # any Celery supported broker connection string will work here
backend_conn_str="sqlite:///log.db", # any SQLAlchemy 2 supported backend connection string will work here
broker_connection_retry_on_startup=True, # any Celery supported keyword argument can be passed through to Celery
)
@orchestra.task
def simple_task() -> str:
return "hello world"
async def main():
schedule_definitions = [
{"module": "main", # if you're using the same Python module to declare your Celery tasks and running the scheduler, then this has to be the module's name
"schedules": [
{
"name": "Simple Task every 48 hours",
"task": "simple_task",
"enabled": True,
"schedule": {
"timing": "every 48:00:00",
}
}
]}
]
await orchestra.create_schedule(schedule_definitions) # you have to create a schedule first, optionally preload a schedule definition file
await orchestra.run() # run Orchestra
if __name__ == "__main__":
asyncio.run(main()) # Orchestra is based on the scheduler package, which uses asyncio under the hood
The output would look something like the following:
[08:38:50] INFO Job Simple Task every 48 hours was scheduled to core.py:214
run 2 days, 0:00:00
INFO Orchestra starting core.py:82
╭───────────┬─────────────┬────────────────────────────────────┬───────────────────────┬───────────────────────────┬─────────────┬───────────┬────────────┬──────╮
│State │ Schedule │ Job name │ Module and task │ Due at │ Timezone │ Due in │ Attempts │ Tags │
├───────────┼─────────────┼────────────────────────────────────┼───────────────────────┼───────────────────────────┼─────────────┼───────────┼────────────┼──────┤
│Running │ CYCLIC │ Simple Task every 48 hours │ main.simple_task │ 2024-03-05 07:38:50 │ UTC │ 1 day │ 0/inf │ │
╰───────────┴─────────────┴────────────────────────────────────┴───────────────────────┴───────────────────────────┴─────────────┴───────────┴────────────┴──────╯
╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
You may also use a standalone python module for holding your Celery tasks, but you have to declare Orchestra there as well or import it from the main module:
tasks.py
from time import sleep
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db", # any Celery supported broker connection string will work here
backend_conn_str="sqlite:///log.db", # any SQLAlchemy 2 supported backend connection string will work here
broker_connection_retry_on_startup=True, # any Celery supported keyword argument can be passed through to Celery
)
def fibo(n: int):
if n <= 1:
return n
else:
return fibo(n - 1) + fibo(n - 2)
@orchestra.task
def short_task() -> int:
return fibo(10)
@orchestra.task
def long_task() -> int:
sleep(10000)
return fibo(33)
main.py
import asyncio
import yaml
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db",
backend_conn_str="sqlite:///log.db",
broker_connection_retry_on_startup=True,
)
async def main():
schedule_definitions = yaml.safe_load(open("schedule.yaml", "rt")) # see the Schedule definition section for an example
await orchestra.create_schedule(schedule_definitions) # you have to create a schedule first, optionally preload a schedule definition file
await orchestra.run() # run Orchestra
if __name__ == "__main__":
asyncio.run(main()) # Orchestra is based on the scheduler package, which uses asyncio under the hood
import asyncio
import datetime
import pytz
from scheduler import trigger
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db",
backend_conn_str="sqlite:///log.db",
broker_connection_retry_on_startup=True,
)
@orchestra.task
def simple_task() -> str:
return "Ran Simple Task"
async def hello():
print("Hello")
async def main():
await orchestra.create_schedule()
# schedule a regular python function, this won't be tracked in the database and Orchestra does not support resuming such jobs in case of a shutdown
orchestra.scheduler.cyclic(timing=datetime.timedelta(seconds=10), alias="Hello World every 10s", handle=hello)
# schedule a Celery task function, Orchestra fully supports scheduling Celery tasks programmatically as well as through a schedule definition
orchestra.schedule_celery_task(schedule=orchestra.scheduler.cyclic,
timing=datetime.timedelta(seconds=1),
task=simple_task,
job_name="Simple task every 1s")
orchestra.schedule_celery_task(schedule=orchestra.scheduler.weekly,
timing=trigger.Monday(datetime.time(hour=16, minute=30, tzinfo=pytz.timezone("Europe/Berlin"))),
task=simple_task,
job_name="Simple task every Monday at 16:30 UTC")
await orchestra.run()
if __name__ == "__main__":
asyncio.run(main())
The output would look something like the following:
[08:36:47] INFO Job task_every_48_hours was scheduled to run 2 core.py:214
days, 0:00:00
INFO Job task_every_31_minutes_12_seconds was core.py:214
scheduled to run 0:31:12
INFO Job task_every_Monday_at_03_15_00 was scheduled core.py:214
to run Monday(time=datetime.time(3, 15,
tzinfo=<DstTzInfo 'Europe/Berlin' LMT+1:16:00
STD>))
...
INFO Orchestra starting core.py:82
╭────────┬───────────┬────────────────────────────────────────────┬──────────────────────┬─────────────────────┬─────────────────┬──────────┬──────────┬─────────╮
│State │ Schedule │ Job name │ Module and task │ Due at │ Timezone │ Due in │ Attempts │ Tags │
├────────┼───────────┼────────────────────────────────────────────┼──────────────────────┼─────────────────────┼─────────────────┼──────────┼──────────┼─────────┤
│Running │ ONCE │ task_at_9_13 │ tasks.long_task │ 2024-03-03 09:13:00 │ Europe/Berlin │ 0:36:09 │ 0/1 │ gpu │
│Running │ HOURLY │ task_every_hour_at_05_00 │ tasks.long_task │ 2024-03-03 09:05:00 │ Europe/Berlin │ 0:28:09 │ 0/inf │ gpu │
│Running │ CYCLIC │ exception_task_every_10_second │ tasks.exception_task │ 2024-03-03 07:36:57 │ UTC │ 0:00:06 │ 0/inf │ cpu,fast│
│Running │ DAILY │ task_every_day_at_9_15 │ tasks.long_task │ 2024-03-03 09:15:00 │ Europe/Berlin │ 0:38:09 │ 0/inf │ gpu │
│Running │ ONCE │ task_next_23_00_00 │ tasks.long_task │ 2024-03-03 23:00:00 │ Europe/Berlin │ 14:23:09 │ 0/1 │ gpu │
│Running │ ONCE │ task_once_in_13_11_52 │ tasks.long_task │ 2024-03-03 20:48:39 │ UTC │ 13:11:49 │ 0/1 │ gpu │
│Running │ CYCLIC │ task_every_31_minutes_12_seconds │ tasks.long_task │ 2024-03-03 08:07:59 │ UTC │ 0:31:09 │ 0/inf │ gpu │
│Running │ CYCLIC │ short_task_every_1_second │ tasks.short_task │ 2024-03-03 07:36:51 │ UTC │ 0:00:00 │ 3/inf │ cpu,fast│
│Running │ ONCE │ task_at_11_23_00 │ tasks.long_task │ 2024-03-03 11:23:00 │ Europe/Berlin │ 2:46:09 │ 0/1 │ gpu │
│Running │ ONCE │ task_once_in_1_hour_and_10_minutes │ tasks.long_task │ 2024-03-03 08:46:47 │ UTC │ 1:09:57 │ 0/1 │ gpu │
│Running │ WEEKLY │ task_every_Monday_at_03_15_00 │ tasks.long_task │ 2024-03-04 03:15:00 │ Europe/Berlin │ 18:38:09 │ 0/inf │ gpu │
│Running │ DAILY │ task_every_day_at_13_hours │ tasks.long_task │ 2024-03-03 13:00:00 │ Europe/Berlin │ 4:23:09 │ 0/inf │ gpu │
│Running │ ONCE │ task_next_Tuesday_at_13_11_00 │ tasks.long_task │ 2024-03-05 13:11:00 │ Europe/Berlin │ 2 days │ 0/1 │ gpu │
│Running │ DAILY │ task_every_day_on_9_03 │ tasks.long_task │ 2024-03-03 09:03:00 │ Europe/Berlin │ 0:26:09 │ 0/inf │ gpu │
│Running │ MINUTELY │ task_every_minute_at_15_seconds │ tasks.long_task │ 2024-03-03 08:37:15 │ Europe/Berlin │ 0:00:24 │ 0/inf │ gpu │
│Running │ ONCE │ task_2024_04_01_09_13 │ tasks.long_task │ 2024-04-01 09:13:00 │ Europe/Berlin │ 28 days │ 0/1 │ gpu │
│Running │ ONCE │ task_once_Wednesday_at_01_15_00 │ tasks.long_task │ 2024-03-06 01:15:00 │ Europe/Berlin │ 2 days │ 0/1 │ gpu │
│Running │ ONCE │ task_once_on_Thursday_00_31_41 │ tasks.long_task │ 2024-03-07 00:31:41 │ Europe/Berlin │ 3 days │ 0/1 │ gpu │
│Running │ ONCE │ task_once_in_5_hours_2_minutes_and_15_sec… │ tasks.long_task │ 2024-03-03 12:39:02 │ UTC │ 5:02:12 │ 0/1 │ gpu │
│Running │ DAILY │ task_every_day_at_09_15_14 │ tasks.long_task │ 2024-03-03 09:15:14 │ Europe/Berlin │ 0:38:23 │ 0/inf │ gpu │
│Running │ CYCLIC │ task_every_48_hours │ tasks.long_task │ 2024-03-05 07:36:47 │ UTC │ 1 day │ 0/inf │ gpu │
│Running │ WEEKLY │ task_every_Monday_at_03_15_00_US │ tasks.long_task │ 2024-03-04 03:15:00 │ America/Boise │ 1 day │ 0/inf │ gpu │
╰────────┴───────────┴────────────────────────────────────────────┴──────────────────────┴─────────────────────┴─────────────────┴──────────┴──────────┴─────────╯
╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
import asyncio
import datetime
import pytz
from scheduler import trigger
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db",
backend_conn_str="sqlite:///log.db",
broker_connection_retry_on_startup=True,
)
@orchestra.task
def simple_task() -> str:
return "Ran Simple Task"
async def hello():
print("Hello")
async def main():
await orchestra.create_schedule()
# schedule a regular python function, this won't be tracked in the database and Orchestra does not support resuming such jobs in case of a shutdown
orchestra.scheduler.cyclic(timing=datetime.timedelta(seconds=10), alias="Hello World every 10s", handle=hello)
# schedule a Celery task function, Orchestra fully supports scheduling Celery tasks programmatically as well as through a schedule definition
job1 = orchestra.schedule_celery_task(schedule=orchestra.scheduler.cyclic,
timing=datetime.timedelta(seconds=1),
task=simple_task,
job_name="Simple task every 1s",
tags={"latency", "gpu"})
job2 = orchestra.schedule_celery_task(schedule=orchestra.scheduler.weekly,
timing=trigger.Monday(datetime.time(hour=16, minute=30, tzinfo=pytz.timezone("Europe/Berlin"))),
task=simple_task,
job_name="Simple task every Monday at 16:30 UTC",
tags={"cpu"})
# you may pause a task by name, references or by matching tags
orchestra.pause_job("Simple task every Monday at 16:30 UTC")
orchestra.pause_jobs({job1, job2})
# resume job by name or by matching tags
orchestra.resume_jobs(orchestra.get_jobs(tags={"latency"}))
await orchestra.run()
if __name__ == "__main__":
asyncio.run(main())
The output would look something like the following:
[08:39:41] INFO Job Simple task every 1s was scheduled to run core.py:214
0:00:01
INFO Job Simple task every Monday at 16:30 UTC was core.py:214
scheduled to run Monday(time=datetime.time(16,
30, tzinfo=<DstTzInfo 'Europe/Berlin'
LMT+0:53:00 STD>))
INFO Paused job Simple task every Monday at 16:30 UTC core.py:298
INFO Paused job Simple task every 1s core.py:298
INFO Resumed job Simple task every 1s core.py:314
INFO Orchestra starting core.py:82
╭─────────┬───────────┬─────────────────────────────────────────┬───────────────────────┬──────────────────────┬───────────────┬─────────┬──────────┬────────────╮
│State │ Schedule │ Job name │ Module and task │ Due at │ Timezone │ Due in │ Attempts │ Tags │
├─────────┼───────────┼─────────────────────────────────────────┼───────────────────────┼──────────────────────┼───────────────┼─────────┼──────────┼────────────┤
│Running │ CYCLIC │ Hello World every 10s │ hello │ 2024-03-03 07:39:51 │ UTC │ 0:00:04 │ 0/inf │ │
│Running │ CYCLIC │ Simple task every 1s │ __main__.simple_task │ 2024-03-03 07:39:47 │ UTC │ 0:00:00 │ 5/inf │ gpu,latency│
│Paused │ WEEKLY │ Simple task every Monday at 16:30 UTC │ __main__.simple_task │ 2024-03-04 16:30:00 │ Europe/Berlin │ 1 day │ 0/inf │ cpu │
╰─────────┴───────────┴─────────────────────────────────────────┴───────────────────────┴──────────────────────┴───────────────┴─────────┴──────────┴────────────╯
╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
Orchestra comes with a built-in API accessible by setting api_server=True
on the Orchestra
instance.
You may optionally provide an api_address
parameter to control which host:port
gets bound by uvicorn
.
import asyncio
import yaml
from orchestra import Orchestra
orchestra = Orchestra(
broker="sqla+sqlite:///log.db",
backend_conn_str="sqlite:///log.db",
enable_api=True,
api_address="0.0.0.0:5000",
broker_connection_retry_on_startup=True,
)
async def main():
schedule_definitions = yaml.safe_load(open("schedule.yaml", "rt"))
await orchestra.create_schedule(schedule_definitions)
await orchestra.run()
if __name__ == "__main__":
asyncio.run(main())
You can check the Swagger Docs created by FastAPI at the specified api_address
, which is http://localhost:5000 in the above example and by default.
FAQs
Orchestra is a job scheduler on top of Celery
We found that pyorchestra demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.