windyquery - A non-blocking Python PostgreSQL query builder
Windyquery is a non-blocking PostgreSQL query builder with Asyncio.
Installation
$ pip install windyquery
Connection
import asyncio
from windyquery import DB
db = DB()
asyncio.get_event_loop().run_until_complete(db.connect('db_name', {
'host': 'localhost',
'port': '5432',
'database': 'db_name',
'username': 'db_user_name',
'password': 'db_user_password'
}, default=True))
asyncio.get_event_loop().run_until_complete(db.connect('other_db_name', {
'host': 'localhost',
'port': '5432',
'database': 'other_db_name',
'username': 'db_user_name',
'password': 'db_user_password'
}, default=False))
db.connection('other_db_name')
db.default = 'other_db_name'
asyncio.get_event_loop().run_until_complete(db.stop())
CRUD examples
A DB instance can be used to constuct a SQL. The instance is a coroutine object.
It can be scheduled to run by all asyncio mechanisms.
Build a SQL and execute it
async def main(db):
users = await db.table('users').select('id', 'name')
print(users[0]['name'])
asyncio.run(main(db))
SELECT
await db.table('users').select('name AS username', 'address addr')
await db.table('users').select().where('id', 1).where('name', 'Tom')
await db.table('users').select().where('id', '=', 1).where('name', '=', 'Tom')
await db.table('users').select().where('id = ? AND name = ?', 1, 'Tom')
await db.table('cards').select().where("id", [1, 2])
await db.table('cards').select().where("id", 'IN', [1, 2])
await db.table('cards').select().where("id IN (?, ?)", 1, 2)
await db.table('users').select().order_by('id', 'name DESC')
await db.table('users').select().group_by('id', 'name')
await db.table('users').select().limit(100).offset(10)
await db.table('users').select('users.*', 'orders.total').\
join('orders', 'orders.user_id', '=', 'users.id')
await db.table('users').select('users.*', 'orders.total').\
join('orders', 'orders.user_id = users.id AND orders.total > ?', 100)
INSERT
await db.table('users').insert(
{'id': 1, 'name': 'Tom'},
{'id': 2, 'name': 'Jerry'},
{'id': 3, 'name': 'DEFAULT'}
)
await db.table('users').insert(
{'id': 1, 'name': 'Tom'},
{'id': 2, 'name': 'Jerry'},
{'id': 3, 'name': 'DEFAULT'}
).returning('id', 'name')
await db.table('users').insert(
{'id': 1, 'name': 'Tom'},
{'id': 2, 'name': 'Jerry'},
{'id': 3, 'name': 'DEFAULT'}
).returning()
await db.table('users').insert(
{'id': 1, 'name': 'Tom'},
).on_conflict('(id)', 'DO NOTHING')
await db.table('users AS u').insert(
{'id': 1, 'name': 'Tom'},
).on_conflict(
'ON CONSTRAINT users_pkey',
"DO UPDATE SET name = EXCLUDED.name || ' (formerly ' || u.name || ')'"
)
UPDATE
await db.table('cards').where('id', 9).update({'name': 'Tom'})
await db.table('cards').update('total = total + 1').where('id', 9)
await db.table('users').update({'name': 'Tom'}).where('id', '=', 9).returning()
await db.table('users').update({'name': 'Tom'}).where('id', '=', 9).returning('id', 'name')
await db.table('users').update('name = orders.name').\
from_table('orders').\
where('orders.user_id = users.id')
await db.table('users').update('name = product.name, purchase = products.name, is_paid = ?', True).\
from_table('orders').\
join('products', 'orders.product_id', '=', 'products.id').\
where('orders.user_id = users.id')
DELETE
await db.table('users').where('id', 1).delete()
await db.table('users').where('id', 1).delete().returning('id', 'name')
Migration examples
The DB instance can also be used to migrate database schema.
CREATE TABLE
await db.schema('TABLE users').create(
'id serial PRIMARY KEY',
'group_id integer references groups (id) ON DELETE CASCADE',
'created_at timestamp not null DEFAULT NOW()',
'email text not null unique',
'is_admin boolean not null default false',
'address jsonb',
'payday integer not null',
'CONSTRAINT unique_email UNIQUE(group_id, email)',
'check(payday > 0 and payday < 8)',
)
await db.schema('TABLE accounts').create(
'like users'
)
await db.schema('TABLE IF NOT EXISTS accounts').create(
'like users'
)
Modify TABLE
await db.schema('TABLE users').alter(
'alter id TYPE bigint',
'alter name SET DEFAULT \'no_name\'',
'alter COLUMN address DROP DEFAULT',
'alter "user info" SET NOT NULL',
'add CONSTRAINT check(payday > 1 and payday < 6)',
'add UNIQUE(name, email) WITH (fillfactor=70)',
'add FOREIGN KEY (group_id) REFERENCES groups (id) ON DELETE SET NULL',
'drop CONSTRAINT IF EXISTS idx_email CASCADE',
)
await db.schema('TABLE users').alter('RENAME TO accounts')
await db.schema('TABLE users').alter('RENAME email TO email_address')
await db.schema('TABLE users').alter('RENAME CONSTRAINT idx_name TO index_name')
await db.schema('TABLE users').alter('ADD COLUMN address text')
await db.schema('TABLE users').alter('DROP address')
await db.schema('INDEX idx_email ON users').create('name', 'email')
await db.schema('UNIQUE INDEX unique_name ON users').create('name',).where('soft_deleted', False)
await db.schema('INDEX idx_email').drop('CASCADE')
await db.schema('TABLE users').drop()
Raw
The raw
method can be used to execute any form of SQL. Usually the raw
method is used to execute complex hard-coded (versus dynamically built) queries. It's also very common to use raw
method to run migrations.
The input to raw
method is not validated, so it is not safe from SQL injection.
RAW for complex SQL
await db.raw('SELECT ROUND(AVG(group_id),1) AS avg_id, COUNT(1) AS total_users FROM users WHERE id in ($1, $2, $3)', 4, 5, 6)
await db.raw("SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num, letter)")
await db.raw("""
INSERT INTO user (id, name)
SELECT $1, $2 WHERE NOT EXISTS (SELECT id FROM users WHERE id = $1)
""", 1, 'Tom')
RAW for migration
await db.raw("""
CREATE TABLE users(
id INT NOT NULL,
created_at DATE NOT NULL,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
birthday_mmddyyyy CHAR(10) NOT NULL,
)
""")
WITH Clause using VALUES Lists
The Postgres VALUES provides a way to generate a "constant table" from a list of values. Together with the WITH clause, a small set of data can be loaded into the DB and queried like a table.
SELECT using WITH VALUES
result = await db.with_values('my_values', {
'text_col': 'Tom',
'bool_col': True,
'num_col': 2,
'dict_col': {'id': 1},
'datetime_col': datetime.now(),
'null_col': 'null',
'null_col2': None
}).table('my_values').select()
result[0]['text_col']
result[0]['bool_col']
result[0]['num_col']
result[0]['dict_col']
result[0]['datetime_col']
result[0]['null_col']
result[0]['null_col2']
await db.with_values('workers', {
'task_id': 1,
'name': 'Tom'
}, {
'task_id': 2,
'name': 'Jerry'
}).table('workers').select(
'workers.name AS worker_name',
'tasks.name AS task_name'
).join('tasks', 'workers.task_id = tasks.id').order_by('tasks.id')
await db.with_values('workers1', {
'task_id': 1,
'name': 'Tom'
}, {
'task_id': 2,
'name': 'Jerry'
}).with_values('workers2', {
'task_id': 1,
'name': 'Topsy'
}, {
'task_id': 2,
'name': 'Nibbles'
}).table('tasks').select(
'workers1.name AS primary_worker_name',
'workers2.name AS secondary_worker_name',
'tasks.name AS task_name'
).join('workers1', 'workers1.task_id = tasks.id').\
join('workers2', 'workers2.task_id = tasks.id')
UPDATE using WITH VALUES
await db.with_values('workers', {
'task_id': 1,
'name': 'Tom'
}, {
'task_id': 2,
'name': 'Jerry'
}).table('tasks').update("name = tasks.name || ' (worked by ' || workers.name || ')'").\
from_table('workers').\
where('workers.task_id = tasks.id').\
returning(
'workers.name AS worker_name',
'tasks.name AS task_name'
)
RAW using WITH VALUES
await db.with_values('workers', {
'task_id': 1,
'name': 'Tom'
}, {
'task_id': 2,
'name': 'Jerry'
}).raw("""
SELECT * FROM tasks WHERE EXISTS(
SELECT 1 FROM workers
JOIN task_results ON workers.task_id = task_results.task_id
WHERE workers.task_id = tasks.id
)
""")
JSONB examples
Methods are created to support jsonb data type for some simple use cases.
Create a table with jsonb data type
await db.schema('TABLE users').create(
'id serial PRIMARY KEY',
'data jsonb',
)
Select jsonb field
rows = await db.table('users').select('data', 'data->name AS name', 'data->>name AS name_text')
await db.table('users').select('data->name AS name').where('data->>name', 'LIKE', 'Tom%')
await db.table('users').select('data->name AS name').where("data->name", 'Tom')
Insert jsonb field
await db.table('users').insert(
{'data': {'name': 'Tom'}},
{'data': {'name': 'Jerry'}},
).returning()
Update jsonb field
await db.table('users').update({'data': {'address': {'city': 'New York'}}})
await db.table('users').update({'data->address->city': 'Chicago'})
Migrations
Windyquery has a preliminary support for database migrations. The provided command-line script is called wq
.
Generate a migration file
A migration file can be created by,
$ wq make_migration --name=create_my_table
By default, the new file is add to database/migrations/
under the current working directory. If the diretory does not exist, it will be created first. The file contains an empty function to be filled by the user,
async def run(db):
pass
Some sample migration templates are provided at here. They can be automatically inserted in the generated file by specifying the --template
parameter,
$ wq make_migration --name=create_my_table --template="create table"
$ wq make_migration --name=create_my_table --template=all
Run migrations
To run all of the outstanding migrations, use the migrate
sub-command,
$ wq migrate --host=localhost --port=5432 --database=my-db --username=my-name --password=my-pass
$ DB_HOST=localhost DB_PORT=5432 DB_DATABASE=my-db DB_USERNAME=my-name DB_PASSWORD=my-pass wq migrate
Use custom directory and database table
The wq
command requires a directory to save the migration files, and a database table to store executed migrations. By default, the migration directory is database/migrations/
under the current working directroy, and the database table is named migrations
. They are created automatically if they do not already exist.
The directory and table name can be customized by using --migration_dir
and --migration_table
parameters,
$ wq make_migration --name=create_my_table --migrations_dir="my_db_work/migrations"
$ wq migrate --host=localhost --port=5432 --database=my-db --username=my-name --password=my-pass --migrations_dir="my_db_work/migrations" --migrations_table=my_migrations
Syntax checker
A very important part of windyquery is to validate the inputs of the various builder methods. It defines a Validator class, which is used to reject input strings not following the proper syntax.
As a result, it can be used separately as a syntax checker for other DB libraries. For example, it is very common for REST API to support filtering or searching parameters specified by the users,
......
url_query = "name=Tom&state=AZ;DROP TABLE Students"
where = url_query.replace("&", " AND ")
from windyquery.validator import Validator
from windyquery.validator import ValidationError
from windyquery.ctx import Ctx
try:
ctx = Ctx()
validator = Validator()
where = validator.validate_where(where, ctx)
except ValidationError:
abort(400, f'Invalid query parameters: {url_query}')
connection = psycopg2.connect(**dbConfig)
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM users WHERE {where}')
......
Please note,
- Except
raw
, all windyquery's own builder methods, such as select
, update
, where
, and so on, already implicitly use these validation functions. They may be useful when used alone, for example, to help other DB libraries validate SQL snippets; - These validation functions only cover a very small (though commonly used) subset of SQL grammar of Postgres.
Listen for a notification
Postgres implements LISTEN/NOTIFY for interprocess communications.
In order to listen on a channel, use the DB.listen() method. It returns an awaitable object, which resolves to a dict when a notification fires.
from windyquery.exceptions import ListenConnectionClosed
listener = db.listen('my_table')
await listener.start()
try:
for _ in range(100):
result = await listener
print(result)
except ListenConnectionClosed as e:
print(e)
finally:
await listener.stop()
async with db.listen('my_table') as listener:
for _ in range(100):
result = await listener
print(result)
RRULE
Windyquery has a rrule function that can "expand" a rrule string into it occurrences (a list of datetimes) by using dateutil. A values CTE is prepared from the rrule occurrences, which can be further used by other querries.
A simple rrule example
rruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
await db.rrule('my_rrules', {'rrule': rruleStr}).table('my_rrules').select()
More than one rrules
rruleStr1 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
rruleStr2 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;INTERVAL=10;COUNT=3
RRULE:FREQ=DAILY;INTERVAL=5;COUNT=3
"""
)
await db.rrule('my_rrules', {
'rrule': rruleStr1
}, {
'rrule': rruleStr2
}).table('my_rrules').select()
await db.rrule('my_rrules', {
'rrule': [rruleStr1, rruleStr2]
}).table('my_rrules').select()
Use exrule
rruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
exruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;BYWEEKDAY=SA,SU
"""
await db.rrule('my_rrules', {'rrule': rruleStr, 'exrule': exruleStr}).table('my_rrules').select()
Use rdate
await db.rrule('my_rrules', {'rdate': '20210503T100000Z'}).table('my_rrules').select()
rruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
await db.rrule('my_rrules', {'rrule': rruleStr, 'rdate': '20210503T100000Z'}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rdate': ['20210503T100000Z','20210603T100000Z']}).table('my_rrules').select()
Use exdate
rruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
await db.rrule('my_rrules', {'rrule': rruleStr, 'exdate': '20210304T100000Z'}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'exdate': ['20210304T100000Z','20210306T100000Z']}).table('my_rrules').select()
Use after, before, and between
rruleStr = """
DTSTART:20210715T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_after': {'dt': '20210716T100000Z'}}]}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_after': {'dt': '20210716T100000Z', 'inc': True}}]}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_before': {'dt': '20210716T100000Z'}}]}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_before': {'dt': '20210716T100000Z', 'inc': True}}]}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_between': {'after': '20210716T100000Z', 'before': '20210719T100000Z'}}]}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_between': {'after': '20210716T100000Z', 'before': '20210719T100000Z', 'inc': True}}]}).table('my_rrules').select()
Join rrule with other tables
import datetime
rruleStr1 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
rruleStr2 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;INTERVAL=10;COUNT=3
RRULE:FREQ=DAILY;INTERVAL=5;COUNT=3
"""
await db.rrule('task_rrules', {
'task_id': 1, 'rrule': rruleStr1
}, {
'task_id': 2, 'rrule': rruleStr2
}).table('task_rrules').
join('tasks', 'tasks.id', '=', 'task_rrules.task_id').
where('rrule > ? AND rrule < ?',
datetime.datetime(2021, 3, 5, 10, 0,
tzinfo=datetime.timezone.utc),
datetime.datetime(2021, 3, 8, 10, 0,
tzinfo=datetime.timezone.utc),
).select('task_rrules.rrule', 'tasks.name')
Using rrule in update
import datetime
rruleStr1 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
rruleStr2 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;INTERVAL=10;COUNT=3
RRULE:FREQ=DAILY;INTERVAL=5;COUNT=3
"""
await db.rrule('task_rrules', {
'task_id': 1, 'rrule': rruleStr1
}, {
'task_id': 2, 'rrule': rruleStr2
}).table('tasks').update("result = 'done'").
from_table('task_rrules').
where('task_rrules.task_id = tasks.id')
Using rrule with raw method
import datetime
rruleStr1 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;COUNT=5
"""
rruleStr2 = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY;INTERVAL=10;COUNT=3
RRULE:FREQ=DAILY;INTERVAL=5;COUNT=3
"""
await db.rrule('task_rrules', {
'task_id': 1, 'rrule': rruleStr1
}, {
'task_id': 3, 'rrule': rruleStr2
}).raw("""
DELETE FROM tasks
WHERE EXISTS(
SELECT 1 FROM task_rrules
WHERE
task_id = tasks.id AND
rrule > $1
)
RETURNING id, task_id
""", datetime.datetime(2021, 3, 20, 10, 0,
tzinfo=datetime.timezone.utc))
Using a slice to limit the occurrences
import datetime
rruleStr = """
DTSTART:20210303T100000Z
RRULE:FREQ=DAILY
"""
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_slice': slice(3)}).table('my_rrules').select()
await db.rrule('my_rrules', {'rrule': rruleStr, 'rrule_slice': slice(10,20,2)}).table('my_rrules').select()
Tests
Windyquery includes tests. These tests are also served as examples on how to use this library.
Running tests
Install pytest to run the included tests,
pip install -U pytest
Set up a postgres server with preloaded data. This can be done by using docker with the official postgre docker image,
docker run --rm --name windyquery-test -p 5432:5432 -v ${PWD}/windyquery/tests/seed_test_data.sql:/docker-entrypoint-initdb.d/seed_test_data.sql -e POSTGRES_USER=windyquery-test -e POSTGRES_PASSWORD=windyquery-test -e POSTGRES_DB=windyquery-test -d postgres:12-alpine
Note: to use existing postgres server, it must be configured to have the correct user, password, and database needed in tests/conftest.py. Data needed by tests is in tests/seed_test_data.sql.
To run the tests,
pytest