splut
Actor model for Python
Usage example
>>> from concurrent.futures import ThreadPoolExecutor
>>> from splut.actor import Join, Spawn
>>> from urllib.request import urlopen
>>> import sys
>>>
>>> class Pipe:
...
... traffic = 0
...
... def __init__(self, index):
... self.index = index
...
... def fetch(self, url):
... def log(message):
... print(f"[{self.index}] [{url}] {message}", file = sys.stderr) # Demonstrate worker (and thus thread) utilisation.
... log('Fetch.')
... with urlopen(url) as f:
... data = f.read()
... n = len(data)
... log(f"Got: {n}")
... self.traffic += n # Effectively self is locked, so this is thread-safe.
... return data
...
>>> class Sum:
...
... def __init__(self, pipeactor):
... self.pipeactor = pipeactor
...
... async def bytecount(self, dotcoms):
... futures = [self.pipeactor.fetch(f"https://www.{dotcom}.com/") for dotcom in dotcoms] # Collect futures eagerly.
... return sum(map(len, await Join(futures)))
...
>>> pipes = [Pipe(i) for i in range(3)]
>>> with ThreadPoolExecutor() as e:
... spawn = Spawn(e)
... pipeactor = spawn(*pipes) # One mailbox, multiple independent workers.
... sumactor = spawn(Sum(pipeactor)) # An actor is cheap to create, unlike a thread.
... bytecount = sumactor.bytecount(['facebook', 'google', 'tumblr', 'yahoo', 'youtube']).wait()
Total bytes piped by workers matches the total we have:
>>> sum(p.traffic for p in pipes) == bytecount
True
API
splut.actor
Spawn Objects
class Spawn()
__init__
def __init__(executor)
Spawned actors will use threads from the given executor.
__call__
def __call__(*objs)
Create an actor backed by the given worker object(s), each of which is used in a single-threaded way.
Calling a method on the returned actor returns a Future
immediately, which eventually becomes done with the result of a worker method of the same name (or never if the worker method hangs).
A worker method may be async, in which case it can await futures returned by other actors, releasing the worker in the meantime.
Join Objects
class Join()
Make multiple futures awaitable as a unit. In the zero futures case this resolves (to an empty list) without suspending execution.
Otherwise if any future hangs, so does this. Otherwise if any future failed, all such exceptions are raised as a chain. Otherwise all results are returned as a list.
splut.actor.future
Future Objects
class Future()
wait
def wait()
Block until there is an outcome, then return/raise it.
For use outside actors, or within one if you know the future is done and don't want to suspend execution with await
in that case.
andforget
def andforget(log)
Send any exception to the given log.
splut.bg
Sleeper Objects
class Sleeper()
interrupt
def interrupt()
If a sleep is in progress that sleep returns now, otherwise the next sleep will return immediately.
This is similar behaviour to interrupting a maybe-sleeping thread in Java.
splut.bg.delay