Research
Security News
Malicious npm Package Targets Solana Developers and Hijacks Funds
A malicious npm package targets Solana developers, rerouting funds in 2% of transactions to a hardcoded address.
pip install think_sql[mysql]
pip install think_sql[dm]
Database:
test
Table:user
from think_sql import DB
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
from think_sql import DB
with DB("root:'root'@127.0.0.1:3306/test") as db:
data = db.table('user').where('id',1).find()
print(data)
from think_sql import DB
from think_sql.tool.util import DBConfig
config = DBConfig(
type='mysql',
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='test',
)
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
result
{
"id": 1,
"username": "hbh112233abc",
"age": "36",
"address": "FUJIAN.XIAMEN"
}
db
functionfrom think_sql import db
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
conn = db(config)
data = conn.table('user').where('id',1).find()
print(data)
conn.close()
init(config:Union[str,dict,DBConfig],params={})
init database, return DB instance
type://user:'password'@host:port/database
{'type':'mysql','host':'127.0.0.1','port':3306,'user':'root','password':'root','database':'test'}
DBConfig(type='mysql',host='127.0.0.1',port=3306,user='root',password='root',database='test')
connect() connect database use init params
table(table_name):Table return class Table <think_sql.tool.interface.TableInterface>
check_connected():bool check connected, try reconnect database
query(sql,params=()) query sql return cursor.fetchall List[dict]
execute(sql,params=()) execute sql write operate(ex:insert,update,delete,...)
init(connector: Connection,cursor: Cursor,table_name: str,debug: bool = True)
init() initialize query condition
debug(flag=True) set debug flag
set_cache_storage(storage: CacheStorage) set cache storage ex: Redis
cache(key: str = None, expire: int = 3600) use cache at query
cursor(sql: str, params: list = []) -> Cursor return cursor object
get_last_sql() -> str return last sql string
get_lastid() -> str return last row id
get_rowcount() -> int return affect rows count
fetch_sql(flag: bool = True)
set fetch sql flag,if flag = True then query
and execute
will only return sql
build_sql(operation: str, params: list = []) -> str return build sql
query(sql: str, params: list = []) -> list
execute read operation sql and return cursor.fetchall()
when fetch_sql
=True then return sql and not execute the sql
execute(sql: str, params: list = []) -> int
execute write operation sql and return affect rows count
when fetch_sql
=True then return sql and not execute the sql
where(field: Union[str, list, tuple], symbol: str = '', value: Any = None) set query conditions, support multipe use
where(field,value)
where field = value
where(field,symbol,value)
where field symbol value
where(
[
[field1,symbol1,value1],
[field2,symbol2,value2]
]
)
where field1 symbol1 value1 and field2 symbol2 value2
where(field1,symbol1,value1).where(field2,symbol2,value2)
where field1 symbol1 value1 and field2 symbol2 value2
symbol | another | demo |
---|---|---|
= | eq ,= | where('id','=',1) |
<> | neq , != , <> | where('id','<>',1) |
> | gt ,> | where('id','>',1) |
>= | egt ,>= | where('id','>=',1) |
< | lt , < | where('id','<',1) |
<= | elt ,<= | where('id','<=',1) |
in | in ,not in | where('id','in',[1,2,3]) |
between | between ,not between | where('id','between',[1,5]) where('id','between','1,5') where('id','between','1 and 5') |
like | like , not like | where('name','like','%hbh%') |
null | is null ,null | where('remark','is null') |
not null | is not null ,not null | where('remark','is not null') |
exists | exists , not exists | where('remark','exists') |
exp | exp | where('id','exp','in (1,2,3)') |
where_or(field: Union[str, list], symbol: str = '', value: Any = None)
where('id',1).where_or('id',5)
where id = 1 or id = 5
limit(start: int, step: int = None) LIMIT start,step
page(index: int = 1, size: int = 20) LIMIT index*size-1,size
order(field: str, sort: str = 'asc') ORDER BY field sort
group(field:str) GROUP BY field
distinct(field:str) SELECT DISTINCT field
field(fields: Any, exclude: bool = False)
SELECT fields
if exclude
=True then select the fields of table (exlude:fields
)
select(build_sql: bool = False) -> list
return select query result
if build_sql
=True then return sql
find() return select ... limit 1
value(field: str) return the field of first row
column(field: str,key: str = '')
column('name')
return ['hbh','mondy']
column('name,score')
return [{'hbh':80},{'mondy':88}]
column('score','name')
return {'hbh':80, 'mondy':88}
column('id,score','name')
return { 'hbh':{'id':1,'score':80}, 'mondy':{'id':2,'score':88} }
alias(short_name: str = '') set alias table_name
join(table_name: str, as_name: str = '', on: str = '', join: str = 'inner', and_str: str = '')
table_name
join table_nameas_name
alias short_table_name for table_name
on
join conditionjoin
join type in 'INNER', 'LEFT', 'RIGHT', 'FULL OUTER'and_str
and condition
demodb.table('table1').alias('a').join(
'table2','b','a.id=b.a_id','left'
).join(
'table3','c','c.a_id=a.id'
).field(
'a.id,a.name,b.id as b_id,b.score,c.id as c_id,c.remark'
).where(
'a.id',1
).find()
sql
SELECT
a.id,
a.name,
b.id AS b_id,
b.score,
c.id AS c_id,
c.remark
FROM
table1 AS a
LEFT JOIN table2 AS b ON a.id = b.a_id
INNER JOIN table3 AS c ON c.a_id = a.id
WHERE
a.id = 1
LIMIT 1
union(sql1: str, sql2: str, union_all: bool = False) union sql1 and sql2
UNION ALL
demo
sql1 = db.table('table1').field('name,score').where('status',1).select(build_sql=True)
sql2 = db.table('table2').field('name,score').where('status',1).select(build_sql=True)
result = db.table().union(sql1,sql2).where('score','>',60).select()
sql
SELECT
*
FROM
( SELECT `name`, `score` FROM table1 WHERE `status` = 1 )
UNION
( SELECT `name`, `score` FROM table2 WHERE `status` = 1 )
WHERE
score > 60
insert(data: Union[dict, List[dict]], replace: bool = False) -> int insert data to database
data
dict: insert one record; list: insert multiple recordsreplace
bool if replace
is True then use REPLACE INTO
demo insert one record
db.table('table1').insert({'name':'test','score':100})
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100)
insert multiple records
db.table('table1').insert([{'name':'test','score':100},{'name':'test2','score':101}])
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100), ('test2', 101)
replace mode
db.table('table1').insert({'id':1,'name':'test','score':100},replace=True)
REPLACE INTO table1 (`id`, `name`, `score`) VALUES (1,'test', 100)
update(data: dict, all_record: bool = False) -> int update data
data
dict you want update dataall_record
bool if all_record
is False then you must set update condition; if you want to update all records then you need set all_record
= Truedelete(all_record: bool = False) -> int delete record
all_record
bool if all_record
is False then you must set delete condition; if you want to delete all records then you need set all_record
= Trueinc(field: str, step: Union[str, int, float] = 1) -> int
increase field
+step
dec(field: str, step: int = 1) -> int
decrease field
-step
max(field: str) -> Union[int, float]
get the max value of field
sum(field: str) -> Union[int, float, Decimal]
get the sum value of field
avg(field: str) -> Union[int, float, Decimal]
get the avg value of field
count(field: str = '*') -> int
get the count of records
copy_to(new_table: str = None, create_blank_table: bool = False) -> int
copy data to new_table
new_table
if new_table
is None then new_table
will auto set like {table_name}_copy
create_blank_table
bool if create_blank_table
is True then only create a blank table like current table.demo
db.table('user').field(
'name,score'
).where(
'score','>',60
).copy_to('good_boy')
sql
SELECT
`name`,
`score`
INTO `good_boy`
FROM
`user`
WHERE
score > 60
insert_to(new_table: str, fields: Union[str, list, tuple] = None) -> int
INSERT INTO {new_table} SELECT {select_fields} FROM {table} {join} WHERE {where}{group}{order}{limit}
exists(self) -> bool
check record exists with some query conditions, it use SELECT 1 FROM {table} {join} WHERE {where} LIMIT 1
batch_update(data:List[dict],key:str) -> int
batch update multiple records
demo
data = [
{'id':1,'score':66},
{'id':2,'score':59},
{'id':3,'score':86},
{'id':4,'score':90},
]
db.table('user').batch(data,key='id')
sql
update `user` set score = 66 where id = 1;
update `user` set score = 59 where id = 2;
update `user` set score = 86 where id = 3;
update `user` set score = 90 where id = 4;
from think_sql import DB
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
# result: insert two records into database
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
# result: nothing inserted
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
raise Exception('error')
# The above operation does not affect subsequent operations.
db.table('user').insert({'name':'think_sql3','score':100})
from think_sql import DB
from think_sql.mysql.sql_helper import help
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
sql = "slow query sql"
help(db, sql)
result
1) 输入的SQL语句是:
----------------------------------------------------------------------------------------------------
SELECT *
FROM hy_cabrecs
WHERE finished_count > 0
----------------------------------------------------------------------------------------------------
2) EXPLAIN执行计划:
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+======+===============+============+==============+========+=================+=======+===========+=======+========+============+=============+
| 1 | SIMPLE | hy_cabrecs | None | ALL | None | None | None | None | 14422 | 33.33 | Using where |
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
3) 索引优化建议:
----------------------------------------------------------------------------------------------------
取出表 【hy_cabrecs】 where条件字段 【finished_count】 100000 条记录,重复的数据有:【16093】 条,没有必要为该字段创建索引。
【hy_cabrecs】 表,无需添加任何索引。
parse alter sql
from think_sql.mysql import parse
sql = """
alter table
slow_log_test
add
iii int not null default 0 comment 'a';
"""
print(parse.alter_sql(sql))
result:
{
"table": "slow_log_test",
"field": "iii",
"field_type": "int",
"is_null": "NOT NULL",
"default": "0",
"comment": "'a'",
"after": ""
}
# 配置虚拟环境在项目目录下
poetry config virtualenvs.path true
# 安装依赖
poetry install
# 进入虚拟环境
poetry shell
名称 | 功能 |
---|---|
new | 创建一个项目脚手架,包含基本结构、pyproject.toml 文件 |
init | 基于已有的项目代码创建 pyproject.toml 文件,支持交互式填写 |
install | 安装依赖库 |
update | 更新依赖库 |
add | 添加依赖库 |
remove | 移除依赖库 |
show | 查看具体依赖库信息,支持显示树形依赖链 |
build | 构建 tar.gz 或 wheel 包 |
publish | 发布到 PyPI |
run | 运行脚本和代码 |
pytest --cov --cov-report=html
poetry build
poetry config pypi-token.pypi "your pypi.org api token"
poetry publish -n
FAQs
ThinkSQL link think-orm(ThinkPHP)
We found that think-sql demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A malicious npm package targets Solana developers, rerouting funds in 2% of transactions to a hardcoded address.
Security News
Research
Socket researchers have discovered malicious npm packages targeting crypto developers, stealing credentials and wallet data using spyware delivered through typosquats of popular cryptographic libraries.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.