synchronaut
Overview
synchronaut
is a tiny bridge to write your business logic once and run it in both sync and async contexts—thread-safe, decorator-driven, and DB-friendly. It provides:
- A single
call_any
entrypoint for all sync↔️async combinations, where you can optionally pass executor=
- 🆕 A decorator
@synchronaut(...)
with .sync
/ .async_
bypass methods
- Batch helper
parallel_map
(aliased as parallel_map
), with per-call timeouts and exception capture
- Context-var propagation across threads
- 🆕 Enhanced event‐loop policy support: synchronaut now defers/install
uvloop
only if you haven’t already, and exposes get_preferred_loop()
, is_uvloop()
& has_uvloop_policy()
so it always joins your existing asyncio (or uvloop) event loop
- Customizable timeouts with
CallAnyTimeout
|
|
|
|
|
| 
Quickstart
Install:
pip install synchronaut
pip install synchronaut[fast]
Create quickstart.py
:
import time
import asyncio
from synchronaut import synchronaut, call_any, call_map, CallAnyTimeout
def sync_add(a, b):
return a + b
async def async_add(a, b):
return a + b
@synchronaut()
def dec_sync_add(a, b):
return a + b
@synchronaut(timeout=1.0)
async def dec_async_add(a, b):
return a + b
async def main():
print('sync_add:', sync_add(1, 2))
print('call_any(sync_add):', await call_any(sync_add, 3, 4))
print('offloaded sync_add:', await call_any(sync_add, 5, 6))
print('async_add:', await async_add(7, 8))
print('call_any(async_add):', await call_any(async_add, 7, 8))
print('call_map:', await call_map([sync_add, async_add], 4, 5))
print('await dec_sync_add.async_:', await dec_sync_add.async_(6, 7))
print('await dec_async_add:', await dec_async_add(8, 9))
try:
await call_any(lambda: time.sleep(2), timeout=0.5)
except CallAnyTimeout as e:
print('Timeout caught:', e)
if __name__ == '__main__':
print('dec_sync_add(2,3):', dec_sync_add(2, 3))
print('call_any(async_add) in sync:', call_any(async_add, 9, 10))
asyncio.run(main())
Run it:
python quickstart.py
Expected output:
dec_sync_add(2,3): 5
sync_add: 3
call_any(sync_add): 7
offloaded sync_add: 11
async_add: 15
call_any(async_add): 15
call_map: [9, 9]
await dec_sync_add.async_: 13
await dec_async_add: 17
Timeout caught: Function <lambda> timed out after 0.5s
Notes:
FastAPI Integration
Copy this into app.py
—it’ll just work once you pip install synchronaut
:
from typing import AsyncGenerator
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from synchronaut import synchronaut
class User(BaseModel):
id: int
name: str
class DummyDB:
def __init__(self):
self._data = {
1: {'id': 1, 'name': 'Alice'},
2: {'id': 2, 'name': 'Bob'},
}
def query(self, user_id: int):
return self._data.get(user_id)
async def get_db_async() -> AsyncGenerator[DummyDB, None]:
db = DummyDB()
try:
yield db
finally:
...
app = FastAPI()
@synchronaut()
def get_user(user_id: int, db: DummyDB = Depends(get_db_async)) -> User:
data = db.query(user_id)
if not data:
raise HTTPException(status_code=404, detail='User not found')
return User(**data)
@app.get('/')
async def hello():
return {"Hello, @syncronauts!"}
@app.get('/users/{user_id}', response_model=User)
async def read_user(user: User = Depends(get_user)):
return user
Run:
uvicorn app:app --reload
This will produce:
When you go to http://127.0.0.1:8000/ -> {'Hello, @syncronauts!'}
When you go to http://127.0.0.1:8000/users/1 -> {'id': 1, 'name': 'Alice'}
When you go to http://127.0.0.1:8000/users/2 -> {'id': 2, 'name': 'Bob'}
When you go to http://127.0.0.1:8000/users/3 -> {"detail":"User not found"}
Context Propagation
Put this in ctx_prop.py
:
from synchronaut.utils import (
request_context,
spawn_thread_with_ctx,
set_request_ctx,
get_request_ctx,
)
set_request_ctx({'user_id': 42})
print('Global, user_id:', get_request_ctx()['user_id'])
with request_context({'user_id': 99}):
print('Inside block, user_id:', get_request_ctx()['user_id'])
print('Global again, user_id:', get_request_ctx()['user_id'])
def work():
print('Inside thread, user_id:', get_request_ctx()['user_id'])
thread = spawn_thread_with_ctx(work)
thread.join()
Run:
python ctx_prop.py
Expected:
Global, user_id: 42
Inside block, user_id: 99
Global again, user_id: 42
Inside thread, user_id: 42
Batch Helper: parallel_map
A key feature that has the ability to run multiple calls in parallel (in both sync and async contexts) with individual timeouts.
- In sync-land, all calls are submitted to a thread pool at once and run truly in parallel (up to
max_workers
).
- In
asyncio
-land, each call is wrapped in an asyncio.create_task(...)
and then awaited with a single asyncio.gather(...)
.
- In Trio-land, calls are run sequentially under Trio’s task runner—but any sync call still offloads to threads if needed.
Signature
def parallel_map(
calls: list[tuple[Callable, tuple, dict, float|None]],
*,
executor: ThreadPoolExecutor | None = None,
return_exceptions: bool = False,
) -> Any
-
calls
is a list of 4-tuples:
(func, args_tuple, kwargs_dict, per_call_timeout)
where per_call_timeout
is a float
(seconds) or None
(no timeout).
-
executor
(optional) lets you supply your own ThreadPoolExecutor
for offloading in sync-land or asyncio-land; if omitted, the built-in _SHARED_EXECUTOR
is used.
-
return_exceptions
(bool
) controls whether exceptions get returned in the results list (instead of immediately propagating).
Per-Function Timeouts
Each call’s 4th element is either:
- A
float
(e.g. 0.2
), causing call_any(..., timeout=0.2, …)
to be used, so that if the function runs longer, a CallAnyTimeout
is returned or raised.
None
, meaning no timeout is applied on that call.
Exception Capture
- If
return_exceptions=False
(the default), the first exception (or timeout) anywhere will immediately bubble up.
- If
return_exceptions=True
, each call is wrapped in a try/except
that returns the exception object in that position instead of raising.
Examples
1. Synchronous (plain‐old def
) usage
def sync_greet(name):
return f"Hello, {name}!"
def sync_add(a, b):
return a + b
def sync_sleep_and_return(x):
import time; time.sleep(x)
return f"Slept {x}s"
calls = [
(sync_greet, ("Alice",), {}, None),
(sync_add, (2, 3), {}, 1.0),
(sync_sleep_and_return, (2,), {}, 1.0),
]
results = parallel_map(calls, return_exceptions=True)
print(results)
Possible Output:
['Hello, Alice!', 5, CallAnyTimeout('Function sync_sleep_and_return timed out after 1.0s')]
2. “Mixed” Sync + Async in an asyncio
coroutine
import asyncio
async def async_hello(name):
await asyncio.sleep(0.1)
return f"Hi, {name}!"
def sync_double(x):
return x * 2
async def main():
calls = [
(async_hello, ("Charlie",), {}, 0.5),
(sync_double, (21,), {}, None),
(async_hello, ("TooSlow",), {}, 0.01),
]
results = await parallel_map(calls, return_exceptions=True)
print(results)
asyncio.run(main())
Possible Output:
['Hi, Charlie!', 42, CallAnyTimeout('Function async_hello timed out after 0.01s')]
3. Inside a Trio‐based function
import trio
async def async_square(n):
await trio.sleep(0.05)
return n * n
def sync_subtract(a, b):
return a - b
async def trio_main():
calls = [
(async_square, (5,), {}, 0.1),
(sync_subtract, (10,3),{}, None),
(async_square, (10,), {}, 0.01),
]
results = await parallel_map(calls, return_exceptions=True)
print(results)
trio.run(trio_main)
Possible Output:
[25, 7, CallAnyTimeout('Function async_square timed out after 0.01s')]
⚙️ How It Works
-
Signature
def parallel_map(
calls: list[tuple[Callable, tuple, dict, float|None]],
*,
executor: ThreadPoolExecutor | None = None,
return_exceptions: bool = False,
) -> Any:
-
calls
is a list of 4‐tuples:
(func, args_tuple, kwargs_dict, per_call_timeout)
where per_call_timeout
is a float
or None
.
-
executor
(optional) lets you supply your own ThreadPoolExecutor
for offloading in sync-land or asyncio-land.
-
return_exceptions
controls whether exceptions get returned in the result list (instead of immediately propagating).
-
Sync branch (plain “no async loop”)
futures = [
executor.submit(
partial(
call_any,
fn, *args,
timeout=per_timeout,
executor=executor,
**kwargs
)
)
for (fn, args, kwargs, per_timeout) in calls
]
for fut in futures:
try:
results.append(fut.result())
except Exception as exc:
if return_exceptions:
results.append(exc)
else:
raise
- Each
call_any(...)
may, in turn, spin up an asyncio.run_coroutine_threadsafe(...)
(for async fns) or run your sync fn directly in that thread.
- Because all calls are submitted at once, they run in parallel up to
max_workers
threads in your pool.
-
Asyncio branch (mode == 'asyncio'
)
async def _run_all_asyncio():
tasks = []
for (fn, args, kwargs, per_timeout) in calls:
async def _run_one(fn=fn, args=args, kwargs=kwargs, per_timeout=per_timeout):
try:
return await call_any(fn, *args, timeout=per_timeout, executor=_exec, **kwargs)
except Exception as exc:
if return_exceptions:
return exc
raise
tasks.append(asyncio.create_task(_run_one()))
return await asyncio.gather(*tasks, return_exceptions=return_exceptions)
- Each call is scheduled immediately with
asyncio.create_task(...)
.
- Then a single
await asyncio.gather(...)
waits for all to complete or timeout/raise.
-
Trio branch (mode == 'trio'
)
async def _run_all_trio():
results = []
for (fn, args, kwargs, per_timeout) in calls:
try:
val = await call_any(fn, *args, timeout=per_timeout, **kwargs)
results.append(val)
except Exception as exc:
if return_exceptions:
results.append(exc)
else:
raise
return results
- Calls are run sequentially under Trio’s task runner.
- If any individual call raises, it’s either captured (if
return_exceptions=True
) or re‐raised.
Advanced
All these options are callable via call_any(...)
or the @synchronaut(...)
decorator:
timeout=
: raises CallAnyTimeout
if the call exceeds N seconds
force_offload=True
: always run sync funcs in the background loop (enables timely cancellation)
executor=
: send offloaded sync work into a caller-provided ThreadPoolExecutor
(instead of the default)
call_map([...], *args)
: runs in parallel in async context, sequentially in sync context
- Context propagation:
set_request_ctx()
/ get_request_ctx()
to set and read a global ContextVar
request_context({...})
context-manager to temporarily override
spawn_thread_with_ctx(fn, *args)
to ensure ContextVar
state flows into threads
⚠️ Gotchas
- Decorator overhead: Each decorated call does an
inspect
+ coroutine check (nanoseconds–µs). For ultra-hot loops, consider a bypass (.sync
or .async_
).
- Timeouts on sync code: Pure-sync functions only honor
timeout=
if they’re offloaded; otherwise they block until completion.
- Background loop lifecycle: All offloads and
.sync
bypasses run on the preferred loop returned by get_preferred_loop()
(which now uses new_event_loop()
under your installed policy). That loop is started once and lives until process exit—no hidden extra loops or deprecation warnings.
- Custom executor: Pass your own
ThreadPoolExecutor
via executor=
to call_any
/parallel_map
; otherwise the built-in _SHARED_EXECUTOR
is used.
- Context-var propagation: Only works if you use
spawn_thread_with_ctx
(or the decorator’s offload) to carry your ContextVar
into threads.
- Event-loop policy management: Synchronaut will warn & install
uvloop
on first import only if you haven’t already set the uvloop policy—otherwise it simply joins your loop. Use has_uvloop_policy()
/is_uvloop()
to detect what you’ve got.
- Non-asyncio stacks:
_in_async_context()
recognizes only asyncio
and trio
. If you’re on some other event loop, calls may mis-route.
- Tracebacks: Decorators + offloads can obscure original frames. Enable debug logging or use
inspect.trace()
for deep dives.
✅ When to use synchronaut
- I/O-bound web services (DB calls, HTTP, file I/O)
- Mixed sync/async code-bases (one API, two contexts)
- FastAPI / DI: sync ORMs auto-offload under the hood
- Context-scoped resources: single “request context” across threads & coros
🚫 When not to use synchronaut
- CPU-bound tight loops where microseconds matter
- Pure-sync or pure-async projects (no context switching)
- Non-asyncio async frameworks (e.g. Curio)
- Strict loop-lifecycle environments that forbid background loops