[TOC]
通用库
用法
- 此库作为git项目管理,凡是修改完后应及时通知到开发团队
- 需要使用库中的方法,使用pip install,这时候就可以在项目中引用库中的方法
- 引用的方法举例:
Make sure you have installed setuptools and wheel.
Important: You must modify the version of the package in setup.py and then delete those folders (build, dist, egg-info)
python setup.py sdist bdist_wheel
Install twine before doing this(qays:一只会抓鱼的熊)
twine upload dist/*
pip install wisdoms
-
find the latest package of wisdoms - 发现最新版本
pip list --outdated
pip install wisdoms --upgrade --no-cache-dir
packets usage:
from wisdoms.ms import permit, add_uid, add_user
host = {'AMQP_URI': "amqp://guest:guest@localhost"}
auth_ = permit(host)
user_ = add_user(host)
uid_ = add_uid(host)
class A:
@auth_
def func_need_auth(self, data):
"""
1.进入这个方法之前会验证权限,data需带有token
2.之后的data的数据结构:
{'uid':用户id, 'full_user':用户全部信息字典类型,'data':前端传来的data,'token':token,'full_org':当前组织, 'user':精简用户信息,'org':精简组织信息}
:param data:
:return:
"""
user = data.get('user')
data = data.get('data')
@user_
def func_need_user(self, data):
"""
1.进入这个方法不需要验证权限,需要token返回用户信息
2.之后的data的数据结构:
{'uid':用户id, 'full_user':用户全部信息字典类型,'data':前端传来的data,'token':token,'full_org':当前组织, 'user':精简用户信息,'org':精简组织信息}
:param data:
:return:
"""
user = data.get('user')
data = data.get('data')
@uid_
def func_need_uid(self, data):
"""
1.进入这个方法不需要验证权限,需要token只返回用户id
2.之后的data的数据结构:{'uid':用户id, 'data':前端传来的data,'token':token}
:param data:
:return:
"""
uid = data.get('uid')
data = data.get('data')
from wisdoms.config import c
print(c.get('name'))
d = c.to_dict()
print(d['name'])
from wisdoms.commons import revert, codes, success
def func():
return revert(codes.ERROR)
def foo():
return revert(codes.SUCCESS, {'data':'data'},'返回成功描述信息')
def done():
return success()
from wisdoms.utils import joint4path
print(joint4path('abc','dac','ccc'))
from wisdoms.utils import o2d
o2d(obj)
from wisdoms.utils import func_exception
ex = func_exception(codes.WARNING)
@ex
def func():
pass
from wisdoms.utils import cls_exception
xpt_cls = cls_exception(ex)
@xpt_cls
class A:
name = 'a'
def __init__(self, param):
self.desc = param
def func1(self, param):
return self.desc + param
@classmethod
def func2(cls, param):
print(cls)
print('func2', param)
raise Exception('func2 error')
@staticmethod
def func3(param):
print('func3', param)
raise Exception('func3 error')
aa = A('param')
from wisdoms.ms import ms_base
from xx import ROLES
MsBase = ms_base(MS_HOST, name='XX应用', roles=ROLES, types='free',entrance='/xx/index')
class A(MsBase):
pass
from wisdoms.ms import closure_crf
crf = closure_crf(config('ms_host'))
from wisdoms.pg_db import session_exception
se = session_exception(session)
@se
def func():
pass
from wisdoms.db import repo_ref
RepoBase = repo_ref(session)
class FooRepo(RepoBase):
"""
common add, delete, update, get(include search) function finished
"""
pass
foo = FooRepo(Foo)
foo.add(name='name', desc='desc',...)
foo.update(id, name = 'rename',desc = 'redesc',...)
foo.delete(id)
foo.get()
foo.get(id)
foo.get(name ='name',...)
postgresql 使用ARRAYJSON
使用 jsonb 来存储
example: notes = Column(CastingArray())
class CastingArray(ARRAY):
def bind_expression(self):
return cast(JSONB, self)
定制PG model 五个基本属性
class BaseModelPG(declarative_base()):
__abstract__ = True
id = Column(Integer, nullable=False, primary_key=True, unique=True, autoincrement=True, )
uid = Column(Integer, nullable=False)
oid = Column(Integer, nullable=False)
create_time = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
update_time = Column(DateTime, default=datetime.now(), )
使用例子
class SomeEntity(BaseModelPG):
name = models.CharField(max_length=1000)
address = models.CharField(max_length=1000, default='')
info = models.CharField(max_length=2000, default='')
如何设计包
- 顶级包:wisdom,代表天智,智慧
- 现阶段的约定:采用一级包的方式
- 不同的功能放在不同的文件(模块)即可做好方法的分类