🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

pythread

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pythread

A Python library providing a user-friendly interface for managing both synchronous and asynchronous threads.

1.5.0
PyPI
Maintainers
1

pythread 🧵

A Python library providing a user-friendly interface for managing both synchronous and asynchronous threads.

Description 📚

pythread is a dual-threading Python library crafted to streamline the creation and management of threads in Python applications. It offers two distinct managers:

  • SyncThreadManager: Handles classic synchronous threads for executing blocking operations.
  • AsyncThreadManager: Manages asynchronous tasks to run coroutines in a non-blocking fashion.

With pythread, starting, stopping, and supervising the life cycle of threads and async tasks becomes effortless, enhancing the readability and resilience of your code.

Features ✨

  • Initiate and terminate threads/tasks by name or reference.
  • Accommodates additional arguments for threads/tasks.
  • Simple management of thread/task lifecycle.
  • Suitable for both sync and async implementations.

Installation 📥

To get started with pythread, run:

pip install pythread

Use Cases 🔧

  • Conducting background tasks like I/O operations, processing data, or scheduled tasks.
  • Handling multiple network connections in parallel.
  • Implementing producer-consumer patterns using thread-safe queues.
  • Crafting a basic task scheduler for time-based task execution.

Documentation 📄

SyncThreadManager

Facilitates the creation of synchronous threads for running functions with specific delays.

# Initialize the SyncThreadManager
manager = SyncThreadManager()

# Function to run in a separate thread
def print_message(message):
    print(f"Thread message: {message}")

# Start a new thread with a specific action and delay
thread = manager.start_thread(name='PrinterThread', func=print_message, delay=1, 
    message='Hello from SyncThread!'
)

# Stop the thread using its name
manager.stop_thread('PrinterThread')

# Stop the thread using the thread object
manager.stop_thread(thread)

AsyncThreadManager

Enables the execution of async coroutines concurrently.

import asyncio
from pythread import AsyncThreadManager

# Initialize the AsyncThreadManager
manager = AsyncThreadManager()

# Example coroutine function
async def async_print_message(message):
    print(f"Async message: {message}")

# Start and run an async task
loop = asyncio.get_event_loop()
task = loop.run_until_complete(manager.start_task(name='AsyncPrinter', coro_func=async_print_message, message='Hello from AsyncThread!'))

# Stop the task using its name
loop.run_until_complete(manager.stop_task('AsyncPrinter'))

# Stop the task using the task object
loop.run_until_complete(manager.stop_task(task))

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts