Security News
Weekly Downloads Now Available in npm Package Search Results
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
ABC for asynchronous, point-readable, exception-free read/write queues
pip install queue-api
ReadQueue[A]
A = TypeVar('A', covariant=True)
class ReadQueue(Generic[A]):
async def pop(self) -> Either[QueueError, tuple[str, A]]:
...
async def pop(self, id: str) -> Either[ReadError, A]:
...
async def read(self) -> Either[QueueError, tuple[str, A]]:
...
async def read(self, id: str) -> Either[ReadError, A]:
...
async def items(self) -> AsyncIter[Either[QueueError, tuple[str, A]]]:
...
async def keys(self) -> AsyncIter[Either[ReadError, str]]:
...
async def values(self) -> AsyncIter[Either[ReadError, A]]:
...
WriteQueue[A]
A = TypeVar('A', contravariant=True)
class WriteQueue(Generic[A]):
async def push(self, key: str, value: A) -> Either[QueueError, None]:
...
Queue[A] = ReadQueue[A] & WriteQueue[A]
AppendQueue[A]
class AppendQueue(WriteQueue[Sequence[A]], Generic[A]):
async def append(self, id: str, values: Sequence[A], *, create: bool) -> bool:
...
In general, a Queue[A]
can be treated as an AsyncIterable[A]
. However, there are some distintions between Read-
and WriteQueues
ReadQueue[A]
is covariant
Since ReadQueue
is immutable, someone expecting a ReadQueue[Animal]
will be happy with a ReadQueue[Cat]
.
WriteQueue[A]
is contravariant
This is a bit of a weird one: WriteQueue
is mutable but not readable. Thus, someone expecting a WriteQueue[Cat]
will be happy with a WriteQueue[Animal]
.
ReadQueue
s can bemap
-ed andfilter
-ed. This is akin tomap
andfilter
onAsyncIter
-ables.
WriteQueue
s can bepremap
-ed andprefilter
-ed.
Again a bit of a weird one: let's use an example:
async def cats(queue: WriteQueue[tuple[str, Cat]]):
await queue.push('key1', ('Garfield', Cat(...)))
await queue.push('key2', ('Puss in Boots', Cat(...)))
q_cats: Queue[Cat] = ...
Here, our q_cats
queue wants Cat
s, but cats
insists on yielding a tuple (name, Cat)
. So, we can premap
a function to adapt the queue:
await cats(q_cats.premap(lambda t: t[1]))
Queue[B]
is both a ReadQueue[B]
and a WriteQueue[B]
. However, we'll generally want to have a producer function (that receives a WriteQueue[A]
) and a consumer function, that receives a ReadQueue[C]
.
Still, for whatever reason, we may want to actually store data in a Queue[B]
. The setup is then as follows:
queue: Queue[B] = ...
async def producer(queue: WriteQueue[A]):
...
async def consumer(queue: ReadQueue[C]):
...
producer(queue.premap(f))
consumer(queue.map(g))
Now, premap
always returns a WriteQueue
; map
always a ReadQueue
. Thus, we can compose indefinetely in either direction:
producer(queue.premap(f1).premap(f2).premap(f3))
consumer(queue.map(g1).map(g2).map(g3))
FAQs
ABC for asynchronous, point-readable, exception-free read/write queues
We found that queue-api demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
Security News
A Stanford study reveals 9.5% of engineers contribute almost nothing, costing tech $90B annually, with remote work fueling the rise of "ghost engineers."
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.