Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

message-queue-sqlite

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

message-queue-sqlite

A simple message queue demo

  • 2.0.1
  • PyPI
  • Socket score

Maintainers
1

MESSAGE QUEUE SQLITE

[TOC]

介绍

这是一个基于消息队列的任务处理系统。使用 SQLite 作为中间件实现了异步任务的提交与处理,支持多线程处理任务和结果的回调。

特性

  • 异步任务处理: 利用线程池实现任务的并发处理。
  • 动态任务注册: 可以通过装饰器轻松注册新任务。
  • 结果回调: 支持任务执行后的结果回调函数。
  • 状态管理: 通过状态枚举管理任务的生命周期,包括未开始、运行中、已完成、失败等状态

目录结构

message_queue_sqlite/ 
 ├── cache/ 
 │    └── __init__.py # 缓存模块 
 ├── config/ 
 │    └── __init__.py # 配置模块 
 ├── constants/ # 常量模块 
 │    ├── __init__.py 
 │    └── task_status.py # 任务状态常量 
 ├── middleware/ # 中间件模块 
 │    ├── __init__.py 
 │    ├── client.py # 客户端中间件 
 │    └── server.py # 服务器中间件 
 ├── model/ # 结果模型模块 
 │    ├── __init__.py │ 
 |    └── task_result.py # 任务结果模型 
 ├── task_service/ # 任务服务模块 
 │    ├── service/ 
 │    │    ├── __init__.py 
 │    │    └── services.py # 服务回调模块 
 │    ├── task/ 
 │    │    ├── __init__.py 
 │    │    ├── discover.py # 动态任务挂载模块 
 │    │    ├── task_base.py # 任务基类 
 │    │    └── tasks.py # 任务函数管理模块
 │    └── __init__.py 
 └── __init__.py # 初始化模块 

安装

  1. 创建虚拟环境

    mkdir demo
    cd demo
    python -m venv venv
    source venv/bin/activate
    
  2. 克隆仓库

    git clone https://gitee.com/cai-xinpenge/message_queue.git
    

    cd message_queue

  3. 安装

    pip install .
    

使用示例

请参考以下示例代码以了解如何使用 message_queue_sqlite

目录结构

demo/
├── app/
|    ├── engine/
|    |    ├── __init__.py
|    |    └── test.py # 回调函数
|    └── __init__.py
main.py

定义任务

app/engine/test.py

from message_queue_sqlite import task_function

@task_function(use_cache=True)
def test(message):
    print(message)
    return "test"

@task_function()
def test1(message):
    print(f"{message}1")
    return "test1"

app/engine/__init__.py

from .test import test, test1

__all__ = ["test", "test1"]

动态挂载任务并初始化服务

app/__init__.py

from message_queue_sqlite import discover_and_mount_ts, init_server
from message_queue_sqlite.config import Config 

# 动态挂在 app.engine 目录下的任务
tasks = discover_and_mount_ts("app.engine")
tasks.get_all_task_names()

# 初始化服务
config = Config(middleware_path="message_queue", max_workers=5, middleware_num=1)
server, client, send_message = init_server(tasks, config) 
# config 参数可选,默认 middleware_path 为 message_queue,max_workers 为 5,middleware_num 为 1

__all__ = ["model", "server"]

Config 参数可选

  • middleware_path 为中间件路径(不包括文件名)
  • max_workers 为线程池大小
  • middleware_num 为中间件数量(默认为 1)。

启动服务

main.py

from app import server, client, send_message
from message_queue_sqlite import stop_server, start_server
import sys
import time
from threading import Thread


if __name__ == '__main__':
    # 启动服务
    Thread(target=start_server, args=(server, client)).start()
    # 循环发送任务
    while(True):
        try:
            keyworded_args = {'message': 'hello world'}
            callback = lambda x: print(x)
            send_message('test', keyworded_args, callback, 1, True)
            send_message('test1', keyworded_args, callback, 2)
            time.sleep(0.002)
        except KeyboardInterrupt:
            break
    # 停止服务
    sys.exit(stop_server(server, client))

任务调用

# 缓存模式
send_message('test', keyworded_args, callback, priority=1, use_cache=True)  # priority 值越大优先级越高
# 非缓存模式
send_message('test1', keyworded_args, callback, priority=2)

缓存模式

当task任务的参数类型不方便被序列化时,可以选择使用缓存模式。该模式会将参数存入 cache 模块中,但执行任务时会先从 cache 中获取参数,由于 cache 只在运行时存在,所以该模式不支持持久化,若程序意外退出,缓存数据也会丢失,该模式主要针对参数类型不方便被序列化的场景。

多中间件负载均衡

区别于 1.x 版本,2.x 版本支持多中间件负载均衡,可以根据任务的执行时间和任务的负载情况,动态调整中间件的数量,以提高任务处理的效率。send_message 函数会根据指定的中间件数量,将任务发送到不同的中间件,以降低 sqlite 并发处理的压力。

任务状态

任务的生命周期通过枚举类型 TaskStatus 管理,包括:

  • NOT_STARTED: 任务尚未开始
  • RUNNING: 任务正在执行
  • FINISHED: 任务执行完成
  • FAILED: 任务执行失败
  • CALLBACKED: 任务结果已经回调

版本更新

v0.0.1

  • 实现基本功能,包括异步任务处理、动态任务注册、结果回调、状态管理。

v0.0.2

  • 代码优化

v1.0.0

  • 实现缓存模式

v1.0.1

  • 修复了一些 bug

v1.0.2

  • 性能优化

v2.0.0

  • 实现多中间件负载均衡

v2.0.1

  • 说明文档更新

联系

如有问题,请联系作者:

  • 姓名: chakcy
  • 邮箱: 947105045@qq.com
  • 仓库地址: https://gitee.com/cai-xinpenge/message_queue

许可

该项目遵循 MIT 许可证。请查看 LICENSE 文件以获取更多信息。

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc