Socket
Socket
Sign inDemoInstall

queue-api

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-api

ABC for asynchronous, point-readable, exception-free read/write queues


Maintainers
1

Queue Api

ABC for asynchronous, point-readable, exception-free read/write queues

pip install queue-api

Interfaces

ReadQueue[A]

A = TypeVar('A', covariant=True)

class ReadQueue(Generic[A]):
  async def pop(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def pop(self, id: str) -> Either[ReadError, A]:
    ...

  async def read(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def read(self, id: str) -> Either[ReadError, A]:
    ...

  async def items(self) -> AsyncIter[Either[QueueError, tuple[str, A]]]:
    ...
  
  async def keys(self) -> AsyncIter[Either[ReadError, str]]:
    ...
  
  async def values(self) -> AsyncIter[Either[ReadError, A]]:
    ...

WriteQueue[A]

A = TypeVar('A', contravariant=True)

class WriteQueue(Generic[A]):
  async def push(self, key: str, value: A) -> Either[QueueError, None]:
    ...

Queue[A] = ReadQueue[A] & WriteQueue[A]

AppendQueue[A]

class AppendQueue(WriteQueue[Sequence[A]], Generic[A]):
  async def append(self, id: str, values: Sequence[A], *, create: bool) -> bool:
    ...

R/W differences

In general, a Queue[A] can be treated as an AsyncIterable[A]. However, there are some distintions between Read- and WriteQueues

Variance

ReadQueue[A] is covariant

Since ReadQueue is immutable, someone expecting a ReadQueue[Animal] will be happy with a ReadQueue[Cat].

WriteQueue[A] is contravariant

This is a bit of a weird one: WriteQueue is mutable but not readable. Thus, someone expecting a WriteQueue[Cat] will be happy with a WriteQueue[Animal].

Operations

ReadQueues can be map-ed and filter-ed. This is akin to map and filter on AsyncIter-ables.

WriteQueues can be premap-ed and prefilter-ed.

Again a bit of a weird one: let's use an example:

async def cats(queue: WriteQueue[tuple[str, Cat]]):
  await queue.push('key1', ('Garfield', Cat(...)))
  await queue.push('key2', ('Puss in Boots', Cat(...)))

q_cats: Queue[Cat] = ...

Here, our q_cats queue wants Cats, but cats insists on yielding a tuple (name, Cat). So, we can premap a function to adapt the queue:

await cats(q_cats.premap(lambda t: t[1]))

Composition

Queue[B] is both a ReadQueue[B] and a WriteQueue[B]. However, we'll generally want to have a producer function (that receives a WriteQueue[A]) and a consumer function, that receives a ReadQueue[C].

Still, for whatever reason, we may want to actually store data in a Queue[B]. The setup is then as follows:

queue: Queue[B] = ...

async def producer(queue: WriteQueue[A]):
  ...

async def consumer(queue: ReadQueue[C]):
  ...

producer(queue.premap(f))
consumer(queue.map(g))

Now, premap always returns a WriteQueue; map always a ReadQueue. Thus, we can compose indefinetely in either direction:

producer(queue.premap(f1).premap(f2).premap(f3))
consumer(queue.map(g1).map(g2).map(g3))

Implementations

  • queue-kv: based on kv.api

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc