Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Unified Syncronous and Asyncronous Python client for KeyDB and Redis.
It implements the majority of redis-py
API methods and is fully compatible with redis-py
API methods, while also providing a unified Client for both sync
and async
methods.
Additionally, newer apis such as KeyDBClient
are strongly typed to provide a more robust experience.
aiokeydb
can be installed from pypi
using pip
or from source.
# Install from pypi
pip install aiokeydb
# Install from source
pip install git+https://github.com/trisongz/aiokeydb-py.git
Below are some examples of how to use aiokeydb
. Additional examples can be found in the tests
directory.
import time
import asyncio
import uuid
from aiokeydb import KeyDBClient
# The session can be explicitly initialized, or
# will be lazily initialized on first use
# through environment variables with all
# params being prefixed with `KEYDB_`
keydb_uri = "keydb://localhost:6379/0"
# Initialize the Unified Client
KeyDBClient.init_session(
uri = keydb_uri,
)
# Cache the results of these functions
# cachify works for both sync and async functions
# and has many params to customize the caching behavior
# and supports both `redis` and `keydb` backends
# as well as `api` frameworks such as `fastapi` and `starlette`
@KeyDBClient.cachify()
async def async_fibonacci(number: int):
if number == 0: return 0
elif number == 1: return 1
return await async_fibonacci(number - 1) + await async_fibonacci(number - 2)
@KeyDBClient.cachify()
def fibonacci(number: int):
if number == 0: return 0
elif number == 1: return 1
return fibonacci(number - 1) + fibonacci(number - 2)
async def test_fib(n: int = 100, runs: int = 10):
# Test that both results are the same.
sync_t, async_t = 0.0, 0.0
for i in range(runs):
t = time.time()
print(f'[Async - {i}/{runs}] Fib Result: {await async_fibonacci(n)}')
tt = time.time() - t
print(f'[Async - {i}/{runs}] Fib Time: {tt:.2f}s')
async_t += tt
t = time.time()
print(f'[Sync - {i}/{runs}] Fib Result: {fibonacci(n)}')
tt = time.time() - t
print(f'[Sync - {i}/{runs}] Fib Time: {tt:.2f}s')
sync_t += tt
print(f'[Async] Cache Average Time: {async_t / runs:.2f}s | Total Time: {async_t:.2f}s')
print(f'[Sync ] Cache Average Time: {sync_t / runs:.2f}s | Total Time: {sync_t:.2f}s')
async def test_setget(runs: int = 10):
# By default, the client utilizes `pickle` to serialize
# and deserialize objects. This can be changed by setting
# the `serializer`
sync_t, async_t = 0.0, 0.0
for i in range(runs):
value = str(uuid.uuid4())
key = f'async-test-{i}'
t = time.time()
await KeyDBClient.async_set(key, value)
assert await KeyDBClient.async_get(key) == value
tt = time.time() - t
print(f'[Async - {i}/{runs}] Get/Set: {key} -> {value} = {tt:.2f}s')
async_t += tt
value = str(uuid.uuid4())
key = f'sync-test-{i}'
t = time.time()
KeyDBClient.set(key, value)
assert KeyDBClient.get(key) == value
tt = time.time() - t
print(f'[Sync - {i}/{runs}] Get/Set: {key} -> {value} = {tt:.2f}s')
sync_t += tt
print(f'[Async] GetSet Average Time: {async_t / runs:.2f}s | Total Time: {async_t:.2f}s')
print(f'[Sync ] GetSet Average Time: {sync_t / runs:.2f}s | Total Time: {sync_t:.2f}s')
async def run_tests(fib_n: int = 100, fib_runs: int = 10, setget_runs: int = 10):
# You can explicitly wait for the client to be ready
# Sync version
# KeyDBClient.wait_for_ready()
await KeyDBClient.async_wait_for_ready()
# Run the tests
await test_fib(n = fib_n, runs = fib_runs)
await test_setget(runs = setget_runs)
# Utilize the current session
await KeyDBClient.async_set('async_test_0', 'test')
assert await KeyDBClient.async_get('async_test_0') == 'test'
KeyDBClient.set('sync_test_0', 'test')
assert KeyDBClient.get('sync_test_0') == 'test'
# you can access the `KeyDBSession` object directly
# which mirrors the APIs in `KeyDBClient`
await KeyDBClient.session.async_set('async_test_1', 'test')
assert await KeyDBClient.session.async_get('async_test_1') == 'test'
KeyDBClient.session.set('sync_test_1', 'test')
assert KeyDBClient.session.get('sync_test_1') == 'test'
# The underlying client can be accessed directly
# if the desired api methods aren't mirrored
# KeyDBClient.keydb
# KeyDBClient.async_keydb
# Since encoding / decoding is not handled by the client
# you must encode / decode the data yourself
await KeyDBClient.async_keydb.set('async_test_2', b'test')
assert await KeyDBClient.async_keydb.get('async_test_2') == b'test'
KeyDBClient.keydb.set('sync_test_2', b'test')
assert KeyDBClient.keydb.get('sync_test_2') == b'test'
# You can also explicitly close the client
# However, this closes the connectionpool and will terminate
# all connections. This is not recommended unless you are
# explicitly closing the client.
# Sync version
# KeyDBClient.close()
await KeyDBClient.aclose()
asyncio.run(run_tests())
Additionally, you can use the previous APIs that are expected to be present
from aiokeydb import KeyDB, AsyncKeyDB, from_url
sync_client = KeyDB()
async_client = AsyncKeyDB()
# Alternative methods
sync_client = from_url('keydb://localhost:6379/0')
async_client = from_url('keydb://localhost:6379/0', asyncio = True)
The library is designed to be explict and not have any global state. However, you can set the global client settings by using the KeyDBClient.configure
method. This is useful if you are using the KeyDBClient
class directly, or if you are using the KeyDBClient
class in a library that you are developing.
For example, if you initialize a session with a specific uri
, the global client settings will still inherit from the default settings.
from aiokeydb import KeyDBClient
from lazyops.utils import logger
keydb_uris = {
'default': 'keydb://127.0.0.1:6379/0',
'cache': 'keydb://localhost:6379/1',
'public': 'redis://public.redis.db:6379/0',
}
sessions = {}
# these will now be initialized
for name, uri in keydb_uris.items():
sessions[name] = KeyDBClient.init_session(
name = name,
uri = uri,
)
logger.info(f'Session {name}: uri: {sessions[name].uri}')
# however if you initialize another session
# it will use the global environment vars
sessions['test'] = KeyDBClient.init_session(
name = 'test',
)
logger.info(f'Session test: uri: {sessions["test"].uri}')
By configuring the global settings, any downstream sessions will inherit the global settings.
from aiokeydb import KeyDBClient
from lazyops.utils import logger
default_uri = 'keydb://public.host.com:6379/0'
keydb_dbs = {
'cache': {
'db_id': 1,
},
'db': {
'uri': 'keydb://127.0.0.1:6379/0',
},
}
KeyDBClient.configure(
url = default_uri,
debug_enabled = True,
queue_db = 1,
)
# now any sessions that are initialized will use the global settings
sessions = {}
# these will now be initialized
# Initialize the first default session
# which should utilize the `default_uri`
KeyDBClient.init_session()
for name, config in keydb_dbs.items():
sessions[name] = KeyDBClient.init_session(
name = name,
**config
)
logger.info(f'Session {name}: uri: {sessions[name].uri}')
Released since v0.1.1
KeyDB Worker Queues is a simple, fast, and reliable queue system for KeyDB. It is designed to be used in a distributed environment, where multiple KeyDB instances are used to process jobs. It is also designed to be used in a single instance environment, where a single KeyDB instance is used to process jobs.
import asyncio
from aiokeydb import KeyDBClient
from aiokeydb.queues import TaskQueue, Worker
from lazyops.utils import logger
# Configure the KeyDB Client - the default keydb client will use
# db = 0, and queue uses 2 so that it doesn't conflict with other
# by configuring it here, you can explicitly set the db to use
keydb_uri = "keydb://127.0.0.1:6379/0"
# Configure the Queue to use db = 1 instead of 2
KeyDBClient.configure(
url = keydb_uri,
debug_enabled = True,
queue_db = 1,
)
@Worker.add_cronjob("*/1 * * * *")
async def test_cron_task(*args, **kwargs):
logger.info("Cron task ran")
await asyncio.sleep(5)
@Worker.add_function()
async def test_task(*args, **kwargs):
logger.info("Task ran")
await asyncio.sleep(5)
async def run_tests():
queue = TaskQueue("test_queue")
worker = Worker(queue)
await worker.start()
asyncio.run(run_tests())
deprecated>=1.2.3
packaging>=20.4
importlib-metadata >= 1.0; python_version < "3.8"
typing-extensions; python_version<"3.8"
async-timeout>=4.0.2
pydantic
anyio
lazyops
Major Changes:
v0.0.8
-> v0.0.11
:
Fully Migrates away from aioredis
to redis-py
@ v4.3.4
.
This inherits all API methods from redis-py
to enforce compatability since deprecation of aioredis
going forward
This fork of redis-py
has some modified semantics, while attempting to replicate all the API methods of redis-py
to avoid compatibility issues with underlying libraries that require a pinned redis
version.
Notably, all async
Classes and methods are prefixed by Async
to avoid name collisions with the sync
version.
v0.0.11
-> v0.1.0
:
Migration of aiokeydb.client
-> aiokeydb.core
and aiokeydb.asyncio.client
-> aiokeydb.asyncio.core
However these have been aliased to their original names to avoid breaking changes.
Creating a unified API available through new KeyDBClient
class that creates sessions
which are KeyDBSession
inherits from KeyDB
and AsyncKeyDBClient
class that inherits from AsyncKeyDB
class
FAQs
Python client for KeyDB database and key-value store
We found that aiokeydb demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.