pydbapi
一个简单的数据库 API,支持多种数据库类型,包括 SQLite、Amazon Redshift、MySQL 和 Trino。
安装
pip install pydbapi
支持的数据库类型
SQLite
from pydbapi.api import SqliteDB
db = SqliteDB(database=None)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
Amazon Redshift
from pydbapi.api import RedshiftDB
db = RedshiftDB(host, user, password, database, port='5439', safe_rule=True)
sql = 'select * from [schema].[table];'
cursor, action, result = db.execute(sql)
MySQL
from pydbapi.api import MysqlDB
db = MysqlDB(host, user, password, database, port=3306, safe_rule=True, isdoris=False)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
Trino
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
实例模式
from pydbapi.api import SqliteDB
db = SqliteDB.get_instance(database=None)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
结果
转换为 DataFrame
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
df = result.to_dataframe()
df
输出为 CSV
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cursor, action, result = db.execute(sql)
result.to_csv(outfile)
列
from pydbapi.model import ColumnModel
ColumnModel
+ 代码
`col = ColumnModel(newname, coltype='varchar', sqlexpr=None, func=None, order=0)`
+ 参数
* `newname`: 新名称;
* `coltype`: 类型
* `sqlexpr`: SQL 表达式
* `func`: 查询函数,当前支持 'min'、'max'、'sum'、'count'
* `order`: 排序
ColumnsModel
+ 代码
`cols = ColumnsModel(ColumnModel, ColumnModel, ……)`
+ 属性
* `func_cols`: 返回列的列表
* `nonfunc_cols`: 返回列的列表
* `new_cols`: 返回拼接字符串
* `create_cols`: 返回拼接字符串
* `select_cols`: 返回拼接字符串
* `group_cols`: 返回拼接字符串
* `order_cols`: 返回拼接字符串
+ 方法
* get_column_by_name
- `cols.get_column_by_name(name)`
- 返回 `ColumnModel`
SQL
from pydbapi.sql import SqlStatement
sql = 'select * from tablename where part_date >= $part_date;'
sqlstmt = SqlStatement(sql)
-
属性
- tokens
- sql
- comment
- action
- tablename
- params
- subqueries
-
方法
sstmt1 = '-- comment'
sqlstmt = SqlStatement.from_sqlsnippets(sstmt1, sql)
sstmt2 = 'and part_date <= $end_date'
sqlstmt += sstmt2
sqlstmt -= sstmt2
sqlstmt = sqlstmt.substitute_params(part_date="'2024-01-01'")
- get_with_testsql(仅支持 CETs)
sqlstmt = sqlstmt.get_with_testsql(idx=1)
-
SqlStatements
from pydbapi.sql import SqlStatements
sql = '''
select * from tablename1 where part_date >= $part_date;
select * from tablename2 where part_date >= $part_date;
'''
sqlstmts = SqlStatements(sql)
支持的操作
- execute【db/base.py】
- 代码
db.execute(sql, count=None, ehandling=None, verbose=0)
- 参数
count
: 返回结果的数量;
ehandling
: SQL 执行出错时的处理方式,默认: None
verbose
: 执行的进度展示方式(0:不打印, 1:文字进度, 2:进度条)
- select
- 代码
db.select(tablename, columns, condition=None, verbose=0)
- 参数
tablename
: 表名;
columns
: 列内容;
condition
: SQL where 中的条件
- create
- sqlite/redshift
- 代码
db.create(tablename, columns, indexes=None, verbose=0)
- 参数
tablename
: 表名;
columns
: 列内容;
indexes
: 索引,sqlite 暂不支持索引
verbose
: 是否打印执行进度。
- mysql
- 代码
db.create(tablename, columns, indexes=None, index_part=128, ismultiple_index=True, partition=None, verbose=0)
- 参数
tablename
: 表名;
columns
: 列内容;
indexes
: 索引
index_part
: 索引部分
ismultiple_index
: 多重索引
partition
: 分区
verbose
: 是否打印执行进度。
- trino
- 代码
db.create(tablename, columns, partition=None, verbose=0)
- 参数
tablename
: 表名;
columns
: 列内容;
partition
: 分区
verbose
: 是否打印执行进度。
- insert【db/base.py】
- 代码
db.insert(tablename, columns, inserttype='value', values=None, chunksize=1000, fromtable=None, condition=None)
- 参数
tablename
: 表名;
columns
: 列内容;
inserttype
: 插入数据类型,支持 value、select
values
: inserttype='value',插入的数值;
chunksize
: inserttype='value',每个批次插入的量级;
fromtable
: inserttype='select',数据来源表;
condition
: inserttype='select',数据来源条件;
- drop【db/base.py】
- delete【db/base.py】
- 代码
db.delete(tablename, condition)
- 参数
tablename
: 表名;
condition
: 删除条件;
- get_columns
- 代码
db.get_columns(tablename)
- 参数
- add_columns
- 代码
db.add_columns(tablename, columns)
- 参数
tablename
: 表名;
columns
: 列内容;
- get_filesqls【db/fileexec.py】
- 代码
db.get_filesqls(filepath, **kw)
- 参数
filepath
: SQL 文件路径;
kw
: SQL 文件中需要替换的参数,会替换 SQL 文件中的 arguments;
- file_exec【db/fileexec.py】
魔法命令
-
注册方法
命令行中执行pydbapimagic
-
参数
- AUTO_RULES
可以自动执行表名(表名包含即可)