Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-task-processor

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-task-processor

Simple package to run async tasks

  • 0.2.6
  • PyPI
  • Socket score

Maintainers
1

==================== Async task processor

Used to distribute tasks between configurable workers.

Features

  • simple definition of a task as a normal function.
  • SimpleProcessor is used for simple tasks.
  • PeriodicProcessor is used for periodic tasks.
  • TarantoolProcessor is used for listen tarantool queue and trigger task when data comes.
  • ability to retry on error (max_retries and retry_countdown options).
  • ability to bind task as self option to worker function.
  • ability to implement your own task processor.
  • ability to make control api with processors (can manage your workers)

TODO's

  • Tests
  • Console utils
  • Sphinx docs

Installation

As usually use pip:

.. code-block:: bash

pip install async-task-processor

Usage examples

Periodic task processor example:

.. code-block:: python

import time

from async_task_processor import ATP
from async_task_processor.processors import PeriodicProcessor
from examples import logger


# first test function
def test_func_one(sleep_time, word):
    """

    :type sleep_time: int
    :type word: str
    :return:
    """
    logger.info('start working')
    time.sleep(sleep_time)
    logger.info('Job is done. Word is: %s' % word)


# second test function
def test_func_second(sleep_time, word):
    """

    :type sleep_time: int
    :type word: str
    :return:
    """
    logger.info('start working')
    time.sleep(sleep_time)
    logger.info('Job is done. Word is: %s' % word)


# third function with exception
def test_func_bad(self, sleep_time, word):
    """

    :type self: async_task_processor.Task
    :type sleep_time: int
    :type word: str
    :return:
    """
    logger.info('start working')
    try:
        a = 1 / 0
    except ZeroDivisionError:
        # optionally you can overload max_retries and retry_countdown here
        self.retry()
    time.sleep(sleep_time)
    logger.info('Job is done. Word is: %s' % word)


atp = ATP(asyncio_debug=True)
task_processor = PeriodicProcessor(atp=atp)

# Add function to task processor
task_processor.add_task(test_func_one, args=[5, 'first hello world'], max_workers=5, timeout=1,
                        max_retries=5, retry_countdown=1)

# Add one more function to task processor
task_processor.add_task(test_func_second, args=[3, 'second hello world'], max_workers=5, timeout=1,
                        max_retries=5, retry_countdown=1)

# Add one more bad function with exception. This function will raise exception and will retry it,
# then when retries exceeded, workers of this func will stop one by one with exception MaxRetriesExceeded
# bind option make Task as self argument
task_processor.add_task(test_func_bad, args=[3, 'second hello world'], bind=True, max_workers=2, timeout=1,
                        max_retries=3, retry_countdown=3)

# Start async-task-processor
atp.start()

Tarantool task processor example:

.. code-block:: python

import asyncio
import time

import asynctnt
import asynctnt_queue

from async_task_processor import ATP
from async_task_processor.processors import TarantoolProcessor
from examples import logger

TARANTOOL_QUEUE = 'test_queue'
TARANTOOL_HOST = 'localhost'
TARANTOOL_PORT = 3301
TARANTOOL_USER = None
TARANTOOL_PASS = None


def put_messages_to_tarantool(messages_count=1, tube_name='test_queue', host='localhost', port=3301, user=None,
                              password=None):
    """Put some test messages to tarantool queue

    :param messages_count: messages number to put in queue
    :param tube_name: tarantool queue name
    :type tube_name: str
    :param host: tarantool host
    :param port: tarantool port
    :param user: tarantool user
    :param password: tarantool password
    :return:
    """

    async def put_jobs():
        conn = asynctnt.Connection(host=host, port=port, username=user, password=password)
        await conn.connect()
        queue = asynctnt_queue.Queue(conn)
        tube = queue.tube(tube_name)
        [await tube.put(dict(num=i, first_name='Jon', last_name='Smith')) for i in range(messages_count)]
        await conn.disconnect()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.ensure_future(put_jobs()))
    loop.close()


# Let's put 100 messages to tarantool
put_messages_to_tarantool(messages_count=100, tube_name=TARANTOOL_QUEUE, host=TARANTOOL_HOST, port=TARANTOOL_PORT,
                          user=TARANTOOL_USER, password=TARANTOOL_PASS)


# Test function
def test_func(self, sleep_time, word):
    """

    :type self: async_task_processor.TarantoolTask
    :type sleep_time: int
    :type word: str
    :return:
    """
    logger.info('start working')
    time.sleep(sleep_time)
    logger.info('Job is done. Word is %s. Data is %s. ' % (word, self.data))


atp = ATP(asyncio_debug=True)
task_processor = TarantoolProcessor(atp=atp, host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,
                                    password=TARANTOOL_PASS, connection_max_retries=3, connection_retry_countdown=3)

# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers
# will be started.
task_processor.add_task(foo=test_func, queue=TARANTOOL_QUEUE, args=[1, 'hello world'], bind=True, max_workers=20,
                        max_retries=5, retry_countdown=1)
