Product
Introducing SSO
Streamline your login process and enhance security by enabling Single Sign-On (SSO) on the Socket platform, now available for all customers on the Enterprise plan, supporting 20+ identity providers.
Readme
.. image:: https://img.shields.io/pypi/v/psycopg2_mq.svg :target: https://pypi.org/pypi/psycopg2_mq
.. image:: https://github.com/mmerickel/psycopg2_mq/workflows/Build%20and%20test/badge.svg?branch=main :target: https://github.com/mmerickel/psycopg2_mq/actions?query=workflow%3A%22Build+and+test%22 :alt: main CI Status
psycopg2_mq
is a message queue implemented on top of
PostgreSQL <https://www.postgresql.org/>
,
SQLAlchemy <https://www.sqlalchemy.org/>
, and
psycopg2 <http://initd.org/psycopg/>
__.
Currently the library provides only the low-level constructs that can be used to build a multithreaded worker system. It is broken into two components:
psycopg2_mq.MQWorker
- a reusable worker object that manages a
single-threaded worker that can accept jobs and execute them. An application
should create worker per thread. It supports an API for thread-safe graceful
shutdown.
psycopg2_mq.MQSource
- a source object providing a client-side API for
invoking and querying job states.
It is expected that these core components are then wrapped into your own application in any way that you see fit, without dictating a specific CLI or framework.
Workers run jobs defined in queues. Currently each queue will run jobs
concurrently, while a future version may support serial execution on a
per-queue basis. Each registered queue should contain an execute_job(job)
method.
The execute_job
method of a queue is passed a Job
object containing
the following attributes:
id
queue
method
args
cursor
As a convenience, there is an extend(**kw)
method which can be used to
add extra attributes to the object. This is useful in individual queues to
define a contract between a queue and its methods.
A Job
can be scheduled with a cursor_key
. There can only be one
pending job and one running job for any cursor. New jobs scheduled while
another one is pending will be ignored and the pending job is returned.
A job.cursor
dict is provided to the workers containing the cursor data,
and is saved back to the database when the job is completed. This effectively
gives jobs some persistent, shared state, and serializes all jobs over a given
cursor.
A Job
can be delayed to run in the future by providing a datetime
object to the when
argument. This, along with a cursor key, can provide a
nice throttle on how frequently a job runs. For example, delay jobs to run
in 30 seconds with a cursor_key
and any jobs that are scheduled in the
meantime will be dropped. The assumption here is that the arguments are
constant and data for execution is in the cursor or another table. As a last
resort, a conflict_resolver
callback can be used to modify properties of
the job when arguments cannot be constant.
A JobSchedule
can be defined which supports
RFC 5545 <https://tools.ietf.org/html/rfc5545>
__ RRULE
schedules. These
are powerful and can support timezones, frequencies based on calendars as well
as simple recurring rules from an anchor time using DTSTART
. Cron jobs
can be converted to this syntax for simpler scenarios.
psycopg2-mq
workers will automatically negotiate which worker is responsible
for managing schedules so clustered workers should operate as expected.
.. code-block:: python
from psycopg2_mq import (
MQWorker,
make_default_model,
)
from sqlalchemy import (
MetaData,
create_engine,
)
import sys
class EchoQueue:
def execute_job(self, job):
return f'hello, {job.args["name"]} from method="{job.method}"'
if __name__ == '__main__':
engine = create_engine(sys.argv[1])
metadata = MetaData()
model = make_default_model(metadata)
worker = MQWorker(
engine=engine,
queues={
'echo': EchoQueue(),
},
model=model,
)
worker.run()
.. code-block:: python
engine = create_engine()
metadata = MetaData()
model = make_default_model(metadata)
session_factory = sessionmaker()
session_factory.configure(bind=engine)
dbsession = session_factory()
with dbsession.begin():
mq = MQSource(
dbsession=dbsession,
model=model,
)
job = mq.call('echo', 'hello', {'name': 'Andy'})
print(f'queued job={job.id}')
Add support for Python 3.10, and 3.11.
[breaking] Prevent retrying of collapsible jobs. Require them to be invoked
using call
instead for an opportunity to specify a conflict_resolver
.
Fix a bug in the default model schema in which the collapsible database index was not marked unique.
Copy trace info when retrying a job.
Capture the stringified exception to the job result in the message
key,
alongside the existing tb
, exc
, and args
keys.
The worker was not recognizing capture_signals=False
, causing problems
when running the event loop in other threads.
Blackify the codebase and add some real tests. Yay!
MQWorker.make_job_context
.Drop Python 3.6 support.
[breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to SQLAlchemy 2.0.
[breaking] Rename update_job_id
to updated_job_id
in the
JobCursor
model.
Ensure the trace
attribute is populated on the JobContext
.
Add MQWorker.make_job_context
which can be defined to completely override
the JobContext
factory using the Job
object and open database session.
[breaking] Add update_job_id
foreign key to the JobCursor
model to
make it possible to know which job last updated the value in the cursor.
[breaking] Add trace
json blob to the Job
model.
Support a trace
json blob when creating new jobs. This value is available
on the running job context and can be used when creating sub-jobs or when
making requests to external systems to pass through tracing metadata.
See MQSource.call
's new trace
parameter when creating jobs.
See JobContext.trace
attribute when handling jobs.
Add a standard FailedJobError
exception which can be raised by jobs to
mark a failure with a custom result object. This is different from unhandled
exceptions that cause the MQWorker.result_from_error
method to be invoked.
[breaking] Add model changes to mark jobs as collapsible.
[breaking] Add model changes to the cursor index.
Allow multiple pending jobs to be scheduled on the same cursor if either:
The queue or method are different from existing pending jobs on the cursor.
collapse_on_cursor
is set to False
when scheduling the job.
schedule_id
attribute to the job context for use in jobs that want
to know whether they were executed from a schedule or not.schedule_id
information to retried jobs.call_schedule
to accept an id instead of an object.UNIQUE
constraint on the background job lock_id
column.Add a scheduler model with support for emitting periodic jobs based on RRULE syntax. See https://github.com/mmerickel/psycopg2_mq/pull/11
Enable the workers to coordinate on a per-queue basis who is in control of scheduling jobs. See https://github.com/mmerickel/psycopg2_mq/pull/12
Reduce the number of advisory locks held from one per job to one per worker. See https://github.com/mmerickel/psycopg2_mq/pull/12
timeout
parameter.Add a worker
column to the Job
model to track what worker
is handling a job.
Add an optional name
argument to MQWorker
to name the worker -
the value will be recorded in each job.
Add a threads
argument (default=1
) to MQWorker
to support
handling multiple jobs from the same worker instance instead of making a
worker per thread.
Add capture_signals
argument (default=True
) to MQWorker
which
will capture SIGTERM
, SIGINT
and SIGUSR1
. The first two will
trigger graceful shutdown - they will make the process stop handling new
jobs while finishing active jobs. The latter will dump to stderr
a
JSON dump of the current status of the worker.
When attempting to schedule a job with a cursor and a scheduled_time
earlier than a pending job on the same cursor, the job will be updated to
run at the earlier time.
When attempting to schedule a job with a cursor and a pending job already
exists on the same cursor, a conflict_resolver
function may be
supplied to MQSource.call
to update the job properties, merging the
arguments however the user wishes.
cursor_snapshot
to the Job
model which will
contain the value of the cursor when the job begins.cursor_key
column, a new JobCursor
model, and some new indices.psycopg2_mq.MQSource.call
to allow custom columns on the job table.Job.params
to Job.args
.psycopg2
an optional dependency in order to allow apps to depend
on psycopg2-binary
if they wish.FAQs
A message queue written around PostgreSQL.
We found that psycopg2-mq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Streamline your login process and enhance security by enabling Single Sign-On (SSO) on the Socket platform, now available for all customers on the Enterprise plan, supporting 20+ identity providers.
Security News
Tea.xyz, a crypto project aimed at rewarding open source contributions, is once again facing backlash due to an influx of spam packages flooding public package registries.
Security News
As cyber threats become more autonomous, AI-powered defenses are crucial for businesses to stay ahead of attackers who can exploit software vulnerabilities at scale.