
Research
Supply Chain Attack on Axios Pulls Malicious Dependency from npm
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.
cs-threads
Advanced tools
Thread related convenience classes and functions.
Latest release 20250528: Small doc update.
Short summary:
AdjustableSemaphore: A semaphore whose value may be tuned after instantiation.bg: Dispatch the callable func in its own Thread; return the Thread.DeadlockError: Raised by NRLock when a lock is attempted from the Thread currently holding the lock.HasThreadState: A mixin for classes with a cs.threads.ThreadState instance as .state providing a context manager which pushes current=self onto that state and a default() class method returning cls.perthread_state.current as the default instance of that class.joinif: Call T.join() if T is not the current Thread.LockableMixin: Trite mixin to control access to an object via its ._lock attribute. Exposes the ._lock as the property .lock. Presents a context manager interface for obtaining an object's lock.locked: A decorator for instance methods that must run within a lock.locked_property: A thread safe property whose value is cached. The lock is taken if the value needs to computed.monitor: Turn a class into a monitor, all of whose public methods are @locked.NRLock: A nonrecursive lock. Attempting to take this lock when it is already held by the current Thread will raise DeadlockError. Otherwise this behaves like threading.Lock.PriorityLock: A priority based mutex which is acquired by and released to waiters in priority order.PriorityLockSubLock: The record for the per-acquirer Lock held by PriorityLock.acquire.State: A Thread local object with attributes which can be used as a context manager to stack attribute values.ThreadState: A Thread local object with attributes which can be used as a context manager to stack attribute values.via: Return a callable that calls the supplied func inside a with statement using the context manager cmanager. This intended use case is aimed at deferred function calls.Module contents:
AdjustableSemaphore.acquire(self, blocking=True):
The acquire() method calls the base acquire() method if not blocking.
If blocking is true, the base acquire() is called inside a lock to
avoid competing with a reducing adjust().
AdjustableSemaphore.adjust(self, newvalue):
Set capacity to newvalue
by calling release() or acquire() an appropriate number of times.
If newvalue lowers the semaphore capacity then adjust()
may block until the overcapacity is released.
AdjustableSemaphore.adjust_delta(self, delta):
Adjust capacity by delta by calling release() or acquire()
an appropriate number of times.
If delta lowers the semaphore capacity then adjust() may block
until the overcapacity is released.
AdjustableSemaphore.release(self):
Release the semaphore.
bg(func, *, daemon=None, name=None, no_start=False, no_logexc=False, args=None, kwargs=None, thread_factory=None, pre_enter_objects=None, **thread_factory_kw): Dispatch the callable func in its own Thread;
return the Thread.
Parameters:
func: a callable for the Thread target.args, kwargs: passed to the Thread constructorkwargs, kwargs: passed to the Thread constructordaemon: optional argument specifying the .daemon attribute.name: optional argument specifying the Thread name,
default: the name of func.no_logexc: if false (default False), wrap func in @logexc.no_start: optional argument, default False.
If true, do not start the Thread.pre_enter_objects: an optional iterable of objects which
should be entered using withIf pre_enter_objects is supplied, these context manager
objects will be entered using with via the closeall()
function before the Thread is started and exited when the
Thread target function ends.
If the Thread is not started (no_start=True, very
unusual) then the objects will still be entered and it will
be the caller's responsibility to manage the entered objects.
Class DeadlockError(builtins.RuntimeError)``: Raised by NRLock when a lock is attempted from the Thread currently holding the lock.
Class HasThreadState(cs.context.ContextManagerMixin)``: A mixin for classes with a cs.threads.ThreadState instance as .state
providing a context manager which pushes current=self onto that state
and a default() class method returning cls.perthread_state.current
as the default instance of that class.
NOTE: the documentation here refers to cls.perthread_state, but in
fact we honour the cls.THREAD_STATE_ATTR attribute to name
the state attribute which allows perclass state attributes,
and also use with classes which already use .perthread_state for
another purpose.
NOTE: HasThreadState.Thread is a class method whose default
is to push state for all active HasThreadState subclasses.
Contrast with HasThreadState.bg which is an _instance_method
whose default is to push state for just that instance.
The top level cs.threads.bg function calls HasThreadState.Thread
to obtain its Thread.
HasThreadState.Thread(*, name=None, target, enter_objects=None, **Thread_kw):
Factory for a Thread to push the .current state for the
currently active classes.
The optional parameter enter_objects may be used to pass
an iterable of objects whose contexts should be entered
using with obj:.
If this is set to True that indicates that every "current"
HasThreadStates instance should be entered.
The default does not enter any object contexts.
The HasThreadStates.bg method defaults to passing
enter_objects=(self,) to enter the context for self.
HasThreadState.__enter_exit__(self):
Push self.perthread_state.current=self as the Thread local current instance.
Include self.__class__ in the set of currently active classes for the duration.
HasThreadState.bg(self, func, *, enter_objects=None, **bg_kw):
Get a Thread using type(self).Thread and start it.
Return the Thread.
The HasThreadState.Thread factory duplicates the current Thread's
HasThreadState current objects as current in the new Thread.
Additionally it enters the contexts of various objects using
with obj according to the enter_objects parameter.
The value of the optional parameter enter_objects governs
which objects have their context entered using with obj
in the child Thread while running func as follows:
None: the default, meaning (self,)False: no object contexts are enteredTrue: all current HasThreadState object contexts will be entered() to enter no objectsHasThreadState.default(*, factory=None, raise_on_None=False):
The default instance of this class from cls.perthread_state.current.
Parameters:
factory: optional callable to create an instance of cls
if cls.perthread_state.current is None or missing;
if factory is True then cls is used as the factoryraise_on_None: if cls.perthread_state.current is None or missing
and factory is false and raise_on_None is true,
raise a RuntimeError;
this is primarily a debugging aidHasThreadState.get_thread_states(all_classes=None):
Return a mapping of class->current_instancefor use withHasThreadState.with_thread_statesorHasThreadState.ThreadorHasThreadState.bg`.
The default behaviour returns just a mapping for this class, expecting the default instance to be responsible for what other resources it holds.
There is also a legacy mode for all_classes=True
where the mapping is for all active classes,
probably best used for Threads spawned outside
a HasThreadState context.
Parameters:
all_classes: optional flag, default False;
if true, return a mapping of class to current instance
for all HasThreadState subclasses with an open instance,
otherwise just a mapping from this class to its current instancejoinif(T: threading.Thread): Call T.join() if T is not the current Thread.
Unlike threading.Thread.join, this function is a no-op if
T is the current `Thread.
The use case is situations such as the shutdown phase of the
MultiOpenMixin.startup_shutdown context manager. Because
the "initial open" startup phase is not necessarily run in
the same thread as the "final close" shutdown phase, it is
possible for example for a worker Thread to execute the
shutdown phase and try to join itself. Using this function
supports that scenario.
Class LockableMixin``: Trite mixin to control access to an object via its ._lock attribute.
Exposes the ._lock as the property .lock.
Presents a context manager interface for obtaining an object's lock.
LockableMixin.__exit__(self, exc_type, exc_value, traceback):
pylint: disable=unused-argument
LockableMixin.lock:
The internal lock object.
locked(*da, **dkw): A decorator for instance methods that must run within a lock.
Decorator keyword arguments:
initial_timeout:
the initial lock attempt timeout;
if this is >0 and exceeded a warning is issued
and then an indefinite attempt is made.
Default: 2.0slockattr:
the name of the attribute of self
which references the lock object.
Default '_lock'locked_property(*da, **dkw): A thread safe property whose value is cached.
The lock is taken if the value needs to computed.
The default lock attribute is ._lock.
The default attribute for the cached value is ._funcname
where funcname is func.__name__.
The default "unset" value for the cache is None.
monitor(*da, **dkw): Turn a class into a monitor, all of whose public methods are @locked.
This is a simple approach which requires class instances to have a
._lock which is an RLock or compatible
because methods may naively call each other.
Parameters:
attrs: optional iterable of attribute names to wrap in @locked.
If omitted, all names commencing with a letter are chosen.initial_timeout: optional initial lock timeout, default 10.0s.lockattr: optional lock attribute name, default '_lock'.Only attributes satifying inspect.ismethod are wrapped
because @locked requires access to the instance ._lock attribute.
Class NRLock``: A nonrecursive lock.
Attempting to take this lock when it is already held by the current Thread
will raise DeadlockError.
Otherwise this behaves like threading.Lock.
NRLock.acquire(self, *a, caller_frame=None, **kw):
Acquire the lock as for threading.Lock.
Raises DeadlockError is the lock is already held by the current Thread.
NRLock.locked(self):
Return the lock status.
NRLock.release(self):
Release the lock as for threading.Lock.
Class PriorityLock``: A priority based mutex which is acquired by and released to waiters
in priority order.
The initialiser sets a default priority, itself defaulting to 0.
The acquire() method accepts an optional priority value
which specifies the priority of the acquire request;
lower values have higher priorities.
acquire returns a new PriorityLockSubLock.
Note that internally this allocates a threading.Lock per acquirer.
When acquire is called, if the PriorityLock is taken
then the acquirer blocks on their personal Lock.
When release() is called the highest priority Lock is released.
Within a priority level acquires are served in FIFO order.
Used as a context manager, the mutex is obtained at the default priority.
The priority() method offers a context manager
with a specified priority.
Both context managers return the PriorityLockSubLock
allocated by the acquire.
PriorityLock.__init__(self, default_priority=0, name=None):
Initialise the PriorityLock.
Parameters:
default_priority: the default acquire priority,
default 0.name: optional identifying namePriorityLock.__enter__(self):
Enter the mutex as a context manager at the default priority.
Returns the new Lock.
PriorityLock.__exit__(self, *_):
Exit the context manager.
PriorityLock.acquire(self, priority=None):
Acquire the mutex with priority (default from default_priority).
Return the new PriorityLockSubLock.
This blocks behind any higher priority acquires
or any earlier acquires of the same priority.
PriorityLock.priority(self, this_priority):
A context manager with the specified this_priority.
Returns the new Lock.
PriorityLock.release(self):
Release the mutex.
Internally, this releases the highest priority Lock,
allowing that acquirer to go forward.
Class PriorityLockSubLock(PriorityLockSubLock)``: The record for the per-acquirer Lock held by PriorityLock.acquire.
Class State(_thread._local)``: A Thread local object with attributes
which can be used as a context manager to stack attribute values.
Example:
from cs.threads import ThreadState
S = ThreadState(verbose=False)
with S(verbose=True) as prev_attrs:
if S.verbose:
print("verbose! (formerly verbose=%s)" % prev_attrs['verbose'])
State.__init__(self, **kw):
Initiate the ThreadState, providing the per-Thread initial values.
State.__call__(self, **kw):
Calling a ThreadState returns a context manager which stacks some state.
The context manager yields the previous values
for the attributes which were stacked.
Class ThreadState(_thread._local)``: A Thread local object with attributes
which can be used as a context manager to stack attribute values.
Example:
from cs.threads import ThreadState
S = ThreadState(verbose=False)
with S(verbose=True) as prev_attrs:
if S.verbose:
print("verbose! (formerly verbose=%s)" % prev_attrs['verbose'])
ThreadState.__init__(self, **kw):
Initiate the ThreadState, providing the per-Thread initial values.
ThreadState.__call__(self, **kw):
Calling a ThreadState returns a context manager which stacks some state.
The context manager yields the previous values
for the attributes which were stacked.
via(cmanager, func, *a, **kw): Return a callable that calls the supplied func inside a
with statement using the context manager cmanager.
This intended use case is aimed at deferred function calls.Release 20250528: Small doc update.
Release 20250325: NRLock: include the lock name in DeadLock exceptions.
Release 20250306: HasThreadState: various fixes.
Release 20241005: Remove some debug noise.
Release 20240630:
Release 20240422: HasThreadState.default: make factory and raise_on keyword only.
Release 20240412:
Release 20240316: Fixed release upload artifacts.
Release 20240303:
Release 20231129:
Release 20230331:
Release 20230212:
Release 20230125: New HasThreadState mixin for classes with a state=State() attribute to provide a cls.default() class method for the default instance and a context manager to push/pop self.state.current=self.
Release 20221228:
Release 20221207: Small bug fix.
Release 20221118: REMOVE WorkerThreadPool, pulls in too many other things and was never used.
Release 20211208: bg: do not pass the current Pfx prefix into the new Thread, seems to leak and grow.
Release 20210306: bg: include the current Pfx prefix in the thread name and thread body Pfx, obsoletes cs.pfx.PfxThread.
Release 20210123: New @monitor class decorator for simple RLock based reentrance protection.
Release 20201025:
Release 20200718: @locked: apply the interior doc to the wrapper.
Release 20200521: @locked_property: decorate with @cs.deco.decorator to support keyword arguments.
Release 20191102: @locked: report slow-to-acquire locks, add initial_timeout and lockattr decorator keyword parameters.
Release 20190923.2: Fix annoying docstring typo.
Release 20190923.1: Docstring updates.
Release 20190923: Remove dependence on cs.obj.
Release 20190921: New PriorityLock class for a mutex which releases in (priority,fifo) order.
Release 20190812:
bg: compute default name before wrapping func in @logexc.
Release 20190729:
bg: provide default name, run callable inside Pfx, add optional no_logexc=False param preventing @logec wrapper if true.
Release 20190422:
bg(): new optional no_start=False keyword argument, preventing Thread.start if true
Release 20190102:
Release 20160828: Use "install_requires" instead of "requires" in DISTINFO.
Release 20160827:
Release 20150115: First PyPI release.
FAQs
threading and communication/synchronisation conveniences
We found that cs-threads demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.

Research
Malicious versions of the Telnyx Python SDK on PyPI delivered credential-stealing malware via a multi-stage supply chain attack.

Security News
TeamPCP is partnering with ransomware group Vect to turn open source supply chain attacks on tools like Trivy and LiteLLM into large-scale ransomware operations.