Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
PostgreSQL adapter to stream results of multi-statement queries without a server-side cursor
Stream results of multi-statement PostgreSQL queries from Python without server-side cursors. Has benefits over some other Python PostgreSQL libraries:
Streams results from complex multi-statement queries even though SQL doesn't allow server-side cursors for such queries - suitable for large amounts of results that don't fit in memory.
CTRL+C (SIGINT) by default behaves as expected even during slow queries - a KeyboardInterrupt
is raised and quickly bubbles up through streampq code. Unless client code prevents it, the program will exit.
Every effort is made to cancel queries on KeyboardInterrupt
, SystemExit
, or errors - the server doesn't continue needlessly using resources.
Particularly useful when temporary tables are needed to store intermediate results in multi-statement SQL scripts, or when using a server-side cursor would make a query slower. Instead of server-side cursors libpq's single-row mode is used.
pip install streampq
The libpq
binary library is also required. This is typically either already installed, or installed by:
brew install libpq
apt install libpq5
yum install postgresql-libs
The only runtime dependencies are libpq
and Python itself.
from streampq import streampq_connect
# libpq connection paramters
# https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
#
# Any can be ommitted and environment variables will be used instead
# https://www.postgresql.org/docs/current/libpq-envars.html
connection_params = (
('host', 'localhost'),
('port', '5432'),
('dbname', 'postgres'),
('user', 'postgres'),
('password', 'password'),
)
# SQL statement(s) - if more than one, separate by ;
sql = '''
SELECT * FROM my_table;
SELECT * FROM my_other_table;
'''
# Connection and querying is via a context manager
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql):
print(columns) # Tuple of column names (see below)
for row in rows:
print(row) # Tuple of row values
You can create a Pandas dataframe of SQL query results. This loads all results into memory.
import itertools
import pandas as pd
from streampq import streampq_connect
connection_params = (
('host', 'localhost'),
('port', '5432'),
('dbname', 'postgres'),
('user', 'postgres'),
('password', 'password'),
)
sql = '''
SELECT * FROM my_table;
SELECT * FROM my_other_table;
'''
with streampq_connect(connection_params) as query:
for columns, rows in query(sql):
df = pd.DataFrame(rows, columns=columns)
print(df)
For larger sets of results that don't fit into memory, you can create chunked Pandas dataframes of SQL query results, similar to the output of using the chunksize
parameter of the Pandas read_csv
function. This uses a helper function to do the chunking.
import itertools
import pandas as pd
from streampq import streampq_connect
connection_params = (
('host', 'localhost'),
('port', '5432'),
('dbname', 'postgres'),
('user', 'postgres'),
('password', 'password'),
)
sql = '''
SELECT * FROM my_table;
SELECT * FROM my_other_table;
'''
def query_chunked_dfs(query, sql, chunk_size):
def _chunked_df(columns, rows):
it = iter(rows)
while True:
df = pd.DataFrame(itertools.islice(it, chunk_size), columns=columns)
if len(df) == 0:
break
yield df
for columns, rows in query(sql):
yield _chunked_df(columns, rows)
with streampq_connect(connection_params) as query:
for chunked_dfs in query_chunked_dfs(query, sql, chunk_size=10000):
for df in chunked_dfs:
print(df)
Each column is an instance of streampq.Column, a subclass of str with two extra attributes:
type_id
The PostgresSQL type id of the column. For example 25 indicates text.
type_modifier
The PostgreSQL type modifier of the column. This is often -1 to indicate no modification.
While it's not typical to subclass str in Python, it's much more frequent to only need the column name for query results than the PostgreSQL type ID and modifier.
There are 194 built-in PostgreSQL data types (including array types), and streampq converts them to Python types. In summary:
PostgreSQL types | Python type |
---|---|
null | None |
text (e.g. varchar), xml, network addresses, and money | str |
byte (e.g. bytea) | bytes |
integer (e.g. int4) | int |
inexact real number (e.g. float4) | float |
exact real number (e.g. numeric) | Decimal |
date | date (+ and - infinity are mapped to date.max and date.min respectively) |
timestamp | datetime (without timezone, + and - infinity are mapped to datetime.max and datetime.min respectively) |
timestamptz | datetime (with offset timezone, + and - infinity are mapped to datetime.max and datetime.min respectively) |
json and jsonb | output of json.loads |
interval | streampq.Interval |
range (e.g. daterange) | streampq.Range |
multirange (e.g. datemultirange) | tuples of streampq.Range |
arrays and vectors | tuple (of any of the above types, or of nested tuples) |
To customise these, override the default value of the get_decoders
parameter of the streampq_connect
function in streampq.py.
In general, built-in types are preferred over custom types, and immutable types are preferred over mutable.
The Python built-in timedelta type is not used for PostgreSQL interval since timedelta does not offer a way to store PostgreSQL intervals of years or months, other than converting to days which would be a loss of information.
Instead, a namedtuple is defined, streampq.Interval, with members:
Member | Type |
---|---|
years | int |
months | int |
days | int |
hours | int |
minutes | int |
seconds | Decimal |
There is no Python built-in type for a PosgreSQL range. So for these, a namedtuple is defined, streampq.Range, with members:
Member | Type |
---|---|
lower | int, date, datetime (without timezone), or datetime (with offset timezone) |
upper | int, date, datetime (without timezone), or datetime (with offset timezone) |
bounds | str - one of () , (] , [) , or [] |
Dynamic SQL literals can be bound using the literals
parameter of the query function. It must be an iterable of key-value pairs.
sql = '''
SELECT * FROM my_table WHERE my_col = {my_col_value};
'''
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql, literals=(
('my_col_value', 'my-value'),
)):
for row in rows:
pass
Dynamic SQL identifiers, e.g. column names, can be bound using the identifiers
parameter of the query function. It must be an iterable of key-value pairs.
sql = '''
SELECT * FROM my_table WHERE {column_name} = 'my-value';
'''
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql, identifiers=(
('column_name', 'my_col'),
)):
for row in rows:
pass
Identifiers and literals use different escaping rules - hence the need for 2 different parameters.
While this library is specialsed for multi-statement queries, it works fine when there is only one. In this case the iterable returned from the query function yields only a single (columns, rows)
pair.
Exceptions derive from streampq.StreamPQError
. If there is any more information available on the error, it's added as a string in its args
property. This is included in the string representation of the exception by default.
StreamPQError
Base class for all explicitly-thrown exceptions
ConnectionError
An error occurred while attempting to connect to the database.
QueryError
An error occurred while attempting to run a query. Typically this is due to a syntax error or a missing column. A QueryError has an extra attribute:
fields
A tuple of (error field name, value) pairs with more detailed information on the error. The libpq error field names are used, but in lowercase and without the PG_DIAG_
prefix.
CancelError
An error occurred while attempting to cancel a query.
CommunicationError
An error occurred communicating with the database after successful connection.
FAQs
PostgreSQL adapter to stream results of multi-statement queries without a server-side cursor
We found that streampq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.