Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
A collection of helpful queue utilities. Pipes, timers, tickers, multiplexors, multicasters and queue select.
Let's make using Queues great again! Queue utilities and conveniences for those using sync libraries.
Currently implements using threads and threading queues as standard, multiprocessing queues can be used by passing in relevant multiprocessing.Queue arguments. Worker threads use threading.Thread not multiprocessing.Process by design, if you require running eg Select in an external process as a message broker I recommend spawning a Process and then using as is documented.
This utilities package contains the following classes:
Note that this package is early stages of development.
python3 -m pip install queue-utilities
from queue_utilities import Pipe
original_q, output_q = _queue.Queue(), _queue.Queue()
p = Pipe(original_q, output_q)
# put an item into the original queue
original_q.put(1)
# get the message off the output queue
recv = output_q.get()
print(recv) # 1
# don't forget to stop the pipe after you've finished.
p.stop()
from queue_utilities import Timer
# emit time after 5 seconds
five_seconds = Timer(5)
five_seconds.get()
# cancel a timer
to_cancel = Timer(60)
print(to_cancel._is_finished) # False
to_cancel.stop()
print(to_cancel._is_finished) # True
from queue_utilities import Ticker
# print the time every 5 seconds 4 times
tick = Ticker(5)
for _ in range(4):
print(f"The time is: {tick.get()}")
# cancel the ticker thread
tick.stop()
from queue_utilities import Multiplex
from queue import Queue
# create two queues and pass them into the Multiplex
queue_a, queue_b = Queue(), Queue()
multi_p = Multiplex(queue_a, queue_b)
# send messages to any of the queues
queue_a.put("a")
queue_b.put("b")
# read the messages
for _ in range(2):
message = multi_p() # or multi_p.get()
print(f"I got a message! {message}")
# cleanup
multi_p.stop()
# if you try to read a message after stop
# it raises a MultiplexClosed exception
multi_p.get()
from queue_utilities import Multicast
from queue import Queue
out_a, out_b = Queue(), Queue()
multicast = Multicast(out_a, out_b)
multicast.send("A message!")
for q in (out_a, out_b):
print(q.get()) # prints "A message!" twice!
multicast.stop()
from queue_utilities import Select
from queue import Queue
from threading import Thread
out_a, cancel_sig = Queue(), Queue()
def selector(*queues):
with Select(*queues) as select:
for which, message in select:
if which is cancel_sig:
# stop select on any message on queue b
select.stop()
else:
print(f'Got a message {message}')
Thread(target=selector, args=(out_a, cancel_sig)).start()
out_a.put(1)
out_a.put(2)
out_a.put(3)
cancel_sig.put('Bye')
from threading import Thread
import time
from queue import Queue
from queue_utilities import Select, Timer
def do_something_slow(q: Queue) -> None:
# do something slow
time.sleep(3)
q.put("Done")
output_q = Queue()
Thread(target=do_something_slow, args=(output_q,)).start()
timeout = Timer(2)
select = Select(output_q, timeout._output_q)
for (which_q, result) in select:
if which_q is output_q:
print("Received result", result)
else:
print("Function timed out!")
break
select.stop()
from queue_utilities import as_thread
import time
@as_thread
def sleeper():
time.sleep(5)
print('Goodbye!')
sleeper()
print('Waiting...')
time.sleep(6)
print('Done!')
from queue_utilities import with_input_queue
from queue import Queue
work_queue = Queue()
result_queue = Queue()
@with_input_queue(work_queue, result_queue)
def squarer(input: int):
return input**2
for i in range(10):
work_queue.put(i)
print(f'{i} squared is {result_queue.get()}')
from queue_utilities import with_input_queue
from queue import Queue
result_queue = Queue()
@with_output_queue(result_queue)
def squarer(input: int):
return input**2
for i in range(10):
squarer(i)
print(f'{i} squared is {result_queue.get()}')
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
FAQs
A collection of helpful queue utilities. Pipes, timers, tickers, multiplexors, multicasters and queue select.
We found that queue-utilities demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.