tqueue package
This library allows you to do your tasks in multiple threads easily.
This is helpful when you have a lot of data to process.
Assume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.
Workers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).
Installation
pip install tqueue
Usage
- Import library
from tqueue import ThreadingQueue
- Create a worker
- Create a worker function that gets the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child threads
def worker(data):
pass
async def worker2(data):
pass
- Set threading for a producer
Apply the threading for a producer:
-
a. Set the number of threads and the worker
-
b. Put data into the queue
-
You can also use ThreadingQueue as a context manager
def producer():
with ThreadingQueue(40, worker) as tq:
...
tq.put(data)
- You can also use it async
async def producer():
async with ThreadingQueue(40, worker) as tq:
...
await tq.put(data)
- Run producer
await producer()
or
asyncio.run(producer())
Note
- You can add more keyword params for all workers running in threads via
worker_params
- Apart from the number of threads and the worker, you can set
log_dir
to store logs to file - and
worker_params_builder
to generate parameters for each worker. on_thread_close
is an optional param as a function that is helpful when you need to close the database connection when a thread done- Apart from all the above params, the rest of the keyword params will be passed to the worker.
- If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:
with ThreadingQueue(num_of_threads, worker) as tq:
...
await tq.put(data)
with ThreadingQueue(num_of_threads, worker) as tq:
...
tq.put(data)
async with ThreadingQueue(num_of_threads, worker) as tq:
...
await tq.put(data)
- In both sync and async cases, you can provide a worker as an async function.
- The async version is a little bit better in performance because it uses
asyncio.sleep
to wait when the queue is full compared to time.sleep
in the sync version. In most cases, the difference in performance is not much.
Example
import json
import pymysql
import asyncio
from tqueue import ThreadingQueue
NUM_OF_THREADS = 40
def get_db_connection():
return pymysql.connect(host='localhost',
user='root',
password='123456',
database='example',
cursorclass=pymysql.cursors.DictCursor)
def worker_params_builder():
conn = get_db_connection()
conn.autocommit(1)
cursor = conn.cursor()
return {"cursor": cursor, "connection": conn}
def on_close_thread(cursor, connection):
cursor.close()
connection.close()
def worker(image_info, cursor, uid: int, **kwargs):
sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
def producer(source_file: str):
with ThreadingQueue(
NUM_OF_THREADS, worker,
log_dir=f"logs/update-images",
worker_params_builder=worker_params_builder,
on_close_thread=on_close_thread,
params={"uid": 123},
retry_count=1
) as tq:
with open(source_file, 'r') as f:
for line in f:
if not line:
continue
data = json.loads(line)
tq.put(data)
if __name__ == "__main__":
producer("images.jsonl")
Development
Build project
- Update the version number in file
src/tqueue/__version__.py
- Update the Change log
- Build and publish the changes
python3 -m build
python3 -m twine upload dist/*
Release Information
Fixed
- No exception when log to file anymore
Full changelog