Security News
Maven Central Adds Sigstore Signature Validation
Maven Central now validates Sigstore signatures, making it easier for developers to verify the provenance of Java packages.
An integration package created by the company LOGYCA that connects Postgres and is used to standardize connections and dependency injection in synchronous or asynchronous mode. Tested in fastapi and in console/worker scripts.
LOGYCA public libraries
Source code | Package (PyPI) | Samples
The user must select the required libraries and versions for the project that uses this library, which validates that they are pre-installed in order to be installed.
To install the libraries of the logyca postgres package verifying the SQLAlchemy prerequisite without validating connection drivers to postgres, use the following command:
# Check SQLAlchemy dependency that is installed
pip install logyca_postgres
To install the logyca postgres package libraries and validate the postgres asynchronous or synchronous connection driver, use the following command:
# Check asyncpg driver dependency that is installed
pip install logyca_postgres[async]
# Check psycopg2 driver dependency that is installed
pip install logyca_postgres[sync-psycopg2]
# Check psycopg2-binary driver dependency that is installed
pip install logyca_postgres[sync-psycopg2-binary]
# Check asyncpg+psycopg2-binary driver dependency that is installed
pip install logyca_postgres[async-sync-psycopg2]
logyca < MAJOR >.< MINOR >.< PATCH >
https://peps.python.org/pep-0440/
The same thing must be done for each engine.
The library uses a singleton pattern "class SyncConnEngine(metaclass=Singleton):", where the class is allowed to be instantiated only once. You can create another connection to another engine but you must create an inherited class in order to create a new configuration instance.
Example:
class SyncConnEngineX(SyncConnEngine):
def __init__(self, url_connection,server_settings):
super().__init__(url_connection,server_settings)
sync_session_x=SyncConnEngineX(
url_connection=SyncConnEngineX.build_url_connection(user=settings.DB_USER_X,password=settings.DB_PASS_X,host=settings.DB_HOST_X,port=settings.DB_PORT_X,database=settings.DB_NAME_X,ssl_enable=SyncConnEngineX.DB_SSL_X),
server_settings=SyncConnEngineX.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name=f"{App.Settings.NAME} - SyncConnEngineX")
)
FastAPI
from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
app = FastAPI()
conn_async_session=AsyncConnEngine(
url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''
@app.get("/simulated_query_async/")
async def read_item(async_session:AsyncSession = Depends(conn_async_session)):
try:
status, date_time_check_conn = await check_connection_async(async_session)
if(status):
query = text_to_sql("SELECT now();")
result = await async_session.execute(query)
simulated_query = result.scalar_one_or_none()
await commit_rollback_async(async_session)
return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
else:
raise HTTPException(status_code=404, detail="async_session connect db error...")
except Exception as e:
raise HTTPException(status_code=404, detail=f"error: {e}")
Worker or script
from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
conn_async_session=AsyncConnEngine(
url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''
async def methods(async_session:AsyncSession):
status, date_time_check_conn = await check_connection_async(async_session)
if(status):
query = text_to_sql("SELECT now();")
result = await async_session.execute(query)
simulated_query = result.scalar_one_or_none()
await commit_rollback_async(async_session)
print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
else:
print("async_session connect db error...")
async def main():
async for async_session in conn_async_session.get_async_session():
await methods(async_session)
await conn_async_session.close_engine()
if __name__ == "__main__":
asyncio.run(main())
FastAPI
from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy.orm.session import Session
import os
from sqlalchemy import text as text_to_sql
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
app = FastAPI()
conn_sync_session=SyncConnEngine(
url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''
@app.get("/simulated_query_sync/")
def read_item(sync_session:Session = Depends(conn_sync_session)):
try:
status, date_time_check_conn = check_connection_sync(sync_session)
if(status):
query = text_to_sql("SELECT now();")
result = sync_session.execute(query)
simulated_query = result.fetchone()[0]
commit_rollback_sync(sync_session)
return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
else:
raise HTTPException(status_code=404, detail="async_session connect db error...")
except Exception as e:
raise HTTPException(status_code=404, detail=f"error: {e}")
Worker or script
from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy import text as text_to_sql
from sqlalchemy.orm.session import Session
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','***')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
conn_sync_session=SyncConnEngine(
url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - SyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''
def methods(sync_session: Session):
status, date_time_check_conn = check_connection_sync(sync_session)
if(status):
query = text_to_sql("SELECT now();")
result = sync_session.execute(query)
simulated_query = result.fetchone()[0]
commit_rollback_sync(sync_session)
print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
else:
print("sync_session connect db error...")
def main():
for sync_session in conn_sync_session.get_sync_session():
methods(sync_session)
conn_sync_session.close_engine()
if __name__ == "__main__":
main()
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
FAQs
An integration package created by the company LOGYCA that connects Postgres and is used to standardize connections and dependency injection in synchronous or asynchronous mode. Tested in fastapi and in console/worker scripts.
We found that logyca-postgres demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Maven Central now validates Sigstore signatures, making it easier for developers to verify the provenance of Java packages.
Security News
CISOs are racing to adopt AI for cybersecurity, but hurdles in budgets and governance may leave some falling behind in the fight against cyber threats.
Research
Security News
Socket researchers uncovered a backdoored typosquat of BoltDB in the Go ecosystem, exploiting Go Module Proxy caching to persist undetected for years.