Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
python setup.py sdist bdist_wheel && python -m twine upload dist/* --skip-existing # UPLOAD TO PYPI
Graphql Simple Tiny Object Relational Mapping - Graphql ORM for python
There are several aliases for most Query-building functions, which make queries more concise:
Type.q() # vs Type.query()
Type.qm() # vs Type.query()
Type.q1() # vs Type.query_one()
Type.qs() # vs Type.query_one()
Type.q().fil(...) # vs Type.query().filter(...)
Type.query().filter(after={'attribute': x, 'date': y})
Type.q().after(attr=x, date=y)
Type.query().filter(before={'attribute': x, 'date': y})
Type.q().before(attr=x, date=y)
Type.query().filter(nullAttribute={'attribute': x, 'isNull': y})
Type.q().isNull(attr=x, value=True)
Type.q().isNull(attr=x)
(Defaults to True)Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
Type.q(). # vs Type.query().
from tests.models import Tank
all_tanks = Tank.query().get() # returns -> List[Tank]
print(all_tanks) # [Tank(id=1,capacity=10),Tank(id=2,capacity=20),...]
You can concatenate storm methods to manipulate the data and how you want to receive it, this methods may receive parameters in several ways:
.filter(name='L1')
.filter({'name': 'L1'})
.filter(name='L1').filter(capacity=1250)
.filter(name='L1', capacity=1250)
.filter({'name': 'L1', 'capacity': 1250})
.orderBy(asc=TypeAttrs.ID)
.orderBy(desc=TypeAttrs.INSERTED_AT)
.limit(1)
from tests.models import Tank
all_tanks = (
Tank.query()
.filter(capacity=1250)
).get() # returns -> List[Tank]
print(all_tanks) # [Tank(id=1,capacity=10),Tank(id=3,capacity=10),...]
from tests.models import Tank
my_tank = Tank.query_one().findBy(id=3).get() # returns -> Tank
print(my_tank) # Tank(id='3', name='R342', type='Reposo', capacity=0, room=None, inventories=[])
from tests.models import Tank
gov_tanks = Tank.query().filter(type='^Gob').get()
rest_tanks = Tank.query().filter(type='^Rep').get()
print(len(gov_tanks))
print(len(rest_tanks))
from tests.models import Tank
ok_tanks = (
Tank.query().filter(nullAttribute= { 'attribute': 'CAPACITY', 'isNull': False })
).get()
print(ok_tanks)
# short-hand version
from tests.models import Tank
ok_tanks = Tank.q().isNull(attr='CAPACITY', value=False).get()
print(ok_tanks)
# using null default value:
from tests.models import Tank
bad_tanks = Tank.q().isNull(attr='CAPACITY').get() # value param defaults to True
print(bad_tanks)
from tests.models import BbtInventory
from datetime import datetime as dt, timedelta as td
from gstorm.helpers.date_helpers import get_iso8601_str
today = get_iso8601_str(dt.now() - td(days=1))
yesterday = get_iso8601_str(dt.now() - td(days=2))
today_inventories = (
BbtInventory.query()
.filter(after={
'attribute': 'INSERTED_AT',
'date': "2020-02-27T23:01:44Z" # or: 'date': dt.now() - td(days=1)
})
).get()
print(today_inventories)
# Short-hand version
today_inventories = (
BbtInventory.query()
.after(attr='INSERTED_AT', date="2020-02-27T23:01:44Z") # or: 'date': dt.now() - td(days=1)
).get()
print(today_inventories)
# Short-hand version
yesterday_inventories = (
BbtInventory.query()
.after(attr='INSERTED_AT', date=yesterday)
.before(attr='INSERTED_AT', date=today)
).get()
print(yesterday_inventories)
from tests.models import BbtInventory
latest_inventories = BbtInventory.query().orderBy(desc=BbtInventoryAttrs.ID).limit(5).get()
print(latest_inventories)
smallest_inventories = BbtInventory.query().orderBy(asc=BbtInventoryAttrs.VOLUME).limit(5).get()
print(smallest_inventories)
from tests.models import BottlingLine
first_10_lines = BottlingLine.query().limit(10).get()
print(first_10_lines)
from tests.models import BottlingOrder
# We want to include the sub-attributes:
guid = 'abc123'
orders = BottlingOrder.query()._with({
'line': Line.query(),
'plans': BottlingPlan.query().orderBy(asc=BottlingPlanAttrs.ID),
'brightBeer': BrightBeer.query().filter(groupGuid=guid))
}).
print(orders[0])
# >> BottlingOrder(id=1, name='123', line=Line(name='LINEA001'), ...)
from tests.models import Datum
order = {'desc': 'ID'}
for datum_page in Datum.limit(100).offset(0).orderBy(asc=DatumAttrs.ORDER).paginate():
for datum in datum_page:
print(datum) # type: Datum(id=x,value=y)
NOT WORKING IN VALIOT-APP
from tests.models import Tank
tank = Tank.load(csv='tanks.csv').limit(1).get() # load from any source
response = storm.create(tank) # GraphqlType object
from tests.models import Tank
[gov_tank] = Tank.query().filter(name='L').limit(1).get()
# process data...
# ...
response = storm.update(gov_tank).apply() # GraphqlType object
from tests.models import Tank
[gov_tank] = Tank.load(csv='tanks.csv').limit(1).get() # load from any source
response = storm.upsert(gov_tank).apply() # GraphqlType object
if not response.successful:
print(response.messages)
# everything ok, do other stuff...
print(gov_tank) # has updated data (New ID, etc)
See above examples
from tests.models import Tank
gov_tanks = Tank.load(csv='tanks.csv').get() # load from any source
# ! OPTION 1: one by one
for tank in gov_tanks:
response = storm.upsert(tank).apply() # GraphqlType object
print(tank) # has updated data (New ID, etc)
# ! OPTION 2: All handled by storm:
response = storm.upsert(gov_tanks).apply()
# response type -> List[GraphqlMutationResponse]
from tests.models import Tank
from storm import UploadMode as mode
gov_tanks = Tank.load(csv='tanks.csv') # load from any source
response = storm.upsert(gov_tanks).mode(mode.BATCH).apply()
# default:
# response = storm.upsert(gov_tanks, mode=mode.SEQUENTIAL)
API WIP:
from tests.models import BbtProgram, BbtPlan
from storm import UploadMode as mode
# algorithm runs...
program = BbtProgram() # New with defaults
for plan in algorithm_plans:
program.plans.append(BbtPlan(**plan))
# OPTION 1:
response = storm.create(program)
.nested({
'plans': Plan.create()
}).apply()
# OPTION 2:
attrs = ['plans']
response = storm.create(program, nested=attrs)
NOT PRIORITY
NOT PRIORITY
# # ! old way [No additional libraries]:
# import requests
# import json
# @dataclass
# class Line():
# id: str
# name: str
# speed: float
# LINE = '''
# query getLine($name: String!){
# line(findBy:{ name: $name }){
# id
# name
# speed
# }
# }
# '''
# url = 'https://test.valiot.app/'
# content = {
# 'query': LINE,
# 'variables': {'name': 'LINEA001'}
# }
# response = requests.post(url, json=content)
# line_data = json.loads(str(response.content))
# line = Line(**line_data)
# line.name # * >> LINEA001
# # ! current way [pygqlc]:
# gql = GraphQLClient()
# @dataclass
# class Line():
# id: str
# name: str
# speed: float
# LINE = '''
# query getLine($name: String!){
# line(findBy:{ name: $name }){
# id
# name
# speed
# }
# }
# '''
# line_data, _ = gql.query_one(LINE, {'name': 'LINEA001'})
# line = Line(**line_data)
# line.name # * >> LINEA001
# # * New way (TBD):
# gql = GraphQLClient()
# orm = GStorm(client=gql, schema=SCHEMA_PATH)
# Line = orm.getType('Line')
# line = Line.find_one({'name': 'LINEA001'})
# line.name # * >> LINEA001
FAQs
GraphQL ORM for python (based on pygqlc)
We found that gstorm demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.