# Start async-task-processor
atp.start()

Tarantool task processor example with ability to scale workers via tarantool:

.. code-block:: python

import asyncio
import importlib
import socket
import sys
import time

import asynctnt
import asynctnt_queue
import tarantool

from async_task_processor import ATP
from async_task_processor.processors import TarantoolProcessor
from examples import logger

TARANTOOL_QUEUE = 'test_queue'
TARANTOOL_HOST = 'localhost'
TARANTOOL_PORT = 3301
TARANTOOL_USER = None
TARANTOOL_PASS = None


def put_messages_to_tarantool(messages_count=1, tube_name='test_queue', host='localhost', port=3301, user=None,
                              password=None):
    """Put some test messages to tarantool queue

    :param messages_count: messages number to put in queue
    :param tube_name: tarantool queue name
    :type tube_name: str
    :param host: tarantool host
    :param port: tarantool port
    :param user: tarantool user
    :param password: tarantool password
    :return:
    """

    async def put_jobs():
        conn = asynctnt.Connection(host=host, port=port, username=user, password=password)
        await conn.connect()
        tube = asynctnt_queue.Queue(conn).tube(tube_name)
        [await tube.put(dict(num=i, first_name='Jon', last_name='Smith')) for i in range(messages_count)]
        await conn.disconnect()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.ensure_future(put_jobs()))
    loop.close()


# Let's put 100 messages to tarantool
put_messages_to_tarantool(messages_count=100, tube_name=TARANTOOL_QUEUE, host=TARANTOOL_HOST, port=TARANTOOL_PORT,
                          user=TARANTOOL_USER, password=TARANTOOL_PASS)


# Create tube in queue for manage workers
def create_tube(tube_name):
    try:
        t = tarantool.connect(host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,
                              password=TARANTOOL_PASS)
        t.call("queue.create_tube", (tube_name, 'fifo', {'if_not_exists': True}))
    except tarantool.error.DatabaseError as e:
        if e.args[0] == 32:
            pass
        else:
            raise


# Test function
def test_func(self, sleep_time, word):
    """

    :type self: async_task_processor.TarantoolTask
    :type sleep_time: int
    :type word: str
    :return:
    """
    logger.info('Start working')
    time.sleep(sleep_time)
    logger.info('Job is done. Word is %s. Data is %s. ' % (word, self.data))


# Function for import functions
def func_import(foo_path):
    path_list = foo_path.split('.')
    func_name = path_list.pop()
    m = importlib.import_module('.'.join(path_list)) if path_list else sys.modules[__name__]
    func = getattr(m, func_name)
    return func


# Function for manage workers
def add_task(self, tp):
    """

    :type self: async_task_processor.primitives.TarantoolTask
    :type tp: TarantoolProcessor
    :return:
    """
    if self.data['command'] == 'stop':
        tp.stop(name=self.data['foo'], workers_count=self.data['max_workers'], leave_last=False)
        self.app.logger.info("%d workers was deleted from task %s" % (self.data['max_workers'], self.data['foo']))
    elif self.data['command'] == 'start':
        tp.add_task(foo=func_import(self.data['foo']), queue=TARANTOOL_QUEUE, args=[1, 'message from new worker'],
                    bind=True, max_workers=self.data['max_workers'], name=self.data['foo'])
        self.app.logger.info("Added %d workers for task %s" % (self.data['max_workers'], self.data['foo']))
    elif self.data['command'] == 'info':
        [logger.info(task.as_json()) for task in self.app.tasks]
    else:
        self.app.logger.info("Unknown command %s" % self.data['command'])


# get host ip
ip = [l for l in ([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")][:1], [
    [(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in
     [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) if l][0][0].replace('.', '_')

# manage tube name
control_tube_name = 'control_queue_%s' % ip
logger.info("control tube is %s" % control_tube_name)

# create tube for manage workers
create_tube(control_tube_name)

atp = ATP(asyncio_debug=True,logger=logger)

task_processor = TarantoolProcessor(atp=atp, host=TARANTOOL_HOST, port=TARANTOOL_PORT, user=TARANTOOL_USER,
                                    password=TARANTOOL_PASS, connection_max_retries=3, connection_retry_countdown=3)

# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers
# will be started.
task_processor.add_task(foo=test_func, queue=TARANTOOL_QUEUE, args=[1, 'hello world'], bind=True, max_workers=20,
                        max_retries=5, retry_countdown=1)

# Add task for listen manage tube commands. In this case if you start your app on different hosts,
# you would control all host, because ip in control queue and different queues will be created for each host.
# You can try to manage workers from tarantool console. Example command:
# queue.tube.control_queue_<your ip>:put({ foo='test_func', command = 'start', max_workers = 2})
task_processor.add_task(foo=add_task, queue=control_tube_name, args=[task_processor], bind=True)

# Start async-task-processor
atp.start()

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc