queue-utilities
Let's make using Queues great again! Queue utilities and conveniences for those using sync libraries.
Currently implements using threads and threading queues as standard, multiprocessing queues can be used by passing in relevant multiprocessing.Queue arguments. Worker threads use threading.Thread not multiprocessing.Process by design, if you require running eg Select in an external process as a message broker I recommend spawning a Process and then using as is documented.
This utilities package contains the following classes:
- Pipe - Pipe messages from one queue to another.
- Timer - Threaded timer that emits time on internal or provided queue after given wait time period. Can be cancelled.
- Ticker - Same as timer but emits time at regular intervals until stopped.
- Multiplex - Many-to-One (fan-in) queue management helper.
- Multicast - One-to-Many (fan-out) queue management helper.
- Select - Like Multiplex but output payload contains message source queue to be used in dynamic message based switching. Inspired by Golangs select statements using channels.
- as_thread - Decorator to run function in thread.
- with_input_queue - Decorator to attach input and optional output queues to function which will be run in a thread.
- with_output_queue - Decorator that sends function results to output queue.
Note that this package is early stages of development.
Installation
python3 -m pip install queue-utilities
Usage
Pipe
from queue_utilities import Pipe
original_q, output_q = _queue.Queue(), _queue.Queue()
p = Pipe(original_q, output_q)
original_q.put(1)
recv = output_q.get()
print(recv)
p.stop()
Timer
from queue_utilities import Timer
five_seconds = Timer(5)
five_seconds.get()
to_cancel = Timer(60)
print(to_cancel._is_finished)
to_cancel.stop()
print(to_cancel._is_finished)
Ticker
from queue_utilities import Ticker
tick = Ticker(5)
for _ in range(4):
print(f"The time is: {tick.get()}")
tick.stop()
Multiplex
from queue_utilities import Multiplex
from queue import Queue
queue_a, queue_b = Queue(), Queue()
multi_p = Multiplex(queue_a, queue_b)
queue_a.put("a")
queue_b.put("b")
for _ in range(2):
message = multi_p()
print(f"I got a message! {message}")
multi_p.stop()
multi_p.get()
Multicast
from queue_utilities import Multicast
from queue import Queue
out_a, out_b = Queue(), Queue()
multicast = Multicast(out_a, out_b)
multicast.send("A message!")
for q in (out_a, out_b):
print(q.get())
multicast.stop()
Select
Use with context to build a cancellable thread
from queue_utilities import Select
from queue import Queue
from threading import Thread
out_a, cancel_sig = Queue(), Queue()
def selector(*queues):
with Select(*queues) as select:
for which, message in select:
if which is cancel_sig:
select.stop()
else:
print(f'Got a message {message}')
Thread(target=selector, args=(out_a, cancel_sig)).start()
out_a.put(1)
out_a.put(2)
out_a.put(3)
cancel_sig.put('Bye')
Timeout a function with Timer
from threading import Thread
import time
from queue import Queue
from queue_utilities import Select, Timer
def do_something_slow(q: Queue) -> None:
time.sleep(3)
q.put("Done")
output_q = Queue()
Thread(target=do_something_slow, args=(output_q,)).start()
timeout = Timer(2)
select = Select(output_q, timeout._output_q)
for (which_q, result) in select:
if which_q is output_q:
print("Received result", result)
else:
print("Function timed out!")
break
select.stop()
as_thread
from queue_utilities import as_thread
import time
@as_thread
def sleeper():
time.sleep(5)
print('Goodbye!')
sleeper()
print('Waiting...')
time.sleep(6)
print('Done!')
with_input_queue
from queue_utilities import with_input_queue
from queue import Queue
work_queue = Queue()
result_queue = Queue()
@with_input_queue(work_queue, result_queue)
def squarer(input: int):
return input**2
for i in range(10):
work_queue.put(i)
print(f'{i} squared is {result_queue.get()}')
with_output_queue
from queue_utilities import with_input_queue
from queue import Queue
result_queue = Queue()
@with_output_queue(result_queue)
def squarer(input: int):
return input**2
for i in range(10):
squarer(i)
print(f'{i} squared is {result_queue.get()}')
Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
License
MIT