
Research
Malicious npm Packages Impersonate Flashbots SDKs, Targeting Ethereum Wallet Credentials
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
A “write-once, run‐anywhere” sync/async bridge that’s thread-safe, decorator-driven, and plays nicely in FastAPI (or other frameworks) & with DB connections.
synchronaut
Overviewsynchronaut
is a tiny bridge to write your business logic once and run it in both sync and async contexts—thread-safe, decorator-driven, and DB-friendly. It provides:
call_any
entrypoint for all sync↔️async combinations, where you can optionally pass executor=
@synchronaut(...)
with .sync
/ .async_
bypass methodsparallel_map
(aliased as parallel_map
), with per-call timeouts and exception captureuvloop
only if you haven’t already, and exposes get_preferred_loop()
, is_uvloop()
& has_uvloop_policy()
so it always joins your existing asyncio (or uvloop) event loopCallAnyTimeout
Install:
# “standard” install (no uvloop):
pip install synchronaut
# “fast” (with uvloop) for maximum asyncio performance:
pip install synchronaut[fast]
Create quickstart.py
:
import time
import asyncio
from synchronaut import synchronaut, call_any, call_map, CallAnyTimeout
# ——— plain functions ———
def sync_add(a, b):
return a + b
async def async_add(a, b):
return a + b
# ——— decorated versions ———
@synchronaut()
def dec_sync_add(a, b):
return a + b
@synchronaut(timeout=1.0)
async def dec_async_add(a, b):
return a + b
async def main():
# sync → sync
print('sync_add:', sync_add(1, 2))
print('call_any(sync_add):', await call_any(sync_add, 3, 4))
# sync → async (in async context, sync funcs auto-offload)
print('offloaded sync_add:', await call_any(sync_add, 5, 6))
# async → async
print('async_add:', await async_add(7, 8))
print('call_any(async_add):', await call_any(async_add, 7, 8))
# batch helper in async
print('call_map:', await call_map([sync_add, async_add], 4, 5))
# decorator shortcuts in async
print('await dec_sync_add.async_:', await dec_sync_add.async_(6, 7))
print('await dec_async_add:', await dec_async_add(8, 9))
# timeout demo (pure-sync offload)
try:
await call_any(lambda: time.sleep(2), timeout=0.5)
except CallAnyTimeout as e:
print('Timeout caught:', e)
if __name__ == '__main__':
# sync-land examples
print('dec_sync_add(2,3):', dec_sync_add(2, 3))
print('call_any(async_add) in sync:', call_any(async_add, 9, 10))
# then run the async demonstrations
asyncio.run(main())
Run it:
python quickstart.py
Expected output:
dec_sync_add(2,3): 5
sync_add: 3
call_any(sync_add): 7
offloaded sync_add: 11
async_add: 15
call_any(async_add): 15
call_map: [9, 9]
await dec_sync_add.async_: 13
await dec_async_add: 17
Timeout caught: Function <lambda> timed out after 0.5s
Notes:
- if you ever need to offload into your own thread‐pool, you can write the below rather than relying on the built-in default:
call_any(some_sync_fn, arg1, arg2, executor=my_custom_executor)
- By tuning
timeout
,force_offload
, or using the.sync
/.async_
bypasses, you get seamless sync↔️async interoperability without rewriting your core logic.
Copy this into app.py
—it’ll just work once you pip install synchronaut
:
from typing import AsyncGenerator
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from synchronaut import synchronaut
# ——— Dummy DB & models ———
class User(BaseModel):
id: int
name: str
class DummyDB:
def __init__(self):
self._data = {
1: {'id': 1, 'name': 'Alice'},
2: {'id': 2, 'name': 'Bob'},
}
def query(self, user_id: int):
return self._data.get(user_id)
async def get_db_async() -> AsyncGenerator[DummyDB, None]:
db = DummyDB()
try:
yield db
finally:
...
# ——— App & routes ———
app = FastAPI()
@synchronaut()
def get_user(user_id: int, db: DummyDB = Depends(get_db_async)) -> User:
data = db.query(user_id)
if not data:
raise HTTPException(status_code=404, detail='User not found')
return User(**data)
@app.get('/')
async def hello():
return {"Hello, @syncronauts!"}
@app.get('/users/{user_id}', response_model=User)
async def read_user(user: User = Depends(get_user)):
return user
Run:
uvicorn app:app --reload
This will produce:
When you go to http://127.0.0.1:8000/ -> {'Hello, @syncronauts!'}
When you go to http://127.0.0.1:8000/users/1 -> {'id': 1, 'name': 'Alice'}
When you go to http://127.0.0.1:8000/users/2 -> {'id': 2, 'name': 'Bob'}
When you go to http://127.0.0.1:8000/users/3 -> {"detail":"User not found"}
Put this in ctx_prop.py
:
from synchronaut.utils import (
request_context,
spawn_thread_with_ctx,
set_request_ctx,
get_request_ctx,
)
# set a global context
set_request_ctx({'user_id': 42})
print('Global, user_id:', get_request_ctx()['user_id']) # 42
# override in a block
with request_context({'user_id': 99}):
print('Inside block, user_id:', get_request_ctx()['user_id']) # 99
# back to global
print('Global again, user_id:', get_request_ctx()['user_id']) # 42
# worker in a thread sees the global context
def work():
print('Inside thread, user_id:', get_request_ctx()['user_id']) # 42
thread = spawn_thread_with_ctx(work)
thread.join()
Run:
python ctx_prop.py
Expected:
Global, user_id: 42
Inside block, user_id: 99
Global again, user_id: 42
Inside thread, user_id: 42
parallel_map
A key feature that has the ability to run multiple calls in parallel (in both sync and async contexts) with individual timeouts.
max_workers
).asyncio
-land, each call is wrapped in an asyncio.create_task(...)
and then awaited with a single asyncio.gather(...)
.def parallel_map(
calls: list[tuple[Callable, tuple, dict, float|None]],
*,
executor: ThreadPoolExecutor | None = None,
return_exceptions: bool = False,
) -> Any
calls
is a list of 4-tuples:
(func, args_tuple, kwargs_dict, per_call_timeout)
where per_call_timeout
is a float
(seconds) or None
(no timeout).
executor
(optional) lets you supply your own ThreadPoolExecutor
for offloading in sync-land or asyncio-land; if omitted, the built-in _SHARED_EXECUTOR
is used.
return_exceptions
(bool
) controls whether exceptions get returned in the results list (instead of immediately propagating).
Each call’s 4th element is either:
float
(e.g. 0.2
), causing call_any(..., timeout=0.2, …)
to be used, so that if the function runs longer, a CallAnyTimeout
is returned or raised.None
, meaning no timeout is applied on that call.return_exceptions=False
(the default), the first exception (or timeout) anywhere will immediately bubble up.return_exceptions=True
, each call is wrapped in a try/except
that returns the exception object in that position instead of raising.def
) usagedef sync_greet(name):
return f"Hello, {name}!"
def sync_add(a, b):
return a + b
def sync_sleep_and_return(x):
import time; time.sleep(x)
return f"Slept {x}s"
# We want:
# - sync_greet("Alice") with no timeout
# - sync_add(2, 3) with a 1.0s timeout
# - sync_sleep_and_return(2) with a 1.0s timeout (this one should “timeout”)
calls = [
(sync_greet, ("Alice",), {}, None),
(sync_add, (2, 3), {}, 1.0),
(sync_sleep_and_return, (2,), {}, 1.0),
]
# Collect exceptions instead of letting the sleep call raise
results = parallel_map(calls, return_exceptions=True)
# `results` is a list, in the same order:
# [
# "Hello, Alice!", # from sync_greet
# 5, # from sync_add
# CallAnyTimeout(...) # from sync_sleep_and_return because it slept 2s > 1.0s timeout
# ]
print(results)
Possible Output:
['Hello, Alice!', 5, CallAnyTimeout('Function sync_sleep_and_return timed out after 1.0s')]
asyncio
coroutineimport asyncio
async def async_hello(name):
await asyncio.sleep(0.1)
return f"Hi, {name}!"
def sync_double(x):
return x * 2
async def main():
calls = [
# run async_hello("Charlie") with a 0.5s timeout
(async_hello, ("Charlie",), {}, 0.5),
# run sync_double(21) with no timeout
(sync_double, (21,), {}, None),
# run async_hello but force it to “time out” in 0.01s
(async_hello, ("TooSlow",), {}, 0.01),
]
# We want exceptions captured so we can see who timed out
results = await parallel_map(calls, return_exceptions=True)
# results[0] → "Hi, Charlie!"
# results[1] → 42
# results[2] → CallAnyTimeout(...) because async_hello slept 0.1s > 0.01s
print(results)
asyncio.run(main())
Possible Output:
['Hi, Charlie!', 42, CallAnyTimeout('Function async_hello timed out after 0.01s')]
import trio
async def async_square(n):
await trio.sleep(0.05)
return n * n
def sync_subtract(a, b):
return a - b
async def trio_main():
calls = [
(async_square, (5,), {}, 0.1), # finishes in 0.05s < 0.1s
(sync_subtract, (10,3),{}, None),
(async_square, (10,), {}, 0.01), # will timeout
]
results = await parallel_map(calls, return_exceptions=True)
# → [25, 7, CallAnyTimeout(...)]
print(results)
trio.run(trio_main)
Possible Output:
[25, 7, CallAnyTimeout('Function async_square timed out after 0.01s')]
Signature
def parallel_map(
calls: list[tuple[Callable, tuple, dict, float|None]],
*,
executor: ThreadPoolExecutor | None = None,
return_exceptions: bool = False,
) -> Any:
calls
is a list of 4‐tuples:
(func, args_tuple, kwargs_dict, per_call_timeout)
where per_call_timeout
is a float
or None
.
executor
(optional) lets you supply your own ThreadPoolExecutor
for offloading in sync-land or asyncio-land.
return_exceptions
controls whether exceptions get returned in the result list (instead of immediately propagating).
Sync branch (plain “no async loop”)
futures = [
executor.submit(
partial(
call_any,
fn, *args,
timeout=per_timeout,
executor=executor,
**kwargs
)
)
for (fn, args, kwargs, per_timeout) in calls
]
for fut in futures:
try:
results.append(fut.result())
except Exception as exc:
if return_exceptions:
results.append(exc)
else:
raise
call_any(...)
may, in turn, spin up an asyncio.run_coroutine_threadsafe(...)
(for async fns) or run your sync fn directly in that thread.max_workers
threads in your pool.Asyncio branch (mode == 'asyncio'
)
async def _run_all_asyncio():
tasks = []
for (fn, args, kwargs, per_timeout) in calls:
async def _run_one(fn=fn, args=args, kwargs=kwargs, per_timeout=per_timeout):
try:
return await call_any(fn, *args, timeout=per_timeout, executor=_exec, **kwargs)
except Exception as exc:
if return_exceptions:
return exc
raise
tasks.append(asyncio.create_task(_run_one()))
return await asyncio.gather(*tasks, return_exceptions=return_exceptions)
asyncio.create_task(...)
.await asyncio.gather(...)
waits for all to complete or timeout/raise.Trio branch (mode == 'trio'
)
async def _run_all_trio():
results = []
for (fn, args, kwargs, per_timeout) in calls:
try:
val = await call_any(fn, *args, timeout=per_timeout, **kwargs)
results.append(val)
except Exception as exc:
if return_exceptions:
results.append(exc)
else:
raise
return results
return_exceptions=True
) or re‐raised.All these options are callable via call_any(...)
or the @synchronaut(...)
decorator:
timeout=
: raises CallAnyTimeout
if the call exceeds N secondsforce_offload=True
: always run sync funcs in the background loop (enables timely cancellation)executor=
: send offloaded sync work into a caller-provided ThreadPoolExecutor
(instead of the default)call_map([...], *args)
: runs in parallel in async context, sequentially in sync contextset_request_ctx()
/ get_request_ctx()
to set and read a global ContextVar
request_context({...})
context-manager to temporarily overridespawn_thread_with_ctx(fn, *args)
to ensure ContextVar
state flows into threadsinspect
+ coroutine check (nanoseconds–µs). For ultra-hot loops, consider a bypass (.sync
or .async_
).timeout=
if they’re offloaded; otherwise they block until completion..sync
bypasses run on the preferred loop returned by get_preferred_loop()
(which now uses new_event_loop()
under your installed policy). That loop is started once and lives until process exit—no hidden extra loops or deprecation warnings.ThreadPoolExecutor
via executor=
to call_any
/parallel_map
; otherwise the built-in _SHARED_EXECUTOR
is used.spawn_thread_with_ctx
(or the decorator’s offload) to carry your ContextVar
into threads.uvloop
on first import only if you haven’t already set the uvloop policy—otherwise it simply joins your loop. Use has_uvloop_policy()
/is_uvloop()
to detect what you’ve got._in_async_context()
recognizes only asyncio
and trio
. If you’re on some other event loop, calls may mis-route.inspect.trace()
for deep dives.FAQs
A “write-once, run‐anywhere” sync/async bridge that’s thread-safe, decorator-driven, and plays nicely in FastAPI (or other frameworks) & with DB connections.
We found that synchronaut demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Security News
Ruby maintainers from Bundler and rbenv teams are building rv to bring Python uv's speed and unified tooling approach to Ruby development.
Security News
Following last week’s supply chain attack, Nx published findings on the GitHub Actions exploit and moved npm publishing to Trusted Publishers.