Security News
Weekly Downloads Now Available in npm Package Search Results
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
This library is designed for asynchronous execution of scheduled tasks.
Simple example:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Example without using decorators:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Эта библиотека предназначена для асинхронного выполнения задач по расписанию.
Простой пример:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Пример без использования декораторов:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
FAQs
This library is designed for asynchronous execution of scheduled tasks.
We found that aiobgjobs demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
Security News
A Stanford study reveals 9.5% of engineers contribute almost nothing, costing tech $90B annually, with remote work fueling the rise of "ghost engineers."
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.