
Product
Secure Your AI-Generated Code with Socket MCP
Socket MCP brings real-time security checks to AI-generated code, helping developers catch risky dependencies before they enter the codebase.
PGQueuer is a minimalist, high-performance job queue library for Python, leveraging PostgreSQL's robustness. Designed with simplicity and efficiency in mind, PGQueuer offers real-time, high-throughput processing for background jobs using PostgreSQL's LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED
mechanisms.
FOR UPDATE SKIP LOCKED
to ensure reliable concurrency control and smooth job processing without contention.LISTEN
and NOTIFY
commands for real-time job status updates.Install PGQueuer via pip:
pip install pgqueuer
Below is a minimal example of how to use PGQueuer to process data.
from __future__ import annotations
from datetime import datetime
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
pgq = PgQueuer(driver)
# Entrypoint for jobs whose entrypoint is named 'fetch'.
@pgq.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job!r}")
# Define and register recurring tasks using cron expressions
# The cron expression "* * * * *" means the task will run every minute
@pgq.schedule("scheduled_every_minute", "* * * * *")
async def scheduled_every_minute(schedule: Schedule) -> None:
print(f"Executed every minute {schedule!r} {datetime.now()!r}")
return pgq
The above example is located in the examples folder, and can be run by using the pgq
cli.
pgq run examples.consumer:main
from __future__ import annotations
import sys
import asyncpg
import uvloop
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main(N: int) -> None:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
["fetch"] * N,
[f"this is from me: {n}".encode() for n in range(1, N + 1)],
[0] * N,
)
if __name__ == "__main__":
N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
uvloop.run(main(N))
Run the producer:
python3 examples/producer.py 10000
Monitor job processing statistics in real-time using the built-in dashboard:
pgq dashboard --interval 10 --tail 25 --table-format grid
This provides a real-time, refreshing view of job queues and their status.
Example output:
+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
...
+---------------------------+-------+------------+--------------------------+------------+----------+
We follow the Scientific Python SPEC 0 policy of dropping a Python minor version three years after its initial releas.
PGQueuer is MIT licensed. See LICENSE for more information.
Ready to supercharge your workflows? Install PGQueuer today and take your job management to the next level!
FAQs
Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.
We found that pgqueuer demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket MCP brings real-time security checks to AI-generated code, helping developers catch risky dependencies before they enter the codebase.
Security News
As vulnerability data bottlenecks grow, the federal government is formally investigating NISTβs handling of the National Vulnerability Database.
Research
Security News
Socketβs Threat Research Team has uncovered 60 npm packages using post-install scripts to silently exfiltrate hostnames, IP addresses, DNS servers, and user directories to a Discord-controlled endpoint.