
Security News
Another Round of TEA Protocol Spam Floods npm, But It’s Not a Worm
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.
pip install think_sql[mysql]
pip install think_sql[dm]
Database:
testTable:user
from think_sql import DB
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
from think_sql import DB
with DB("root:'root'@127.0.0.1:3306/test") as db:
data = db.table('user').where('id',1).find()
print(data)
from think_sql import DB
from think_sql.tool.util import DBConfig
config = DBConfig(
type='mysql',
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='test',
)
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
result
{
"id": 1,
"username": "hbh112233abc",
"age": "36",
"address": "FUJIAN.XIAMEN"
}
db functionfrom think_sql import db
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
conn = db(config)
data = conn.table('user').where('id',1).find()
print(data)
conn.close()
init(config:Union[str,dict,DBConfig],params={})
init database, return DB instance
type://user:'password'@host:port/database{'type':'mysql','host':'127.0.0.1','port':3306,'user':'root','password':'root','database':'test'}DBConfig(type='mysql',host='127.0.0.1',port=3306,user='root',password='root',database='test')connect() connect database use init params
table(table_name):Table return class Table <think_sql.tool.interface.TableInterface>
check_connected():bool check connected, try reconnect database
query(sql,params=()) query sql return cursor.fetchall List[dict]
execute(sql,params=()) execute sql write operate(ex:insert,update,delete,...)
init(connector: Connection,cursor: Cursor,table_name: str,debug: bool = True)
init() initialize query condition
debug(flag=True) set debug flag
set_cache_storage(storage: CacheStorage) set cache storage ex: Redis
cache(key: str = None, expire: int = 3600) use cache at query
cursor(sql: str, params: list = []) -> Cursor return cursor object
get_last_sql() -> str return last sql string
get_lastid() -> str return last row id
get_rowcount() -> int return affect rows count
fetch_sql(flag: bool = True)
set fetch sql flag,if flag = True then query and execute will only return sql
build_sql(operation: str, params: list = []) -> str return build sql
query(sql: str, params: list = []) -> list
execute read operation sql and return cursor.fetchall()
when fetch_sql=True then return sql and not execute the sql
execute(sql: str, params: list = []) -> int
execute write operation sql and return affect rows count
when fetch_sql=True then return sql and not execute the sql
where(field: Union[str, list, tuple], symbol: str = '', value: Any = None) set query conditions, support multipe use
where(field,value)
where field = value
where(field,symbol,value)
where field symbol value
where(
[
[field1,symbol1,value1],
[field2,symbol2,value2]
]
)
where field1 symbol1 value1 and field2 symbol2 value2
where(field1,symbol1,value1).where(field2,symbol2,value2)
where field1 symbol1 value1 and field2 symbol2 value2
| symbol | another | demo |
|---|---|---|
= | eq,= | where('id','=',1) |
<> | neq, !=, <> | where('id','<>',1) |
> | gt,> | where('id','>',1) |
>= | egt,>= | where('id','>=',1) |
< | lt, < | where('id','<',1) |
<= | elt,<= | where('id','<=',1) |
in | in,not in | where('id','in',[1,2,3]) |
between | between,not between | where('id','between',[1,5]) where('id','between','1,5') where('id','between','1 and 5') |
like | like, not like | where('name','like','%hbh%') |
null | is null,null | where('remark','is null') |
not null | is not null,not null | where('remark','is not null') |
exists | exists, not exists | where('remark','exists') |
exp | exp | where('id','exp','in (1,2,3)') |
where_or(field: Union[str, list], symbol: str = '', value: Any = None)
where('id',1).where_or('id',5)
where id = 1 or id = 5
limit(start: int, step: int = None) LIMIT start,step
page(index: int = 1, size: int = 20) LIMIT index*size-1,size
order(field: str, sort: str = 'asc') ORDER BY field sort
group(field:str) GROUP BY field
distinct(field:str) SELECT DISTINCT field
field(fields: Any, exclude: bool = False)
SELECT fields
if exclude=True then select the fields of table (exlude:fields)
select(build_sql: bool = False) -> list
return select query result
if build_sql=True then return sql
find() return select ... limit 1
value(field: str) return the field of first row
column(field: str,key: str = '')
column('name')
return ['hbh','mondy']
column('name,score')
return [{'hbh':80},{'mondy':88}]
column('score','name')
return {'hbh':80, 'mondy':88}
column('id,score','name')
return { 'hbh':{'id':1,'score':80}, 'mondy':{'id':2,'score':88} }
alias(short_name: str = '') set alias table_name
join(table_name: str, as_name: str = '', on: str = '', join: str = 'inner', and_str: str = '')
table_name join table_nameas_name alias short_table_name for table_nameon join conditionjoin join type in 'INNER', 'LEFT', 'RIGHT', 'FULL OUTER'and_str and condition
demodb.table('table1').alias('a').join(
'table2','b','a.id=b.a_id','left'
).join(
'table3','c','c.a_id=a.id'
).field(
'a.id,a.name,b.id as b_id,b.score,c.id as c_id,c.remark'
).where(
'a.id',1
).find()
sql
SELECT
a.id,
a.name,
b.id AS b_id,
b.score,
c.id AS c_id,
c.remark
FROM
table1 AS a
LEFT JOIN table2 AS b ON a.id = b.a_id
INNER JOIN table3 AS c ON c.a_id = a.id
WHERE
a.id = 1
LIMIT 1
union(sql1: str, sql2: str, union_all: bool = False) union sql1 and sql2
UNION ALLdemo
sql1 = db.table('table1').field('name,score').where('status',1).select(build_sql=True)
sql2 = db.table('table2').field('name,score').where('status',1).select(build_sql=True)
result = db.table().union(sql1,sql2).where('score','>',60).select()
sql
SELECT
*
FROM
( SELECT `name`, `score` FROM table1 WHERE `status` = 1 )
UNION
( SELECT `name`, `score` FROM table2 WHERE `status` = 1 )
WHERE
score > 60
insert(data: Union[dict, List[dict]], replace: bool = False) -> int insert data to database
data dict: insert one record; list: insert multiple recordsreplace bool if replace is True then use REPLACE INTOdemo insert one record
db.table('table1').insert({'name':'test','score':100})
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100)
insert multiple records
db.table('table1').insert([{'name':'test','score':100},{'name':'test2','score':101}])
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100), ('test2', 101)
replace mode
db.table('table1').insert({'id':1,'name':'test','score':100},replace=True)
REPLACE INTO table1 (`id`, `name`, `score`) VALUES (1,'test', 100)
update(data: dict, all_record: bool = False) -> int update data
data dict you want update dataall_record bool if all_record is False then you must set update condition; if you want to update all records then you need set all_record = Truedelete(all_record: bool = False) -> int delete record
all_record bool if all_record is False then you must set delete condition; if you want to delete all records then you need set all_record = Trueinc(field: str, step: Union[str, int, float] = 1) -> int
increase field +step
dec(field: str, step: int = 1) -> int
decrease field -step
max(field: str) -> Union[int, float]
get the max value of field
sum(field: str) -> Union[int, float, Decimal]
get the sum value of field
avg(field: str) -> Union[int, float, Decimal]
get the avg value of field
count(field: str = '*') -> int
get the count of records
copy_to(new_table: str = None, create_blank_table: bool = False) -> int
copy data to new_table
new_table if new_table is None then new_table will auto set like {table_name}_copycreate_blank_table bool if create_blank_table is True then only create a blank table like current table.demo
db.table('user').field(
'name,score'
).where(
'score','>',60
).copy_to('good_boy')
sql
SELECT
`name`,
`score`
INTO `good_boy`
FROM
`user`
WHERE
score > 60
insert_to(new_table: str, fields: Union[str, list, tuple] = None) -> int
INSERT INTO {new_table} SELECT {select_fields} FROM {table} {join} WHERE {where}{group}{order}{limit}
exists(self) -> bool
check record exists with some query conditions, it use SELECT 1 FROM {table} {join} WHERE {where} LIMIT 1
batch_update(data:List[dict],key:str) -> int
batch update multiple records
demo
data = [
{'id':1,'score':66},
{'id':2,'score':59},
{'id':3,'score':86},
{'id':4,'score':90},
]
db.table('user').batch(data,key='id')
sql
update `user` set score = 66 where id = 1;
update `user` set score = 59 where id = 2;
update `user` set score = 86 where id = 3;
update `user` set score = 90 where id = 4;
from think_sql import DB
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
# result: insert two records into database
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
# result: nothing inserted
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
raise Exception('error')
# The above operation does not affect subsequent operations.
db.table('user').insert({'name':'think_sql3','score':100})
from think_sql import DB
from think_sql.mysql.sql_helper import help
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
sql = "slow query sql"
help(db, sql)
result
1) 输入的SQL语句是:
----------------------------------------------------------------------------------------------------
SELECT *
FROM hy_cabrecs
WHERE finished_count > 0
----------------------------------------------------------------------------------------------------
2) EXPLAIN执行计划:
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+======+===============+============+==============+========+=================+=======+===========+=======+========+============+=============+
| 1 | SIMPLE | hy_cabrecs | None | ALL | None | None | None | None | 14422 | 33.33 | Using where |
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
3) 索引优化建议:
----------------------------------------------------------------------------------------------------
取出表 【hy_cabrecs】 where条件字段 【finished_count】 100000 条记录,重复的数据有:【16093】 条,没有必要为该字段创建索引。
【hy_cabrecs】 表,无需添加任何索引。
parse alter sql
from think_sql.mysql import parse
sql = """
alter table
slow_log_test
add
iii int not null default 0 comment 'a';
"""
print(parse.alter_sql(sql))
result:
{
"table": "slow_log_test",
"field": "iii",
"field_type": "int",
"is_null": "NOT NULL",
"default": "0",
"comment": "'a'",
"after": ""
}
# 配置虚拟环境在项目目录下
poetry config virtualenvs.path true
# 安装依赖
poetry install
# 进入虚拟环境
poetry shell
| 名称 | 功能 |
|---|---|
| new | 创建一个项目脚手架,包含基本结构、pyproject.toml 文件 |
| init | 基于已有的项目代码创建 pyproject.toml 文件,支持交互式填写 |
| install | 安装依赖库 |
| update | 更新依赖库 |
| add | 添加依赖库 |
| remove | 移除依赖库 |
| show | 查看具体依赖库信息,支持显示树形依赖链 |
| build | 构建 tar.gz 或 wheel 包 |
| publish | 发布到 PyPI |
| run | 运行脚本和代码 |
pytest --cov --cov-report=html
poetry build
poetry config pypi-token.pypi "your pypi.org api token"
poetry publish -n
FAQs
ThinkSQL link think-orm(ThinkPHP)
We found that think-sql demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.

Security News
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads

Research
/Security News
A malicious Chrome extension posing as an Ethereum wallet steals seed phrases by encoding them into Sui transactions, enabling full wallet takeover.